Splunk Search

How to create an alert for multiple alerts (fires when too many of specific alerts are activated)

spluzer
Communicator

Newbie here. I'm trying to set an alert that runs every 5 minutes and looks back over the past hour. It would trigger when we have "too many" of certain important ("heartbeat alert" for example) alerts firing. The tricky part is getting the "baseline" over the past 30 days or so to figure out what an appropriate "trigger number" might be. I'm filtering out certain alerts using a lookup

Right now I'm thinking of using a z_score, but subsearches are limited to 10k results which I'm assuming would skew the data too much. Does anybody have any cool ideas?
Here is my current search outline, indicating the path I'm trying to go down. I'm open to any suggestions if you think the "z_score route" isn't going to get me there...

My search for fired_alert NOT [inputlookup nonessential_alerts] earliest 30d@d

|  stats count t by fired_alert_name
| evetstats mean(count) as mean_count stdev(count) as stdev_count
| eval z_score (round (((count-mean/stedev) 
 | eval bad_times=case z_score >,<,= blah blah)

Then,

| Appendcols 

"Same query for last day or so to get another z_score to compare to the "baseline"

Then,

 | eval bad_times=case z_score >,<,= blah blah)

And the alert will fire letting us know that too many "important alerts" are going off, thus something is seriously wrong

Ultimately, I'm looking for a clever way to get a smart and accurate count of alerts in the past to get the baseline, then set up a search to alert when something is wrong because too many important alerts are going off. Sorta predictive analytics question I guess. As of right now, I can't download any "apps" that might do this... I have done it "old school" (for lack of better words)

Thanks all!

0 Karma
1 Solution

rbechtold
Communicator

Hold on to your hat, this is about to get a bit ridiculous. Long needlessly complicated searches are my specialty!

I have an alert I spent a good amount of time setting up to alert me on "atypical" data rates for all of my indexes daily. I've modified it fairly extensively to give you a search I'm really hoping will accomplish what you're trying to do.

To start out, let's make sure the data we're working with looks the same.

Copy and paste this into any Splunk instance, this is a fake dataset of what I'm assuming your data looks like. If your data is not in the same format, the rest of the search will likely not function correctly:

| gentimes start=6/23/2019 end=7/24/2019 increment=1m 
| rename starttime AS _time
| eval id = random()%7
| eval fired_alert_name= if(id = "0", "Unique Alert A",  if(id = "1", "Unique Alert B",   if(id = "2", "Unique Alert C",  if(id = "3", "Unique Alert D",  if(id = "4", "Unique Alert E",  if(id = "5", "Unique Alert F", "Unique Alert G"))))))
| fields  _time fired_alert_name
| search NOT [|inputlookup nonessential_alerts.csv]

Note-
This is what I'm using for my non-essential alerts lookup:

 | makeresults count=2 
 | streamstats count AS id
 | eval fired_alert_name= if(id= "1", "Unique Alert C", "Unique Alert E") 
 | fields - _time id
 | outputlookup nonessential_alerts.csv

Basically, this lookup just filters any instances of "Unique Alert C" and "Unique Alert E" from my main search.

If this all looks good, we can progress to the real search.

...BASE SEARCH...
| rename fired_alert_name AS title 
    `comment("We'll need to rename whatever fields your using for your alert names as "title" so it will function with the rest of the search")` 
| bin _time span=60m aligntime=@m
    `comment("Puts our time in 60 minute "buckets" from the time the search is ran")` 
| rex mode=sed field=title "s/\W/_/g" 
    `comment("We'll be using the foreach command extensively during this search. Since we know this ahead of time, we'll need to prepare the fields so that this is possible. The foreach command is only to work on fields that contain alphanumeric characters, as well as underscores. Each alert will become a new field after we use the chart command in the next pipe. Since some of the alert names may have spaces or special characters, well need to replace those characters with underscores so we can perform aggregate functions on the fields later on.")` 
| chart count OVER _time by title useother=f limit=0
    `comment("Finds the count of alerts fired in each bucket")` 
| timewrap 1day 
    `comment("We're wrapping time to compare our current bucket count with bucket counts in the same timeframe for each day over the last 30 days")` 
| fillnull value="0" 
    `comment("Since we may not have any alerts fired for certain times, we want to replace null values with "0" so that we can perform aggregate functions on the data")` 
| fields - *_30days_before 
    `comment("We remove the last day from our search because it will end up being a partial day, and as a result will be missing data for some of the buckets.This will cause the average to be skewed for buckets in those time ranges, so we'll just remove this day altogether.")` 
| foreach *_29days_before 
    [| eval <<MATCHSTR>>_avg=<<FIELD>> ] 
| foreach *_28days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_27days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_26days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_25days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_24day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_23day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_22days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_21days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_20days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_19days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_18day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_17day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_16days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_15days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_14days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_13days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_12day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_11day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_10days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_9days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_8days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_7days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_6day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_5days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_4days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_3days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_2days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_1day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_avg 
    [| eval <<FIELD>>=<<FIELD>>/29] 
    `comment("Calculating the average number of alerts fired for each bucket and alert for the last 30 days")` 
| foreach *_29days_before 
    [| eval <<MATCHSTR>>_stddev=abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_28days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_27days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_26days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_25days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_24day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_23day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_22days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_21days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_20days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_19days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_18day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_17day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_16days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_15days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_14days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_13days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_12day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_11day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_10days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_9days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_8days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_7days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_6day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_5days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_4days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_3days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_2days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_1day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_stddev 
    [| eval <<FIELD>>=<<FIELD>>/29] 
    `comment("Calculating the average standard deviation for each bucket and each alert over the past 30 days")` 
| fields *_latest_day *_avg *_stddev 
| eval hidev=2, minimum=2
    `comment("IMPORTANT: Play around with these numbers to set your alerting criteria. These numbers will be used as tokens in the subsequent foreach command. They will also be how we determine whether or not the alert should go off. 

If you find your alert is going off too frequently, you can raise this number set the bar higher to define what is an "atypical" amount of alerts going off. This works by multiplying the standard deviation by a certain amount. (1 = 100% of the standard deviation, 2 = 200% of the standard deviation, etc)
The minimum field will be used to prevent fields with infrequent alerting patterns to cause an alert. For example, if you have an alert that goes off once every few days, the average fire rate for that alert is probably less than 1. This would mean any time we saw that alert fire, it would most likely trigger this alert to go off. To get around that, we set the minimum to "2" so that there must be at least 2 alert firings to happen to trigger this alert.")` 
| head 1
    `comment("We only care about the last hour, so we'll remove all other buckets from our search")` 
| foreach *_latest_day 
    [| eval <<MATCHSTR>>_upperBound = <<MATCHSTR>>_avg + <<MATCHSTR>>_stddev * hidev
        `comment("This calculates our alerting threshold but adding the average plus our standard deviation muliplied by our custom multiplier")` 
    | eval <<MATCHSTR>>_concern = if(<<MATCHSTR>>_latest_day > <<MATCHSTR>>_upperBound AND <<MATCHSTR>>_latest_day >= minimum, <<MATCHSTR>>_latest_day, null)
        `comment("We'll then compare our current alert count to our threshold and our minimum. If it meets both the criteria of being over our threshold AND over our minimum, it will return a value.")` 
    | eval <<MATCHSTR>>_concern = if(isnotnull(<<MATCHSTR>>_concern), "Alert Count: ".<<MATCHSTR>>_latest_day.",Average Alert Count: ".round(<<MATCHSTR>>_avg,1), null)]
    `comment("This just makes it pretty.")` 
| streamstats count as temp_id 
| stats values(*) by temp_id 
| rename values(*) as * 
| fields - temp_id 
    `comment("These four pipes go through and look for any null fields and remove them from the search")` 
| table *_concern 
| foreach *_concern 
    [| eval <<MATCHSTR>>_concern = split(<<MATCHSTR>>_concern, ",")] 
    `comment("Also to make it look pretty")` 
| rename *_concern AS *
    `comment("Renaming alerts to their original names")` 
| table *
    `comment("Since we're not sure which fields will be returned, we'll return them all!")`

I've added comments along the way to try to help identify what each piece is doing and why each piece is important. You can break it down if you'd like to understand it better.

You can also play with the fake data if you just add the main search onto my fake dataset.

The most important part that you'll need to change is the hidev and the minimum. Go ahead remove the |head 1 from the search to give you a better idea of what has been considered "atypical" over the last 24 hours. Tweak the hidev to make concern values appear more or less frequently.

I'm open to assist with any problems you run into as my schedule allows. Good luck!

View solution in original post

rbechtold
Communicator

Hold on to your hat, this is about to get a bit ridiculous. Long needlessly complicated searches are my specialty!

I have an alert I spent a good amount of time setting up to alert me on "atypical" data rates for all of my indexes daily. I've modified it fairly extensively to give you a search I'm really hoping will accomplish what you're trying to do.

To start out, let's make sure the data we're working with looks the same.

Copy and paste this into any Splunk instance, this is a fake dataset of what I'm assuming your data looks like. If your data is not in the same format, the rest of the search will likely not function correctly:

| gentimes start=6/23/2019 end=7/24/2019 increment=1m 
| rename starttime AS _time
| eval id = random()%7
| eval fired_alert_name= if(id = "0", "Unique Alert A",  if(id = "1", "Unique Alert B",   if(id = "2", "Unique Alert C",  if(id = "3", "Unique Alert D",  if(id = "4", "Unique Alert E",  if(id = "5", "Unique Alert F", "Unique Alert G"))))))
| fields  _time fired_alert_name
| search NOT [|inputlookup nonessential_alerts.csv]

Note-
This is what I'm using for my non-essential alerts lookup:

 | makeresults count=2 
 | streamstats count AS id
 | eval fired_alert_name= if(id= "1", "Unique Alert C", "Unique Alert E") 
 | fields - _time id
 | outputlookup nonessential_alerts.csv

Basically, this lookup just filters any instances of "Unique Alert C" and "Unique Alert E" from my main search.

If this all looks good, we can progress to the real search.

...BASE SEARCH...
| rename fired_alert_name AS title 
    `comment("We'll need to rename whatever fields your using for your alert names as "title" so it will function with the rest of the search")` 
| bin _time span=60m aligntime=@m
    `comment("Puts our time in 60 minute "buckets" from the time the search is ran")` 
| rex mode=sed field=title "s/\W/_/g" 
    `comment("We'll be using the foreach command extensively during this search. Since we know this ahead of time, we'll need to prepare the fields so that this is possible. The foreach command is only to work on fields that contain alphanumeric characters, as well as underscores. Each alert will become a new field after we use the chart command in the next pipe. Since some of the alert names may have spaces or special characters, well need to replace those characters with underscores so we can perform aggregate functions on the fields later on.")` 
| chart count OVER _time by title useother=f limit=0
    `comment("Finds the count of alerts fired in each bucket")` 
| timewrap 1day 
    `comment("We're wrapping time to compare our current bucket count with bucket counts in the same timeframe for each day over the last 30 days")` 
| fillnull value="0" 
    `comment("Since we may not have any alerts fired for certain times, we want to replace null values with "0" so that we can perform aggregate functions on the data")` 
| fields - *_30days_before 
    `comment("We remove the last day from our search because it will end up being a partial day, and as a result will be missing data for some of the buckets.This will cause the average to be skewed for buckets in those time ranges, so we'll just remove this day altogether.")` 
| foreach *_29days_before 
    [| eval <<MATCHSTR>>_avg=<<FIELD>> ] 
| foreach *_28days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_27days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_26days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_25days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_24day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_23day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_22days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_21days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_20days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_19days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_18day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_17day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_16days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_15days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_14days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_13days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_12day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_11day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_10days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_9days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_8days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_7days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_6day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_5days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_4days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_3days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_2days_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_1day_before 
    [| eval <<MATCHSTR>>_avg=<<MATCHSTR>>_avg+<<FIELD>> ] 
| foreach *_avg 
    [| eval <<FIELD>>=<<FIELD>>/29] 
    `comment("Calculating the average number of alerts fired for each bucket and alert for the last 30 days")` 
| foreach *_29days_before 
    [| eval <<MATCHSTR>>_stddev=abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_28days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_27days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_26days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_25days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_24day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_23day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_22days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_21days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_20days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_19days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_18day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_17day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_16days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_15days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_14days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_13days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_12day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_11day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_10days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_9days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_8days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_7days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_6day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_5days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_4days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_3days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_2days_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_1day_before 
    [| eval <<MATCHSTR>>_stddev=<<MATCHSTR>>_stddev+abs(<<MATCHSTR>>_avg-<<FIELD>>) ] 
| foreach *_stddev 
    [| eval <<FIELD>>=<<FIELD>>/29] 
    `comment("Calculating the average standard deviation for each bucket and each alert over the past 30 days")` 
| fields *_latest_day *_avg *_stddev 
| eval hidev=2, minimum=2
    `comment("IMPORTANT: Play around with these numbers to set your alerting criteria. These numbers will be used as tokens in the subsequent foreach command. They will also be how we determine whether or not the alert should go off. 

If you find your alert is going off too frequently, you can raise this number set the bar higher to define what is an "atypical" amount of alerts going off. This works by multiplying the standard deviation by a certain amount. (1 = 100% of the standard deviation, 2 = 200% of the standard deviation, etc)
The minimum field will be used to prevent fields with infrequent alerting patterns to cause an alert. For example, if you have an alert that goes off once every few days, the average fire rate for that alert is probably less than 1. This would mean any time we saw that alert fire, it would most likely trigger this alert to go off. To get around that, we set the minimum to "2" so that there must be at least 2 alert firings to happen to trigger this alert.")` 
| head 1
    `comment("We only care about the last hour, so we'll remove all other buckets from our search")` 
| foreach *_latest_day 
    [| eval <<MATCHSTR>>_upperBound = <<MATCHSTR>>_avg + <<MATCHSTR>>_stddev * hidev
        `comment("This calculates our alerting threshold but adding the average plus our standard deviation muliplied by our custom multiplier")` 
    | eval <<MATCHSTR>>_concern = if(<<MATCHSTR>>_latest_day > <<MATCHSTR>>_upperBound AND <<MATCHSTR>>_latest_day >= minimum, <<MATCHSTR>>_latest_day, null)
        `comment("We'll then compare our current alert count to our threshold and our minimum. If it meets both the criteria of being over our threshold AND over our minimum, it will return a value.")` 
    | eval <<MATCHSTR>>_concern = if(isnotnull(<<MATCHSTR>>_concern), "Alert Count: ".<<MATCHSTR>>_latest_day.",Average Alert Count: ".round(<<MATCHSTR>>_avg,1), null)]
    `comment("This just makes it pretty.")` 
| streamstats count as temp_id 
| stats values(*) by temp_id 
| rename values(*) as * 
| fields - temp_id 
    `comment("These four pipes go through and look for any null fields and remove them from the search")` 
| table *_concern 
| foreach *_concern 
    [| eval <<MATCHSTR>>_concern = split(<<MATCHSTR>>_concern, ",")] 
    `comment("Also to make it look pretty")` 
| rename *_concern AS *
    `comment("Renaming alerts to their original names")` 
| table *
    `comment("Since we're not sure which fields will be returned, we'll return them all!")`

I've added comments along the way to try to help identify what each piece is doing and why each piece is important. You can break it down if you'd like to understand it better.

You can also play with the fake data if you just add the main search onto my fake dataset.

The most important part that you'll need to change is the hidev and the minimum. Go ahead remove the |head 1 from the search to give you a better idea of what has been considered "atypical" over the last 24 hours. Tweak the hidev to make concern values appear more or less frequently.

I'm open to assist with any problems you run into as my schedule allows. Good luck!

ramgnisiv
Path Finder

Amazing explanation, awarded with 25 karma points

0 Karma

spluzer
Communicator

YOWZA! Thanks! I'm going to have to take some time to unpack this. Thanks again!

spluzer
Communicator

Sorry, got a little side tracked on this task. This is amazing. Accepted accordingly, cheers!

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...