The requirements is to find the event_A and event_B such that
Here are some sample data:
_time CATEGORY TYPE TEXT 2020-12-29T05:20:32.710-0800 ADVISORY event_B K35JB 2020-12-29T05:37:54.462-0800 ADVISORY event_B A05KM 2020-12-29T05:57:50.164-0800 ADVISORY event_B K25CD 2020-12-29T05:59:06.004-0800 ALARM event_A R20-A 2020-12-29T05:59:24.635-0800 ALARM event_A K35-E 2020-12-29T05:59:37.200-0800 ALARM event_A C15 2020-12-29T06:00:24.470-0800 CLEARED event_A R20-A 2020-12-29T06:00:40.415-0800 CLEARED event_A K35-E 2020-12-29T06:08:09.945-0800 ADVISORY event_B R65AG 2020-12-29T06:14:24.740-0800 ADVISORY event_B K35JB 2020-12-29T06:14:43.988-0800 ADVISORY event_B K45JB 2020-12-29T06:56:44.642-0800 ADVISORY event_B A77MD 2020-12-29T06:59:42.745-0800 ADVISORY event_B C87AB 2020-12-29T07:30:39.080-0800 ADVISORY event_B M97AF 2020-12-29T08:39:26.008-0800 ADVISORY event_B K25BA 2020-12-29T09:46:48.175-0800 ADVISORY event_B C25EG
Here is the illustration with the above sample data (with comment after # )
_time CATEGORY TYPE TEXT # all the event_B without event_A before are eliminated 2020-12-29T05:59:06.004-0800 ALARM event_A R20-A # expecting event_B with TEXT with prefix Ri where i = 1, 2, 3 2020-12-29T05:59:24.635-0800 ALARM event_A K35-E # expecting event_B with TEXT with prefix Ki where i = 2, 3, 4 2020-12-29T05:59:37.200-0800 ALARM event_A C15 # expecting event_B with TEXT with prefix Ci where i = 0, 1, 2 2020-12-29T06:00:24.470-0800 CLEARED event_A R20-A # only expecting event_B with TEXT with prefix Ri where i = 1, 2, 3 with _time < 2020-12-29T06:00:24.470-0800 + 60 minutes 2020-12-29T06:00:40.415-0800 CLEARED event_A K35-E # only expecting event_B with TEXT with prefix Ki where i = 2, 3, 4 with _time < 2020-12-29T06:00:40.415-0800 + 60 minutes 2020-12-29T06:08:09.945-0800 ADVISORY event_B R65AG # to be eliminated, not expected, as R6 does not match Ri, i=1, 2, 3 2020-12-29T06:14:24.740-0800 ADVISORY event_B K35JB # kept, as K3 matched the expected prefix, and within the time windows 2020-12-29T06:14:43.988-0800 ADVISORY event_B K45JB # kept, as K4 matched the expected prefix, and within the time windows 2020-12-29T06:56:44.642-0800 ADVISORY event_B A77MD # to be eliminated, not expected, as A7 does not match any of the expected prefix 2020-12-29T06:59:42.745-0800 ADVISORY event_B C87AB # to be eliminated, not expected, as C8 does not match Ci, i=0, 1, 2 2020-12-29T07:30:39.080-0800 ADVISORY event_B M97AF # to be eliminated, not expected, as M9 does not match any of the expected prefix 2020-12-29T08:39:26.008-0800 ADVISORY event_B K25BA # to be eliminated, not expected, as its _time is beyond the expected window 2020-12-29T09:46:48.175-0800 ADVISORY event_B C25EG # kept, as C2 matched the expected prefix, and there is no time window limit for the prefx C2
I cannot wrap my head to figure a solution with Splunk query.
I could only find a solution when there is only one event_A expecting the corresponding event_B, using streamstats to keep of track the only one expecting event_A’s TEXT prefix, and _time to scan for the satisfying event_B, but once there are multiple event_A’s expecting with different TEXT prefixes and _time’s, then I cannot find a way to remember and perform the scan for the multiple event_A’s expectations.
With a conventional programming language, say Python, I’ll keep track of the union of expectant prefixes, and time windows, and scan the events against such history state.
Could you kindly help me! Thanks in advance!
| makeresults
| eval _raw="_time CATEGORY TYPE TEXT
2020-12-29T05:20:32.710-0800 ADVISORY event_B K35JB
2020-12-29T05:37:54.462-0800 ADVISORY event_B A05KM
2020-12-29T05:57:50.164-0800 ADVISORY event_B K25CD
2020-12-29T05:59:06.004-0800 ALARM event_A R20-A
2020-12-29T05:59:24.635-0800 ALARM event_A K35-E
2020-12-29T05:59:37.200-0800 ALARM event_A C15
2020-12-29T06:00:24.470-0800 CLEARED event_A R20-A
2020-12-29T06:00:40.415-0800 CLEARED event_A K35-E
2020-12-29T06:08:09.945-0800 ADVISORY event_B R65AG
2020-12-29T06:14:24.740-0800 ADVISORY event_B K35JB
2020-12-29T06:14:43.988-0800 ADVISORY event_B K45JB
2020-12-29T06:56:44.642-0800 ADVISORY event_B A77MD
2020-12-29T06:59:42.745-0800 ADVISORY event_B C87AB
2020-12-29T07:30:39.080-0800 ADVISORY event_B M97AF
2020-12-29T08:39:26.008-0800 ADVISORY event_B K25BA
2020-12-29T09:46:48.175-0800 ADVISORY event_B C25EG"
| multikv forceheader=1
| eval _time=strptime(time,"%FT%T.%3Q%:z")
| table _time CATEGORY TYPE TEXT
| rex field=TEXT "(?<cat1>\w)(?<cat2>\w)"
| eval cat2_range=mvrange(cat2-1,cat2+2)
| streamstats count(eval(TYPE="event_A")) as session
| where session > 0
| eventstats values(eval(if(CATEGORY="ALARM",cat2_range,NULL))) as limit
values(eval(if(CATEGORY="CLEARED",relative_time(_time,"+60m"),NULL))) as limit_time by cat1
| where CATEGORY IN ("ALARM","CLEARED")
OR (CATEGORY="ADVISORY" AND if(limit_time!="",match(limit,cat2) AND _time <= limit_time,match(limit,cat2)))
```
| table _time CATEGORY TYPE TEXT
```
I just filtered it as you asked.
| makeresults
| eval _raw="_time CATEGORY TYPE TEXT
2020-12-29T05:20:32.710-0800 ADVISORY event_B K35JB
2020-12-29T05:37:54.462-0800 ADVISORY event_B A05KM
2020-12-29T05:57:50.164-0800 ADVISORY event_B K25CD
2020-12-29T05:59:06.004-0800 ALARM event_A R20-A
2020-12-29T05:59:24.635-0800 ALARM event_A K35-E
2020-12-29T05:59:37.200-0800 ALARM event_A C15
2020-12-29T06:00:24.470-0800 CLEARED event_A R20-A
2020-12-29T06:00:40.415-0800 CLEARED event_A K35-E
2020-12-29T06:08:09.945-0800 ADVISORY event_B R65AG
2020-12-29T06:14:24.740-0800 ADVISORY event_B K35JB
2020-12-29T06:14:43.988-0800 ADVISORY event_B K45JB
2020-12-29T06:56:44.642-0800 ADVISORY event_B A77MD
2020-12-29T06:59:42.745-0800 ADVISORY event_B C87AB
2020-12-29T07:30:39.080-0800 ADVISORY event_B M97AF
2020-12-29T08:39:26.008-0800 ADVISORY event_B K25BA
2020-12-29T09:46:48.175-0800 ADVISORY event_B C25EG"
| multikv forceheader=1
| eval _time=strptime(time,"%FT%T.%3Q%:z")
| table _time CATEGORY TYPE TEXT
| rex field=TEXT "(?<cat1>\w)(?<cat2>\w)"
| eval cat2_range=mvrange(cat2-1,cat2+2)
| streamstats count(eval(TYPE="event_A")) as session
| where session > 0
| eventstats values(eval(if(CATEGORY="ALARM",cat2_range,NULL))) as limit
values(eval(if(CATEGORY="CLEARED",relative_time(_time,"+60m"),NULL))) as limit_time by cat1
| where CATEGORY IN ("ALARM","CLEARED")
OR (CATEGORY="ADVISORY" AND if(limit_time!="",match(limit,cat2) AND _time <= limit_time,match(limit,cat2)))
```
| table _time CATEGORY TYPE TEXT
```
I just filtered it as you asked.
Amazing! Powerful, and sophisticated!
Thanks a million!
Your question was very clear, and I just had to make it that way.
I hope it is the same for others.