I have a stream of logs from a system.
To filter for errors, I can perform a search like so:
index=project1 sourcetype=pc1 log_data="*error*"
I can use it to get errors however I also want the events surrounding this error as well. I want to be able to get all events that occurred 1 minute before and 1 minute after (all events, not just errors).
What would be the best possible way to achieve this?
Technically there is a 3rd option (and often with Splunk there may be a 4th), but this example shows you how to first detect errors and then mark the events that fit within the window required of that error.
It creates 40 random events with an occasional error then it basically copies the error time up and down the non-error events and then filters those that match the time window of the closest error.
| makeresults count=40
| streamstats c
| eval _time=now() - c*20
| eval log_data=if(c % (random() % 30) = 0, "bla error message bla", "normal event message")
| fields - c
``` The above creates a simple 40 event data set with an occasional error ```
``` Ensure time descending order and mark the events that have an error ```
| sort - _time
| streamstats window=1 values(eval(if(match(log_data,"error"), _time, null()))) as error_time
``` Save the error time and copy the error time down to all following records until the next error ```
| eval start_time=error_time
| filldown error_time
``` Now filter events within 60 seconds prior to the error ```
| eval INCLUDE=if(_time>=(error_time-60) AND _time<=error_time, "YES", "NO")
``` Now do the same in reverse, i.e. time ascending order ```
| sort _time
| filldown start_time
``` and filter events that are within 60 seconds AFTER the error ```
| eval INCLUDE=if(_time<=(start_time+60) AND _time>=start_time, "YES", INCLUDE)
| fields - start_time error_time
Bear in mind that this could be an expensive search as it does 2 sorts and 2 streamstats, but in your case you could do
index=project1 sourcetype=pc1
followed by the SPL after the data setup above.
How about this, if you don't need to get those immediately with your 1st search.
Just make you search. Then click correct event and open it from > mark in beginning of event then click _time fields and it opens to you
Then just select correct time slot and do search again without any "matching words" like 'log_data="*error*"'
Technically there is a 3rd option (and often with Splunk there may be a 4th), but this example shows you how to first detect errors and then mark the events that fit within the window required of that error.
It creates 40 random events with an occasional error then it basically copies the error time up and down the non-error events and then filters those that match the time window of the closest error.
| makeresults count=40
| streamstats c
| eval _time=now() - c*20
| eval log_data=if(c % (random() % 30) = 0, "bla error message bla", "normal event message")
| fields - c
``` The above creates a simple 40 event data set with an occasional error ```
``` Ensure time descending order and mark the events that have an error ```
| sort - _time
| streamstats window=1 values(eval(if(match(log_data,"error"), _time, null()))) as error_time
``` Save the error time and copy the error time down to all following records until the next error ```
| eval start_time=error_time
| filldown error_time
``` Now filter events within 60 seconds prior to the error ```
| eval INCLUDE=if(_time>=(error_time-60) AND _time<=error_time, "YES", "NO")
``` Now do the same in reverse, i.e. time ascending order ```
| sort _time
| filldown start_time
``` and filter events that are within 60 seconds AFTER the error ```
| eval INCLUDE=if(_time<=(start_time+60) AND _time>=start_time, "YES", INCLUDE)
| fields - start_time error_time
Bear in mind that this could be an expensive search as it does 2 sorts and 2 streamstats, but in your case you could do
index=project1 sourcetype=pc1
followed by the SPL after the data setup above.
Thank you for the help bowesmana. This solution works but it seems to cap my results to 10k Events, is this an inherent splunk thing or am I missing a piece of the puzzle?
I did do a search for only the INCLUDE=YES events
``` Ensure time descending order and mark the events that have an error ```
| sort - _time
| streamstats window=1 values(eval(if(match(log_data,"error"), _time, null()))) as error_time
``` Save the error time and copy the error time down to all following records until the next error ```
| eval start_time=error_time
| filldown error_time
``` Now filter events within 60 seconds prior to the error ```
| eval INCLUDE=if(_time>=(error_time-60) AND _time<=error_time, "YES", "NO")
``` Now do the same in reverse, i.e. time ascending order ```
| sort _time
| filldown start_time
``` and filter events that are within 60 seconds AFTER the error ```
| eval INCLUDE=if(_time<=(start_time+60) AND _time>=start_time, "YES", INCLUDE)
| fields - start_time error_time
| search INCLUDE=YES
Actually, it's the sort command that is capping the results to 10k - always bites me, if you want to sort ALL results you must do sort 0 - ...
Glad to hear it worked.
As @yuanliu said, recommending map is not often found here, as it will run the map command sequentially, but if you have few errors, then the map will not have to make many iterations, but by default it will only run over 10 results unless you override the params.
max_stream_window = <integer>
* For the streamstats command, the maximum allow window size.
* Default: 10000
This is probably the cause.
Thank you PickleRick, this was probably the reason, unfortunately I couldn't edit the max_stream_window.
Yes. There is this mark and select approach but it requires Splunk to not only scan all events from the initial search timerange, it also requires it to hold them as immediate results for the purpose of reversing. So it's not really a practical solution. But yes, it can be done this way.
It is rare that I, or anyone here, recommends map command but this seems to be an appropriate use case if errors are rare and far in between.
index=project1 sourcetype=pc1 log_data="*error*"
| eval early = _time - 60, late = _time + 60
| map search="search index=project1 sourcetype=pc1 earliest=$early$ latest=$late$"
There are two ways about it. One is the map command as shown by @yuanliu . Another one is using subsearch.
The subsearch has its limitations and can be silently finalized early producing incomplete results. But the map command is one of the risky commands and a normal user can be forbidden from running it.