Splunk Search

Streamstat reset_after resets for all users

ecanmaster
Explorer

I found this search from woodcock user and it basically searches for successful logins after several failed attempts:

 index=* sourcetype=linux_secure tag=authentication action="failure" OR action="success"
 | reverse
 | streamstats sum(eval(match(action,"failure"))) AS action_count reset_after="("match(action,\"success\")")" BY user
 | reverse 
 | where match(action,"success") AND action_count>=3

(in this case the query searches for 3 failed logins followed by one successful login)
The action_ count counts all the failed attempts, and this works quite good.
For example if root has failed logins after 5 attempts it counts this as 5 and when one successful attempts occurs , it just resets the count and starts with one with the next failed login.

However this only works if you search for a particular user (in this case " user=root"),
but if you run the query it will still count the failed logins per user, but after a reset of one user, it also resets the count for all users,
so would it be possible to reset the count per user base instead of all users?

0 Karma

DalJeanis
Legend

The trivial answer is | sort 0 _time user instead of reverse.

  index=* sourcetype=linux_secure tag=authentication action="failure" OR action="success"
  | sort 0 _time user
  | streamstats sum(eval(match(action,"failure"))) AS action_count reset_after="("match(action,\"success\")")" BY user
  | where match(action,"success") AND action_count>=3

The later reverse doesn't really seem to be necessary anyway...

I'm still looking for the elegant answer...


As a general case I'd usually do something like this...

  | rename COMMENT as "The above just creates test data"
  | sort 0 _time user

  | rename COMMENT as "break the events into groups based on change of action"
  | streamstats current=f last(action) as prioraction by user
  | eval newgroup=case(isnull(prioraction),1, action!=prioraction,1)
  | streamstats sum(newgroup) as groupno by user

  | rename COMMENT as "identify what I'm counting for each group and count it up"
  | eval countable=if(action="failure",1,0)
  | eventstats sum(countable) as groupcount by user groupno

  | rename COMMENT as "in this case, we want the value from the prior failure group only on the first record of the new success group"
  | streamstats current=f last(groupcount) as priorcount by user
  | where match(action,"success") AND priorcount>=3

Run anywhere test code...

  | makeresults 
  | eval mydata="user1,success user1,success user1,failure user1,failure user2,failure user2,success user1,failure user1,failure user1,success user3,failure user3,failure user3,failure user3,success"
  | makemv mydata 
  | mvexpand mydata
  | makemv delim="," mydata 
  | eval user=mvindex(mydata,0)
  | eval action=mvindex(mydata,1)
  | streamstats count as recno
  | eval time=relative_time(now(),"@d")+recno
  | fields - mydata

  | rename COMMENT as "The above just creates test data"
  | sort 0 _time user
  | streamstats sum(eval(match(action,"failure"))) AS action_count reset_after="("match(action,\"success\")")" BY user
  | where match(action,"success") AND action_count>=3

somesoni2
Revered Legend

Give this a try

index=* sourcetype=linux_secure tag=authentication action="failure" OR action="success"
  | sort 0 user _time
  | streamstats sum(eval(match(action,"failure"))) AS action_count reset_after="("match(action,\"success\")")" BY user
  | sort 0 -_time
  | where match(action,"success") AND action_count>=3
Get Updates on the Splunk Community!

Introducing the 2024 SplunkTrust!

Hello, Splunk Community! We are beyond thrilled to announce our newest group of SplunkTrust members!  The ...

Introducing the 2024 Splunk MVPs!

We are excited to announce the 2024 cohort of the Splunk MVP program. Splunk MVPs are passionate members of ...

Splunk Custom Visualizations App End of Life

The Splunk Custom Visualizations apps End of Life for SimpleXML will reach end of support on Dec 21, 2024, ...