Alerting

How to create an hourly alert when never seen before with certain unique characteristics appear in an index?

gesman
Communicator

I need to build an hourly alert when never seen before events (with certain unique characteristics) appear in index.
In essence - this is the logic:

  1. Get list of fresh events (with 3 fields of interest: field1,2,3): index=mydata earliest=-1h@h | dedup field1, field2, field3
  2. For each found event, run this pseudo search: index=mydata latest=-1h@h field1 (field2 OR field3) | stats count
  3. Alert if count=0 - in other words - alert if there were no previously seen events with field1 AND (field2 OR field3) found at step #1.
  4. Return original event that generated the alert.

Suggestions are welcome.

Tags (3)
0 Karma
1 Solution

gesman
Communicator

I found the solution to the task.

First, to simplify the task definition: Be able to execute very custom query per each found event and collect the results.

Lookups allows you to find matches within lookup source based on field1 AND field2 AND field3 logic.
Subsearches allows you to pull subsearch results and then run the outer query either in field1 AND field2 AND field3 or field1 OR field2 OR field3 manner. You may control "outer" behavior of subsearch results with ... | format ... parameter somewhat, but not to the extend of having custom-crafted outer query based on the returned results.

So the solution to the task is based on the ability to craft very custom search as a string and then return it to the outer search as a single search field.
Outer search will take it as is and execute it.

The trick was to prevent Splunk from post-tweaking the search and getting confused by some elements of it.
For example Splunk would not allow search string to contain ...earliest=... latest=... elements. Splunk would get confused if returned search string contained aliases.
The solution to both was to code them within macros and then include macro within search= string to be returned to the outer search.

This is the blueprint of the solution.
Notes: index=NONEXISTENT - again is a trick to prevent Splunk from getting confused. Without it would try to search for everything before main subsearch. index=NONEXISTENT will solve it by causing Splunk to quickly return zero results and focus on subsearch business.

Below you'll notice that per each pre-found event the mind-boggling custom search query is crafted that contains double-nested subsearches by itself. While looking a bit scary - what it does is created an empty event first with previous_match_found=0. Then it will run custom search and if any results found (grabs only the first one to save time) - the previous_match_found value becomes 1.

Once the whole monster executes - it's easy to only filter on ... | where previous_match_found=0 ... to accomplish the final task - finding unique events that never occured before.

`index=NONEXISTENT
[search ...searching for special events... | fields field1, field2, field3

| eval COMMENT="Here we got events. Now per each event - craft custom search query:"

| eval search_this="
| append [|stats count AS previous_match_found | eval _time=\"" + _time + "\" | eval field1 =\"" + field1 + "\" | appendcols override=1 [search index=myindex ... field1=\"" + field1 + "\" (field2=\"" + field2 + "\" OR field3=\"" + field3 + "\" ) | head 1 | eventstats count AS previous_match_found] ]
"

| eval COMMENT="Glue all queries together into the single one:"

| stats values(search_this) AS all_searches
| eval search=mvjoin(all_searches, " ")

| eval COMMENT="Finally, return main combined search query back to the outer search :"

| fields search
]
| where previous_match_found=0
...
`
On a final note - this search apparently runs pretty fast considering the volume of data.

View solution in original post

0 Karma

gesman
Communicator

I found the solution to the task.

First, to simplify the task definition: Be able to execute very custom query per each found event and collect the results.

Lookups allows you to find matches within lookup source based on field1 AND field2 AND field3 logic.
Subsearches allows you to pull subsearch results and then run the outer query either in field1 AND field2 AND field3 or field1 OR field2 OR field3 manner. You may control "outer" behavior of subsearch results with ... | format ... parameter somewhat, but not to the extend of having custom-crafted outer query based on the returned results.

So the solution to the task is based on the ability to craft very custom search as a string and then return it to the outer search as a single search field.
Outer search will take it as is and execute it.

The trick was to prevent Splunk from post-tweaking the search and getting confused by some elements of it.
For example Splunk would not allow search string to contain ...earliest=... latest=... elements. Splunk would get confused if returned search string contained aliases.
The solution to both was to code them within macros and then include macro within search= string to be returned to the outer search.

This is the blueprint of the solution.
Notes: index=NONEXISTENT - again is a trick to prevent Splunk from getting confused. Without it would try to search for everything before main subsearch. index=NONEXISTENT will solve it by causing Splunk to quickly return zero results and focus on subsearch business.

Below you'll notice that per each pre-found event the mind-boggling custom search query is crafted that contains double-nested subsearches by itself. While looking a bit scary - what it does is created an empty event first with previous_match_found=0. Then it will run custom search and if any results found (grabs only the first one to save time) - the previous_match_found value becomes 1.

Once the whole monster executes - it's easy to only filter on ... | where previous_match_found=0 ... to accomplish the final task - finding unique events that never occured before.

`index=NONEXISTENT
[search ...searching for special events... | fields field1, field2, field3

| eval COMMENT="Here we got events. Now per each event - craft custom search query:"

| eval search_this="
| append [|stats count AS previous_match_found | eval _time=\"" + _time + "\" | eval field1 =\"" + field1 + "\" | appendcols override=1 [search index=myindex ... field1=\"" + field1 + "\" (field2=\"" + field2 + "\" OR field3=\"" + field3 + "\" ) | head 1 | eventstats count AS previous_match_found] ]
"

| eval COMMENT="Glue all queries together into the single one:"

| stats values(search_this) AS all_searches
| eval search=mvjoin(all_searches, " ")

| eval COMMENT="Finally, return main combined search query back to the outer search :"

| fields search
]
| where previous_match_found=0
...
`
On a final note - this search apparently runs pretty fast considering the volume of data.

0 Karma

dwaddle
SplunkTrust
SplunkTrust

Following up on @tpflicke's answer, lookups are a good way to go about this. Really what you need to know are two foundational things.

[1] The minimum value of _time for events matching a specific set of unique fields.
[2] Is the minimum _time for a specific set of unique fields "recent" ?

So, let's approach something like this. Start by searching over all time:

index=foo sourcetype=mysourcetype
| stats min(_time) as _time by field1, field2, field3
| outputlookup mysourcetypetracker.csv

Now we should have a simple lookup table with the earliest value of _time for combinations of those three fields. We can incrementally update this with a saved search:

index=foo sourcetype=mysourcetype
| stats min(_time) as _time by field1, field2, field3
| inputlookup append=true mysourcetypetracker.csv
| stats min(_time) as _time by field1, field2, field3
| outputlookup mysourcetypetracker.csv

You can run this saved search every hour, every 15 minutes - whatever works for you. Calling stats twice here gives us the earliest _time for the "current window" as well as the "overall" lowest time for all time. We output this back into our tracker lookup file, which now always has the state of the earliest time seen for combinations of field1, field2, field3.

Now, it becomes pretty easy to look for "new arrivals". We can run a search like

| inputlookup mysourcetypetracker.csv
| where _time >= (now() - 3600)

To find any combinations newly appearing in the past hour. You can probably with a subsearch or two figure out how to use this to come up with the raw events for newly appearing combinations. I will have to experiment with that a little..

gesman
Communicator

Thank you for your comments.
The condition #2 in my question states that uniqueness is determined by query field1 AND (field2 OR field3), not by field1 AND field2 AND field3. That's important difference that makes task more complicated that just using simple lookup.
If lookups would allow for more complex matching logic, that would be great, but i think more work is required to tweak your suggestions.

0 Karma

dwaddle
SplunkTrust
SplunkTrust

I missed that detail, so my apologies. It probably does add a good bit of complexity - I will think about it and maybe come back with an answer.

0 Karma

tpflicke
Path Finder

Here's a suggestion trying to get to the same result though via a different approach and one that would not require to effectively make an all time search every hour.

To remember already discovered combinations of field1, field2, field2 long term and even exceeding your normal data retention period you could use either a summary index or a lookup table.
In general I find working with lookup tables easier so here's an suggestion facilitating one:

In CSV format the table, I named discovered_combinations, could look like:

discovered_timestamp,field1,field2,field3
1423922204,AAA,"",123
1423923303,"",bbb,435
1423924444,"RRR,bbb,""

The timestamp would provide a return value but might also be used in queries and generally help keep track of when field combinations were discovered.
A search scheduled every hour would find events that don't match a field combination in the lookup table, dedup and send the alert so you get the first event for any new combination.

index=mydata earliest=-1h@h latest=@h
| dedup field1, field2, field3
| lookup discovered_combinations field1 field2 field2 OUTPUT discovered_timestamp
| where isNull(discovered_timestamp)

A second search scheduled afterwards would then update the lookup table using ... | outputlookup discovered_combinations append=true.
There might be a way of doing this all in a single query which would be neater.

gesman
Communicator

Thank you for your writeup and suggestions.
The problem is that in my case lookup table could potentially hold about million of entries and will exceed it's recommended limits as an approach.
In my example mydata is a summary index already.

The challenge is to implement "intelligent negative lookbehind" with Splunk.
Where

  1. intelligent = more complex matching logic than just single AND or OR on all fields.
  2. negative = discovering the fact on non-existence of events matching intelligent query
  3. lookbehind = searching within past data.
0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...