I have a new log source from which I am receiving data. The log source has no TA for the vendor (at least for what I am trying to do with it). The logs are CEF format and the logs are received via a SYSLOG server and then sent to my indexers where I can see the data. The problem is that for every 1 event in the system that it goes into, there are 3 consecutive events that occur. For example
Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level
I tried writing a small query using a transaction but unfortunately this doesn't get rid of the duplication of the tags. If I run a transaction on the data, the single event looks as follows:
Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=bottom
Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading="Log time" report=wypm/reports/1234 section=level
So dedup with consecutive seemed to be the next best bit.
If I write a small query such as follows I can dedup this into a single event such as (where "report" is the common field in these 3 events).
index=wypm | dedup report consecutive=true
Splunk will then combine this into a single event as follows
Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown
This achieves what I need it do. I however want to extract the fields as they are post the dedup. Can I do this? I also have people that may search this index so I am half tempted to write a macro that calls "index=wypm | dedup report consecutive=true" (and fixes the time stamp). I don't know whether this would work especially with the extracted fields. I would at least have extracted fields being CIM compliant. This method also looks like a messy workaround to the problem, but I am not sure of the correct path to follow to achieve what I trying to do.
My intent is to use a search from the "combined data" to generate an alert.
| makeresults 
| eval _raw=" Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external
 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920517] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=bottom
 Feb 18 03:43:00 WYPM [2020-02-18T03:43:920346] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=\"Log time\" report=wypm/reports/1234 section=level" 
| makemv delim="
 " _raw 
| stats count by _raw 
`comment("this is your sample")`
| rex "^.*\|(?<cef>.*)"
| eval cef=replace(cef,"(\w+=)","#\1") 
| eval cef=split(cef,"#") 
| mvexpand cef 
| where cef!=""
| rename cef as _raw
| eval _raw=replace(_raw,"= (.*) ","=\1")
| eval _raw=replace(_raw,"=([^\"]+)","=\"\1\"")
| kv
| foreach * [ |eval <<FIELD>>=trim('<<FIELD>>')]
| stats values(*) as *
| fields - count
Thanks, so I can't use the "dedup consecutive" and then do field extraction that way? This seems to imply that a transaction must be done and then the above applied?
first: field extract
second: operating fields
the order should be
 
					
				
		
Can you provide an example of what your intended final result would be?
The final "single" event would be similar to the following (there are more fields but I have truncated the log for simplicity sake. The fields follow on (for example) cv1 cv1label cv2 cv2label. Each single system event generates 3 seperate events in SPLUNK, hence the dedup with consecutive.
Feb 18 03:43:00 WYPM [2020-02-18T03:43:962684] INFO -- : CEF:0|wypm|Column|2.0|Match|1|begintime= FEB 18 2020 03:43 realtime=FEB 18 2020 03:43 customdate=FEB 18 2020 03:43 customdateheading=Log time report=wypm/reports/1234 cat=external section=Unknown cv1=signature
Basically I would then make each field as follows (these are examples not the actual fields I would use on the extraction)
begintime would be something like 'logged time'
realtime would be something like 'system time'
report would be something like 'identifier'
cat would be category;  and so on
Each of the 3 SPLUNK indexed events, have some differences in their fields (i.e. cat doesn't appear in the first 2 events but does in the 3rd).
