Splunk Search

How to create a search for moving average for email sent per user

jwalzerpitt
Influencer

Thx to DalJeanis I have the following search that establishes a baseline of email sent per user by subject then looks for anomalies (i.e., user sends 3.2 emails on average, but is now sending four times the threshold).

Right now, I'm limited to an average dictated by the time picker, but I;d like to have a moving average for the number of emails sent per user to smooth out any peaks/valleys and return more relevant events.

How do I create a search in which the user's average emails sent is for X amount of time (3 months, 6 months, all-time, etc.)?

index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com
| stats count by SenderAddress Subject
| search count > 1
| eventstats avg(count) as AvgOfCount, stdev(count) as StdevOfCount, max(count) as MaxOfCount by SenderAddress
| eval AvgOfCount = ceiling(AvgOfCount)
| eval StdevOfCount=ceiling(StdevOfCount)
| eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)
| eval KeepMe = if(count >= threshold, 1,0)
| search KeepMe=1

0 Karma
1 Solution

DalJeanis
SplunkTrust
SplunkTrust

First, the subsearch that's going to calculate across the time range.

We add a pipe and the search keyword, surround with brackets, change the eventstats to stats, set the time frame, feed only the SenderAddress and threshold back to the main search. That looks like this -

[
  | search earliest=-180d@d latest=-0h@h
    index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
  | stats count by SenderAddress Subject 
  | search count > 1
  | stats avg(count) as AvgOfCount, stdev(count) as StdevOfCount, max(count) as MaxOfCount by SenderAddress 
  | eval AvgOfCount = ceiling(AvgOfCount)
  | eval StdevOfCount = ceiling(StdevOfCount)
  | eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)
  | table SenderAddress threshold
]

You can smoke-test the code by leaving off the square brackets and the search command.

Next, we add your main search (which will presumably get its time range from the time picker) and the join, and afterwards we screen for the bad boys the same way we did originally.

 index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
  | stats count by SenderAddress Subject 
  | search count > 1

  | join type=left SenderAddress 
    [
    | search earliest=-180d@d latest=@h
      index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
    | stats count by SenderAddress Subject 
    | search count > 1
    | stats avg(count) as AvgOfCount, stdev(count) as StdevOfCount, max(count) as MaxOfCount by SenderAddress 
    | eval AvgOfCount = ceiling(AvgOfCount)
    | eval StdevOfCount = max(1,ceiling(StdevOfCount))
    | eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)
    | table SenderAddress threshold
    ]  

  | eval threshold=coalesce(threshold,10) 
  | eval KeepMe = if(count >= threshold, 1,0)
  | search KeepMe=1

Notice I've set a default threshold in case the person has literally never sent an email to more than one recipient before.


Update to use -180d@d rather than -6m@m (Note to self - %m in format is month. @m in time offset is NOT month. ), added eval keyword to a calculation.

View solution in original post

DalJeanis
SplunkTrust
SplunkTrust

First, the subsearch that's going to calculate across the time range.

We add a pipe and the search keyword, surround with brackets, change the eventstats to stats, set the time frame, feed only the SenderAddress and threshold back to the main search. That looks like this -

[
  | search earliest=-180d@d latest=-0h@h
    index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
  | stats count by SenderAddress Subject 
  | search count > 1
  | stats avg(count) as AvgOfCount, stdev(count) as StdevOfCount, max(count) as MaxOfCount by SenderAddress 
  | eval AvgOfCount = ceiling(AvgOfCount)
  | eval StdevOfCount = ceiling(StdevOfCount)
  | eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)
  | table SenderAddress threshold
]

You can smoke-test the code by leaving off the square brackets and the search command.

Next, we add your main search (which will presumably get its time range from the time picker) and the join, and afterwards we screen for the bad boys the same way we did originally.

 index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
  | stats count by SenderAddress Subject 
  | search count > 1

  | join type=left SenderAddress 
    [
    | search earliest=-180d@d latest=@h
      index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
    | stats count by SenderAddress Subject 
    | search count > 1
    | stats avg(count) as AvgOfCount, stdev(count) as StdevOfCount, max(count) as MaxOfCount by SenderAddress 
    | eval AvgOfCount = ceiling(AvgOfCount)
    | eval StdevOfCount = max(1,ceiling(StdevOfCount))
    | eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)
    | table SenderAddress threshold
    ]  

  | eval threshold=coalesce(threshold,10) 
  | eval KeepMe = if(count >= threshold, 1,0)
  | search KeepMe=1

Notice I've set a default threshold in case the person has literally never sent an email to more than one recipient before.


Update to use -180d@d rather than -6m@m (Note to self - %m in format is month. @m in time offset is NOT month. ), added eval keyword to a calculation.

jwalzerpitt
Influencer

Thx

When I run the search I get an error message:

'Error in 'search' command: Unable to parse the search: Invalid time bounds in search: start=1485283860 > end=1485280800.'

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

my bad. see updated code. Use earliest=-180d@d for six months back.

jwalzerpitt
Influencer

Ran the search and got 16 events (lowered time frame to 30 days for quicker search, but will expand time frame in other searches I plan to run).

Here's info for the first few users returned:

SenderAddress Subject Count KeepMe threshold
user@xyz.com Blah Blah Blah 493 1 398
user@xyz.com Blah Blah Blah 267 1 194
user@xyz.com Blah Blah Blah 218 1 155

In my original search I helped cut down on users by using an inputlookup command which has a whitelist of users (bounce lists, smtp mailers, and so on). Is it possible to add a second list for filtering? I have a list that has keywords a subject might contain to indicate spam.

Also, I have a separate search that looks for the count in special characters in the subject as follows:

index=exchange 
| eval SubjectLen=length(Subject)  
| eval SubjectCopy=Subject 
| rex field=SubjectCopy mode=sed "s/[!@#$%^&*()_+]//g"  
| eval specialCharCount = SubjectLen - length(SubjectCopy)  
| fields - SubjectCopy
| stats count by specialCharCount | sort -count

Would it be possible to inject this search as well, with a weight on subjects that have a higher count of special characters?

Thx again

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Sure, that test would happen after the join but before the threshold test... except is the index= exchange or email?

    ]  

| eval threshold=coalesce(threshold,10) 
| eval SubjectCopy=Subject 
| rex field=SubjectCopy mode=sed "s/[!@#$%^&*()_+]//g"  
| eval specialCharCount = len(Subject)  - len(SubjectCopy)  
| eval KeepMe = 
     case(count >= threshold, 1,
              specialCharCount>4, 1,
               true(),0)
| search KeepMe=1

You can weight the test however you want.


edited to add comma between 1 and true().

DalJeanis
SplunkTrust
SplunkTrust

This version checks the person's email history, including single emails, for what the typical usage of special characters is for that sender. It passes out a value called "specialpass" that is the highest of (a) their 75th percentile use overall, or their 75th percentile use in multi-recipient emails. It defaults to 2 when empty, but you can modify that code however you want.

index=exchange NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
   | stats count by SenderAddress Subject 
   | search count > 1

   | join type=left SenderAddress 
     [
     | search earliest=-180d@d latest=@h
       index=email NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
     | stats count by SenderAddress Subject 
     | eval SubjectCopy=Subject 
     | rex field=SubjectCopy mode=sed "s/[!@#$%^&*()_+]//g"  
     | eval specialCharCount = len(Subject)  - len(SubjectCopy)  
     | eval multicount=if(count > 1,1,0)
     | eventstats avg(specialCharCount) as GrandAvgSpecial,
                  upperperc75(specialCharCount) as GrandPct75Special 
                  by SenderAddress
     | stats avg(count) as AvgOfCount, 
             stdev(count) as StdevOfCount, 
             max(count) as MaxOfCount, 
             avg(specialCharCount) as AvgSpecial, 
             upperperc75(specialCharCount) as Pct75Special
             by SenderAddress multicount
     | eval AvgOfCount = ceiling(AvgOfCount)
     | eval StdevOfCount = max(1,ceiling(StdevOfCount))
     | eval GrandAvgSpecial = max(1,ceiling(GrandAvgSpecial))
     | eval GrandPct75Special = max(1,ceiling(GrandPct75Special))
     | eval AvgSpecial = max(1,ceiling(AvgSpecial))
     | eval Pct75Special = max(1,ceiling(Pct75Special))
     | search multicount=1  
     | eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)
     | eval SpecialPass = coalesce(max(Pct75Special,GrandPct75Special), max(AvgSpecial,GrandAvgSpecial))
     | table SenderAddress threshold specialpass
     ]  

 | eval threshold=coalesce(threshold,10) 
 | eval specialpass=coalesce(specialpass,2)
 | eval SubjectCopy=Subject 
 | rex field=SubjectCopy mode=sed "s/[!@#$%^&*()_+]//g"  
 | eval specialCharCount = len(Subject)  - len(SubjectCopy)  
 | eval KeepMe = 
      case(count >= threshold, 1,
               specialCharCount > specialpass, 1,
                true(),0)
 | search KeepMe=1

edited to correct length() to len(), fix missing comma, add eval.

jwalzerpitt
Influencer

Also, I changed:
| threshold = max(AvgOfCount + 4 * StdevOfCount,10)

to:
| eval threshold = max(AvgOfCount + 4 * StdevOfCount,10)

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

got it. fixed comment.

0 Karma

jwalzerpitt
Influencer

Thx - still getting, "Error in 'eval' command: The expression is malformed. Expected )."

Combing through the eval commands to see where a ) is missing...

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

It needs a comma between 1 and true()

jwalzerpitt
Influencer

if I wanted to put this search through the ML toolkit, should I feed it a less specific search like below to see what ML will tag as a potential anomaly?

index=exchange NOT Status=Quarantined NOT Status=Failed SenderAddress=*.xyz.com 
    | stats count by SenderAddress Subject 
    | search count > 1

Thx

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Sure, couldn't hurt. I'd also calculate the specialCharCount and see what it says about that.

0 Karma

jwalzerpitt
Influencer

Was wondering if you could take a look at a new post I have concerning email analytics - posted under, "Need help finding average email sent per user on a daily basis"

Thx

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

responded to it.

0 Karma

jwalzerpitt
Influencer

Got it / re-running search

Thx

0 Karma

jwalzerpitt
Influencer

Sorry about that / index=exchange

I tagged on the following:

| eval threshold=coalesce(threshold,10)
   | eval SubjectLen=length(Subject)
   | eval SubjectCopy=Subject 
   | rex field=SubjectCopy mode=sed "s/[!@#$%^&*()_+]//g"  
   | eval specialCharCount = SubjectLen - length(SubjectCopy)  
   | eval KeepMe = 
      case(count >= threshold, 1,
               specialCharCount>4, 1
                true(),0)
 | search KeepMe=1

but I get an error: "Error in 'eval' command: The expression is malformed. Expected )."

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Try running the case statement all together on the line with KeepMe=.

I dont see any obvious issues with it.

Ah, in splunk the second and fourth evals would be len(Subject) and len(SubjectCopy).

jwalzerpitt
Influencer

Got it - thx / running search now - will let you know results shortly

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...