Splunk Search

Extract Fields after dedup / consecutive events

willadams
Contributor

I have a new log source from which I am receiving data. The log source has no TA for the vendor (at least for what I am trying to do with it). The logs are CEF format and the logs are received via a SYSLOG server and then sent to my indexers where I can see the data. The problem is that for every 1 event in the system that it goes into, there are 3 consecutive events that occur. For example

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external

Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom

Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level

I tried writing a small query using a transaction but unfortunately this doesn't get rid of the duplication of the tags. If I run a transaction on the data, the single event looks as follows:

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level

So dedup with consecutive seemed to be the next best bit.

If I write a small query such as follows I can dedup this into a single event such as (where "report" is the common field in these 3 events).

index=wypm | dedup report consecutive=true

Splunk will then combine this into a single event as follows

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown

This achieves what I need it do. I however want to extract the fields as they are post the dedup. Can I do this? I also have people that may search this index so I am half tempted to write a macro that calls "index=wypm | dedup report consecutive=true" (and fixes the time stamp). I don't know whether this would work especially with the extracted fields. I would at least have extracted fields being CIM compliant. This method also looks like a messy workaround to the problem, but I am not sure of the correct path to follow to achieve what I trying to do.

My intent is to use a search from the "combined data" to generate an alert.

Tags (2)
0 Karma

to4kawa
Ultra Champion
| makeresults 
| eval _raw=" Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external

 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=bottom

 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=level" 
| makemv delim="
 " _raw 
| stats count by _raw 
`comment("this is your sample")`
| rex "^.*\|(?<cef>.*)"
| eval cef=replace(cef,"(\w+=)","#\1") 
| eval cef=split(cef,"#") 
| mvexpand cef 
| where cef!=""
| rename cef as _raw
| eval _raw=replace(_raw,"= (.*) ","=\1")
| eval _raw=replace(_raw,"=([^\"]+)","=\"\1\"")
| kv
| foreach * [ |eval <<FIELD>>=trim('<<FIELD>>')]
| stats values(*) as *
| fields - count
0 Karma

willadams
Contributor

Thanks, so I can't use the "dedup consecutive" and then do field extraction that way? This seems to imply that a transaction must be done and then the above applied?

0 Karma

to4kawa
Ultra Champion

first: field extract
second: operating fields
the order should be

0 Karma

nickhills
Ultra Champion

Can you provide an example of what your intended final result would be?

If my comment helps, please give it a thumbs up!
0 Karma

willadams
Contributor

The final "single" event would be similar to the following (there are more fields but I have truncated the log for simplicity sake. The fields follow on (for example) cv1 cv1label cv2 cv2label. Each single system event generates 3 seperate events in SPLUNK, hence the dedup with consecutive.

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown cv1=signature

Basically I would then make each field as follows (these are examples not the actual fields I would use on the extraction)

begintime would be something like 'logged time'
realtime would be something like 'system time'
report would be something like 'identifier'
cat would be category; and so on

Each of the 3 SPLUNK indexed events, have some differences in their fields (i.e. cat doesn't appear in the first 2 events but does in the 3rd).

0 Karma
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.

Can’t make it to .conf25? Join us online!

Get Updates on the Splunk Community!

Can’t Make It to Boston? Stream .conf25 and Learn with Haya Husain

Boston may be buzzing this September with Splunk University and .conf25, but you don’t have to pack a bag to ...

Splunk Lantern’s Guide to The Most Popular .conf25 Sessions

Splunk Lantern is a Splunk customer success center that provides advice from Splunk experts on valuable data ...

Unlock What’s Next: The Splunk Cloud Platform at .conf25

In just a few days, Boston will be buzzing as the Splunk team and thousands of community members come together ...