Getting Data In

How do we extract multi-value fields from nested JSON?

att35
Builder

Hi,

We are ingesting Azure NSG flow logs and visualizing them using app Microsoft Azure App for Splunk https://splunkbase.splunk.com/app/4882

Data is in JSON format with multiple levels/records in a single event. Each record can have multiple flows, flow tuples etc. Adding few screenshots here to give the context.

Default extractions for the main JSON fields look fine. But when it comes to values within the flow tuple field, i.e. records{}.properties.flows{}.flows{}.flowTuples{}, Splunk only keeps values from the very first entry.

How can I make these src_ip, dest_ip fields also get multiple values(across all records/flow tuples etc)

flowlogs_records.png

 

Splunk extracts values only from that first highlighted entrySplunk extracts values only from that first highlighted entry

Here is the extraction logic from this app. 

 

[extract_tuple]
SOURCE_KEY = records{}.properties.flows{}.flows{}.flowTuples{}
DELIMS = ","
FIELDS = time,src_ip,dst_ip,src_port,dst_port,protocol,traffic_flow,traffic_result

 

 

Thanks,

Labels (4)
Tags (2)
0 Karma
1 Solution

dtburrows3
Builder

yea unfortunately mvexpand can be memory intensive. 

I would say limit your fieldset as much as possible before using it and see if that helps.

It actually may work to just do a,

 

<base_search>
    | stats count by "records{}.properties.flows{}.flows{}.flowTuples{}"
    | eval 
        time=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0),
        src_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1),
        dst_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2),
        src_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3),
        dst_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4),
        protocol=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5),
        traffic_flow=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6),
        traffic_result=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7)
    | stats
        sum(count) as total
            by src_ip, dst_ip

 




this should tally up all the individual flow_tuples from events and then we can eval to split it out and then sum it all up by src, dest IP.

I think this get around the need for an MVexpand.

Let me know if that works!

View solution in original post

dtburrows3
Builder

You can give these evals a go. I would check and make sure you are getting everything properly as expected. 

I don't have access to any sourcetype="mscs:nsg:flow" data at the moment so I just am using simulated data based off of your screenshots.

If you are happy with the output then you could add them as calculated fields in local/props.conf (I would make sure that they don't step on any existing knowledge object in the app though)

| eval time=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0))), 'time')
    | eval src_ip=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1))), 'src_ip')
    | eval dst_ip=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2))), 'dst_ip')
    | eval src_port=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3))), 'src_port')
    | eval dst_port=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4))), 'dst_port')
    | eval protocol=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5))), 'protocol')
    | eval traffic_flow=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6))), 'traffic_flow')
    | eval traffic_result=if(isnotnull('records{}.properties.flows{}.flows{}.flowTuples{}'), case(mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')==1, mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7), mvcount('records{}.properties.flows{}.flows{}.flowTuples{}')>1, mvmap('records{}.properties.flows{}.flows{}.flowTuples{}', mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7))), 'traffic_result')

 
Also, not sure if there are ever events formatted slightly differently because only a single flow occurred and it would no longer be an array in the json event, therefore changing the overall extracted field name to something like "records{}.properties.flows{}.flows.flowTuples{}". From the look at the microsoft_azure app configs, it looks like its only every referencing "records{}.properties.flows{}.flows{}.flowTuples{}" for it's extractions so I just made the assumption that events will be formatted this way.

0 Karma

att35
Builder

@dtburrows3 

Thank you for the reply. 

Tried these eval and the fields are getting extracted from the tuples, but it seems the association between them is lost. 

For this one event, there are total 17 tuples. But after applying evals, resulting stats shows several other combinations between src_ip & dst_ip.

Stats for field records{}.properties.flows{}.flows{}.flowTuples{}

flow_tuples.png

stats on src_ip,dst_ip after applying eval

flow_tuple_afterextraction.png

0 Karma

dtburrows3
Builder

To retain the associations for any sort of analysis you may need to mvexpand the "records{}.properties.flows{}.flows{}.flowTuples{}" field itself.

stats aggregation using 2 multivalued fields as by-fields can be misleading for the final output.

Below is a table of the event you shared on the initial post after using the mvexpand and then extracting out the individual fields after.

dtburrows3_0-1702948883195.png

SPL to do this 

 

 

 

 

| mvexpand "records{}.properties.flows{}.flows{}.flowTuples{}"
    | eval 
        time=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0),
        src_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1),
        dest_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2),
        src_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3),
        dest_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4),
        protocol=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5),
        traffic_flow=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6),
        traffic_result=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7)

 

 

Doing a stats count by src_ip and dst_ip should make more sense using the data formatted in this way.

 

 

0 Karma

att35
Builder

Thanks @dtburrows3 

This method worked perfectly. Able to extract the required fields while still keeping associations intact. 

although running this at scale, I am getting the following message. 

 command.mvexpand: output will be truncated at 2200 results due to excessive memory usage. Memory threshold of 500MB as configured in limits.conf / [mvexpand] / max_mem_usage_mb has been reached.

Are there any alternatives to mvexpand that would avoid these memory issues? 

0 Karma

dtburrows3
Builder

yea unfortunately mvexpand can be memory intensive. 

I would say limit your fieldset as much as possible before using it and see if that helps.

It actually may work to just do a,

 

<base_search>
    | stats count by "records{}.properties.flows{}.flows{}.flowTuples{}"
    | eval 
        time=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 0),
        src_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 1),
        dst_ip=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 2),
        src_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 3),
        dst_port=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 4),
        protocol=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 5),
        traffic_flow=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 6),
        traffic_result=mvindex(split('records{}.properties.flows{}.flows{}.flowTuples{}', ","), 7)
    | stats
        sum(count) as total
            by src_ip, dst_ip

 




this should tally up all the individual flow_tuples from events and then we can eval to split it out and then sum it all up by src, dest IP.

I think this get around the need for an MVexpand.

Let me know if that works!

att35
Builder

@dtburrows3 

 

Thank you!!

This worked perfectly. No memory issues either.

Do you know if there is a way to apply these using props/transforms or are these strictly in-line search time transformations?

0 Karma
Get Updates on the Splunk Community!

Join Us for Splunk University and Get Your Bootcamp Game On!

If you know, you know! Splunk University is the vibe this summer so register today for bootcamps galore ...

.conf24 | Learning Tracks for Security, Observability, Platform, and Developers!

.conf24 is taking place at The Venetian in Las Vegas from June 11 - 14. Continue reading to learn about the ...

Announcing Scheduled Export GA for Dashboard Studio

We're excited to announce the general availability of Scheduled Export for Dashboard Studio. Starting in ...