Splunk Search

How to use search from summary index as a condition in another search?

spisiakmi
Communicator

Hi,
I have index="ekra_protokol" which has these events:

datum_zeit;meldung
2019-06-19 05:56:26.754: Test Drucken ...
2019-06-19 05:56:37.629: Test Drucken ...
2019-06-19 05:56:48.570: Test Drucken ...
2019-06-19 05:56:59.516: Test Drucken ...
2019-06-19 05:57:10.436: Test Drucken ...
2019-06-19 05:58:10.436: Test Drucken ...
2019-06-20 05:56:59.516: Test Drucken ...
2019-06-20 05:57:10.436: Test Drucken ...
2019-06-20 05:58:10.436: Test Drucken ...
2019-06-20 05:58:32.436: Test Drucken ...

I have a saved search, which is running every 5min. This saved search collects all outputs into a summary search. In fact, it is saving the timestamp datum_zeit when the counter meldung="Test Drucken" multiples 3.

index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

The summary index ekra_protokol_rakel_summary has 3 rows.

datum_zeit_summary
2019-06-19 05:56:48.570
2019-06-19 05:58:10.436
2019-06-20 05:58:10.436

These are offline data. But I want to switch into a situation, when the events will be indexed through forwarder and the counterReset will be 40000 and the saved search will run each hour.

I want to run the saved search exactly from the point, where the counter reached his limit. So each saved search should search only events where datum_zeit> stats max(datum_zeit_summary) as NewTime.

The problem is, that I don´t know, how to use the value of any search of the summary index in the saved search.
I expect somethin like this:

index="ekra_protokol" meldung="Test Drucken ..." 
**| datum_zeit> stats max(datum_zeit_summary) as NewTime**
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

Can anybody help, please?

0 Karma
1 Solution

spisiakmi
Communicator

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

View solution in original post

spisiakmi
Communicator

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

woodcock
Esteemed Legend

Kudos to you for coming back and sharing your solution.

0 Karma

spisiakmi
Communicator

Always my pleasure.

0 Karma

richgalloway
SplunkTrust
SplunkTrust

@spisiakmi If your problem is resolved, please accept an answer to help future readers.

---
If this reply helps you, Karma would be appreciated.
0 Karma

woodcock
Esteemed Legend

Your translation to English is a little difficult for me to understand but I think you are asking how to tokenize a saved search to accept arguments and that really is a thing! See here if this helps:
https://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Savedsearch

0 Karma
Get Updates on the Splunk Community!

Webinar Recap | Revolutionizing IT Operations: The Transformative Power of AI and ML ...

The Transformative Power of AI and ML in Enhancing Observability   In the realm of IT operations, the ...

.conf24 | Registration Open!

Hello, hello! I come bearing good news: Registration for .conf24 is now open!   conf is Splunk’s rad annual ...

ICYMI - Check out the latest releases of Splunk Edge Processor

Splunk is pleased to announce the latest enhancements to Splunk Edge Processor.  HEC Receiver authorization ...