Splunk Search

How to find a follow-up event after certain interval

cyber_castle
Path Finder

Say, when a user connects his VPN, it will do policy checking (event--> policy_checking) and within 5 minutes will be completed (event--> policy_checking_completed) and allows the user to connect through the VPN, till then it becomes complied policy_checking events will be generated.

If the policy_checking_completed event is not generated in 5 minutes we need to get an alert. If the VPN goes down because of the poor connection or anything else; then it will go through the policy check again.

My search is like this:

index=vpn [search index=vpn policy="policy_checking" | eval earliest=_time | eval latest=_time+300 | fields earliest,latest, Name] | eval status = if(policy="policy_checking_completed",1,0) | eventstats sum(status) as statscount by Name  |  table _time Name policy statscount | search statscount=0

We have used bin span=5m but it checks the values between 09:00 and 09:05 (NOT 09:03 and 09:08).

This search works fine for if the system generates has one set of events, if the user has multiple disconnection and re-connection say in 1 hr, the search will go for a toss because of statscount.

Is there any other workaround?

woodcock
Esteemed Legend

You need streamstats; try this:

index=vpn AND (policy="policy_checking" OR policy="policy_checking_completed")
| streamstats count(eval(policy="policy_checking_completed")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)

cyber_castle
Path Finder

Thanks a lot for your feedback.

Nearly perfect.. the only issue which i have found was that.. (pls refer the table which i have posted yesterday) for the first set it starts at 15:01:18 and ends at 15:02:45.72 (3 policy_checking events) it didnt get captured as a separate one; instead it went till 15:15:36.827 (5 events: 4 policy_checking event, 1 policy_checking_completed).

sessionID Name _ time duration policy policy_count
4 Laptop 2019-11-23 15:15:36.826 857.862 policy_checking policy_checking_completed 2
3 Laptop 2019-11-23 15:36:08.765 35.167 policy_checking policy_checking_completed 2
2 Laptop 2019-11-23 15:48:28.549 31.422 policy_checking policy_checking_completed 2
1 Laptop 2019-11-23 15:48:31.467 0.000 policy_checking_completed 1
0 Laptop 2019-11-23 15:51:56.253 49.840 policy_checking 1

woodcock
Esteemed Legend

Try this then:

index=vpn AND (policy="policy_checking" OR policy="policy_checking_completed")
| reverse
| streamstats count(eval(policy="policy_checking")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
0 Karma

cyber_castle
Path Finder

Thanks woodcock. But it will skew the last entries. I have last 3 events as policy_checking and its not getting picked up. Also, I feel duration is also not correct. Why the duration is 1196 for the time 15:15:36

15:01:18.965  722   policy_checking 
15:02:03.260        policy_checking 
15:02:45.723        policy_checking 
15:13:21.932        policy_checking 
15:15:36.827 1196   policy_checking_completed   
15:35:33.598        policy_checking 
15:36:08.765 708    policy_checking_completed
0 Karma

woodcock
Esteemed Legend

I just tested and I stand by my last answer. Here is a run-anywhere proof based on the data that you provided in previous comment:

| makeresults 
| eval raw="time=15:01:18.965 foo=722 policy=policy_checking    
time=15:02:03.260 policy=policy_checking    
time=15:02:45.723 policy=policy_checking    
time=15:13:21.932 policy=policy_checking    
time=15:15:36.827 foo=1196 policy=policy_checking_completed    
time=15:35:33.598 policy=policy_checking    
time=15:36:08.765 foo=708 policy=policy_checking_completed" 
| makemv delim="
" raw
| mvexpand raw
| rename raw AS _raw
| kv
| eval _time = strptime(time, "%H:%M:%S.%3N"), Name = "Name"
| sort 0 - _time 
| table _time Name foo policy

| rename COMMENT AS "Everything above generates sample event data; everything below is your solution"

| reverse
| streamstats count(eval(policy="policy_checking")) AS sessionID BY Name
| stats min(_time) AS _time range(_time) AS duration values(policy) AS policy dc(policy) as policy_count count BY sessionID Name
| where policy="policy_checking" AND policy_count==1
| eval waiting = now() - _time
| where waiting >= (5 * 60)
0 Karma

woodcock
Esteemed Legend

I am sure that I can meet your edge cases if you will supply appropriate sample events. Otherwise we will be doing this dance forever.

0 Karma

to4kawa
Ultra Champion
index=vpn policy="policy_checking" OR policy="policy_checking_completed"
| reverse
| streamstats count(eval(policy="policy_checking_completed")) as count by Name
| eventstats range(_time) as duration by count Name
| where duration > 300

Hi, It might be a little different, but what about this query?
Displays events that have taken more than 5 minutes to complete.

0 Karma

cyber_castle
Path Finder

Thanks a lot for your feedback. I appreciate your efforts. Pls excuse, if I was not fully clear on my requirement.

I have seen few challenges. Pls refer the following table

  1. After _time:15:02:45:723 it is not failing because as per the requirement we need to get alert the 5minutes failure.
  2. If we look for _time 15:13:21.932 & 15:15:36.827 duration is 1196.771 but in reference to 15:13:21.932, policy is complied and the user is allowed to connect to VPN.
  3. If we look for 15:47:57.127 it was policy_checking and 15:48:28.549 it is complied so the count should be zero.

Date _time duration count policy
1.2019-10-23 15:01:18.965 722.967 0 policy_checking
2. 2019-11-22 15:02:03.260 722.967 0 policy_checking
3. 2019-11-22 15:02:45.723 722.967 0 policy_checking
4. 2019-11-22 15:13:21.932 722.967 0 policy_checking
5. 2019-11-22 15:15:36.827 1196.771 1 policy_checking_completed
6. 2019-11-22 15:35:33.598 1196.771 1 policy_checking
7. 2019-11-22 15:36:08.765 708.362 2 policy_checking_completed
8. 2019-11-22 15:47:57.127 708.362 2 policy_checking
9. 2019-11-22 15:48:28.549 0.000 3 policy_checking_completed
10. 2019-11-22 15:48:31.468 204.786 4 policy_checking_completed
11. 2019-11-22 15:51:06.414 204.786 4 policy_checking
12. 2019-11-22 15:51:39.921 204.786 4 policy_checking
13. 2019-11-22 15:51:56.254 204.786 4 policy_checking

The issue with me is that, we need to start the search when it sees the first policy_checking and wait for 5 minutes to see policy_checking_completed. If not send me an alert. Then the counter need to reset to Zero. Whatever comes after it need to be considered as a fresh one.

Also, we have large volume of data so I am not sure whether reverse command is efficient or not?

0 Karma
Get Updates on the Splunk Community!

Splunk Security Content for Threat Detection & Response, Q1 Roundup

Join Principal Threat Researcher, Michael Haag, as he walks through:An introduction to the Splunk Threat ...

Splunk Life | Happy Pride Month!

Happy Pride Month, Splunk Community! 🌈 In the United States, as well as many countries around the ...

SplunkTrust | Where Are They Now - Michael Uschmann

The Background Five years ago, Splunk published several videos showcasing members of the SplunkTrust to share ...