Splunk Search

How can I count failures in the neighborhood events matching a rex

virgilg
Explorer

I have a question similar to:
https://answers.splunk.com/answers/2602
and
https://answers.splunk.com/answers/448796

I would like to get a search match (for which I define a field) and also search the subsequent daemon log for another search. If the second search repeats x count, then save this field as an Error; otherwise (if search contains < x count but > 0), it's a Warning. If the next line does not contain an Error or a Warning, then it’s a Pass.

The daemon is atftpd and its logs of interest are:

Sep 25 10:58:07 caffeine atftpd[6596]: Serving kernels/vmlinuz to IP:1668
Sep 25 10:58:07 caffeine atftpd[6596]: Serving kernels/vmlinuz to IP:1669
Sep 25 10:58:23 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:28 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:33 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:38 caffeine atftpd[6596]: timeout: retrying...
Sep 25 10:58:43 caffeine atftpd[6596]: timeout: retrying...
Sep 25 11:08:07 caffeine atftpd[6596]: Serving kernels/vmlinuz to anotherIP:1211

There is a deterministic pattern to the timeout: retrying... entries (every 5 seconds) and also a configurable count (5).
So if I see a Serving... line followed by exactly 5 retrying... I know for sure it's a failure.

My search so far saves the IPs and the errors in some fields, but the transaction facility in Splunk returns only the first hit of "timeout":

sourcetype=syslog AND atftpd AND caffeine | rex field=_raw "Serving.* to (?<ip_address>[0-9]*.[0-9]*.[0-9]*.[0-9]*)" | rex field=_raw ".* (?<error>timeout).*" | transaction endswith=(: timeout: retrying...) maxcount=5

I would have assumed that maxcount=5 gave the count of the transaction search match, not the total line count of the previous search.

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Try something like this...

   | your search that gets the data  with either "serving" or "timeout" records

   | rename COMMENT as "Put in time order, mark timeout records, copy each onto next record"
   | sort 0 _time
   | eval Timeout=if(match(_raw,"timeout"),1,0)
   | streamstats current=f last(Timeout) as priorTimeout 

   | rename COMMENT as "Mark as new group if it is the first record or the timeout value changes, calculate the group number" 
   | eval newgroup=case(isnull(priorTimeout),1, priorTimeout!=Timeout,1, true(),0 )
   | streamstats sum(newgroup) as groupno

   | rename COMMENT as "Determine how many timeout records are in the group, set to zero if not a timeout group" 
   | eventstats count as groupcount by groupno
   | eval groupcount=if(Timeout=1,groupcount,0)

   | rename COMMENT as "Run backwards through the data to copy the number of timeouts onto the PRECEDING serving record." 
   | reverse
   | streamstats current=f last(groupcount) as timeoutCount 

   | rename COMMENT as "Drop the Timeout records, set the flag ." 
   | where Timeout=0
   | eval Flag=case(timeoutCount>=5,"Error", timeoutCount>0,"Warning", timeoutCount=0,"Pass", true(),"Unknown")
0 Karma
Get Updates on the Splunk Community!

Splunk Forwarders and Forced Time Based Load Balancing

Splunk customers use universal forwarders to collect and send data to Splunk. A universal forwarder can send ...

NEW! Log Views in Splunk Observability Dashboards Gives Context From a Single Page

Today, Splunk Observability releases log views, a new feature for users to add their logs data from Splunk Log ...

Last Chance to Submit Your Paper For BSides Splunk - Deadline is August 12th!

Hello everyone! Don't wait to submit - The deadline is August 12th! We have truly missed the community so ...