All Apps and Add-ons
Highlighted

Dealing with complex variable level / depth JSON arrays

Communicator

Hello there,

I am kind of stuck trying to handle JSON data with multiple and variable parts.

The event looks like this.

{
    "timestamp": 1558527739980,
    "correlationId": "xyz",
    "circuitPath": [{
        "policy": "policy xyz",
        "execTime": 1983,
        "filters": [{
            "name": "set_context",
            "status": "Pass",
            "filterTime": 1558527737995,
            "execTime": 0
        }, {
            "name": "request_processing",
            "status": "Pass",
            "filterTime": 1558527737995,
            "execTime": 0,
            "subPaths": [{
                "policy": "do_nothing",
                "execTime": 0,
                "filters": [{
                    "name": "True Filter",
                    "status": "Pass",
                    "filterTime": 1558527737995,
                    "execTime": 0
                }]
            }]
        }, {
            "name": "connect_to_url",
            "status": "Pass",
            "filterTime": 1558527739978,
            "execTime": 1983
        }]
    }]
}

I am trying to build a simple table that would reproduce the tree aspect of the data :

policy                          status          exec_time   filter_time
policy xyz                      pass            1983            
_set_context                    pass            0           1558527737995
_request_processing             pass            0           1558527737995
___do_nothing                   pass            0           1558527737995
_connect_to_url                 pass            0           1558527739978

Seems pretty straightforward at first, but since everything is variable here, I have only been capable of building a too complex query which still contains some 'static' parts

{
    "timestamp": 1559054841705,
    "correlationId": "abc",
    "circuitPath": [{
        "policy": "policy a",
        "execTime": 4,
        "filters": [{
            "name": "hello",
            "status": "Pass",
            "filterTime": 1559054840171,
            "execTime": 4
        }]
    }, {
        "policy": "policy b",
        "execTime": 1533,
        "filters": [{
            "name": "set_context",
            "status": "Pass",
            "filterTime": 1559054840171,
            "execTime": 0
        }, {
            "name": "request_processing",
            "status": "Pass",
            "filterTime": 1559054840176,
            "execTime": 5,
            "subPaths": [{
                "policy": "policy c",
                "execTime": 5,
                "filters": [{
                    "name": "call_module_a",
                    "status": "Pass",
                    "filterTime": 1559054840176,
                    "execTime": 5,
                    "subPaths": [{
                        "policy": "extract",
                        "execTime": 5,
                        "filters": [{
                            "name": "security",
                            "status": "Pass",
                            "filterTime": 1559054840176,
                            "execTime": 5,
                            "subPaths": [{
                                "policy": "security",
                                "execTime": 5,
                                "filters": [{
                                    "name": "params",
                                    "status": "Pass",
                                    "filterTime": 1559054840171,
                                    "execTime": 0
                                }, {
                                    "name": "more_params",
                                    "status": "Pass",
                                    "filterTime": 1559054840176,
                                    "execTime": 0
                                }]
                            }]
                        }, {
                            "name": "append",
                            "status": "Pass",
                            "filterTime": 1559054840176,
                            "execTime": 0
                        }]
                    }]
                }, {
                    "name": "authorization",
                    "status": "Pass",
                    "filterTime": 1559054840176,
                    "execTime": 0
                }]
            }]
        }, {
            "name": "cnnect_to_url",
            "status": "Pass",
            "filterTime": 1559054841704,
            "execTime": 1528
        }]
    }]
}

The complex search looks like this :

index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*"
| table circuitPath.policy
| rename circuitPath.policy as policy
| mvexpand policy
| streamstats count as sort_policy
| append [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | table circuitPath.policy circuitPath.filters.name circuitPath.filters.status circuitPath.filters.execTime circuitPath.filters.filterTime | rename circuitPath.policy as policy circuitPath.filters.name as name circuitPath.filters.status as status circuitPath.filters.execTime as execution_time circuitPath.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(name)) | mvexpand count | eval policy=mvindex(policy,count), name=mvindex(name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | fields - count | streamstats count(eval(isstr(policy))) as sort_policy, count as sort_name, last(policy) as policy]
| sort + sort_policy
| append [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | table circuitPath.policy circuitPath.filters.name circuitPath.filters.status circuitPath.filters.execTime circuitPath.filters.filterTime | rename circuitPath.policy as policy circuitPath.filters.name as name circuitPath.filters.status as status circuitPath.filters.execTime as execution_time circuitPath.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(name)) | mvexpand count | eval policy=mvindex(policy,count), name=mvindex(name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(policy) as policy | fields name policy | join name [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | rex max_match=0 "(?<filters>\{\s\"name.+?\})" | table filters | mvexpand filters | spath input=filters | streamstats count as sort_name | fields name subPaths{}.policy sort_name | rename subPaths{}.policy as sub_policy name as name] | streamstats count as sort_policy by policy | search sub_policy="*" | streamstats count as sort_sub_policy]
| fields policy name sub_policy status execution_time filter_time sort_policy sort_name sort_sub_policy
| append [search index="main" sourcetype="test" correlationId="xyz" circuitPath.filters.name=* | table circuitPath.filters.subPaths.policy circuitPath.filters.subPaths.filters.name circuitPath.filters.subPaths.filters.status circuitPath.filters.subPaths.filters.execTime circuitPath.filters.subPaths.filters.filterTime | rename circuitPath.filters.subPaths.policy as sub_policy circuitPath.filters.subPaths.filters.name as sub_name circuitPath.filters.subPaths.filters.status as status circuitPath.filters.subPaths.filters.execTime as execution_time circuitPath.filters.subPaths.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(sub_name)) | mvexpand count | eval sub_policy=mvindex(sub_policy,count), sub_name=mvindex(sub_name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(sub_policy) as sub_policy | fields - count | streamstats count as sort_sub_name | join sub_policy [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | table circuitPath.policy circuitPath.filters.name circuitPath.filters.status circuitPath.filters.execTime circuitPath.filters.filterTime | rename circuitPath.policy as policy circuitPath.filters.name as name circuitPath.filters.status as status circuitPath.filters.execTime as execution_time circuitPath.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(name)) | mvexpand count | eval policy=mvindex(policy,count), name=mvindex(name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(policy) as policy | fields name policy | join name [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | rex max_match=0 "(?<filters>\{\s\"name.+?\})" | table filters | mvexpand filters | spath input=filters | streamstats count as sort_name | fields name subPaths{}.policy sort_name | rename subPaths{}.policy as sub_policy name as name] | streamstats count as sort_policy by policy | search sub_policy="*" | streamstats count as sort_sub_policy]]
| append [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | rex max_match=0 "(?<filters>\{\s\"name.+?\})" | table filters | mvexpand filters | spath input=filters | search subPaths{}.filters{}.name="*" | table subPaths{}.filters{}.name subPaths{}.filters{}.subPaths{}.policy | rename subPaths{}.filters{}.name as sub_name subPaths{}.filters{}.subPaths{}.policy as sub_path_policy | streamstats count as sort_sub_path_policy | join sub_name [search index="main" sourcetype="test" correlationId="xyz" circuitPath.filters.name=* | table circuitPath.filters.subPaths.policy circuitPath.filters.subPaths.filters.name circuitPath.filters.subPaths.filters.status circuitPath.filters.subPaths.filters.execTime circuitPath.filters.subPaths.filters.filterTime | rename circuitPath.filters.subPaths.policy as sub_policy circuitPath.filters.subPaths.filters.name as sub_name circuitPath.filters.subPaths.filters.status as status circuitPath.filters.subPaths.filters.execTime as execution_time circuitPath.filters.subPaths.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(sub_name)) | mvexpand count | eval sub_policy=mvindex(sub_policy,count), sub_name=mvindex(sub_name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(sub_policy) as sub_policy | fields - count | streamstats count as sort_sub_name | join sub_policy [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | table circuitPath.policy circuitPath.filters.name circuitPath.filters.status circuitPath.filters.execTime circuitPath.filters.filterTime | rename circuitPath.policy as policy circuitPath.filters.name as name circuitPath.filters.status as status circuitPath.filters.execTime as execution_time circuitPath.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(name)) | mvexpand count | eval policy=mvindex(policy,count), name=mvindex(name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(policy) as policy | fields name policy | join name [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | rex max_match=0 "(?<filters>\{\s\"name.+?\})" | table filters | mvexpand filters | spath input=filters | streamstats count as sort_name | fields name subPaths{}.policy sort_name | rename subPaths{}.policy as sub_policy name as name] | streamstats count as sort_policy by policy | search sub_policy="*" | streamstats count as sort_sub_policy] | fields policy name sub_policy sub_name sort*]]
| append [search index="main" sourcetype="test" correlationId="xyz" circuitPath.filters.name=* | table circuitPath.filters.subPaths.filters.subPaths.policy circuitPath.filters.subPaths.filters.subPaths.filters.name circuitPath.filters.subPaths.filters.subPaths.filters.status circuitPath.filters.subPaths.filters.subPaths.filters.execTime circuitPath.filters.subPaths.filters.subPaths.filters.filterTime | rename circuitPath.filters.subPaths.filters.subPaths.policy as sub_path_policy circuitPath.filters.subPaths.filters.subPaths.filters.name as sub_path_name circuitPath.filters.subPaths.filters.subPaths.filters.status as status circuitPath.filters.subPaths.filters.subPaths.filters.execTime as execution_time circuitPath.filters.subPaths.filters.subPaths.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(sub_path_name)) | mvexpand count | eval sub_path_policy=mvindex(sub_path_policy,count), sub_path_name=mvindex(sub_path_name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(sub_path_policy) as sub_path_policy | fields - count | streamstats count as sort_sub_path_name | join sub_path_policy [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | rex max_match=0 "(?<filters>\{\s\"name.+?\})" | table filters | mvexpand filters | spath input=filters | search subPaths{}.filters{}.name="*" | table subPaths{}.filters{}.name subPaths{}.filters{}.subPaths{}.policy | rename subPaths{}.filters{}.name as sub_name subPaths{}.filters{}.subPaths{}.policy as sub_path_policy | streamstats count as sort_sub_path_policy | join sub_name [search index="main" sourcetype="test" correlationId="xyz" circuitPath.filters.name=* | table circuitPath.filters.subPaths.policy circuitPath.filters.subPaths.filters.name circuitPath.filters.subPaths.filters.status circuitPath.filters.subPaths.filters.execTime circuitPath.filters.subPaths.filters.filterTime | rename circuitPath.filters.subPaths.policy as sub_policy circuitPath.filters.subPaths.filters.name as sub_name circuitPath.filters.subPaths.filters.status as status circuitPath.filters.subPaths.filters.execTime as execution_time circuitPath.filters.subPaths.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(sub_name)) | mvexpand count | eval sub_policy=mvindex(sub_policy,count), sub_name=mvindex(sub_name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(sub_policy) as sub_policy | fields - count | streamstats count as sort_sub_name | join sub_policy [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | table circuitPath.policy circuitPath.filters.name circuitPath.filters.status circuitPath.filters.execTime circuitPath.filters.filterTime | rename circuitPath.policy as policy circuitPath.filters.name as name circuitPath.filters.status as status circuitPath.filters.execTime as execution_time circuitPath.filters.filterTime as filter_time | eval count=mvrange(0,mvcount(name)) | mvexpand count | eval policy=mvindex(policy,count), name=mvindex(name,count), status=mvindex(status,count), execution_time=mvindex(execution_time,count), filter_time=mvindex(filter_time,count) | streamstats last(policy) as policy | fields name policy | join name [search index="main" sourcetype="test" correlationId="xyz" circuitPath.policy="*" | rex max_match=0 "(?<filters>\{\s\"name.+?\})" | table filters | mvexpand filters | spath input=filters | streamstats count as sort_name | fields name subPaths{}.policy sort_name | rename subPaths{}.policy as sub_policy name as name] | streamstats count as sort_policy by policy | search sub_policy="*" | streamstats count as sort_sub_policy] | fields policy name sub_policy sub_name sort*]]]
| fields policy name sub_policy sub_name sub_path_policy sub_path_name status execution_time filter_time sort_policy sort_name sort_sub_policy sort_sub_name sort_sub_path_policy sort_sub_path_name
| fillnull sort_name sort_sub_policy sort_sub_name sort_sub_path_policy sort_sub_path_name
| eval sort_merge=sort_policy.".".sort_name.".".sort_sub_policy.".".sort_sub_name.".".sort_sub_path_policy.".".sort_sub_path_name
| sort + sort_merge
| eval policy=case(isstr(sub_path_name),"__________".sub_path_name,isstr(sub_path_policy),"________".sub_path_policy,isstr(sub_name),"______".sub_name,isstr(sub_policy),"____".sub_policy,isstr(name),"__".name,1=1,policy)
| fields policy status execution_time filter_time
| eval filter_time=strptime(filter_time, "%s%3N")
| eval filter_time=strftime(filter_time,"%d %b %Y, %H:%M:%S.%3N")
| rename policy as filter execution_time as "execution time" filter_time as time
| eval filter=if(isnull(status),filter.":",filter)

It did work on another examples but not on these two.

I am not even trying to fix it. See I have tried multiple ways around this but got stuck somewhere on the way each time so I am rather looking a simple way I did not think of, a fresh look.

I have tried a bit to play with the jmespath add-on which seems interesting, but without success so far.

Any help is appreciated!

0 Karma
Highlighted

Re: Dealing with complex variable level / depth JSON arrays

SplunkTrust
SplunkTrust

have you looked at the spath command ?
https://docs.splunk.com/Documentation/Splunk/7.2.6/SearchReference/Spath
I didn't read all the query you're working with but it looks way too complicated...(in a bad way)

0 Karma
Highlighted

Re: Dealing with complex variable level / depth JSON arrays

Communicator

Thanks David!

I did know spath but thought I did not need it since data was all correctly extracted, and I have found a way using it 🙂 !

Something like

index=...
| spath output=level1 path=circuitPath{}
| fields level
1
| mvexpand level1
| rex field=level
1 "^{\s\"policy\":\s\"(?.+?)\""
| fields policy
| append [search index=... | spath output=level_2 path=...
...

Thanks!

0 Karma