I'm trying to create a dashboard that shows the count of new vulnerabilities between this month and last month, using scan_id as the differentiator. The vuln data comes in as a csv with asset_id, scan_id, and vuln_id. The db will have separate queries for unique and total values, based off vuln_id. My current logic for total vulns is:
this_month=count(List of vulns from scan 'x' not in list of vulns from scan 'y')
last_month=count(List of vulns from scan 'y' not in list of vulns from scan 'z')
index=vulns sourcetype=vuln_scan scan_id = XXXXXX NOT [ search index=vulns sourcetype=vuln_scan scan_id = YYYYYY ] | stats count(vuln_id) as events | eval period=“last_month" | append [ search index=vulns sourcetype=vuln_scan scan_id = YYYYYY NOT [ search index=vulns sourcetype=vuln_scan scan_id = ZZZZZZ ] | stats count(vuln_id) as events | eval period=“this_month”] | fields events, period
I feel like there is a much more optimal way to do this, especially since i'm also going to report on persistent and remediated vulnerabilities. I tried using case (or nested if) to create fields for "this_month", "last_month", and "previous_month", but can't figure out how to subtract out Y from X without using multiple searches. The subtraction has to happen before the count, since not all vulns are 'new'. Any help would be greatly appreciated. Thanks!
The first part generates some sample data. If you just run the first part you will see that there are 3 scans, each with 8 assets. The vulnerabilities are split across the assets, with 2 vulnerabilities in the first scan (0 and 4), 4 in the second scan (0, 2, 4, 6) and 8 in the third scan (0 - 7)
So, first thing to do is combine the asset and vulnerability, then create lists for each scan. Then use streamstats to copy from one scan to the next (this assumes your data in in scan order so you may want to add a sort in if it isn't)
Now we convert the list into a regex by joining with pipe (or in regex), then we remove the asset/vulnerability combination if it appears in the previous scan.
Finally, count the remaining asset vulnerability combinations as these should be the new vulnerabilities found in this scan.
| gentimes start=-1 increment=1h | bin span=8h starttime | rename starttime as scan_id | streamstats count as asset_id by scan_id | bin span=3h endtime | rename endtime as vuln_id | eventstats min(vuln_id) as base | eval scan_id=(scan_id-base)/1800/16 | eval vuln_id=floor((asset_id-1)/(4/pow(2,scan_id)))*(4/pow(2,scan_id)) | table asset_id vuln_id scan_id | eval asset_vuln=asset_id.":".vuln_id | stats list(asset_vuln) as asset_vuln by scan_id | streamstats values(asset_vuln) as prev_asset_vuln window=1 current=f | eval prev_asset_vuln=mvjoin(prev_asset_vuln,"|") | eval asset_vuln=mvmap(asset_vuln,if(match(asset_vuln,prev_asset_vuln),null,asset_vuln)) | eval new_vuln=mvcount(asset_vuln)
OK first part sets up some dummy data (a bit more than last time but I am not sure I got the representation I wanted but your real data might work better).
Assuming the scans are sorted earliest first, we look for the boundaries between different scans based on their ids, and assign all the events into numbered groups and determine whether they are in an odd or even numbered group. This is so that we can compare adjacent groups either odd-even or even-odd.
Then count the occurrences of asset and vulnerability within odd and even groups, a count of 2 means there was no change between the scans, so the change between the scans is the sum of 2-counts.
Depending on whether this is an odd or even group, we take the corresponding change count.
| gentimes start=-2 increment=1h | bin span=8h starttime | rename starttime as scan_id | streamstats count as asset_id by scan_id | bin span=3h endtime | rename endtime as vuln_id | eventstats min(vuln_id) as base | eval scan_id=(scan_id-base)/1800/16 | eval vuln_id=floor((asset_id-1)/(4/pow(2,scan_id)))*(4/pow(2,scan_id)) | eval vuln_id="Vulnerability_".mvindex(split("HIJKLMNO",""),vuln_id) | eval asset_id="Asset_".mvindex(split("PQRSTUVW",""),asset_id-1) | eval scan_id="Scan_".mvindex(split("ABCDEF",""),scan_id) | table asset_id vuln_id scan_id | streamstats window=1 current=f values(scan_id) as previous_scan_id | eval scan_boundary=if(previous_scan_id!=scan_id,1,0) | streamstats sum(scan_boundary) as scan_group | eval even_group=floor(scan_group/2) | eval odd_group=floor((scan_group+1)/2) | eventstats count as even_count by asset_id vuln_id even_group | eventstats count as odd_count by asset_id vuln_id odd_group | eval even_changes=2-even_count | eval odd_changes=2-odd_count | stats sum(even_changes) as changes_even sum(odd_changes) as changes_odd by scan_id scan_group | eval changes_to_previous=if(scan_group%2=0,changes_odd,changes_even) | table scan_id changes_to_previous
Let me know if this approach works for your real data.
It took me a bit to understand what you were doing, which is very clever. The scans are imported as earliest first, however the vulnerability data doesn't seem to have a specific ordering. We weren't able to get the data to count properly. We're currently focusing on this month vs the last, but are considering expanding out a longer trend such as the last 6 months.
Knowing that we also want to calculate the amount of vulnerabilities that remained between scans, and the number that are no longer in the latest scan, is there a way to filter on that?
if in Scan X but not Scan Y, label "new". If in scan X and scan Y, label "persistent". Else (in Scan Y but not Scan X) label remediated, and do a count on those labels?
Sort the results from your search/inputlookup in reverse time order (latest first), then find the boundaries between scans, sum the boundaries to enumerate scans, just keep the first two groups, increment the group number (to give 1 or 2 for the group number), sum the group by asset and vulnerability. Sum of 1 means only in first group (latest scan) - new; sum of 2 means only in second group (previous scan) - remediated; sum of 3 means in both groups - persistent.
| streamstats window=1 current=f values(scan_id) as previous_scan_id | eval scan_boundary=if(previous_scan_id!=scan_id,1,0) | streamstats sum(scan_boundary) as scan_group | where scan_group < 2 | eval scan_group=scan_group+1 | eventstats sum(scan_group) as status by asset_id vuln_id | eval status=mvindex(split("new,remediated,persistent",","),status-1) | stats count by status
You can allow more in etc/system/local/limits.conf, list_maxsize.
Indeed, I used subsearches to deal with the same problem. Converting values into filter is a lot more efficient.