Splunk Search

Recursive Query over Time

rneel
Explorer

I am searching for the best way to create a time chart that is created from queries that have to evaluate data over a period of time.  The item I am counting is vulnerability data and that data is built from scan outputs that occur at different times across different assets throughout the week.  So for instance:

If I ran this query over the past 7 days for today:

 
index="qualys" sourcetype="qualys:hostdetection" TYPE="CONFIRMED"  OS=[OS]  PATCHABLE=YES | dedup HOST_ID QID  sortby -_time | search NOT STATUS=FIXED | stats count by severity
 
I would get back information on all open vulnerabilities by severity (critical, high, medium, low) that are considered opened.
 
I now need to show that trend, but over a 14 day period in a timechart - with the issue being that any one day has to be a 7 day lookback to get the accurate total.  I thought of using a macro then doing an append, but that seems expensive.  I also considered using the query over and over with earliest=-7d@d latest=[-appropriate day count]. 
 
I am sure there is a more elegant way though.  Any advice is greatly appreciated. 
 
Labels (1)
0 Karma
1 Solution

tscroggins
Builder

It helps me to visualize the summary time ranges using earliest and latest. E.g. For a rolling window of three days covering the last three days:

1: earliest=-3d@d latest=-0d@d
2: earliest=-4d@d latest=-1d@d
3: earliest=-5d@d latest=-2d@d

-0d@d is equivalent to @d, but using -0d@d keeps the formatting consistent.

The base search earliest and latest values are the range (the total span) of the desired earliest and latest values:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d

At the end of the search, we'll add a where command to truncate search results to the last three summarized days:

| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")

We first summarize the distinct count of dest by day, signature, and severity. (Substitute signature with another fields or fields as needed to uniquely identify a vulnerability.) I.e. We count each occurrence of dest once per day per signature and severity combination:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity

We use streamstats to summarize the subtotal by severity over a span of three days:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity

I previously used timechart to pivot the results over _time, but you can also use xyseries:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity
| xyseries _time severity total

Finally, add the where command we prepared earlier:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity
| xyseries _time severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")

The result is the distinct count of dest values by signature and severity over _time.

If you'd like, you can a SUBTOTAL column for each day and a TOTAL row for all days:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity
| xyseries _time severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")
| addtotals fieldname=SUBTOTAL
| addcoltotals labelfield=_time label=TOTAL

You can also refactor the base search and stats to use the Vulnerabilities data model and tstats.

With or without acceleration:

| tstats dc(Vulnerabilities.dest) as subtotal from datamodel=Vulnerabilities where earliest=-5d@d latest=-0d@d by _time span=1d Vulnerabilities.signature Vulnerabilities.severity
| streamstats time_window=3d sum(subtotal) as total by Vulnerabilities.severity
| xyseries _time Vulnerabilities.severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")
| addtotals fieldname=SUBTOTAL
| addcoltotals labelfield=_time label=TOTAL

With accelerated summaries only:

| tstats summariesonly=t dc(Vulnerabilities.dest) as subtotal from datamodel=Vulnerabilities where earliest=-5d@d latest=-0d@d by _time span=1d Vulnerabilities.signature Vulnerabilities.severity
| streamstats time_window=3d sum(subtotal) as total by Vulnerabilities.severity
| xyseries _time Vulnerabilities.severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")
| addtotals fieldname=SUBTOTAL
| addcoltotals labelfield=_time label=TOTAL

View solution in original post

0 Karma

rneel
Explorer

Thank you for the help and introducing me to the untable command.  This was very close and I am playing with it to see if I can solve the one remaining issue.  The summary over the 7 days is perfect, but I need to dedup within that 7 days and then aggregate.  For instance:

Day1: a,b,c

Day 2: a,b,c,d

Day 3: a,b,f

Day 4-7: a,b,c

I should end up across those 7 days with a distinct count of 5 (a,b,c,d,f).  I don't think I can reduce after the timechart so am playing with eventstats there.  Much appreciated if you have further time/guidance.  

0 Karma

rneel
Explorer

I slightly misspoke in the last post.  While it would be a count of 5 total, it would actual be a subcount by severity.  So for instance:

a=critical

b= high

c=high

d=medium

f=low

Would then actually give me a single set for that view across the 7 days (which would be an accurate picture of the current state) of 1 (critical), 2 (high), 1 (medium), 1 (low) 

0 Karma

tscroggins
Builder

@rneel 

If I understand correctly, that's actually much simpler:

index=_internal sourcetype=splunkd source=*/splunkd.log* earliest=-7d@d latest=@d
| bin _time span=1d
| stats dc(_time) as days by log_level

In this example, I've binned _time into days and then counted the distinct number of days per log level. Just replace the base search and log_level field with your data.

If your severity field is indexed, you can use tstats for better performance. Here's an example using the source field:

| tstats values(source) as source where index=_internal sourcetype=splunkd earliest=-7d@d latest=@d by _time span=1d
| mvexpand source
| stats dc(_time) by source

If your severity is not index but does exist in raw data as e.g. severity=critical, you can combine tstats with TERM and PREFIX for similar performance gains:

| tstats count where index=_internal sourcetype=scheduler TERM(priority=*) earliest=-7d@d latest=@d by _time span=1d PREFIX(priority=)
| rename "priority=" as priority
| stats dc(_time) as days by priority

The key is the raw data containing some field name and some value separated by a minor breaker. For example, raw data containing severity_critical could be parsed with:

| tstats count where index=main TERM(severity_*) earliest=-7d@d latest=@d by _time span=1d PREFIX(severity_)
| rename "severity_" as severity
| stats dc(_time) as days by severity

PREFIX is very powerful!

 

0 Karma

rneel
Explorer

So I would like to mark all of these as answers because I learned something in each bit, then you could mark me as a poor explainer for not being crisp on the challenge!  If you are up for it I wanted to take one more shot.

There are agents across machines that bring in data every day, multiple times a day.  There are also scans that take place across the environment that bring back similar data, but only occur every few days.  A view of any one system is the aggregate of the distinct data for a system associated with the information collected from both the host and network.  All of this data is organized in the same manner in splunk.  Because data comes in different groupings, an accurate view at any moment in time requires that you look back several days.  This is so you can make sure and count those issues that show up in the data less frequently.  

As example – data for 4 days might look like (using a 3 day look back to make the amount of data less):

Day 1 – just information from host scans are fed in, data from splunk would contain:

System A (e.g. system name), issue A (e.g. vuln xxx), severity (e.g. high/medium/low), status (e.g. opened, closed etc..)

System B, issue A, severity, status

System C, issue C, severity, status

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

 

** Note that the data is collected multiple times

 

Day 2 – similar result with systems reporting in multiple times a day

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

 

Day 3 – Similar but we now have something introduced from the network scan (All in Blue)

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

System A, issue D, severity, status

System A, issue E, severity, status

System B, issue F, severity, status

System C, issue G, severity, status

 

Day 4 – similar result with systems reporting in multiple times a day

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

System A, issue A, severity, status

System B, issue A, severity, status

System C, issue C, severity, status

 

The goal is to go across 3 days to get the number of distinct issues on a system then aggregate the issue counts across all systems into counts by severity.  On any one system you should end up with an issue counted only once, but across systems the issue may show multiple times.   I can accomplish this with no problem when I am looking back to create an output for any single day.  But my skills quickly deteriorate if I want to show a historical view of open issues in the environment in a time chart.  Your first answer:

index=_internal sourcetype=splunkd source=*/splunkd.log* earliest=-14d@d latest=@d
| timechart fixedrange=f span=1d count as subtotal by log_level
| untable _time log_level subtotal
| streamstats time_window=7d sum(subtotal) as total by log_level
| timechart span=1d max(total) as total by log_level
| where _time>relative_time(relative_time(now(), "@d"), "-7d@d")

worked beautifully, with the one exception that it would count a single issue on a host multiple times as it rolled back across the days.  I did some digging and did not see a way to deduplicate later in the process so that for any one set of days you are counting each open system issue only once.  So in the new example:

  • Day one would be a lookback of days 1-3.  For each system you would only count issue xx once across that time span.  You would then count each individual systems issues and group that by severity.  So System A would only show Issue A once even though it appeared 3 times.  System B might also have issue A, in which case it would be counted once as well. 
  • Day two would be a lookback of days 2-4.
  • Etc….
0 Karma

tscroggins
Builder

It helps me to visualize the summary time ranges using earliest and latest. E.g. For a rolling window of three days covering the last three days:

1: earliest=-3d@d latest=-0d@d
2: earliest=-4d@d latest=-1d@d
3: earliest=-5d@d latest=-2d@d

-0d@d is equivalent to @d, but using -0d@d keeps the formatting consistent.

The base search earliest and latest values are the range (the total span) of the desired earliest and latest values:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d

At the end of the search, we'll add a where command to truncate search results to the last three summarized days:

| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")

We first summarize the distinct count of dest by day, signature, and severity. (Substitute signature with another fields or fields as needed to uniquely identify a vulnerability.) I.e. We count each occurrence of dest once per day per signature and severity combination:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity

We use streamstats to summarize the subtotal by severity over a span of three days:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity

I previously used timechart to pivot the results over _time, but you can also use xyseries:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity
| xyseries _time severity total

Finally, add the where command we prepared earlier:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity
| xyseries _time severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")

The result is the distinct count of dest values by signature and severity over _time.

If you'd like, you can a SUBTOTAL column for each day and a TOTAL row for all days:

tag=report tag=vulnerability earliest=-5d@d latest=-0d@d
| bin _time span=1d
| stats dc(dest) as subtotal by _time signature severity
| streamstats time_window=3d sum(subtotal) as total by severity
| xyseries _time severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")
| addtotals fieldname=SUBTOTAL
| addcoltotals labelfield=_time label=TOTAL

You can also refactor the base search and stats to use the Vulnerabilities data model and tstats.

With or without acceleration:

| tstats dc(Vulnerabilities.dest) as subtotal from datamodel=Vulnerabilities where earliest=-5d@d latest=-0d@d by _time span=1d Vulnerabilities.signature Vulnerabilities.severity
| streamstats time_window=3d sum(subtotal) as total by Vulnerabilities.severity
| xyseries _time Vulnerabilities.severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")
| addtotals fieldname=SUBTOTAL
| addcoltotals labelfield=_time label=TOTAL

With accelerated summaries only:

| tstats summariesonly=t dc(Vulnerabilities.dest) as subtotal from datamodel=Vulnerabilities where earliest=-5d@d latest=-0d@d by _time span=1d Vulnerabilities.signature Vulnerabilities.severity
| streamstats time_window=3d sum(subtotal) as total by Vulnerabilities.severity
| xyseries _time Vulnerabilities.severity total
| where _time>=relative_time(relative_time(now(), "-0d@d"), "-3d@d")
| addtotals fieldname=SUBTOTAL
| addcoltotals labelfield=_time label=TOTAL

View solution in original post

0 Karma

tscroggins
Builder

@rneel 

You can use streamstats to generate rolling summarizations. I'll use the _internal index in this example, but you can modify it to use your base search:

index=_internal sourcetype=splunkd source=*/splunkd.log* earliest=-14d@d latest=@d
| timechart fixedrange=f span=1d count as subtotal by log_level
| untable _time log_level subtotal
| streamstats time_window=7d sum(subtotal) as total by log_level
| timechart span=1d max(total) as total by log_level
| where _time>relative_time(relative_time(now(), "@d"), "-7d@d")

I want a rolling count for the last 7 days, so I've expanded my time range to 14 days to ensure day 1 includes 7 days of prior data.

From there, I've reduced the initial result set to a daily summary using timechart followed by untable.

Then I've used streamstats to generate a rolling 7 day total from the daily subtotal.

Finally, I've summarized the total by the field of interest and truncated the results to the last 7 days. (I've used the max aggregation for simplicity. There should only be one total value per log_level per day.)

0 Karma

rneel
Explorer

Thank you for all the time you spent helping with this!