Splunk Search

How to find a follow-up event after certain interval

cyber_castle
Path Finder

Say, when a user connects his VPN, it will do policy checking (event--> policy_checking) and within 5 minutes will be completed (event--> policy_checking_completed) and allows the user to connect through the VPN, till then it becomes complied policy_checking events will be generated.

If the policy_checking_completed event is not generated in 5 minutes we need to get an alert. If the VPN goes down because of the poor connection or anything else; then it will go through the policy check again.

My search is like this:

index=vpn [search index=vpn policy="policy_checking" | eval earliest=_time | eval latest=_time+300 | fields earliest,latest, Name] | eval status = if(policy="policy_checking_completed",1,0) | eventstats sum(status) as statscount by Name  |  table _time Name policy statscount | search statscount=0

We have used bin span=5m but it checks the values between 09:00 and 09:05 (NOT 09:03 and 09:08).

This search works fine for if the system generates has one set of events, if the user has multiple disconnection and re-connection say in 1 hr, the search will go for a toss because of statscount.

Is there any other workaround?

woodcock
Esteemed Legend

You need streamstats; try this:

index=vpn AND (policy="policy_checking" OR policy="policy_checking_completed")
| streamstats count(eval(policy="policy_checking_completed")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)

cyber_castle
Path Finder

Thanks a lot for your feedback.

Nearly perfect.. the only issue which i have found was that.. (pls refer the table which i have posted yesterday) for the first set it starts at 15:01:18 and ends at 15:02:45.72 (3 policy_checking events) it didnt get captured as a separate one; instead it went till 15:15:36.827 (5 events: 4 policy_checking event, 1 policy_checking_completed).

sessionID Name _ time duration policy policy_count
4 Laptop 2019-11-23 15:15:36.826 857.862 policy_checking policy_checking_completed 2
3 Laptop 2019-11-23 15:36:08.765 35.167 policy_checking policy_checking_completed 2
2 Laptop 2019-11-23 15:48:28.549 31.422 policy_checking policy_checking_completed 2
1 Laptop 2019-11-23 15:48:31.467 0.000 policy_checking_completed 1
0 Laptop 2019-11-23 15:51:56.253 49.840 policy_checking 1

woodcock
Esteemed Legend

Try this then:

index=vpn AND (policy="policy_checking" OR policy="policy_checking_completed")
| reverse
| streamstats count(eval(policy="policy_checking")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
0 Karma

cyber_castle
Path Finder

Thanks woodcock. But it will skew the last entries. I have last 3 events as policy_checking and its not getting picked up. Also, I feel duration is also not correct. Why the duration is 1196 for the time 15:15:36

15:01:18.965  722   policy_checking 
15:02:03.260        policy_checking 
15:02:45.723        policy_checking 
15:13:21.932        policy_checking 
15:15:36.827 1196   policy_checking_completed   
15:35:33.598        policy_checking 
15:36:08.765 708    policy_checking_completed
0 Karma

woodcock
Esteemed Legend

I just tested and I stand by my last answer. Here is a run-anywhere proof based on the data that you provided in previous comment:

| makeresults 
| eval raw="time=15:01:18.965 foo=722 policy=policy_checking    
time=15:02:03.260 policy=policy_checking    
time=15:02:45.723 policy=policy_checking    
time=15:13:21.932 policy=policy_checking    
time=15:15:36.827 foo=1196 policy=policy_checking_completed    
time=15:35:33.598 policy=policy_checking    
time=15:36:08.765 foo=708 policy=policy_checking_completed" 
| makemv delim="
" raw
| mvexpand raw
| rename raw AS _raw
| kv
| eval _time = strptime(time, "%H:%M:%S.%3N"), Name = "Name"
| sort 0 - _time 
| table _time Name foo policy

| rename COMMENT AS "Everything above generates sample event data; everything below is your solution"

| reverse
| streamstats count(eval(policy="policy_checking")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
0 Karma

woodcock
Esteemed Legend

I am sure that I can meet your edge cases if you will supply appropriate sample events. Otherwise we will be doing this dance forever.

0 Karma

to4kawa
Ultra Champion
index=vpn policy="policy_checking" OR policy="policy_checking_completed"
| reverse
| streamstats count(eval(policy="policy_checking_completed")) as count by Name
| eventstats range(_time) as duration by count Name
| where duration > 300

Hi, It might be a little different, but what about this query?
Displays events that have taken more than 5 minutes to complete.

0 Karma

cyber_castle
Path Finder

Thanks a lot for your feedback. I appreciate your efforts. Pls excuse, if I was not fully clear on my requirement.

I have seen few challenges. Pls refer the following table

  1. After _time:15:02:45:723 it is not failing because as per the requirement we need to get alert the 5minutes failure.
  2. If we look for _time 15:13:21.932 & 15:15:36.827 duration is 1196.771 but in reference to 15:13:21.932, policy is complied and the user is allowed to connect to VPN.
  3. If we look for 15:47:57.127 it was policy_checking and 15:48:28.549 it is complied so the count should be zero.

Date _time duration count policy
1.2019-10-23 15:01:18.965 722.967 0 policy_checking
2. 2019-11-22 15:02:03.260 722.967 0 policy_checking
3. 2019-11-22 15:02:45.723 722.967 0 policy_checking
4. 2019-11-22 15:13:21.932 722.967 0 policy_checking
5. 2019-11-22 15:15:36.827 1196.771 1 policy_checking_completed
6. 2019-11-22 15:35:33.598 1196.771 1 policy_checking
7. 2019-11-22 15:36:08.765 708.362 2 policy_checking_completed
8. 2019-11-22 15:47:57.127 708.362 2 policy_checking
9. 2019-11-22 15:48:28.549 0.000 3 policy_checking_completed
10. 2019-11-22 15:48:31.468 204.786 4 policy_checking_completed
11. 2019-11-22 15:51:06.414 204.786 4 policy_checking
12. 2019-11-22 15:51:39.921 204.786 4 policy_checking
13. 2019-11-22 15:51:56.254 204.786 4 policy_checking

The issue with me is that, we need to start the search when it sees the first policy_checking and wait for 5 minutes to see policy_checking_completed. If not send me an alert. Then the counter need to reset to Zero. Whatever comes after it need to be considered as a fresh one.

Also, we have large volume of data so I am not sure whether reverse command is efficient or not?

0 Karma
Get Updates on the Splunk Community!

3 Ways to Make OpenTelemetry Even Better

My role as an Observability Specialist at Splunk provides me with the opportunity to work with customers of ...

What's New in Splunk Cloud Platform 9.2.2406?

Hi Splunky people! We are excited to share the newest updates in Splunk Cloud Platform 9.2.2406 with many ...

Enterprise Security Content Update (ESCU) | New Releases

In August, the Splunk Threat Research Team had 3 releases of new security content via the Enterprise Security ...