Splunk Dev

filter events based on time proximity of the same events

aleksanderkamen
New Member

I have a list of results with many duplicates on field time1.

I need to leave only one event of each time1 based on the following conditions:

  • The latest time2, that is earlier than time1
  • If no such exists, then leave any one, earliest would be preferred though
  • If time2 is "null" (string "null"), leave it (as it implies a unique time1 already)

Here's an example of what I mean, input:

time1,    time2,    key1, key2
00:14:00, 00:15:00, qwe,  uiop
00:14:00, 00:13:30, asd,  hjkl
00:14:00, 00:13:00, zxc,  vbnm
00:13:00, 00:15:00, rty,  zxcv
00:13:00, 00:14:00, fgh,  asdf
00:12:00, null,     tyu,  qwer

Result:

time1,    time2,    key1, key2
00:14:00, 00:13:30, asd,  hjkl
00:13:00, 00:14:00, fgh,  asdf
00:12:00, null,     tyu,  qwer

The time difference between time1 and time2 as well as different time2's can be anywhere from seconds to weeks. I haven't figured out how to use stats like this, sounds simple enough though.

Tags (1)
0 Karma
1 Solution

somesoni2
Revered Legend

Give this a try. For your second requirement, it keeps latest records not the earlierst.

your current search giving fields time1 time2 key1 key2
| convert dur2sec(time*) as seconds*
| eval shouldkeep=case(isnull(seconds2),"Yes", seconds2<seconds1,"Yes", 1=1,"No")
| eventstats values(shouldkeep) as shouldkeep1 by time1
| where NOT (mvcount(shouldKeep1)=2 AND shouldkeep="No")
| dedup time1 
| table time1 time2 key1 key2

View solution in original post

0 Karma

woodcock
Esteemed Legend

Like this:

| makeresults 
| eval raw="00:14:00, 00:15:00, qwe,  uiop
00:14:00, 00:13:30, asd,  hjkl
00:14:00, 00:13:00, zxc,  vbnm
00:13:00, 00:15:00, rty,  zxcv
00:13:00, 00:14:00, fgh,  asdf
0:12:00, null,     tyu,  qwer"
| makemv delim="
" raw
| mvexpand raw
| rename raw AS _raw
| rex "^(?<time1>\S+),\s+(?<time2>\S+),\s+(?<key1>\S+),\s+(?<key2>\S+)$"
| eval time1=strptime(time1, "%H:%M:%S"), time2=strptime(time2, "%H:%M:%S")
| fieldformat time1=strftime(time1, "%H:%M:%S")
| fieldformat time2=strftime(time2, "%H:%M:%S")

| rename COMMENT AS "Everything above creates test event data; everything below is your solution"

| fillnull time2 value="null"
| sort 0 -time1 time2
| rename COMMENT AS "I did not understand the sorting for your 2nd condition; you might need to change this to 'sort 0 -time1 -time2'"
| eventstats values(time2) AS time2s BY time1
| fields - _*
| streamstats count AS serial
| mvexpand time2s
| eval time2s=if((time2s<time1), time2s . "YES", time2s . "NO")
| stats list(*) AS * BY serial key1 key2 time1 time2
| eval time2s=replace(mvindex(mvfilter(match(time2s, "YES$")),0), "YES", "")
| where isnull(time2s) OR time2=time2s
| dedup time1
| fields - time2s
0 Karma

aleksanderkamen
New Member

This does not quite work as the result for time1 14:00 is 13:00, not 13:30 which is a requirement.

0 Karma

somesoni2
Revered Legend

Give this a try. For your second requirement, it keeps latest records not the earlierst.

your current search giving fields time1 time2 key1 key2
| convert dur2sec(time*) as seconds*
| eval shouldkeep=case(isnull(seconds2),"Yes", seconds2<seconds1,"Yes", 1=1,"No")
| eventstats values(shouldkeep) as shouldkeep1 by time1
| where NOT (mvcount(shouldKeep1)=2 AND shouldkeep="No")
| dedup time1 
| table time1 time2 key1 key2
0 Karma

aleksanderkamen
New Member

eventstats() is what I was missing. I made my own solution using separate queries and then combining them but this is much better. Thanks!

It's not that important but would be nice to know how to fulfill the second requirement as well.

0 Karma

somesoni2
Revered Legend

See if this works for your second requirement:

your current search giving fields time1 time2 key1 key2
 | convert dur2sec(time*) as seconds*
 | eval shouldkeep=case(isnull(seconds2),"Yes", seconds2<seconds1,"Yes", 1=1,"No")
 | eventstats values(shouldkeep) as shouldkeep1 by time1
 | where NOT (mvcount(shouldKeep1)=2 AND shouldkeep="No")
| reverse
 | dedup time1 
 | table time1 time2 key1 key2
0 Karma

aleksanderkamen
New Member

Just reversing the results won't work as this will screw up the records where there is a greater time2 and will return a smaller time2 than time1. Here's a complete example that works without the second condition that one can copy paste:

| makeresults 
| eval raw="00:14:00, 00:15:00, qwe,  uiop
 00:14:00, 00:13:30, asd,  hjkl
 00:14:00, 00:13:00, zxc,  vbnm
 00:13:00, 00:15:00, rty,  zxcv
 00:13:00, 00:14:00, fgh,  asdf
 00:12:00, null,     tyu,  qwer" 
| makemv delim="
 " raw 
| mvexpand raw 
| rename raw AS _raw 
| rex "^(?<time1>\S+),\s+(?<time2>\S+),\s+(?<key1>\S+),\s+(?<key2>\S+)$" 
| eval time1=strptime(time1, "%H:%M:%S"), time2=strptime(time2, "%H:%M:%S") 
| table time1 time2 key1 key2 
| eval shouldkeep=case(isnull(time2),"No", time1 > time2,"Yes", 1=1,"No") 
| eventstats values(shouldkeep) as shouldkeep1 by time1 
| where NOT (mvcount(shouldkeep1)=2 AND shouldkeep="No") 
| sort -time2
| dedup time1 sortby -time2 
| table time1 time2 key1 key2 
| fieldformat time1=strftime(time1, "%H:%M:%S") 
| fieldformat time2=strftime(time2, "%H:%M:%S")
| sort -time1

Notes: I'm using depup sortby to make sure the results are deduplicated in the order I need to. The extra sort is left in there intentionally to avoid buggy dedup sortby behaviour I otherwise experience. And the shouldKeep in mvcount() is fixed to be shouldkeep (all lower case). Thanks to woodcock for the makeresults.

0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...