Hello,
I was aware that splunk is very versatile application which allows the users to manipulate the data is many ways. I have extracted the fields of event_name, task_id , event_id. I am trying to create an alert if there is an increment in the event_id for the same task_id & event_name when latest even arrives in the splunk.
For example, event at 3:36:40.395 PM have the task_id which is 3 & event_id which is 1223680 AND the latest even arrived at 3:52:40.395 PM which have task_id 3 & event_id which is 1223681 I am trying to create an alert because for the same task_id (3), event_name (server_state) there is an increment in event_id.
I believe it is only possible if we store the previous event_id in a variable for the same event_name & task_id so that we can compare it with the new event_id. However, we have four different task_id, I am not sure how save the event_id for all those different task_id's. Any help would be appreciated.
Log File Explanation:
8/01/2023 3:52:40.395 PM server_state|3 1123681 5
Date Timestamp event_name|task_id event_id random_number
Sample Log file:
8/01/2023 3:52:40.395 PM server_state|3 1223681 5
8/01/2023 3:50:40.395 PM server_state|2 1201257 3
8/01/2023 3:45:40.395 PM server_state|1 1135465 2
8/01/2023 3:41:40.395 PM server_state|0 1545468 5
8/01/2023 3:36:40.395 PM server_state|3 1223680 0
8/01/2023 3:25:40.395 PM server_state|2 1201256 2
8/01/2023 3:15:40.395 PM server_state|1 1135464 3
8/01/2023 3:10:40.395 PM server_state|0 1545467 8
Thank You
Your original post says "create an alert if there is an increment." If you want to alert when there is no change, i.e., no increment or decrement, the formula would be simpler because we don't have calculate whether a change is an increment or decrement.
| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(event_id, 0) = mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
This is a modified emulation where task_id 0 has no change in event_id
| makeresults
| eval data = split("8/01/2023 3:52:40.395 PM server_state|3 1223681 5
8/01/2023 3:50:40.395 PM server_state|2 1201257 3
8/01/2023 3:45:40.395 PM server_state|1 1135465 2
8/01/2023 3:41:40.395 PM server_state|0 1545467 5
8/01/2023 3:36:40.395 PM server_state|3 1223680 0
8/01/2023 3:25:40.395 PM server_state|2 1201256 2
8/01/2023 3:15:40.395 PM server_state|1 1135464 3
8/01/2023 3:10:40.395 PM server_state|0 1545467 8", "
")
| mvexpand data
| rename data as _raw
| rex "(?<ts>(\S+\s){3}) (?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)"
| eval _time = strptime(ts, "%m/%d/%Y %I:%M:%S.%3Q %p")
``` data emulation above ```
This gives you the exact output you ask.
I apologize for the confusion.
I found out that the splunk field extraction feature failed to extract the fields becuase of two different delimiter pipe and space. Looks I need to change log format to all pipes or spaces so that splun would be able to extract the fields correctly.
I am not sure why Splunk cannot handle spaces and pipe as delimiter. Have you tried the rex command in my emulation?
| rex "(?<ts>(\S+\s){3}) (?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)"
In your case, you probably do not need ts extraction because Splunk already gives you _time.
Hi @yuanliu
The reason splunk is unable to extract the fields using in-built filed extraction becuase it only allows to extract either pipe, comma, space but not if the event contains multiple delimiters like pipe, space.
Thank You for sharing the rex command. I have tried the rex command however, I have changed the log format to include additional details. I have played with regex that you shared by making changes but however, it extract only the portion of event_name. I believe it is mainly because of the new format that has a long event_name compared to previous log forma. For example, abc-pendingcardtransfer-networki it only extracts "abc" as event name and I am trying to update the regex to exclude event that starts with [INFO] as it is not required.
(?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)
New sample log format:
abc-pendingcardtransfer-networki|30 77784791 1547
logs-incomingtransaction-datainpu|3 7876821 1458
[INFO] 2019-09-01 13:52:38.22 [main] Apache - Number of netwrok events is 25
dog-acceptedtransactions-incoming|1 746566 1887
sfgd_SGDJELE|2 0 0
es009874_e026516516|28 455255555 785
You can use
\b(?<event_name>[^|]+)\|(?<task_id>\d+) (?<event_id>\d+)
You know you don't have to use delimiter to extract fields. You can select regex instead. This is one way to do it:
Alternatively, you can use the selector. (Most of the time, Splunk will come up with a good regex.)
Thank you for the help. I was able to extract the fields now.
When I run the query 1, I have found that event_name "pending-transfer" with a task_id of 3 have event_id "1274856" being repeated three times in a row which means that there is no increment in the event_id. However, when I run the query 2 for the same event_name " pending-transfer", it doesn't give any output. Technically, query 2 should send an alert ( I have created the alert to run at every minute but still NO alert was triggered ) because there is no change in the event_id for the event that was triggered at 9/4/22 10:02:39 PM and 9/4/22 09:57:39 PM
Not sure if I am missing something.
Query 1 : Alert if there is an increment
| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(_time, 0) > mvindex(_time, -1) AND mvindex(event_id, 0) > mvindex(event_id, -1)
OR mvindex(_time, 0) < mvindex(_time, -1) AND mvindex(event_id, 0) < mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
Below is the output that I am getting when I run the query 1:
Time | event_name | task_id | event_id |
9/4/22 10:02:39 PM | pending-transfer | 3 | 1274856 |
9/4/22 09:57:39 PM | pending-transfer | 3 | 1274856 |
9/4/22 09:52:39 PM | pending-transfer | 3 | 1274856 |
9/4/22 09:47:39 PM | pending-transfer | 3 | 1274851 |
9/4/22 09:37:39 PM | pending-transfer | 3 | 1274849 |
Query 2 : Alert if there is NO increment
| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(event_id, 0) = mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
Thank You
Ah, the original design did not consider the possibility of mixed increment and no-increment. Now, to deal with this, you will need to tell us whether you want to catch any duplicate regardless of interleave, or whether you want to catch only "consecutive" events that duplicate event_id, because the two use cases are very different.
If only consecutive duplicate event_id should trigger alert, you can do
| delta event_id as delta
| stats list(_time) as _time values(delta) as delta by event_id event_name task_id
| where delta == "0"
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
To test this use case, I construct the following extended test dataset based on your illustration.
Time | _time | event_id | event_name | task_id |
9/4/22 10:03:39 PM | 2022-09-04 22:03:39 | 1274851 | pending-transfer | 3 |
9/4/22 10:02:39 PM | 2022-09-04 22:02:39 | 1274856 | pending-transfer | 3 |
9/4/22 09:57:39 PM | 2022-09-04 21:57:39 | 1274856 | pending-transfer | 3 |
9/4/22 09:52:39 PM | 2022-09-04 21:52:39 | 1274856 | pending-transfer | 3 |
9/4/22 09:47:39 PM | 2022-09-04 21:47:39 | 1274851 | pending-transfer | 3 |
9/4/22 09:37:39 PM | 2022-09-04 21:37:39 | 1274849 | pending-transfer | 3 |
And the result is a single row
event_id | event_name | task_id | _time | delta |
1274856 | pending-transfer | 3 | 2022-09-04 22:02:39.000,2022-09-04 21:57:39.000,2022-09-04 21:52:39.000 | 0 5 |
If, on the other hand, the alert should be triggered no matter which other event_id's are in between, you should do
| stats list(_time) as _time by event_id event_name task_id
| where mvcount(_time) > 1
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
Using the same test dataset as illustrated above, you should see two outputs
event_id | event_name | task_id | _time |
1274851 | pending-transfer | 3 | 2022-09-04 22:03:39.000,2022-09-04 21:47:39.000 |
1274856 | pending-transfer | 3 | 2022-09-04 22:02:39.000,2022-09-04 21:57:39.000,2022-09-04 21:52:39.000 |
Here is data emulation that you can play with and compare with real data
| makeresults
| eval _raw = "Time event_name task_id event_id
9/4/22 10:03:39 PM pending-transfer 3 1274851
9/4/22 10:02:39 PM pending-transfer 3 1274856
9/4/22 09:57:39 PM pending-transfer 3 1274856
9/4/22 09:52:39 PM pending-transfer 3 1274856
9/4/22 09:47:39 PM pending-transfer 3 1274851
9/4/22 09:37:39 PM pending-transfer 3 1274849"
| multikv
| eval _time = strptime(Time, "%m/%d/%y %I:%M:%S %p")
| fields - linecount _raw
``` data emulation above ```
I apologize for the confusion. I will try my best to explain it better.
For example,
Event_name = pending-transfer
number of task_id's that event_name (pending-transfer) has "3"
Below table contains the event_id's recieved by the "pending-transfer" for different task_id's at 9:30 PM
Table 1
Time | event_name | task_id | event_id |
9/4/22 09:40:39 PM | pending-transfer | 1 | 1274856 |
9/4/22 09:35:39 PM | pending-transfer | 2 | 1274856 |
9/4/22 09:30:39 PM | pending-transfer | 3 | 1274817 |
At 10:00 PM, there are new event_id's for different task_id's for "pending-transfer" as shown below.
Table 2
Time | event_name | task_id | event_id |
9/4/22 10:10:39 PM | pending-transfer | 1 | 1274856 |
9/4/22 10:05:39 PM | pending-transfer | 2 | 1274748 |
9/4/22 10:00:39 PM | pending-transfer | 3 | 1274902 |
For task_id = 1 , there is no change in the event_id (1274856) for event_id arrived at 10:10 PM compared to the previous event_id at 9:40 PM whereas for other task_id's (task_id=2, task_id=3) there is a change in the event_id. Therefore, alert needs to be generated since there is no change in event_id for task_id=1.
So, logic needs to check if there is a change in event_id for ALL task_id's in an event_name and if there is NO change in event_id for ANY of task_id's in an event_name, then alert needs to be triggered.
I will be creating the alert for each event_name by using where clause.
splun query | where event_name = "pending-transfer"
However, I am not planning to create alert for each specific task_id in the event_name as it lead to so many alerts.
splunk query | where event_name = "pending-transfer" task_id=1
splunk query | where event_name = "pending-transfer" task_id=2
splunk query | where event_name = "pending-transfer" task_id=3
Thank You
Each time you are making very different statements of the requirements. I am not sure it is worth my time to propose search until you can define your requirements in terms of data examples. (Use cases.)
So, let me clarify one more time with data. This first set has two task_id's (out of three) that changed event_id (let's forget about increment or decrement for now) during the time period. There fore this set should not alarm:
_time | eventg_id | event_name | task_id |
2022-09-04 21:40:39 | 1274856 | pending-transfer | 1 |
2022-09-04 21:35:39 | 1274856 | pending-transfer | 2 |
2022-09-04 21:30:39 | 1274817 | pending-transfer | 3 |
2022-09-04 22:10:39 | 1274856 | pending-transfer | 1 |
2022-09-04 22:05:39 | 1274748 | pending-transfer | 2 |
2022-09-04 22:00:39 | 1274902 | pending-transfer | 3 |
Let me construct a slightly different set in which every event_id has a unchanging task_id
_time | event_id | event_name | task_id |
2022-09-04 21:40:39 | 1274856 | pending-transfer | 1 |
2022-09-04 21:35:39 | 1274748 | pending-transfer | 2 |
2022-09-04 21:30:39 | 1274902 | pending-transfer | 3 |
2022-09-04 22:10:39 | 1274856 | pending-transfer | 1 |
2022-09-04 22:05:39 | 1274748 | pending-transfer | 2 |
2022-09-04 22:00:39 | 1274902 | pending-transfer | 3 |
With the second set, you want an alert.
Do the above sufficiently capture use case requirements? Here are emulations of the two sets
| makeresults
| eval _raw = "Time event_name task_id event_id
9/4/22 09:40:39 PM pending-transfer 1 1274856
9/4/22 09:35:39 PM pending-transfer 2 1274856
9/4/22 09:30:39 PM pending-transfer 3 1274817
9/4/22 10:10:39 PM pending-transfer 1 1274856
9/4/22 10:05:39 PM pending-transfer 2 1274748
9/4/22 10:00:39 PM pending-transfer 3 1274902"
| multikv
| eval _time = strptime(Time, "%m/%d/%y %I:%M:%S %p")
| fields - linecount _raw
``` data emulation set 1 ```
| makeresults
| eval _raw = "Time event_name task_id event_id
9/4/22 10:10:39 PM pending-transfer 1 1274856
9/4/22 10:05:39 PM pending-transfer 2 1274748
9/4/22 10:00:39 PM pending-transfer 3 1274902
9/4/22 09:40:39 PM pending-transfer 1 1274856
9/4/22 09:35:39 PM pending-transfer 2 1274748
9/4/22 09:30:39 PM pending-transfer 3 1274902"
| multikv
| eval _time = strptime(Time, "%m/%d/%y %I:%M:%S %p")
| fields - linecount _raw
``` data emulation set 2 ```
If there are additional cases to be differentiated, please play with the emulations and construct differentiation.
Again, forget how many alerts you want to send. Just focus on data input and whether or not a given dataset should trigger alert.
hi @yuanliu
I appreciate all your responses and I hope the below flowchart makes the requirement clear.
Thank You
HI @yuanliu
Please note that task_id's for event_name are constant and only event_id's going to change. Moreover, even if one task_id have an unchanged event_id, then it should trigger an alert. The alert should be triggered only if there is unchanged event_id for the corresponding task_id. To your question, the alert should be triggered with both set-1 and set-2 because set-1 have one unchanged event_id whereas set-2 have three unchanged event_id's.
Thank You
Thank You
question, the alert should be triggered with both set-1 and set-2 because set-1 have one unchanged event_id whereas set-2 have three unchanged event_id's.
In that case,
| stats list(_time) as _time by event_id event_name task_id
| where mvcount(_time) > 1
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
should suffice. The emulated dataset 1 gives
event_id | event_name | task_id | _time |
1274856 | pending-transfer | 1 | 2022-09-04 21:40:39.000,2022-09-04 22:10:39.000 |
Emulated dataset 2 gives
event_id | event_name | task_id | _time |
1274748 | pending-transfer | 2 | 2022-09-04 22:05:39.000,2022-09-04 21:35:39.000 |
1274856 | pending-transfer | 1 | 2022-09-04 22:10:39.000,2022-09-04 21:40:39.000 |
1274902 | pending-transfer | 3 | 2022-09-04 22:00:39.000,2022-09-04 21:30:39.000 |
Can you show a dataset that the above does not meet the requirement? (Just modify the emulations so we are on the same page.)
If I understand the requirements correctly, an alert sequence is one in which task_id is equal but a later event_id is greater than an earlier one, you can say that conceptually, you need a temporary storage. But like most languages, SPL commands RAM for such transient needs.
From your sample log, is the following what you are looking for?
event_name | task_id | _time | event_id |
server_state | 0 | 2023-08-01 15:41:40.395,2023-08-01 15:10:40.395 | 1545468 1545467 |
server_state | 1 | 2023-08-01 15:45:40.395,2023-08-01 15:15:40.395 | 1135465 1135464 |
server_state | 2 | 2023-08-01 15:50:40.395,2023-08-01 15:25:40.395 | 1201257 1201256 |
server_state | 3 | 2023-08-01 15:52:40.395,2023-08-01 15:36:40.395 | 1223681 1223680 |
You can achieve this with the following assuming that event_name, task_id, and event_id are already extracted:
| stats list(_time) as _time list(event_id) as event_id by event_name task_id
| where mvindex(_time, 0) > mvindex(_time, -1) AND mvindex(event_id, 0) > mvindex(event_id, -1)
OR mvindex(_time, 0) < mvindex(_time, -1) AND mvindex(event_id, 0) < mvindex(event_id, -1)
| fieldformat _time = strftime(_time, "%F %H:%M:%S.%3Q")
Here is an emulation of your sample data that you can play with and compare with real data.
| makeresults
| eval data = split("8/01/2023 3:52:40.395 PM server_state|3 1223681 5
8/01/2023 3:50:40.395 PM server_state|2 1201257 3
8/01/2023 3:45:40.395 PM server_state|1 1135465 2
8/01/2023 3:41:40.395 PM server_state|0 1545468 5
8/01/2023 3:36:40.395 PM server_state|3 1223680 0
8/01/2023 3:25:40.395 PM server_state|2 1201256 2
8/01/2023 3:15:40.395 PM server_state|1 1135464 3
8/01/2023 3:10:40.395 PM server_state|0 1545467 8", "
")
| mvexpand data
| rename data as _raw
| rex "(?<ts>(\S+\s){3}) (?<event_name>\w+)\|(?<task_id>\d+) (?<event_id>\d+)"
| eval _time = strptime(ts, "%m/%d/%Y %I:%M:%S.%3Q %p")
``` data emulation above ```
Hope this helps.
Hi @yuanliu
Thank you for the reply. The table view is great. What I was trying to achieve is that to trigger an alert, for example, from the below table, if the latest event_id is "1545467" compared to last/previous event_id (which is also 1545467) for the same task_id, event_name for last 2 hours, then alert should be triggered. Since there is no change in the event_id, it should trigger an alert.
event_name | task_id | _time | event_id |
server_state | 0 | 2023-08-01 15:41:40.395,2023-08-01 15:10:40.395 | 1545467 1545467 |