Splunk Search

How to use search from summary index as a condition in another search?

spisiakmi
Contributor

Hi,
I have index="ekra_protokol" which has these events:

datum_zeit;meldung
2019-06-19 05:56:26.754: Test Drucken ...
2019-06-19 05:56:37.629: Test Drucken ...
2019-06-19 05:56:48.570: Test Drucken ...
2019-06-19 05:56:59.516: Test Drucken ...
2019-06-19 05:57:10.436: Test Drucken ...
2019-06-19 05:58:10.436: Test Drucken ...
2019-06-20 05:56:59.516: Test Drucken ...
2019-06-20 05:57:10.436: Test Drucken ...
2019-06-20 05:58:10.436: Test Drucken ...
2019-06-20 05:58:32.436: Test Drucken ...

I have a saved search, which is running every 5min. This saved search collects all outputs into a summary search. In fact, it is saving the timestamp datum_zeit when the counter meldung="Test Drucken" multiples 3.

index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

The summary index ekra_protokol_rakel_summary has 3 rows.

datum_zeit_summary
2019-06-19 05:56:48.570
2019-06-19 05:58:10.436
2019-06-20 05:58:10.436

These are offline data. But I want to switch into a situation, when the events will be indexed through forwarder and the counterReset will be 40000 and the saved search will run each hour.

I want to run the saved search exactly from the point, where the counter reached his limit. So each saved search should search only events where datum_zeit> stats max(datum_zeit_summary) as NewTime.

The problem is, that I don´t know, how to use the value of any search of the summary index in the saved search.
I expect somethin like this:

index="ekra_protokol" meldung="Test Drucken ..." 
**| datum_zeit> stats max(datum_zeit_summary) as NewTime**
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

Can anybody help, please?

0 Karma
1 Solution

spisiakmi
Contributor

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

View solution in original post

spisiakmi
Contributor

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

woodcock
Esteemed Legend

Kudos to you for coming back and sharing your solution.

0 Karma

spisiakmi
Contributor

Always my pleasure.

0 Karma

richgalloway
SplunkTrust
SplunkTrust

@spisiakmi If your problem is resolved, please accept an answer to help future readers.

---
If this reply helps you, Karma would be appreciated.
0 Karma

woodcock
Esteemed Legend

Your translation to English is a little difficult for me to understand but I think you are asking how to tokenize a saved search to accept arguments and that really is a thing! See here if this helps:
https://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Savedsearch

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...