Alerting

Alert if field combination contains a new (never-before-seen) value

Explorer

I have a data set that's essentially access logs for an application that uses client/session tokens. I want to be alerted when any token is used from a new IP address.

I'm completely lost on how to do this...

There are two fields that matter to me: request.client_token, the client/session token and request.remote_address, the request's source IP. I also want to filter to only requests (not responses), which I can do with spath type | search type=request.

I've figured out that I can use something like this:

index="main" | spath type | search type=request| spath "request.client_token" | stats values("request.remote_address") as addresses by "request.client_token" | rename "request.client_token" as token

Which gives me a table-like output containing every client token (data from all time) and a list of IP addresses that each token has been used from (across all time).

Now I want to find (and alert on) any new IP addresses for a token (say, in the last 5 minutes).

The logical way I'd describe this is I want to be alerted every time any token is used from a new IP address (i.e. a remote_address which that token hasn't been used from before).

Any help would be greatly appreciated. I'm new to Splunk and using the free trial of Splunk Cloud to try and figure out if it can handle some of these requirements, or whether we need to develop a custom solution.

Thanks!

0 Karma
1 Solution

Explorer

So unfortunately, about three and a half hours after asking this (which was already about four hours into my search for a solution), I came up with something that works. I highly doubt this is as optimized as it could be - in fact it's probably awful - but it's the only way I could figure out how to do this...

Note that I determined that for the needs of my application, we don't actually have to look at all history, we only need to check over the past 31 days...

index="main" earliest=-31d latest=now | spath type | search type=request | spath "request.client_token" | search "request.client_token"!="" | stats values("request.remote_address") as all_addresses by "request.client_token" | eval all_count=mvcount(all_addresses) | join "request.client_token" type=outer [search index="main" earliest=-31d latest=-5m | spath type | search type=request | spath "request.client_token" | search "request.client_token"!="" | stats values("request.remote_address") as old_addresses by "request.client_token" | eval old_count=mvcount(old_addresses)] | fillnull value=0 all_count old_count | eval new_ip_count=(all_count - old_count) | where new_ip_count > 0 AND all_count > 1
  • both searches are based on search index="main" earliest=-31d latest=<<>> | spath type | search type=request| spath "request.client_token" | search "request.client_token"!="" | stats values("request.remote_address") as old_addresses by "request.client_token"; this finds events which:
    • are in the "main" search index
    • are within a specified time range: earliest=-31d latest=<<>>
    • have a type field: spath type
    • are of type request: search type=request
    • have a clienttoken that isn't empty/null: ``spath "request.clienttoken" | search "request.client_token"!=""``
    • collects statistics of remote addresses grouped by client token: stats values("request.remote_address") as old_addresses by "request.client_token"
  • the first search looks at events from 31 days ago to now, and puts the remote addresses in an all_addresses field; this represents ALL remote IPs seen for each token in the last 31 days (which should encompass the default max_ttl of 30 days, so it should get all data for all non-expired tokens).
  • we add a count of the distinct addresses to the results: all_count=mvcount(all_addresses)
  • we then perform an outer join on the client_token field, with another search run with the same parameters/filters but against records that are more than 5 minutes old
  • we end up with data that tells us, for each clienttoken seen in the last 31 days, what IPs it came from over all that time (``alladdresses), and what IPs it came from prior to the last 5 minutes (oldaddresses), and the count of each (allcountandold_count``, respectively)
  • The mvcount() function returns null for a field with no values, but we want to do math with this, so we convert the nulls to zero in those fields: fillnull value=0 all_count old_count
  • the total IP count minus the old IP count becomes the count of new IPs seen in the last 5 minutes: eval new_ip_count=(all_count - old_count)
  • we use where new_ip_count > 0 to limit the results to tokens which came from new IP addresses in the last 5 minutes
  • At this point, we'd get results for any tokens that are more than 5 minutes old and have been used by a new IP within the last 5 minutes, including tokens that have been created in the last 5 minutes and used from multiple IPs. But we also get any tokens that have been created in the last 5 minutes and have only been used once. We fix this by appending AND all_count > 1 to the end.

View solution in original post

0 Karma

SplunkTrust
SplunkTrust

This is actually a relatively common meme in searches. When you are thinking about state, think lookups. Here is a slight restatement of your problem definition - please let me know if it is accurate.

You want to know the earliest value of _time for a given combination of client_ip and token. This will enable you to tell if a particular combination is 'new' relative to 'now'.

I know your source data is slightly more complex, but let me approach it as simply as you have a series of events with fields _time, client_ip, and token. (Ignoring your spath and other complexity for the time being). Because Splunk stores time as a time_t, it's an integer and easily comparable. So let's start with:

sourcetype=mylog 
| stats min(_time) as earliest, max(_time) as latest by client_ip, token

This is relatively easy, and we can tack on some simple evaluation criteria onto the end to find 'really old' and/or 'really new' combinations, like:

<search_above> 
| eval status = case( now() - earliest < 300 , "really_new",  now() - latest > 86400 * 90, "really_old", 1=1, "normal") 
| search status = "really_new"

This works, but requires you to search over a fairly large time range. We can compress that some by caching results in a lookup. There's a 3-step process to setting this up and using it.

Step 1 - Generate the lookup the first time. Run this search over a long time period (like all time):

sourcetype=mylog 
| stats min(_time) as earliest, max(_time) as latest by client_ip, token
| outputlookup token_cache.csv

Step 2 - Set up something to run over a shorter interval to maintain the lookup. Run this like every 5 minutes over the 5 minutes before.

sourcetype=mylog 
| stats min(_time) as earliest, max(_time) as latest by client_ip, token
| inputlookup append=true token_cache.csv
| stats min(earliest) as earliest, max(latest) as latest by client_ip, token
| outputlookup token_cache.csv

What we've done here is make changes to our lookup incremental. We run the main data collection search over a smaller time window, but then suck in the cached data and smash it together. Because of how _time is an integer, the primitives of min() and max() make sense for updating the lookup in the most sane way possible.

Step 3 - Set up something to use the cached data for alerting purposes. We've made a cached result now it should be easy to use.

| inputlookup token_cache | where now() - earliest < 300

Searches 2 and 3 get scheduled to run - search 2 is a background maintainer and search 3 is an alert.

Assuming I have properly understood what you were trying to do, this should be much cheaper to use and maintain over time. And as a bonus, you get an indication of when a given IP address stops using a given token too.

Explorer

So unfortunately, about three and a half hours after asking this (which was already about four hours into my search for a solution), I came up with something that works. I highly doubt this is as optimized as it could be - in fact it's probably awful - but it's the only way I could figure out how to do this...

Note that I determined that for the needs of my application, we don't actually have to look at all history, we only need to check over the past 31 days...

index="main" earliest=-31d latest=now | spath type | search type=request | spath "request.client_token" | search "request.client_token"!="" | stats values("request.remote_address") as all_addresses by "request.client_token" | eval all_count=mvcount(all_addresses) | join "request.client_token" type=outer [search index="main" earliest=-31d latest=-5m | spath type | search type=request | spath "request.client_token" | search "request.client_token"!="" | stats values("request.remote_address") as old_addresses by "request.client_token" | eval old_count=mvcount(old_addresses)] | fillnull value=0 all_count old_count | eval new_ip_count=(all_count - old_count) | where new_ip_count > 0 AND all_count > 1
  • both searches are based on search index="main" earliest=-31d latest=<<>> | spath type | search type=request| spath "request.client_token" | search "request.client_token"!="" | stats values("request.remote_address") as old_addresses by "request.client_token"; this finds events which:
    • are in the "main" search index
    • are within a specified time range: earliest=-31d latest=<<>>
    • have a type field: spath type
    • are of type request: search type=request
    • have a clienttoken that isn't empty/null: ``spath "request.clienttoken" | search "request.client_token"!=""``
    • collects statistics of remote addresses grouped by client token: stats values("request.remote_address") as old_addresses by "request.client_token"
  • the first search looks at events from 31 days ago to now, and puts the remote addresses in an all_addresses field; this represents ALL remote IPs seen for each token in the last 31 days (which should encompass the default max_ttl of 30 days, so it should get all data for all non-expired tokens).
  • we add a count of the distinct addresses to the results: all_count=mvcount(all_addresses)
  • we then perform an outer join on the client_token field, with another search run with the same parameters/filters but against records that are more than 5 minutes old
  • we end up with data that tells us, for each clienttoken seen in the last 31 days, what IPs it came from over all that time (``alladdresses), and what IPs it came from prior to the last 5 minutes (oldaddresses), and the count of each (allcountandold_count``, respectively)
  • The mvcount() function returns null for a field with no values, but we want to do math with this, so we convert the nulls to zero in those fields: fillnull value=0 all_count old_count
  • the total IP count minus the old IP count becomes the count of new IPs seen in the last 5 minutes: eval new_ip_count=(all_count - old_count)
  • we use where new_ip_count > 0 to limit the results to tokens which came from new IP addresses in the last 5 minutes
  • At this point, we'd get results for any tokens that are more than 5 minutes old and have been used by a new IP within the last 5 minutes, including tokens that have been created in the last 5 minutes and used from multiple IPs. But we also get any tokens that have been created in the last 5 minutes and have only been used once. We fix this by appending AND all_count > 1 to the end.

View solution in original post

0 Karma

Splunk Employee
Splunk Employee

Hi @jantman - Do you want to resolve your post by clicking "Accept" below your answer? Or do you want to leave it open and wait until another user shares a possible alternative solution for you?

0 Karma

Legend

How about something like this? Doesn't compare against previously used, but checks to see if there is more than one IP within 5 mins (adjustable) range

 index="main" earliest=-5m@m | spath type | search type=request| spath "request.client_token" | stats values("request.remote_address") as addresses by "request.client_token" | rename "request.client_token" as token | where mvcount(addresses)>1

Community Manager
Community Manager

@jantman @jantmantest

Please do not downvote users on Splunk Answers who are just attempting to help you solve your issue. Downvoting should be reserved for answers/suggestions that could be potentially harmful to your Splunk environment. If something didn't directly answer your question, simply comment with more information. This will encourage people to actually provide more help and feedback to find you a solution that works. Please read how community etiquette works in this forum here:
https://answers.splunk.com/answers/244111/proper-etiquette-and-timing-for-voting-here-on-ans.html

Also, do not create multiple accounts on Answers. I'm going to have to suspend your jantmantest account now. Please only use your jantman account moving forward.

0 Karma

Explorer

@ppablo Ok, apologies. I guess the format of this site being a direct ripoff of StackOverflow.com led me to expect the same standards of moderation and voting. I've upvoted the answer to return it to 0.

I didn't intentionally create multiple accounts on Answers. @jantman is my Splunk.com account (which I created in 2008, and have used with the Knowledge Base, etc. since then). @jantmantest is an account I created on Splunk.com to use the Splunk Cloud Free Trial. Until I posted that comment, I was unaware that logging in to my Splunk Cloud instance would result in me posting to answers.splunk.com with that temporary username.

0 Karma

New Member

I downvoted this post because this is significantly less complex than what i'm asking... i'm looking for new ips.

0 Karma