Splunk Search

Tempory output storage

pukka
Loves-to-Learn Everything

Hello,

I was aware that splunk is very versatile application which allows the users to manipulate the data is many ways.  I have extracted the fields of event_name, task_id , event_id. I am trying to create an alert if there is an increment in the event_id for the same task_id & event_name when latest even arrives in the splunk.

For example, event at 3:36:40.395 PM have the task_id which is 3  & event_id which is 1223680  AND  the latest even arrived at 3:52:40.395 PM which have task_id 3 & event_id which is 1223681 I am trying to create an alert because for the same task_id (3), event_name (server_state) there is an increment in event_id.

I believe it is only possible if we store the previous event_id in a variable for the same event_name & task_id so that we can compare it with the new event_id. However, we have four different task_id, I am not sure how save the event_id for all those different task_id's. Any help would be appreciated.

 

Log File Explanation:

 

8/01/2023 3:52:40.395 PM server_state|3 1123681 5

Date       Timestamp      event_name|task_id event_id random_number

 

 

Sample Log file:

 

8/01/2023 3:52:40.395 PM  server_state|3 1223681 5
8/01/2023 3:50:40.395 PM  server_state|2 1201257 3
8/01/2023 3:45:40.395 PM  server_state|1 1135465 2
8/01/2023 3:41:40.395 PM  server_state|0 1545468 5
8/01/2023 3:36:40.395 PM  server_state|3 1223680 0
8/01/2023 3:25:40.395 PM  server_state|2 1201256 2
8/01/2023 3:15:40.395 PM  server_state|1 1135464 3
8/01/2023 3:10:40.395 PM  server_state|0 1545467 8

 

 

Thank You

Labels (1)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

Your original post says "create an alert if there is an increment."  If you want to alert when there is no change, i.e., no increment or decrement, the formula would be simpler because we don't have calculate whether a change is an increment or decrement.

| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(event_id, 0) = mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

This is a modified emulation where task_id 0 has no change in event_id

| makeresults
| eval data = split("8/01/2023 3:52:40.395 PM  server_state|3 1223681 5
8/01/2023 3:50:40.395 PM  server_state|2 1201257 3
8/01/2023 3:45:40.395 PM  server_state|1 1135465 2
8/01/2023 3:41:40.395 PM  server_state|0 1545467 5
8/01/2023 3:36:40.395 PM  server_state|3 1223680 0
8/01/2023 3:25:40.395 PM  server_state|2 1201256 2
8/01/2023 3:15:40.395 PM  server_state|1 1135464 3
8/01/2023 3:10:40.395 PM  server_state|0 1545467 8", "
")
| mvexpand data
| rename data as _raw
| rex "(?<ts>(\S+\s){3}) (?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)"
| eval _time = strptime(ts, "%m/%d/%Y %I:%M:%S.%3Q %p")
``` data emulation above ```

This gives you the exact output you ask.

0 Karma

pukka
Loves-to-Learn Everything

I apologize for the confusion. 

I found out that the splunk field extraction feature failed to extract the fields becuase of two different delimiter pipe and space. Looks I need to change  log format to all pipes or spaces so that splun would be able to extract the fields correctly.

0 Karma

yuanliu
SplunkTrust
SplunkTrust

I am not sure why Splunk cannot handle spaces and pipe as delimiter.  Have you tried the rex command in my emulation?

| rex "(?<ts>(\S+\s){3}) (?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)"

 In your case, you probably do not need ts extraction because Splunk already gives you _time.

Tags (1)
0 Karma

pukka
Loves-to-Learn Everything

Hi @yuanliu 

The reason splunk is unable to extract the fields using in-built filed extraction becuase it only allows to extract either pipe, comma, space but not if the event contains multiple delimiters like pipe, space.

Thank You for sharing the rex command. I have tried the rex command however, I have changed the log format to include additional details. I have  played with regex that you shared by making changes but however, it  extract only the portion of event_name. I believe it is mainly because of the new format that has a long event_name compared to previous log forma. For example,  abc-pendingcardtransfer-networki it only extracts "abc" as event name and I am trying to update the regex to exclude event  that starts with [INFO]  as it is not required. 

(?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)

 

New sample log format:

abc-pendingcardtransfer-networki|30 77784791 1547
logs-incomingtransaction-datainpu|3 7876821 1458
[INFO] 2019-09-01 13:52:38.22 [main] Apache - Number of netwrok events is 25
dog-acceptedtransactions-incoming|1 746566 1887
sfgd_SGDJELE|2 0 0
es009874_e026516516|28 455255555 785

 

 

 

 

 

 

0 Karma

yuanliu
SplunkTrust
SplunkTrust

You can use

 

\b(?<event_name>[^|]+)\|(?<task_id>\d+) (?<event_id>\d+)

 

You know you don't have to use delimiter to extract fields.  You can select regex instead.  This is one way to do it:

extract-prefer-regex.png

extract-regex.png

Alternatively, you can use the selector. (Most of the time, Splunk will come up with a good regex.)

0 Karma

pukka
Loves-to-Learn Everything

Thank you for the help. I was able to extract the fields now.

When I run the query 1, I have found that event_name "pending-transfer" with a task_id of 3 have event_id "1274856" being repeated three times in a row which means that there is no increment in the event_id. However, when I run the query 2 for the same event_name " pending-transfer", it doesn't give any output. Technically, query 2 should send an alert ( I have created the alert to run at every minute but still NO alert was triggered ) because there is no change in the event_id for the event that was triggered at 9/4/22 10:02:39 PM and 9/4/22 09:57:39 PM 

Not sure if I am missing something.

 

Query 1 : Alert if there is an increment

| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(_time, 0) > mvindex(_time, -1) AND mvindex(event_id, 0) > mvindex(event_id, -1)
  OR mvindex(_time, 0) < mvindex(_time, -1) AND mvindex(event_id, 0) < mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

 

Below is the output that I am getting when I run the query 1:

Timeevent_nametask_idevent_id
9/4/22 10:02:39 PMpending-transfer31274856
9/4/22 09:57:39 PMpending-transfer31274856
9/4/22 09:52:39 PMpending-transfer31274856
9/4/22 09:47:39 PMpending-transfer31274851
9/4/22 09:37:39 PMpending-transfer31274849

 

 

Query 2 : Alert if there is NO increment

| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(event_id, 0) = mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

 

Thank You

 

0 Karma

yuanliu
SplunkTrust
SplunkTrust

Ah, the original design did not consider the possibility of mixed increment and no-increment.  Now, to deal with this, you will need to tell us whether you want to catch any duplicate regardless of interleave, or whether you want to catch only "consecutive" events that duplicate event_id, because the two use cases are very different.

If only consecutive duplicate event_id should trigger alert, you can do

 

| delta event_id as delta
| stats list(_time) as _time values(delta) as delta by event_id event_name task_id
| where delta == "0"
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

 

To test this use case, I construct the following extended test dataset based on your illustration.

Time_timeevent_idevent_nametask_id
9/4/22 10:03:39 PM2022-09-04 22:03:391274851pending-transfer3
9/4/22 10:02:39 PM2022-09-04 22:02:391274856pending-transfer3
9/4/22 09:57:39 PM2022-09-04 21:57:391274856pending-transfer3
9/4/22 09:52:39 PM2022-09-04 21:52:391274856pending-transfer3
9/4/22 09:47:39 PM2022-09-04 21:47:391274851pending-transfer3
9/4/22 09:37:39 PM2022-09-04 21:37:391274849pending-transfer3

And the result is a single row

event_idevent_nametask_id_time
delta
1274856pending-transfer32022-09-04 22:02:39.000,2022-09-04 21:57:39.000,2022-09-04 21:52:39.000
0
5

If, on the other hand, the alert should be triggered no matter which other event_id's are in between, you should do

 

| stats list(_time) as _time by event_id event_name task_id
| where mvcount(_time) > 1
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

 

Using the same test dataset as illustrated above, you should see two outputs

event_idevent_nametask_id_time
1274851pending-transfer32022-09-04 22:03:39.000,2022-09-04 21:47:39.000
1274856pending-transfer32022-09-04 22:02:39.000,2022-09-04 21:57:39.000,2022-09-04 21:52:39.000

Here is data emulation that you can play with and compare with real data

 

| makeresults
| eval _raw = "Time	event_name	task_id	event_id
9/4/22 10:03:39 PM	pending-transfer	3	1274851
9/4/22 10:02:39 PM	pending-transfer	3	1274856
9/4/22 09:57:39 PM	pending-transfer	3	1274856
9/4/22 09:52:39 PM	pending-transfer	3	1274856
9/4/22 09:47:39 PM	pending-transfer	3	1274851
9/4/22 09:37:39 PM	pending-transfer	3	1274849"
| multikv
| eval _time = strptime(Time, "%m/%d/%y %I:%M:%S %p")
| fields - linecount _raw
``` data emulation above ```

 

 

0 Karma

pukka
Loves-to-Learn Everything

I apologize for the confusion. I will try my best to explain it better.

For example,

 

Event_name = pending-transfer

number of task_id's that event_name (pending-transfer) has "3"

 

Below table contains the  event_id's recieved by the "pending-transfer" for different task_id's at 9:30 PM

Table 1

Timeevent_nametask_idevent_id
9/4/22 09:40:39 PMpending-transfer11274856
9/4/22 09:35:39 PMpending-transfer21274856
9/4/22 09:30:39 PMpending-transfer31274817

 

 

At 10:00 PM, there are new event_id's for different task_id's for "pending-transfer" as shown below.

Table 2

Timeevent_nametask_idevent_id
9/4/22 10:10:39 PMpending-transfer11274856
9/4/22 10:05:39 PMpending-transfer21274748
9/4/22 10:00:39 PMpending-transfer31274902

 

For task_id = 1 , there is no change in the event_id (1274856) for event_id arrived at 10:10 PM compared to the previous event_id at 9:40 PM whereas for other task_id's (task_id=2, task_id=3) there is a change in the event_id.  Therefore, alert needs to be generated since there is no change in event_id for task_id=1. 

So, logic needs to check if there is a change in event_id for ALL task_id's in an event_name and if there is NO change in event_id for ANY of task_id's in an event_name, then  alert needs to be triggered.

 

I will be creating the alert for each event_name by using where clause.

splun query | where event_name = "pending-transfer"

 

However, I am not planning to create alert for each specific task_id in the event_name as it lead to so many alerts.

splunk query | where event_name = "pending-transfer" task_id=1
splunk query | where event_name = "pending-transfer" task_id=2
splunk query | where event_name = "pending-transfer" task_id=3

 

 

Thank You

0 Karma

yuanliu
SplunkTrust
SplunkTrust

Each time you are making very different statements of the requirements.  I am not sure it is worth my time to propose search until you can define your requirements in terms of data examples. (Use cases.)

So, let me clarify one more time with data.  This first set has two task_id's (out of three) that changed event_id (let's forget about increment or decrement for now) during the time period.  There fore this set should not alarm:

_timeeventg_idevent_nametask_id
2022-09-04 21:40:391274856pending-transfer1
2022-09-04 21:35:391274856pending-transfer2
2022-09-04 21:30:391274817pending-transfer3
2022-09-04 22:10:391274856pending-transfer1
2022-09-04 22:05:391274748pending-transfer2
2022-09-04 22:00:391274902pending-transfer3

Let me construct a slightly different set in which every event_id has a unchanging task_id

_timeevent_idevent_nametask_id
2022-09-04 21:40:391274856pending-transfer1
2022-09-04 21:35:391274748pending-transfer2
2022-09-04 21:30:391274902pending-transfer3
2022-09-04 22:10:391274856pending-transfer1
2022-09-04 22:05:391274748pending-transfer2
2022-09-04 22:00:391274902pending-transfer3

With the second set, you want an alert.

Do the above sufficiently capture use case requirements?  Here are emulations of the two sets

 

| makeresults
| eval _raw = "Time	event_name	task_id	event_id
9/4/22 09:40:39 PM	pending-transfer	1	1274856
9/4/22 09:35:39 PM	pending-transfer	2	1274856
9/4/22 09:30:39 PM	pending-transfer	3	1274817
9/4/22 10:10:39 PM	pending-transfer	1	1274856
9/4/22 10:05:39 PM	pending-transfer	2	1274748
9/4/22 10:00:39 PM	pending-transfer	3	1274902"
| multikv
| eval _time = strptime(Time, "%m/%d/%y %I:%M:%S %p")
| fields - linecount _raw
``` data emulation set 1 ```
| makeresults
| eval _raw = "Time	event_name	task_id	event_id
9/4/22 10:10:39 PM	pending-transfer	1	1274856
9/4/22 10:05:39 PM	pending-transfer	2	1274748
9/4/22 10:00:39 PM	pending-transfer	3	1274902
9/4/22 09:40:39 PM	pending-transfer	1	1274856
9/4/22 09:35:39 PM	pending-transfer	2	1274748
9/4/22 09:30:39 PM	pending-transfer	3	1274902"
| multikv
| eval _time = strptime(Time, "%m/%d/%y %I:%M:%S %p")
| fields - linecount _raw
``` data emulation set 2 ```

 

If there are additional cases to be differentiated, please play with the emulations and construct differentiation.

Again, forget how many alerts you want to send.  Just focus on data input and whether or not a given dataset should trigger alert.

0 Karma

pukka
Loves-to-Learn Everything

hi @yuanliu 

I appreciate all your responses and I hope the below flowchart makes the requirement clear.

flowchart_2.png

 

 

Thank You

0 Karma

pukka
Loves-to-Learn Everything

HI @yuanliu 

 Please note that task_id's for event_name are constant and only event_id's going to change. Moreover, even if one task_id have an unchanged event_id, then it should trigger an alert.  The alert should be triggered only if there is unchanged event_id for the corresponding task_id. To your question, the alert should be triggered with both set-1 and set-2 because set-1 have one unchanged event_id whereas set-2 have three unchanged event_id's.

 

Thank You

 

 

 

Thank You

0 Karma

yuanliu
SplunkTrust
SplunkTrust

question, the alert should be triggered with both set-1 and set-2 because set-1 have one unchanged event_id whereas set-2 have three unchanged event_id's.

In that case,

 

| stats list(_time) as _time by event_id event_name task_id
| where mvcount(_time) > 1
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

 

should suffice.  The emulated dataset 1 gives

event_idevent_nametask_id_time
1274856pending-transfer12022-09-04 21:40:39.000,2022-09-04 22:10:39.000

Emulated dataset 2 gives

event_idevent_nametask_id_time
1274748pending-transfer22022-09-04 22:05:39.000,2022-09-04 21:35:39.000
1274856pending-transfer12022-09-04 22:10:39.000,2022-09-04 21:40:39.000
1274902pending-transfer32022-09-04 22:00:39.000,2022-09-04 21:30:39.000

Can you show a dataset that the above does not meet the requirement? (Just modify the emulations so we are on the same page.)

0 Karma

yuanliu
SplunkTrust
SplunkTrust

If I understand the requirements correctly, an alert sequence is one in which task_id is equal but a later event_id is greater than an earlier one, you can say that conceptually, you need a temporary storage.  But like most languages, SPL commands RAM for such transient needs.

From your sample log, is the following what you are looking for?

event_nametask_id_time
event_id
server_state02023-08-01 15:41:40.395,2023-08-01 15:10:40.395
1545468
1545467
server_state12023-08-01 15:45:40.395,2023-08-01 15:15:40.395
1135465
1135464
server_state22023-08-01 15:50:40.395,2023-08-01 15:25:40.395
1201257
1201256
server_state32023-08-01 15:52:40.395,2023-08-01 15:36:40.395
1223681
1223680

You can achieve this with the following assuming that event_name, task_id, and event_id are already extracted:

 

| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(_time, 0) > mvindex(_time, -1) AND mvindex(event_id, 0) > mvindex(event_id, -1)
  OR mvindex(_time, 0) < mvindex(_time, -1) AND mvindex(event_id, 0) < mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")

 

Here is an emulation of your sample data that you can play with and compare with real data.

 

| makeresults
| eval data = split("8/01/2023 3:52:40.395 PM  server_state|3 1223681 5
8/01/2023 3:50:40.395 PM  server_state|2 1201257 3
8/01/2023 3:45:40.395 PM  server_state|1 1135465 2
8/01/2023 3:41:40.395 PM  server_state|0 1545468 5
8/01/2023 3:36:40.395 PM  server_state|3 1223680 0
8/01/2023 3:25:40.395 PM  server_state|2 1201256 2
8/01/2023 3:15:40.395 PM  server_state|1 1135464 3
8/01/2023 3:10:40.395 PM  server_state|0 1545467 8", "
")
| mvexpand data
| rename data as _raw
| rex "(?<ts>(\S+\s){3}) (?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)"
| eval _time = strptime(ts, "%m/%d/%Y %I:%M:%S.%3Q %p")
``` data emulation above ```

 

 Hope this helps.

Tags (1)
0 Karma

pukka
Loves-to-Learn Everything

Hi @yuanliu 

 

Thank you for the reply.  The table view is great. What I was trying to achieve is that to trigger an alert, for example, from the below table, if the latest event_id is "1545467"  compared to last/previous event_id (which is also 1545467) for the same task_id, event_name for last 2 hours, then alert should be triggered. Since there is no change in the event_id, it should trigger an alert.

 

event_nametask_id_time
event_id
server_state02023-08-01 15:41:40.395,2023-08-01 15:10:40.395
1545467
1545467
0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...