Splunk Search

How to create a list of literal values of strings with Splunk query language?

yshen
Communicator

The requirements is to find the event_A and event_B such that

  1. There is some event A's before the event_B, and the event_A’s TEXT field and the event_B’s TEXT field have the first character identical, and the second characters satisfy the condition:
    • the event_B’s TEXT’s 2nd character in numerical value is equal to the event_A’s corresponding field’s 2nd character, or event_B’s is 1 plus, or 1 minus of the event_A’s.
    1. It is after some event_A satisfying condition 1, with CATEGORY value “ALARM” and not after such event_A with CATEGORY value “CLEARED”, or
    2. It is after some event_A satisfying condition 1, with CATEGORY value “CLEARED”, but the event_B’s _time is within 60 minutes of the _time of event_A (CATEGORY=CLEARED)

Here are some sample data:

_time	                        CATEGORY	TYPE	TEXT
2020-12-29T05:20:32.710-0800	ADVISORY	event_B	K35JB
2020-12-29T05:37:54.462-0800	ADVISORY	event_B	A05KM
2020-12-29T05:57:50.164-0800	ADVISORY	event_B	K25CD
2020-12-29T05:59:06.004-0800	ALARM	    event_A	R20-A
2020-12-29T05:59:24.635-0800	ALARM	    event_A	K35-E
2020-12-29T05:59:37.200-0800	ALARM	    event_A	C15
2020-12-29T06:00:24.470-0800	CLEARED	    event_A	R20-A
2020-12-29T06:00:40.415-0800	CLEARED	    event_A	K35-E
2020-12-29T06:08:09.945-0800	ADVISORY	event_B	R65AG
2020-12-29T06:14:24.740-0800	ADVISORY	event_B	K35JB
2020-12-29T06:14:43.988-0800	ADVISORY	event_B	K45JB
2020-12-29T06:56:44.642-0800	ADVISORY	event_B	A77MD
2020-12-29T06:59:42.745-0800	ADVISORY	event_B	C87AB
2020-12-29T07:30:39.080-0800	ADVISORY	event_B	M97AF
2020-12-29T08:39:26.008-0800	ADVISORY	event_B	K25BA
2020-12-29T09:46:48.175-0800	ADVISORY	event_B	C25EG

Here is the illustration with the above sample data (with comment after # )

_time	                        CATEGORY	TYPE	TEXT
                                                                # all the event_B without event_A before are eliminated
2020-12-29T05:59:06.004-0800	ALARM	    event_A	R20-A   # expecting event_B with TEXT with prefix Ri where i = 1, 2, 3
2020-12-29T05:59:24.635-0800	ALARM	    event_A	K35-E   # expecting event_B with TEXT with prefix Ki where i = 2, 3, 4
2020-12-29T05:59:37.200-0800	ALARM	    event_A	C15     # expecting event_B with TEXT with prefix Ci where i = 0, 1, 2
2020-12-29T06:00:24.470-0800	CLEARED	    event_A	R20-A   # only expecting event_B with TEXT with prefix Ri where i = 1, 2, 3 with _time < 2020-12-29T06:00:24.470-0800 + 60 minutes
2020-12-29T06:00:40.415-0800	CLEARED	    event_A	K35-E   # only expecting event_B with TEXT with prefix Ki where i = 2, 3, 4 with _time < 2020-12-29T06:00:40.415-0800 + 60 minutes
2020-12-29T06:08:09.945-0800	ADVISORY	event_B	R65AG   # to be eliminated, not expected, as R6 does not match Ri, i=1, 2, 3
2020-12-29T06:14:24.740-0800	ADVISORY	event_B	K35JB   # kept, as K3 matched the expected prefix, and within the time windows
2020-12-29T06:14:43.988-0800	ADVISORY	event_B	K45JB   # kept, as K4 matched the expected prefix, and within the time windows
2020-12-29T06:56:44.642-0800	ADVISORY	event_B	A77MD   # to be eliminated, not expected, as A7 does not match any of the expected prefix
2020-12-29T06:59:42.745-0800	ADVISORY	event_B	C87AB   # to be eliminated, not expected, as C8 does not match Ci, i=0, 1, 2
2020-12-29T07:30:39.080-0800	ADVISORY	event_B	M97AF   # to be eliminated, not expected, as M9 does not match any of the expected prefix
2020-12-29T08:39:26.008-0800	ADVISORY	event_B	K25BA   # to be eliminated, not expected, as its _time is beyond the expected window
2020-12-29T09:46:48.175-0800	ADVISORY	event_B	C25EG   # kept, as C2 matched the expected prefix, and there is no time window limit for the prefx C2

I cannot wrap my head to figure a solution with Splunk query.

I could only find a solution when there is only one event_A expecting the corresponding event_B, using streamstats to keep of track the only one expecting event_A’s TEXT prefix, and _time to scan for the satisfying event_B, but once there are multiple event_A’s expecting with different TEXT prefixes and _time’s, then I cannot find a way to remember and perform the scan for the multiple event_A’s expectations.

With a conventional programming language, say Python, I’ll keep track of the union of expectant prefixes, and time windows, and scan the events against such history state.

Could you kindly help me! Thanks in advance!

Labels (3)
1 Solution

to4kawa
Ultra Champion
| makeresults 
| eval _raw="_time	                        CATEGORY	TYPE	TEXT
2020-12-29T05:20:32.710-0800	ADVISORY	event_B	K35JB
2020-12-29T05:37:54.462-0800	ADVISORY	event_B	A05KM
2020-12-29T05:57:50.164-0800	ADVISORY	event_B	K25CD
2020-12-29T05:59:06.004-0800	ALARM	event_A	R20-A
2020-12-29T05:59:24.635-0800	ALARM	event_A	K35-E
2020-12-29T05:59:37.200-0800	ALARM	event_A	C15
2020-12-29T06:00:24.470-0800	CLEARED	event_A	R20-A
2020-12-29T06:00:40.415-0800	CLEARED	event_A	K35-E
2020-12-29T06:08:09.945-0800	ADVISORY	event_B	R65AG
2020-12-29T06:14:24.740-0800	ADVISORY	event_B	K35JB
2020-12-29T06:14:43.988-0800	ADVISORY	event_B	K45JB
2020-12-29T06:56:44.642-0800	ADVISORY	event_B	A77MD
2020-12-29T06:59:42.745-0800	ADVISORY	event_B	C87AB
2020-12-29T07:30:39.080-0800	ADVISORY	event_B	M97AF
2020-12-29T08:39:26.008-0800	ADVISORY	event_B	K25BA
2020-12-29T09:46:48.175-0800	ADVISORY	event_B	C25EG" 
| multikv forceheader=1 
| eval _time=strptime(time,"%FT%T.%3Q%:z") 
| table _time CATEGORY TYPE TEXT 
| rex field=TEXT "(?<cat1>\w)(?<cat2>\w)" 
| eval cat2_range=mvrange(cat2-1,cat2+2) 
| streamstats count(eval(TYPE="event_A")) as session 
| where session > 0 
| eventstats values(eval(if(CATEGORY="ALARM",cat2_range,NULL))) as limit
    values(eval(if(CATEGORY="CLEARED",relative_time(_time,"+60m"),NULL))) as limit_time by cat1 
| where CATEGORY IN ("ALARM","CLEARED")
    OR (CATEGORY="ADVISORY" AND if(limit_time!="",match(limit,cat2) AND _time <= limit_time,match(limit,cat2)))
```
| table _time CATEGORY TYPE TEXT
```

I just filtered it as you asked.

View solution in original post

to4kawa
Ultra Champion
| makeresults 
| eval _raw="_time	                        CATEGORY	TYPE	TEXT
2020-12-29T05:20:32.710-0800	ADVISORY	event_B	K35JB
2020-12-29T05:37:54.462-0800	ADVISORY	event_B	A05KM
2020-12-29T05:57:50.164-0800	ADVISORY	event_B	K25CD
2020-12-29T05:59:06.004-0800	ALARM	event_A	R20-A
2020-12-29T05:59:24.635-0800	ALARM	event_A	K35-E
2020-12-29T05:59:37.200-0800	ALARM	event_A	C15
2020-12-29T06:00:24.470-0800	CLEARED	event_A	R20-A
2020-12-29T06:00:40.415-0800	CLEARED	event_A	K35-E
2020-12-29T06:08:09.945-0800	ADVISORY	event_B	R65AG
2020-12-29T06:14:24.740-0800	ADVISORY	event_B	K35JB
2020-12-29T06:14:43.988-0800	ADVISORY	event_B	K45JB
2020-12-29T06:56:44.642-0800	ADVISORY	event_B	A77MD
2020-12-29T06:59:42.745-0800	ADVISORY	event_B	C87AB
2020-12-29T07:30:39.080-0800	ADVISORY	event_B	M97AF
2020-12-29T08:39:26.008-0800	ADVISORY	event_B	K25BA
2020-12-29T09:46:48.175-0800	ADVISORY	event_B	C25EG" 
| multikv forceheader=1 
| eval _time=strptime(time,"%FT%T.%3Q%:z") 
| table _time CATEGORY TYPE TEXT 
| rex field=TEXT "(?<cat1>\w)(?<cat2>\w)" 
| eval cat2_range=mvrange(cat2-1,cat2+2) 
| streamstats count(eval(TYPE="event_A")) as session 
| where session > 0 
| eventstats values(eval(if(CATEGORY="ALARM",cat2_range,NULL))) as limit
    values(eval(if(CATEGORY="CLEARED",relative_time(_time,"+60m"),NULL))) as limit_time by cat1 
| where CATEGORY IN ("ALARM","CLEARED")
    OR (CATEGORY="ADVISORY" AND if(limit_time!="",match(limit,cat2) AND _time <= limit_time,match(limit,cat2)))
```
| table _time CATEGORY TYPE TEXT
```

I just filtered it as you asked.

yshen
Communicator

Amazing! Powerful, and sophisticated!

Thanks a million!

0 Karma

to4kawa
Ultra Champion

Your question was very clear, and I just had to make it that way.
I hope it is the same for others.

0 Karma