Splunk Search

Search combine transaction with summing

rogueraider
Explorer

Goal:

To get a table summing the amount of data transferred between specified time ranges based on a transaction.

Sample Data:

12:00 01/01/2020Task 1020 Started
13:00 01/01/2020Task 1020 Finished
14:00 02/01/2020Task 3020 Started
15:00 02/01/2020Task 3020 Finished

 

12:00 01/01/2020Data Sent 50
12:01 01/01/2020Data Sent 50
12:02 01/01/2020Data Sent 50
14:10 02/01/2020Data Sent 50
14:11 02/01/2020Data Sent 50
14:12 02/01/2020Data Sent 50

 

Desired Outcome:

Task 1020Start 12:00 01/01/2020Finish 13:00 01/01/2020Data Sent 150
Task 3020Start 14:00 02/01/2020Finish 15:00 02/01/2020Data Sent 150

 

Approach:

This question requires two searches to be carried out. One to find the task start and finish times and a second to find and sum the data that was sent between those times.

I am really struggling with the nested search aspect of this. I can get a transaction search to produce the start and finish times quite easily, but I don't know how to feed those results in to a second search to calculate the amount of data sent.

Best Search

index=main "Started" OR "Finished"  
| rex "Task\s(?<task_id>[0-9]*)" 
| transaction task_id startswith="Started" endswith="Finished"
| eval latest = _time + duration 
| eval earliest = _time 
| fields earliest latest
| format  "multisearch " "[ search source=stream:netflow" "" "| stats sum(DataSent) as bytes_transferred ]" "" ""

 

This produces a bunch of subsearches that run a single search for each task time range

My problem is that I don't know to pass the identifying "task_id" value into the subsearches. My initial thought was to use something like "eval task_id=task_id" but because of the limits on the format command, I can't specify which fields to appear where.

I would appreciate anyone who could help with any ideas on how to approach this problem.

Labels (2)
0 Karma
1 Solution

Richfez
SplunkTrust
SplunkTrust

I believe you might find the `map` command useful.

https://docs.splunk.com/Documentation/Splunk/8.0.5/SearchReference/Map

I think the example in the docs, the last one with sudo?  That makes it really clear and is very similar to what you want to do.

 

But also there may be a more efficient way.  I know it feels like you need two searches, but if your sample data is fairly accurately how your real data is, then .. (And excuse me, this will be pure pseudocode, it'll take some eyeballing and fiddling to get it right)

(index=main "Started" OR "Finished") OR (source=stream:netflow AND OTHER THINGS HERE I BET)
| rex "Task\s(?<task_id>[0-9]*)" 
| transaction task_id startswith="Started" endswith="Finished"
| eventstats sum(DataSent) as bytes_transferred by _cd
| AND CLEANUP AND COLUMN MUNGING HERE

This deserves a bit of explanation, line by line because it probably won't work quite like I wrote it.

1) Toss all the stuff into one big pile.  We'll let transaction and eventstats sort it out later.

2) rex your task_id

3) Transaction as usual

4) FUN!  When you transaction there's a system field that defines each transaction called `_cd` so you can use that to only sum up the things you want.

And other notes:

Your transaction command should have maxspan, maxpause params, and probably you should check into keeporphans. 

Also, now that I've written that, I'm no longer convinced this will be better, but it's worth a try.  The problem with it will be that transaction now has a LOT more stuff to do, and if your timeframes are correct that extra work may include about a zillion extra events.  But still, might be worth it.

Or just switch from multisearch to map and ... use that.  🙂

 

Happy Splunking,

Rich

View solution in original post

Richfez
SplunkTrust
SplunkTrust

I believe you might find the `map` command useful.

https://docs.splunk.com/Documentation/Splunk/8.0.5/SearchReference/Map

I think the example in the docs, the last one with sudo?  That makes it really clear and is very similar to what you want to do.

 

But also there may be a more efficient way.  I know it feels like you need two searches, but if your sample data is fairly accurately how your real data is, then .. (And excuse me, this will be pure pseudocode, it'll take some eyeballing and fiddling to get it right)

(index=main "Started" OR "Finished") OR (source=stream:netflow AND OTHER THINGS HERE I BET)
| rex "Task\s(?<task_id>[0-9]*)" 
| transaction task_id startswith="Started" endswith="Finished"
| eventstats sum(DataSent) as bytes_transferred by _cd
| AND CLEANUP AND COLUMN MUNGING HERE

This deserves a bit of explanation, line by line because it probably won't work quite like I wrote it.

1) Toss all the stuff into one big pile.  We'll let transaction and eventstats sort it out later.

2) rex your task_id

3) Transaction as usual

4) FUN!  When you transaction there's a system field that defines each transaction called `_cd` so you can use that to only sum up the things you want.

And other notes:

Your transaction command should have maxspan, maxpause params, and probably you should check into keeporphans. 

Also, now that I've written that, I'm no longer convinced this will be better, but it's worth a try.  The problem with it will be that transaction now has a LOT more stuff to do, and if your timeframes are correct that extra work may include about a zillion extra events.  But still, might be worth it.

Or just switch from multisearch to map and ... use that.  🙂

 

Happy Splunking,

Rich

rogueraider
Explorer

Thanks for responding @Richfez

I took your approach because it seems to have much more potential than my approach of many searches.

(index=lighthouse ON_TASK OR OFF_TASK) OR (index=main source=stream:netflow)
| rex "task\s(?<task_id>[0-9]*)" 
| rex "reg\s(?<car_rego>ABC-XX[CDEF])"
| rex "(?<task_mode>O(N|FF)_TASK)" 
| rename sum(bytes_in) as data_in 
| transaction maxspan=4h keeporphans=1 car_rego, task_id startswith="ON_TASK" endswith="OFF_TASK"
| eval finishtime=duration + _time 
| where aircraft_rego="VH-XNE"
| table _time car_rego task_id data_in finishtime

The key changes:

`rename sum(bytes_in) as data_in` The field `sum(bytes_in)` is renamed to `data_in` because it's automatically generated by the netflow stream and causes problems when trying to work with it later.

`keeporphans=1` This was an important discovery because if it is not set I only get the transaction events returned. If the netflow results are not available, the `data_in` field disappears.

Now I get all the relevant results listed and populating into a table like so.

_timecar_regotask_iddata_infinishtime
2020-08-21 01:00:01ABC-XXC1111 020-08-21 01:00:59
2020-08-21 01:00:02  50 
2020-08-21 02:00:01  50 
2020-08-21 05:00:01ABC-XXC1112 020-08-21 05:30:00
2020-08-21 05:00:02  50 
2020-08-21 05:00:03  50 
2020-08-21 05:00:06  50 

There are obviously blank cells because the supporting events don't have those fields associated with them.

What I want to get to is this. Note that the `finishtime` field is used to exclude some fields in the summing process.

_timecar_regotask_iddata_totalfinishtime
2020-08-21 01:00:01ABC-XXC111150020-08-21 01:00:59
2020-08-21 05:00:01ABC-XXC1112150020-08-21 05:30:00

 

Do you have any pointers on how to achieve this?

0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

Hi @rogueraider 

Given that you seem to know these transactions are 4hrs, can you add another bin and stats to your query?

(index=lighthouse ON_TASK OR OFF_TASK) OR (index=main source=stream:netflow)
| rex "task\s(?<task_id>[0-9]*)" 
| rex "reg\s(?<car_rego>ABC-XX[CDEF])"
| rex "(?<task_mode>O(N|FF)_TASK)" 
| rename sum(bytes_in) as data_in 
| transaction maxspan=4h keeporphans=1 car_rego, task_id startswith="ON_TASK" endswith="OFF_TASK"
| eval finishtime=duration + _time 
| where aircraft_rego="VH-XNE"
| table _time car_rego task_id data_in finishtime
| bin _time span=4h
| stats values(car_rego) as car_rego, values(task_id) as task_id, sum(data_in) as data_total, values(finishtime) as finishtime by _time
0 Karma

rogueraider
Explorer

I finally found a solution!

The key problem I was not grouping the results correctly into the transaction event. To do this I created the `car_group` variable to make the transaction enclose the other events. Once this common grouping had been created I was able to carry out `stats` as normal.

@Richfez  thanks for putting me on the right path.

(index=first ON_TASK OR OFF_TASK) OR (index=second source=stream:netflow src_ip=172.16* OR dest_ip=172.16*) 
| rex "task\s(?<task_id>[0-9]*)"  
| rex "reg\s(?<car_rego>ABC-XX[CDEF])" 
| rename sum(bytes_in) as data_in  
| eval car_group=case(     
    cidrmatch("172.16.1.1/32", src_ip) OR cidrmatch("172.16.1.1/32", dest_ip) OR car_rego="ABC-XXD", "ABC-XXD",
    cidrmatch("172.16.1.2/32", src_ip) OR cidrmatch("172.16.1.2/32", dest_ip) OR car_rego="ABC-XXF", "ABC-XXF",
    cidrmatch("172.16.1.3/32", src_ip) OR cidrmatch("172.16.1.3/32", dest_ip) OR car_rego="ABC-XXC", "ABC-XXC",
    cidrmatch("172.16.1.4/32", src_ip) OR cidrmatch("172.16.1.4/32", dest_ip) OR car_rego="ABC-XXE", "ABC-XXE")
| transaction car_group task_id startswith="ON_TASK" endswith="OFF_TASK" maxevents=-1 maxspan=4h
| eval finishtime=strftime(duration + _time, "%F %H:%M:%S") 
| eval starttime=strftime(_time, "%F %H:%M:%S")
| eval tasked_time=tostring(duration, "duration")
| stats sum(data_in) as bytes_transferred by car_group task_id starttime finishtime tasked_time
| eval megabytes = bytes_transferred * 0.000001
| table car_group task_id starttime finishtime tasked_time megabytes bytes_transferred

 

0 Karma
Get Updates on the Splunk Community!

Infographic provides the TL;DR for the 2024 Splunk Career Impact Report

We’ve been buzzing with excitement about the recent validation of Splunk Education! The 2024 Splunk Career ...

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...