Good Afternoon,
This is gonna be fun trying to explain. In essence I have a current report we use to review data transfers between hosts for excessive transfer that may be un expected. This is being done on Enterprise Security server on an accelerated data model. This unfortunately have to be manually reviewed and during those reviews I noticed some of the devices repeat each week, which we will consider "expected" behavior.
Along these lines I would like to see if there's anyway to correlate the previous week transfers to get the delta and filter out anything that is within something like 5% from previous week values.
I tried to make some happen with the "delta" and "timewrap" but honestly seems to be beyond my expertise and a bit of a steep learning for me. below the query producing the current report. Ideally I would like to look into the delta for gb_in or gb_out to be within the 5% but if its not feasible perhaps just do it based on the gb_total.
| tstats summariesonly=true values(All_Traffic.dest_ip) as dest_ip values(All_Traffic.src_ip) as src_ip values(All_Traffic.dest_port) as dest_port values(All_Traffic.bytes_in) as bytes_in values(All_Traffic.bytes_out) as bytes_out values(All_Traffic.user) as user values(All_Traffic.dest_zone) as dest_zone values(All_Traffic.src_zone) as src_zone values(All_Traffic.rule) as rule from datamodel=Network_Traffic where All_Traffic.bytes_out > 10000000000 groupby All_Traffic.user, All_Traffic.src_ip, All_Traffic.dest_port,All_Traffic.dest_zone,All_Traffic.src_zone, host sourcetype index _time span=1s
| eval _time=strftime(_time,"%Y-%m-%d %H:%M:%S")
| eval total_bytes_out = sum(bytes_out)
| eval total_bytes_in = sum(bytes_in)
| eval gb_out = round(total_bytes_out/1024/1024/1024,2)
| eval gb_in = round(total_bytes_in/1024/1024/1024,2)
| sort - _time
| streamstats count as No.
| rename host as firewall_ip
| lookup dnslookup clientip as dest_ip OUTPUT clienthost as dest_resolved_ip
| lookup dnslookup clientip as src_ip OUTPUT clienthost as src_resolved_ip
| eval dest_dns=if(dest_resolved_ip!="",dest_resolved_ip,dest)
| eval src_dns=if(src_resolved_ip!="",src_resolved_ip,src)
| eval gb_total=(gb_in+gb_out)
| fields _time, firewall_ip, src_ip, src_dns, dest_ip, dest_dns, dest_port, rule, src_zone, dest_zone, gb_in, gb_out gb_total
The only other option I though about was to export the result as a lookup.lastweek and then compare it with the current week which might be a bit cleaner.
OK, first some observations
1. This logic
... values(All_Traffic.bytes_in) as bytes_in ...
...
| eval total_bytes_in = sum(bytes_in)
...is wrong. Firstly if you get duplicate values of bytes_in then they will be removed so you may not get an accurate sum, and secondly it's not necessary to do it that way. Just do
... sum(All_Traffic.bytes_in) as total_bytes_in ...which will give you the correct sum.
2. | eval _time=strftime(_time,"%Y-%m-%d %H:%M:%S")
is not useful. Doesn't add any value in formatting time as a string before you want to render it for display, so leave it until the end
As to comparing last week and this week - what exactly do you want to compare the 1 second value for a particular host at the same 1 second time a week ago, or some other aggregated time band?
What about users or source/dest IPs - what else are you looking to compare, e.g. in vs in, out vs out, total vs total?
Without a clear understanding of exactly what you're trying to compare, here's an example of producing 20 events for 10 hosts over the week - 10 for this week and 10 for last week.
It then uses streamstats to pull last week figure to this week and then compares the values and retains only those results where ANY of the variances (in/out/total) are >5% for any host.
Note the use of foreach is simply a shorthand way of repeating the same formulas for multiple fields.
| makeresults count=20
| eval total_bytes_in=random() % 200000000000 + 10000000000, total_bytes_out=random() % 200000000000 + 10000000000
```| eval r1=random() % 26, r2=random() % (26-r1) + r1, user=mvindex(split("ABCDEFGHIJKLMNOPQRSTUVWXYZ","") ```
| eval dest_zone="External", src_zone="Internal"
| streamstats c
| eval host="host".mvindex(split("ABCDEFGHIJKLMNOPQRSTUVWXYZ",""), ((c - 1) % 10))
| eval _time=if(c<=10, now() - ((c - 1) % 10), now() - (86400*7) - ((c - 1) % 10))
| eval gb_out = round(total_bytes_out/1024/1024/1024,2)
| eval gb_in = round(total_bytes_in/1024/1024/1024,2)
| eval gb_total=(gb_in+gb_out)
| fields _time, host src_zone, dest_zone, gb_in, gb_out gb_total
``` This sorts the row of data for this week and last week into two rows by _time ```
| sort host _time
``` Now streamstats pulls forward the last week value to the current week row ```
| streamstats window=2 global=f first(gb_*) as last_week_gb_* by host
``` Calulate all the variances for in/out/total between the current and last week ```
| foreach gb_* [ eval diff_<<MATCHSTR>>='<<FIELD>>'-'last_week_<<FIELD>>', variance_<<MATCHSTR>> = round(diff_<<MATCHSTR>> / '<<FIELD>>' * 100, 2), alert=if(variance_<<MATCHSTR>>>5, "Alert", alert) ]
| fields - last_week_*
| where alert="Alert"Hope this helps give you some ideas on how to achieve what you need
Thanks for the amazing corrections! Ill be cleaning that ASAP! Honestly its a weekly report so Im not sure of the value having any date there is proving as its a sum of the whole week.
The weekly comparison is the bytes_in and bytes_out between src_ip and dest_ip. Honestly for my purposes that's the only fields of interest and the rest are just to aid in figuring out where the devices are and validate as expected.
so for example below is a simplified version of the report
previous week
| src_ip | dst_ip | gb_in | gb_out |
| 1.1.1.1 | 2.2.2.2 | 3 | 4 |
current week
| src_ip | dst_ip | gb_in | gb_out |
| 1.1.1.1 | 2.2.2.2 | 3.2 | 3.9 |
on the above scenario then that src_ip and dst_ip would be left out of the report as the values are pretty similar to what was already accepted as expected data transfers.
Ideally i guess what im trying to do is baseline the report. To where if last weeks report contains the same src_ip and dst_ip with close enough gb_in gb_out (Ill adjust depending on risk appetite) from the previous week then it wont make it to the report.
Once again this is super helpful! This is a great community!
Thanks for clarifying - then you still use streamstats - see this modified example, most of the first part is getting your data to a form where you have
_time, src_ip, dest_ip, gb_in, gb_out
for each of the two comparison weeks for the IP pairings. You'll need to run it a few times to get include=1 in the results, as it randomises byte counts.
But the include field calculation looks for the 5% variance or whether the pairing is only seen in ONE of the weeks, in which case it also includes it.
If you want to change the 5% variance, I suggest rather than editing the search each time, just create a macro with the value in there, so your test would look like
if(abs('variance_%_<<MATCHSTR>>')>`get_variance_threshold`...
where your macro get_variance_threshold simply has the value you want to change.
Hope this gets you closer to your target!
| makeresults count=28
| eval bytes_in=random() % 200000000000 + 10000000000, bytes_out=random() % 200000000000 + 10000000000
| streamstats c
| eval src_ip=mvindex(split("1.1.1.1,2.2.2.2", ","), c % 2), dest_ip=mvindex(split("3.3.3.3,4.4.4.4",","), c % 2)
| eval _time=if(c<=14, now(), now() - (86400*7))
| bin _time span=7d
| stats sum(bytes_*) as total_bytes_* by _time src_ip dest_ip
| eval gb_out = round(total_bytes_out/1024/1024/1024,2)
| eval gb_in = round(total_bytes_in/1024/1024/1024,2)
| fields _time, src_ip, dest_ip, gb_in, gb_out
``` Above creates a dataset for comparison - in your case do tstats with a time span of the 1w and a time range of the full period you are comparing ```
``` This sorts the row of data for this week and last week into two rows by _time ```
| sort src_ip dest_ip _time
``` Now streamstats pulls forward the last week value to the current week row ```
| streamstats window=2 global=f first(gb_*) as last_week_gb_* by src_ip dest_ip
``` Look how many times we have seen the pairing so we can detect count=1 ```
| eventstats count as seen_count by src_ip dest_ip
``` Calculate all the variances for in/out between the current and last week ```
| foreach gb_* [ eval diff_<<MATCHSTR>>='<<FIELD>>'-'last_week_<<FIELD>>', ``` Calculates difference in values from last week to this week ```
variance_%_<<MATCHSTR>> = round(diff_<<MATCHSTR>> / '<<FIELD>>' * 100, 2), ``` Calculate the variance % between the two weeks ```
include=if(abs('variance_%_<<MATCHSTR>>')>5 OR seen_count=1, 1, coalesce(include, 0)) ] ``` Looks at the 5% threshold and if it has only been seen one of the weeks ```
| fields - last_week_*
``` Here if include is set then it has exceeded the 5% variance or if it was only seen this week or last week ```
```| where include=1```