Splunk Search

How to extract json with filter

MirrorCraze
Explorer

I have a data like this.

{ 
   env: prod
   hostprod01
   nameappName
   info: { 
     data: [ ...
     ]

     indicators: [ 
       { 
         details: {  

           A.runTime434
           A.Count0
           B.runTime0
           B.Count0
           ....
        

            }
         nametimeCountIndicator
         statusUP
       }
       { 
         details: { 

           A.downCount: 2
           A.nullCount0
           B.downCount0
           B.nullCount0
           ....
        

          }
         name: downCountIndicator
         statusUP
       }
     ]

     statusDOWN
   }

   metrics: { ...
   }

   ping1

}

I only want to extract fields in info.indicators{}.details ONLY when info.indicators{}.name of that field is "timeCountIndicator". I tried to use spath combined with table, mvexpand and where

... | spath path=info.indicators{} output=indicators | table indicators |mvexpand indicators| where match(indicators,"timeCountIndicator")

It returns a record as a string, however. And it's really hard to convert string back to fields which is hard to process. (Technically extract/rex can deal with it, but it takes a REALLY long time to extract every fields in details when I need only some fields)

Is there any way to deal with this in the easier way?

Labels (2)
0 Karma

dtburrows3
Builder

So there are two ways I can think of parsing this

One using MVExpand

<base_search>
| spath path=info.indicators{} output=indicators 
    | table indicators 
    | eval
        time_count_indicator_json=case(
            isnull(indicators), null(),
            mvcount(indicators)==1, if(spath(indicators, "name")=="timeCountIndicator", 'indicators', null()),
            mvcount(indicators)>1, mvmap(indicators, if(spath(indicators, "name")=="timeCountIndicator", 'indicators', null()))
            )
    | fields - indicators
    
    ``` Method 1 using MVExpand ```
    
    | mvexpand time_count_indicator_json
    | spath input=time_count_indicator_json
    | fields - time_count_indicator_json



dtburrows3_1-1702501969326.png


and another that is parsing an array of json_objects matching your criteria of only events "timeCountIndicator"

<base_search>
| spath path=info.indicators{} output=indicators 
    | table indicators 
    | eval
        time_count_indicator_json=case(
            isnull(indicators), null(),
            mvcount(indicators)==1, if(spath(indicators, "name")=="timeCountIndicator", 'indicators', null()),
            mvcount(indicators)>1, mvmap(indicators, if(spath(indicators, "name")=="timeCountIndicator", 'indicators', null()))
            )
    | fields - indicators
    
    ``` Method 2 parsing MV Field as array of json_objects ```
    
    | eval
        time_count_indicator_json_array="[".mvjoin(time_count_indicator_json, ",")."]"
    | spath input=time_count_indicator_json_array
    | fields - time_count_indicator_json, time_count_indicator_json_array
    | rename
        "{}.*" as *

dtburrows3_2-1702502109411.png


I personally find the mvexpand method to be a much cleaner output to work with.

Method 2 could potentially lead to mvfields being unaligned if any of the json_objects have a null value for that field.

But depend on the use case and data volume you are trying to parse because mvexpand can be memory intensive. 

SPL used to replicate:

| makeresults
    | eval
        _raw="{\"env\": \"prod\", \"host\": \"prod\", \"name\": \"appName\", \"info\": {\"data\": [], \"indicators\": [{\"details\": {\"A.runTime\": 434, \"A.Count\": 0, \"B.runTime\": 0, \"B.Count\": 0}, \"name\": \"timeCountIndicator\", \"status\": \"UP\"}, {\"details\": {\"A.downCount\": 2, \"A.nullCount\": 0, \"B.downCount\": 0, \"B.nullCount\": 0}, \"name\": \"downCountIndicator\", \"status\": \"UP\"}, {\"details\": {\"A.runTime\": 333, \"A.Count\": 2, \"B.runTime\": 21, \"B.Count\": 4}, \"name\": \"timeCountIndicator\", \"status\": \"UP\"}], \"status\": \"DOWN\"}, \"metrics\": {}, \"ping\": 1}"
    | spath path=info.indicators{} output=indicators 
    | table indicators 
    | eval
        time_count_indicator_json=case(
            isnull(indicators), null(),
            mvcount(indicators)==1, if(spath(indicators, "name")=="timeCountIndicator", 'indicators', null()),
            mvcount(indicators)>1, mvmap(indicators, if(spath(indicators, "name")=="timeCountIndicator", 'indicators', null()))
            )
    | fields - indicators
    
    ``` Method 1 using MVExpand ```
    
    | mvexpand time_count_indicator_json
    | spath input=time_count_indicator_json
    | fields - time_count_indicator_json
    
    
    ``` Method 2 parsing MV Field as array of json_objects ```
    ```
    | eval
        time_count_indicator_json_array="[".mvjoin(time_count_indicator_json, ",")."]"
    | spath input=time_count_indicator_json_array
    | fields - time_count_indicator_json, time_count_indicator_json_array
    | rename
        "{}.*" as *
    ```
0 Karma
Get Updates on the Splunk Community!

AppDynamics Summer Webinars

This summer, our mighty AppDynamics team is cooking up some delicious content on YouTube Live to satiate your ...

SOCin’ it to you at Splunk University

Splunk University is expanding its instructor-led learning portfolio with dedicated Security tracks at .conf25 ...

Credit Card Data Protection & PCI Compliance with Splunk Edge Processor

Organizations handling credit card transactions know that PCI DSS compliance is both critical and complex. The ...