Splunk Search

Extract Fields after dedup / consecutive events

willadams
Contributor

I have a new log source from which I am receiving data. The log source has no TA for the vendor (at least for what I am trying to do with it). The logs are CEF format and the logs are received via a SYSLOG server and then sent to my indexers where I can see the data. The problem is that for every 1 event in the system that it goes into, there are 3 consecutive events that occur. For example

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external

Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom

Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level

I tried writing a small query using a transaction but unfortunately this doesn't get rid of the duplication of the tags. If I run a transaction on the data, the single event looks as follows:

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level

So dedup with consecutive seemed to be the next best bit.

If I write a small query such as follows I can dedup this into a single event such as (where "report" is the common field in these 3 events).

index=wypm | dedup report consecutive=true

Splunk will then combine this into a single event as follows

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown

This achieves what I need it do. I however want to extract the fields as they are post the dedup. Can I do this? I also have people that may search this index so I am half tempted to write a macro that calls "index=wypm | dedup report consecutive=true" (and fixes the time stamp). I don't know whether this would work especially with the extracted fields. I would at least have extracted fields being CIM compliant. This method also looks like a messy workaround to the problem, but I am not sure of the correct path to follow to achieve what I trying to do.

My intent is to use a search from the "combined data" to generate an alert.

Tags (2)
0 Karma

to4kawa
Ultra Champion
| makeresults 
| eval _raw=" Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external

 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=bottom

 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=level" 
| makemv delim="
 " _raw 
| stats count by _raw 
`comment("this is your sample")`
| rex "^.*\|(?<cef>.*)"
| eval cef=replace(cef,"(\w+=)","#\1") 
| eval cef=split(cef,"#") 
| mvexpand cef 
| where cef!=""
| rename cef as _raw
| eval _raw=replace(_raw,"= (.*) ","=\1")
| eval _raw=replace(_raw,"=([^\"]+)","=\"\1\"")
| kv
| foreach * [ |eval <<FIELD>>=trim('<<FIELD>>')]
| stats values(*) as *
| fields - count
0 Karma

willadams
Contributor

Thanks, so I can't use the "dedup consecutive" and then do field extraction that way? This seems to imply that a transaction must be done and then the above applied?

0 Karma

to4kawa
Ultra Champion

first: field extract
second: operating fields
the order should be

0 Karma

nickhills
Ultra Champion

Can you provide an example of what your intended final result would be?

If my comment helps, please give it a thumbs up!
0 Karma

willadams
Contributor

The final "single" event would be similar to the following (there are more fields but I have truncated the log for simplicity sake. The fields follow on (for example) cv1 cv1label cv2 cv2label. Each single system event generates 3 seperate events in SPLUNK, hence the dedup with consecutive.

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown cv1=signature

Basically I would then make each field as follows (these are examples not the actual fields I would use on the extraction)

begintime would be something like 'logged time'
realtime would be something like 'system time'
report would be something like 'identifier'
cat would be category; and so on

Each of the 3 SPLUNK indexed events, have some differences in their fields (i.e. cat doesn't appear in the first 2 events but does in the 3rd).

0 Karma
Get Updates on the Splunk Community!

Updated Team Landing Page in Splunk Observability

We’re making some changes to the team landing page in Splunk Observability, based on your feedback. The ...

New! Splunk Observability Search Enhancements for Splunk APM Services/Traces and ...

Regardless of where you are in Splunk Observability, you can search for relevant APM targets including service ...

Webinar Recap | Revolutionizing IT Operations: The Transformative Power of AI and ML ...

The Transformative Power of AI and ML in Enhancing Observability   In the realm of IT operations, the ...