Splunk Search

How to use search from summary index as a condition in another search?

spisiakmi
Contributor

Hi,
I have index="ekra_protokol" which has these events:

datum_zeit;meldung
2019-06-19 05:56:26.754: Test Drucken ...
2019-06-19 05:56:37.629: Test Drucken ...
2019-06-19 05:56:48.570: Test Drucken ...
2019-06-19 05:56:59.516: Test Drucken ...
2019-06-19 05:57:10.436: Test Drucken ...
2019-06-19 05:58:10.436: Test Drucken ...
2019-06-20 05:56:59.516: Test Drucken ...
2019-06-20 05:57:10.436: Test Drucken ...
2019-06-20 05:58:10.436: Test Drucken ...
2019-06-20 05:58:32.436: Test Drucken ...

I have a saved search, which is running every 5min. This saved search collects all outputs into a summary search. In fact, it is saving the timestamp datum_zeit when the counter meldung="Test Drucken" multiples 3.

index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

The summary index ekra_protokol_rakel_summary has 3 rows.

datum_zeit_summary
2019-06-19 05:56:48.570
2019-06-19 05:58:10.436
2019-06-20 05:58:10.436

These are offline data. But I want to switch into a situation, when the events will be indexed through forwarder and the counterReset will be 40000 and the saved search will run each hour.

I want to run the saved search exactly from the point, where the counter reached his limit. So each saved search should search only events where datum_zeit> stats max(datum_zeit_summary) as NewTime.

The problem is, that I don´t know, how to use the value of any search of the summary index in the saved search.
I expect somethin like this:

index="ekra_protokol" meldung="Test Drucken ..." 
**| datum_zeit> stats max(datum_zeit_summary) as NewTime**
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

Can anybody help, please?

0 Karma
1 Solution

spisiakmi
Contributor

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

View solution in original post

spisiakmi
Contributor

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

woodcock
Esteemed Legend

Kudos to you for coming back and sharing your solution.

0 Karma

spisiakmi
Contributor

Always my pleasure.

0 Karma

richgalloway
SplunkTrust
SplunkTrust

@spisiakmi If your problem is resolved, please accept an answer to help future readers.

---
If this reply helps you, Karma would be appreciated.
0 Karma

woodcock
Esteemed Legend

Your translation to English is a little difficult for me to understand but I think you are asking how to tokenize a saved search to accept arguments and that really is a thing! See here if this helps:
https://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Savedsearch

0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...