Splunk Search

Return previous row when record matches

marc_ukg
Explorer

I have a log that contain records for tables processed in a database. For each table, a log entry is added showing the number of records to be processed. If processing fails for whatever reason, an ERROR is recorded. If processing succeeds, nothing is recorded.

Sample log

 

[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test1_sales_nas_10, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test1_sales_nas_18, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test2_nas_01, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test3_nas_1113, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1. 
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test3_nas_1112, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1. 

 

There is nothing that links the ERROR record to the INFO record except for the order it happened. How can I create a search that returns records matching "Error occurred during purging of records" AND the previous record in the log to provide context for the error?

I realize that this makes a huge assumption -- that the ERROR always refers to the records immediately above it -- but it's unfortunately the only thing I have to go off of.

Any help is appreciated.

Labels (1)
0 Karma

marc_ukg
Explorer

Unfortunately, this doesn't help me as the data is already imported as one-record-per-line in Splunk.

In the end, the resolution is really to fix the data. I've submitted a request to our engineering team to add the 'tenant' value to the ERROR lines rather than try to post-process the data.

Thanks for the help.

0 Karma

marc_ukg
Explorer

That appears to work, however:

  1. how would I go about returning the ERROR entry alongside the following INFO? It doesn't have to be combined into a single row but should return both rows (in case the ERROR line contains useful details to the cause of the failure).
    I researched 'streamstats' but could not see how to achieve this.

  2. How can I filter the returned records? Using the example data above, how would I exclude ERRORs on 

 

test3_nas_*

Adding a | where clause to the records returned by streamstats did not appear to be the right approach. Meanwhile, adding NOT "test3_nas_*" to the base search replaces the INFO value with the ERROR _raw value instead, but still returns the same number of records.

 

0 Karma

bowesmana
SplunkTrust
SplunkTrust

This works with your example data and assumes that the logical sequence of your data is top (earliest) to bottom (latest), so requires the 'reverse' and match_after 

| makeresults
| fields - _time
| eval _raw="
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test1_sales_nas_10, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test1_sales_nas_18, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test2_nas_01, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test3_nas_1113, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1.|
[ProcessId- 5459] [2020-08-29 06:22:34]  [INFO] For tenant - test3_nas_1112, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1."
| eval x=split(_raw,"|")
| rex field=x mode=sed "s/\n//"
| fields x
| mvexpand x
| eval _raw=x
| fields _raw
| eval COMMENT="----------Up to here is just setting up your data----------"
| reverse
| streamstats reset_after="(match(_raw,\"For tenant\"))" count list(_raw) as events
| where count=2

This collects the items - list(_raw) as events - to a new field called events

 

 

0 Karma

richgalloway
SplunkTrust
SplunkTrust

This works if you think in reverse.  That is, look for the first event *after* (before) the error.

| reverse
| streamstats reset_after="(match(_raw,\"Error occurred\"))" count
| where count=1 
| table _raw

 

---
If this reply helps you, Karma would be appreciated.
0 Karma
Get Updates on the Splunk Community!

Detecting Brute Force Account Takeover Fraud with Splunk

This article is the second in a three-part series exploring advanced fraud detection techniques using Splunk. ...

Buttercup Games: Further Dashboarding Techniques (Part 9)

This series of blogs assumes you have already completed the Splunk Enterprise Search Tutorial as it uses the ...

Buttercup Games: Further Dashboarding Techniques (Part 8)

This series of blogs assumes you have already completed the Splunk Enterprise Search Tutorial as it uses the ...