I have a log that contain records for tables processed in a database. For each table, a log entry is added showing the number of records to be processed. If processing fails for whatever reason, an ERROR is recorded. If processing succeeds, nothing is recorded.
Sample log
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test1_sales_nas_10, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test1_sales_nas_18, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test2_nas_01, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test3_nas_1113, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1.
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test3_nas_1112, total number of records purged = 0
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1.
There is nothing that links the ERROR record to the INFO record except for the order it happened. How can I create a search that returns records matching "Error occurred during purging of records" AND the previous record in the log to provide context for the error?
I realize that this makes a huge assumption -- that the ERROR always refers to the records immediately above it -- but it's unfortunately the only thing I have to go off of.
Any help is appreciated.
Unfortunately, this doesn't help me as the data is already imported as one-record-per-line in Splunk.
In the end, the resolution is really to fix the data. I've submitted a request to our engineering team to add the 'tenant' value to the ERROR lines rather than try to post-process the data.
Thanks for the help.
That appears to work, however:
test3_nas_*
Adding a | where clause to the records returned by streamstats did not appear to be the right approach. Meanwhile, adding NOT "test3_nas_*" to the base search replaces the INFO value with the ERROR _raw value instead, but still returns the same number of records.
This works with your example data and assumes that the logical sequence of your data is top (earliest) to bottom (latest), so requires the 'reverse' and match_after
| makeresults
| fields - _time
| eval _raw="
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test1_sales_nas_10, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test1_sales_nas_18, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test2_nas_01, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test3_nas_1113, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1.|
[ProcessId- 5459] [2020-08-29 06:22:34] [INFO] For tenant - test3_nas_1112, total number of records purged = 0|
[ProcessId- 5459] [2020-08-29 06:22:34] [ERROR] Error occurred during purging of records. Error code returned to shell script by DB function = -1."
| eval x=split(_raw,"|")
| rex field=x mode=sed "s/\n//"
| fields x
| mvexpand x
| eval _raw=x
| fields _raw
| eval COMMENT="----------Up to here is just setting up your data----------"
| reverse
| streamstats reset_after="(match(_raw,\"For tenant\"))" count list(_raw) as events
| where count=2
This collects the items - list(_raw) as events - to a new field called events
This works if you think in reverse. That is, look for the first event *after* (before) the error.
| reverse
| streamstats reset_after="(match(_raw,\"Error occurred\"))" count
| where count=1
| table _raw