Say, when a user connects his VPN, it will do policy checking (event--> policy_checking) and within 5 minutes will be completed (event--> policy_checking_completed) and allows the user to connect through the VPN, till then it becomes complied policy_checking events will be generated.
If the policy_checking_completed event is not generated in 5 minutes we need to get an alert. If the VPN goes down because of the poor connection or anything else; then it will go through the policy check again.
My search is like this:
index=vpn [search index=vpn policy="policy_checking" | eval earliest=_time | eval latest=_time+300 | fields earliest,latest, Name] | eval status = if(policy="policy_checking_completed",1,0) | eventstats sum(status) as statscount by Name | table _time Name policy statscount | search statscount=0
We have used bin span=5m but it checks the values between 09:00 and 09:05 (NOT 09:03 and 09:08).
This search works fine for if the system generates has one set of events, if the user has multiple disconnection and re-connection say in 1 hr, the search will go for a toss because of statscount.
Is there any other workaround?
You need streamstats
; try this:
index=vpn AND (policy="policy_checking" OR policy="policy_checking_completed")
| streamstats count(eval(policy="policy_checking_completed")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
Thanks a lot for your feedback.
Nearly perfect.. the only issue which i have found was that.. (pls refer the table which i have posted yesterday) for the first set it starts at 15:01:18 and ends at 15:02:45.72 (3 policy_checking events) it didnt get captured as a separate one; instead it went till 15:15:36.827 (5 events: 4 policy_checking event, 1 policy_checking_completed).
sessionID | Name _ | time | duration | policy | policy_count |
---|---|---|---|---|---|
4 | Laptop | 2019-11-23 15:15:36.826 | 857.862 | policy_checking policy_checking_completed | 2 |
3 | Laptop | 2019-11-23 15:36:08.765 | 35.167 | policy_checking policy_checking_completed | 2 |
2 | Laptop | 2019-11-23 15:48:28.549 | 31.422 | policy_checking policy_checking_completed | 2 |
1 | Laptop | 2019-11-23 15:48:31.467 | 0.000 | policy_checking_completed | 1 |
0 | Laptop | 2019-11-23 15:51:56.253 | 49.840 | policy_checking | 1 |
Try this then:
index=vpn AND (policy="policy_checking" OR policy="policy_checking_completed")
| reverse
| streamstats count(eval(policy="policy_checking")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
Thanks woodcock. But it will skew the last entries. I have last 3 events as policy_checking and its not getting picked up. Also, I feel duration is also not correct. Why the duration is 1196 for the time 15:15:36
15:01:18.965 722 policy_checking
15:02:03.260 policy_checking
15:02:45.723 policy_checking
15:13:21.932 policy_checking
15:15:36.827 1196 policy_checking_completed
15:35:33.598 policy_checking
15:36:08.765 708 policy_checking_completed
I just tested and I stand by my last answer. Here is a run-anywhere proof based on the data that you provided in previous comment:
| makeresults
| eval raw="time=15:01:18.965 foo=722 policy=policy_checking
time=15:02:03.260 policy=policy_checking
time=15:02:45.723 policy=policy_checking
time=15:13:21.932 policy=policy_checking
time=15:15:36.827 foo=1196 policy=policy_checking_completed
time=15:35:33.598 policy=policy_checking
time=15:36:08.765 foo=708 policy=policy_checking_completed"
| makemv delim="
" raw
| mvexpand raw
| rename raw AS _raw
| kv
| eval _time = strptime(time, "%H:%M:%S.%3N"), Name = "Name"
| sort 0 - _time
| table _time Name foo policy
| rename COMMENT AS "Everything above generates sample event data; everything below is your solution"
| reverse
| streamstats count(eval(policy="policy_checking")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
I am sure that I can meet your edge cases if you will supply appropriate sample events. Otherwise we will be doing this dance forever.
index=vpn policy="policy_checking" OR policy="policy_checking_completed"
| reverse
| streamstats count(eval(policy="policy_checking_completed")) as count by Name
| eventstats range(_time) as duration by count Name
| where duration > 300
Hi, It might be a little different, but what about this query?
Displays events that have taken more than 5 minutes to complete.
Thanks a lot for your feedback. I appreciate your efforts. Pls excuse, if I was not fully clear on my requirement.
I have seen few challenges. Pls refer the following table
Date _time duration count policy
1.2019-10-23 15:01:18.965 722.967 0 policy_checking
2. 2019-11-22 15:02:03.260 722.967 0 policy_checking
3. 2019-11-22 15:02:45.723 722.967 0 policy_checking
4. 2019-11-22 15:13:21.932 722.967 0 policy_checking
5. 2019-11-22 15:15:36.827 1196.771 1 policy_checking_completed
6. 2019-11-22 15:35:33.598 1196.771 1 policy_checking
7. 2019-11-22 15:36:08.765 708.362 2 policy_checking_completed
8. 2019-11-22 15:47:57.127 708.362 2 policy_checking
9. 2019-11-22 15:48:28.549 0.000 3 policy_checking_completed
10. 2019-11-22 15:48:31.468 204.786 4 policy_checking_completed
11. 2019-11-22 15:51:06.414 204.786 4 policy_checking
12. 2019-11-22 15:51:39.921 204.786 4 policy_checking
13. 2019-11-22 15:51:56.254 204.786 4 policy_checking
The issue with me is that, we need to start the search when it sees the first policy_checking and wait for 5 minutes to see policy_checking_completed. If not send me an alert. Then the counter need to reset to Zero. Whatever comes after it need to be considered as a fresh one.
Also, we have large volume of data so I am not sure whether reverse command is efficient or not?