Splunk Search

How to to dynamically change earliest & latest in subsearch to fill summary data gaps with live data?

ft_kd02
Path Finder

Hi all,

I'm working on a dashboard in which I populate a panel with summary data. The summary data runs once per hour over a very large dataset, looking back over the past 60 minutes, generating an | xyseries of response codes across another split by field (field1). 

What I'd like to accomplish is to fill the gap in summary data with live data with a subsearch, if possible. Essentially, the panel will be half filled during the gap between summary data. E.G. it ran at 10 AM, and you're looking at 10:30AM. It also functions poorly for small time increments - say 15 minutes. For now, I'm limited by resources and cannot refine the summary index load to run more frequently. I expect this would fix the problem (say, if it ran every 5 minutes). Is it possible to dynamically generate a timeframe for a live search based on when the summary data sees a gap? 

Within the panel, I pull summary data like so: 

index=summary source="summaryName" sourcetype=stash search_name="summaryName field1=*
| stats
sum(resp*) AS resp*
BY _time
| untable _time responseCode Volume
``` this part is just processing to apply the description I need to the response code ```
| eval trimmedCode=substr(responseCode, -2)
| lookup responseLookup.csv ResponseCode AS trimmedCode OUTPUT ResponseDescription
| eval ResponseDescription=if(isnull(ResponseDescription), " Description Not Available", ResponseDescription)
| eval Response=trimmedCode+" "+ResponseDescription
``` since volume is established by the summary index, sum(Volume) here ```
| timechart partial=f limit=0 span=15m sum(Volume) AS Volume BY Response

I then append a subsearch here to pull the same info from live data, using another timechart after the processing: 

| timechart span=15m limit=0 count AS Volume by Response 
``` presumably here, I would need to sum(Volume) AS Volume to add up the totals```

I looked into some other searches that pull a single event from a summary index, and tried to generate timestamps based on that, but I haven't had much luck. Like so:

index=summary source=summaryName sourcetype=stash
| head 1
| eval SourceData="SI"
| append
[| makeresults
| eval SourceData="gentimes"]
| head 1
| addinfo
| eval earliest=if(SourceData="SI",if(_time>info_min_time,_time,info_min_time),info_min_time)
| eval latest=info_max_time
| table earliest,latest

Any ideas? Is this a possibility or am I limited by the summary data?

Labels (3)
0 Karma
1 Solution

ITWhisperer
SplunkTrust
SplunkTrust

You can limit your index search to just the time periods missing from your summary index with something like this:

<your index search> [
  search index=summary source="summaryName" sourcetype=stash search_name="summaryName field1=*
  | stats count by _time
  | streamstats window=2 range(_time) as interval
  | where interval > 60 * 15
  | eval earliest=_time-interval+900, latest=_time
  | fields earliest latest ]

View solution in original post

ITWhisperer
SplunkTrust
SplunkTrust

You can limit your index search to just the time periods missing from your summary index with something like this:

<your index search> [
  search index=summary source="summaryName" sourcetype=stash search_name="summaryName field1=*
  | stats count by _time
  | streamstats window=2 range(_time) as interval
  | where interval > 60 * 15
  | eval earliest=_time-interval+900, latest=_time
  | fields earliest latest ]

ft_kd02
Path Finder

Hi @ITWhisperer

Thanks for your reply. Do you mind expanding on the interval statement? This looks like 60s x 15 to correspond to the 15 minute span in the timechart. I failed to mention that the original summary index data is split into 5 minute BINs, so the interval is consistently evaluating to 300. 

| bin span=5m _time

Edit: to clarify, this gives me an initial earliest/latest value while the search runs, but I believe the | where statement is causing no results to be found due to the time interval. 

0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

Yes, you'll need to use 300.

<your index search> [
  search index=summary source="summaryName" sourcetype=stash search_name="summaryName field1=*
``` Get the times of the events in the summary index ```
  | stats count by _time
``` Get the difference (range) between two (window=2) consecutive times ```
  | streamstats window=2 range(_time) as interval
``` Interval between 2 times should be 5 minutes ```
  | where interval > 300
``` Missing summary index events are made up from events just prior to the event in the summary index (latest = _time) and the events in the preceding interval starting 5 minutes after the end of the previous summary index event (earliest = _time - interval + 300) ```
  | eval earliest=_time-interval+300, latest=_time
  | fields earliest latest ]
Get Updates on the Splunk Community!

Introducing the Splunk Community Dashboard Challenge!

Welcome to Splunk Community Dashboard Challenge! This is your chance to showcase your skills in creating ...

Built-in Service Level Objectives Management to Bridge the Gap Between Service & ...

Wednesday, May 29, 2024  |  11AM PST / 2PM ESTRegister now and join us to learn more about how you can ...

Get Your Exclusive Splunk Certified Cybersecurity Defense Engineer Certification at ...

We’re excited to announce a new Splunk certification exam being released at .conf24! If you’re headed to Vegas ...