Splunk Search

Extract Fields after dedup / consecutive events

willadams
Contributor

I have a new log source from which I am receiving data. The log source has no TA for the vendor (at least for what I am trying to do with it). The logs are CEF format and the logs are received via a SYSLOG server and then sent to my indexers where I can see the data. The problem is that for every 1 event in the system that it goes into, there are 3 consecutive events that occur. For example

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external

Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom

Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level

I tried writing a small query using a transaction but unfortunately this doesn't get rid of the duplication of the tags. If I run a transaction on the data, the single event looks as follows:

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level

So dedup with consecutive seemed to be the next best bit.

If I write a small query such as follows I can dedup this into a single event such as (where "report" is the common field in these 3 events).

index=wypm | dedup report consecutive=true

Splunk will then combine this into a single event as follows

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown

This achieves what I need it do. I however want to extract the fields as they are post the dedup. Can I do this? I also have people that may search this index so I am half tempted to write a macro that calls "index=wypm | dedup report consecutive=true" (and fixes the time stamp). I don't know whether this would work especially with the extracted fields. I would at least have extracted fields being CIM compliant. This method also looks like a messy workaround to the problem, but I am not sure of the correct path to follow to achieve what I trying to do.

My intent is to use a search from the "combined data" to generate an alert.

Tags (2)
0 Karma

to4kawa
Ultra Champion
| makeresults 
| eval _raw=" Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external

 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=bottom

 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=level" 
| makemv delim="
 " _raw 
| stats count by _raw 
`comment("this is your sample")`
| rex "^.*\|(?<cef>.*)"
| eval cef=replace(cef,"(\w+=)","#\1") 
| eval cef=split(cef,"#") 
| mvexpand cef 
| where cef!=""
| rename cef as _raw
| eval _raw=replace(_raw,"= (.*) ","=\1")
| eval _raw=replace(_raw,"=([^\"]+)","=\"\1\"")
| kv
| foreach * [ |eval <<FIELD>>=trim('<<FIELD>>')]
| stats values(*) as *
| fields - count
0 Karma

willadams
Contributor

Thanks, so I can't use the "dedup consecutive" and then do field extraction that way? This seems to imply that a transaction must be done and then the above applied?

0 Karma

to4kawa
Ultra Champion

first: field extract
second: operating fields
the order should be

0 Karma

nickhills
Ultra Champion

Can you provide an example of what your intended final result would be?

If my comment helps, please give it a thumbs up!
0 Karma

willadams
Contributor

The final "single" event would be similar to the following (there are more fields but I have truncated the log for simplicity sake. The fields follow on (for example) cv1 cv1label cv2 cv2label. Each single system event generates 3 seperate events in SPLUNK, hence the dedup with consecutive.

Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown cv1=signature

Basically I would then make each field as follows (these are examples not the actual fields I would use on the extraction)

begintime would be something like 'logged time'
realtime would be something like 'system time'
report would be something like 'identifier'
cat would be category; and so on

Each of the 3 SPLUNK indexed events, have some differences in their fields (i.e. cat doesn't appear in the first 2 events but does in the 3rd).

0 Karma
Get Updates on the Splunk Community!

Automatic Discovery Part 1: What is Automatic Discovery in Splunk Observability Cloud ...

If you’ve ever deployed a new database cluster, spun up a caching layer, or added a load balancer, you know it ...

Real-Time Fraud Detection: How Splunk Dashboards Protect Financial Institutions

Financial fraud isn't slowing down. If anything, it's getting more sophisticated. Account takeovers, credit ...

Splunk + ThousandEyes: Correlate frontend, app, and network data to troubleshoot ...

 Are you tired of troubleshooting delays caused by siloed frontend, application, and network data? We've got a ...