Splunk Search

How to use search from summary index as a condition in another search?

spisiakmi
Communicator

Hi,
I have index="ekra_protokol" which has these events:

datum_zeit;meldung
2019-06-19 05:56:26.754: Test Drucken ...
2019-06-19 05:56:37.629: Test Drucken ...
2019-06-19 05:56:48.570: Test Drucken ...
2019-06-19 05:56:59.516: Test Drucken ...
2019-06-19 05:57:10.436: Test Drucken ...
2019-06-19 05:58:10.436: Test Drucken ...
2019-06-20 05:56:59.516: Test Drucken ...
2019-06-20 05:57:10.436: Test Drucken ...
2019-06-20 05:58:10.436: Test Drucken ...
2019-06-20 05:58:32.436: Test Drucken ...

I have a saved search, which is running every 5min. This saved search collects all outputs into a summary search. In fact, it is saving the timestamp datum_zeit when the counter meldung="Test Drucken" multiples 3.

index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

The summary index ekra_protokol_rakel_summary has 3 rows.

datum_zeit_summary
2019-06-19 05:56:48.570
2019-06-19 05:58:10.436
2019-06-20 05:58:10.436

These are offline data. But I want to switch into a situation, when the events will be indexed through forwarder and the counterReset will be 40000 and the saved search will run each hour.

I want to run the saved search exactly from the point, where the counter reached his limit. So each saved search should search only events where datum_zeit> stats max(datum_zeit_summary) as NewTime.

The problem is, that I don´t know, how to use the value of any search of the summary index in the saved search.
I expect somethin like this:

index="ekra_protokol" meldung="Test Drucken ..." 
**| datum_zeit> stats max(datum_zeit_summary) as NewTime**
| sort 0 datum_zeit
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

Can anybody help, please?

0 Karma
1 Solution

spisiakmi
Communicator

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

View solution in original post

spisiakmi
Communicator

Hi woodcock, thank you very much for your help. Finally I found a solution:

index="ekra_protokol_rakel_summary" 
| stats max(datum_zeit_summary) as NewTime 
| append [search index="ekra_protokol" meldung="Test Drucken ..." 
| sort 0 datum_zeit] 
| eventstats max(NewTime) as NewTime
| where datum_zeit>NewTime 
| autoregress meldung as meldung_old
| eval cnt=0
| autoregress cnt as cnt_old
| eval cnt = if(meldung=meldung_old,cnt_old+1,cnt_old)
| table datum_zeit meldung cnt cnt_old
| streamstats count(eval(cnt==1)) as conecutive_change
| fields - cnt cnt_old
| eval cnt=conecutive_change+1
| fields - conecutive_change
| eval decimal=(cnt%3)/3
| eval action=if(decimal=0,1,0)
| where action==1
| sitop datum_zeit
| stats max(datum_zeit) as datum_zeit_summary 
| fields datum_zeit_summary
| addinfo
| collect index=ekra_protokol_rakel_summary testmode=false marker="populated_ekra_protokol_rakel_summary"

woodcock
Esteemed Legend

Kudos to you for coming back and sharing your solution.

0 Karma

spisiakmi
Communicator

Always my pleasure.

0 Karma

richgalloway
SplunkTrust
SplunkTrust

@spisiakmi If your problem is resolved, please accept an answer to help future readers.

---
If this reply helps you, Karma would be appreciated.
0 Karma

woodcock
Esteemed Legend

Your translation to English is a little difficult for me to understand but I think you are asking how to tokenize a saved search to accept arguments and that really is a thing! See here if this helps:
https://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Savedsearch

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...