Hi,
We are ingesting Azure NSG flow logs and visualizing them using app Microsoft Azure App for Splunk https://splunkbase.splunk.com/app/4882
Data is in JSON format with multiple levels/records in a single event. Each record can have multiple flows, flow tuples etc. Adding few screenshots here to give the context.
Default extractions for the main JSON fields look fine. But when it comes to values within the flow tuple field, i.e. records{}.properties.flows{}.flows{}.flowTuples{}, Splunk only keeps values from the very first entry.
How can I make these src_ip, dest_ip fields also get multiple values(across all records/flow tuples etc)
Here is the extraction logic from this app.
[extract_tuple]
SOURCE_KEY = records{}.properties.flows{}.flows{}.flowTuples{}
DELIMS = ","
FIELDS = time,src_ip,dst_ip,src_port,dst_port,protocol,traffic_flow,traffic_result
Thanks,
yea unfortunately mvexpand can be memory intensive.
I would say limit your fieldset as much as possible before using it and see if that helps.
It actually may work to just do a,
<base_search>
| stats count by "records{}.properties.flows{}.flows{}.flowTuples{}"
| eval
time=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0),
src_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1),
dst_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2),
src_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3),
dst_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4),
protocol=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5),
traffic_flow=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6),
traffic_result=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7)
| stats
sum(count) as total
by src_ip, dst_ip
this should tally up all the individual flow_tuples from events and then we can eval to split it out and then sum it all up by src, dest IP.
I think this get around the need for an MVexpand.
Let me know if that works!
You can give these evals a go. I would check and make sure you are getting everything properly as expected.
I don't have access to any sourcetype="mscs:nsg:flow" data at the moment so I just am using simulated data based off of your screenshots.
If you are happy with the output then you could add them as calculated fields in local/props.conf (I would make sure that they don't step on any existing knowledge object in the app though)
| eval time=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0))), 'time')
| eval src_ip=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1))), 'src_ip')
| eval dst_ip=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2))), 'dst_ip')
| eval src_port=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3))), 'src_port')
| eval dst_port=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4))), 'dst_port')
| eval protocol=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5))), 'protocol')
| eval traffic_flow=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6))), 'traffic_flow')
| eval traffic_result=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7))), 'traffic_result')
Also, not sure if there are ever events formatted slightly differently because only a single flow occurred and it would no longer be an array in the json event, therefore changing the overall extracted field name to something like "records{}.properties.flows{}.flows.flowTuples{}". From the look at the microsoft_azure app configs, it looks like its only every referencing "records{}.properties.flows{}.flows{}.flowTuples{}" for it's extractions so I just made the assumption that events will be formatted this way.
Thank you for the reply.
Tried these eval and the fields are getting extracted from the tuples, but it seems the association between them is lost.
For this one event, there are total 17 tuples. But after applying evals, resulting stats shows several other combinations between src_ip & dst_ip.
Stats for field records{}.properties.flows{}.flows{}.flowTuples{}
stats on src_ip,dst_ip after applying eval
To retain the associations for any sort of analysis you may need to mvexpand the "records{}.properties.flows{}.flows{}.flowTuples{}" field itself.
stats aggregation using 2 multivalued fields as by-fields can be misleading for the final output.
Below is a table of the event you shared on the initial post after using the mvexpand and then extracting out the individual fields after.
SPL to do this
| mvexpand "records{}.properties.flows{}.flows{}.flowTuples{}"
| eval
time=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0),
src_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1),
dest_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2),
src_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3),
dest_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4),
protocol=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5),
traffic_flow=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6),
traffic_result=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7)
Doing a stats count by src_ip and dst_ip should make more sense using the data formatted in this way.
Thanks @dtburrows3
This method worked perfectly. Able to extract the required fields while still keeping associations intact.
although running this at scale, I am getting the following message.
command.mvexpand: output will be truncated at 2200 results due to excessive memory usage. Memory threshold of 500MB as configured in limits.conf / [mvexpand] / max_mem_usage_mb has been reached.
Are there any alternatives to mvexpand that would avoid these memory issues?
yea unfortunately mvexpand can be memory intensive.
I would say limit your fieldset as much as possible before using it and see if that helps.
It actually may work to just do a,
<base_search>
| stats count by "records{}.properties.flows{}.flows{}.flowTuples{}"
| eval
time=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0),
src_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1),
dst_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2),
src_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3),
dst_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4),
protocol=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5),
traffic_flow=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6),
traffic_result=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7)
| stats
sum(count) as total
by src_ip, dst_ip
this should tally up all the individual flow_tuples from events and then we can eval to split it out and then sum it all up by src, dest IP.
I think this get around the need for an MVexpand.
Let me know if that works!
Thank you!!
This worked perfectly. No memory issues either.
Do you know if there is a way to apply these using props/transforms or are these strictly in-line search time transformations?