Splunk Search

Get all events in a 1-minute time window around an error events

db2
Explorer

I have a stream of logs from a system.

To filter for errors, I can perform a search like so:

index=project1 sourcetype=pc1 log_data="*error*"


I can use it to get errors however I also want the events surrounding this error as well. I want to be able to get all events that occurred 1 minute before and 1 minute after (all events, not just errors). 

What would be the best possible way to achieve this?

Labels (5)
Tags (2)
0 Karma
1 Solution

bowesmana
SplunkTrust
SplunkTrust

Technically there is a 3rd option (and often with Splunk there may be a 4th), but this example shows you how to first detect errors and then mark the events that fit within the window required of that error.

It creates 40 random events with an occasional error then it basically copies the error time up and down the non-error events and then filters those that match the time window of the closest error.

| makeresults count=40
| streamstats c
| eval _time=now() - c*20
| eval log_data=if(c % (random() % 30) = 0, "bla error message bla", "normal event message")
| fields - c
``` The above creates a simple 40 event data set with an occasional error ```

``` Ensure time descending order and mark the events that have an error ```
| sort - _time
| streamstats window=1 values(eval(if(match(log_data,"error"), _time, null()))) as error_time

``` Save the error time and copy the error time down to all following records until the next error ```
| eval start_time=error_time
| filldown error_time
``` Now filter events within 60 seconds prior to the error ```
| eval INCLUDE=if(_time>=(error_time-60) AND _time<=error_time, "YES", "NO")

``` Now do the same in reverse, i.e. time ascending order ```
| sort _time
| filldown start_time
``` and filter events that are within 60 seconds AFTER the error ```
| eval INCLUDE=if(_time<=(start_time+60) AND _time>=start_time, "YES", INCLUDE)
| fields - start_time error_time

Bear in mind that this could be an expensive search as it does 2 sorts and 2 streamstats, but in your case you could do 

index=project1 sourcetype=pc1

followed by the SPL after the data setup above.

 

View solution in original post

isoutamo
SplunkTrust
SplunkTrust

How about this, if you don't need to get those immediately with your 1st search.

Just make you search. Then click correct event and open it from > mark in beginning of event then click _time fields and it opens to you 

isoutamo_0-1743629586163.png

Then just select correct time slot and do search again without any "matching words" like 'log_data="*error*"'

bowesmana
SplunkTrust
SplunkTrust

Technically there is a 3rd option (and often with Splunk there may be a 4th), but this example shows you how to first detect errors and then mark the events that fit within the window required of that error.

It creates 40 random events with an occasional error then it basically copies the error time up and down the non-error events and then filters those that match the time window of the closest error.

| makeresults count=40
| streamstats c
| eval _time=now() - c*20
| eval log_data=if(c % (random() % 30) = 0, "bla error message bla", "normal event message")
| fields - c
``` The above creates a simple 40 event data set with an occasional error ```

``` Ensure time descending order and mark the events that have an error ```
| sort - _time
| streamstats window=1 values(eval(if(match(log_data,"error"), _time, null()))) as error_time

``` Save the error time and copy the error time down to all following records until the next error ```
| eval start_time=error_time
| filldown error_time
``` Now filter events within 60 seconds prior to the error ```
| eval INCLUDE=if(_time>=(error_time-60) AND _time<=error_time, "YES", "NO")

``` Now do the same in reverse, i.e. time ascending order ```
| sort _time
| filldown start_time
``` and filter events that are within 60 seconds AFTER the error ```
| eval INCLUDE=if(_time<=(start_time+60) AND _time>=start_time, "YES", INCLUDE)
| fields - start_time error_time

Bear in mind that this could be an expensive search as it does 2 sorts and 2 streamstats, but in your case you could do 

index=project1 sourcetype=pc1

followed by the SPL after the data setup above.

 

db2
Explorer

Thank you for the help bowesmana. This solution works but it seems to cap my results to 10k Events, is this an inherent splunk thing or am I missing a piece of the puzzle?

I did do a search for only the INCLUDE=YES events

``` Ensure time descending order and mark the events that have an error ```
| sort - _time
| streamstats window=1 values(eval(if(match(log_data,"error"), _time, null()))) as error_time

``` Save the error time and copy the error time down to all following records until the next error ```
| eval start_time=error_time
| filldown error_time
``` Now filter events within 60 seconds prior to the error ```
| eval INCLUDE=if(_time>=(error_time-60) AND _time<=error_time, "YES", "NO")

``` Now do the same in reverse, i.e. time ascending order ```
| sort _time
| filldown start_time
``` and filter events that are within 60 seconds AFTER the error ```
| eval INCLUDE=if(_time<=(start_time+60) AND _time>=start_time, "YES", INCLUDE)
| fields - start_time error_time

| search INCLUDE=YES

 

0 Karma

bowesmana
SplunkTrust
SplunkTrust

Actually, it's the sort command that is capping the results to 10k - always bites me, if you want to sort ALL results you must do sort 0 - ...

Glad to hear it worked.

As @yuanliu said, recommending map is not often found here, as it will run the map command sequentially, but if you have few errors, then the map will not have to make many iterations, but by default it will only run over 10 results unless you override the params. 

 

PickleRick
SplunkTrust
SplunkTrust

max_stream_window = <integer>
* For the streamstats command, the maximum allow window size.
* Default: 10000

This is probably the cause.

db2
Explorer

Thank you PickleRick, this was probably the reason, unfortunately I couldn't edit the max_stream_window.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Yes. There is this mark and select approach but it requires Splunk to not only scan all events from the initial search timerange, it also requires it to hold them as immediate results for the purpose of reversing.  So it's not really a practical solution. But yes, it can be done this way.

0 Karma

yuanliu
SplunkTrust
SplunkTrust

It is rare that I, or anyone here, recommends map command but this seems to be an appropriate use case if errors are rare and far in between.

index=project1 sourcetype=pc1 log_data="*error*"
| eval early = _time - 60, late = _time + 60
| map search="search index=project1 sourcetype=pc1 earliest=$early$ latest=$late$"

 

PickleRick
SplunkTrust
SplunkTrust

There are two ways about it. One is the map command as shown by @yuanliu . Another one is using subsearch.

The subsearch has its limitations and can be silently finalized early producing incomplete results. But the map command is one of the risky commands and a normal user can be forbidden from running it.

Get Updates on the Splunk Community!

See your relevant APM services, dashboards, and alerts in one place with the updated ...

As a Splunk Observability user, you have a lot of data you have to manage, prioritize, and troubleshoot on a ...

Cultivate Your Career Growth with Fresh Splunk Training

Growth doesn’t just happen—it’s nurtured. Like tending a garden, developing your Splunk skills takes the right ...

Introducing a Smarter Way to Discover Apps on Splunkbase

We’re excited to announce the launch of a foundational enhancement to Splunkbase: App Tiering.  Because we’ve ...