Splunk Search

Need help getting grandparent/parent/child relationships to table

ngperf
Explorer

Hi,

I have data in the following format from Microsoft Windows OS process executions:

FileName,ProcessID,ParentProcessID
child1.exe,126,108
parent1.exe,108,93
grandparent1.exe,93,24
child2.exe,276,92
parent2.exe,92,24
...

As you can see, for example, the process hierarchies here would be as follows:

grandparent1.exe -> parent1.exe -> child1.exe
grandparent1.exe -> parent2.exe -> child2.exe
And there could be many more relationships with various parents and grandparents, as you would expect.

I would like to output these relationships in the following manner:

FileName,ParentFileName,GrandparentFileName
child1.exe,parent1.exe,grandparent1.exe
child2.exe,parent2.exe,grandparent1.exe
...

The limitations here are that, frustratingly, I have no permissions to use lookup tables on the hosted Splunk environment I'm using.

Currently, I can quite easily get the parent information using the following:

| inputcsv dispatch=t procs.csv
| append [
    search event=ProcessExecution earliest=-1y latest=now [
        search event=ProcessExecution FileName="cmd.exe"
        | rename ParentProcessID AS ProcessID
        | outputcsv dispatch=t procs.csv
        | fields ProcessID
    ]
    | rename FileName AS ParentFileName
    | fields ParentFileName
]
| stats values(FileName) as FileName
    values(ParentFileName) as ParentFileName
    by ProcessID

But I'm totally lost on how I would get the grandparent information into this.

I'd like to stay away from using 'join' because I'll sometimes be processing a lot more than 50,000 records. As you can see above, I'm limiting my first subsearch to 'FileName' matching 'cmd.exe" and only querying for the parent processes of those records. This way, the search is efficient and will never hit 50,000.

Any help is much appreciated, thank you.

0 Karma
1 Solution

ngperf
Explorer

I figured it out, it was simpler than I realised.

Aggregate parent & child together, output aggregated results to CSV and then use the parent's ParentProcessID as the ProcessID to query to get the grandparent. Finally, aggregate everything again to join the grandparent into the previous parent/child aggregation.

For each query, you can preserve the original process IDs with a simple eval ThisProcessID=ProcessID and then use them in the final aggregation/table view. You also need to rename the fields for the parent and grandparent queries so they don't get merged into the same field during aggregation, i.e. 'FileName' becomes 'ParentFileName'.

In my use-case I had a lot more fields than below and I needed to zip some values and expand them later to prevent multi-value aggregation, so I've included that too.

See example query below.

| inputcsv dispatch=t child_with_parent_processes_aggregated.csv
| append [
    search event=ProcessExecution earliest=-1y latest=now [
        | inputcsv dispatch=t child_processes.csv
        | append [
            search event=ProcessExecution earliest=-1y latest=now [
                search event=ProcessExecution FileName="cmd.exe"
                | eval ChildProcessID=ProcessID
                | rename ParentProcessID AS ProcessID
                | outputcsv dispatch=t child_processes.csv
                | fields ProcessID
            ]
            | rename FileName AS ParentFileName
        ]
        | eval zipped=mvzip(FileName,ChildProcessID,"!!!!!cpid=")
        | stats values(*) as * by ProcessID
        | mvexpand zipped
        | rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)$"
        | eval ParentProcessID=ProcessID
        | rename ParentProcessID as ProcessID
        | table ParentFileName ParentProcessID FileName ChildProcessID ProcessID
        | outputcsv dispatch=t child_with_parent_processes_aggregated.csv
        | fields TargetProcessId_decimal
    ]
    | rename FileName AS GrandParentFileName
    | eval GrandParentProcessID=ProcessID
]
| eval zipped=mvzip(mvzip(mvzip(FileName,ChildProcessID,"!!!!!cpid="),ParentFileName,"!!!!!pname="),ParentProcessID,"!!!!!ppid=")
| stats values(*) as * by ProcessID
| mvexpand zipped
| rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)!!!!!pname=(?<ParentFileName>.*)!!!!!ppid=(?<ParentProcessID>.*)$"
| rename ChildProcessID as ProcessID
| table GrandParentFileName GrandParentProcessID ParentFileName ParentProcessID FileName ProcessID

View solution in original post

ngperf
Explorer

I figured it out, it was simpler than I realised.

Aggregate parent & child together, output aggregated results to CSV and then use the parent's ParentProcessID as the ProcessID to query to get the grandparent. Finally, aggregate everything again to join the grandparent into the previous parent/child aggregation.

For each query, you can preserve the original process IDs with a simple eval ThisProcessID=ProcessID and then use them in the final aggregation/table view. You also need to rename the fields for the parent and grandparent queries so they don't get merged into the same field during aggregation, i.e. 'FileName' becomes 'ParentFileName'.

In my use-case I had a lot more fields than below and I needed to zip some values and expand them later to prevent multi-value aggregation, so I've included that too.

See example query below.

| inputcsv dispatch=t child_with_parent_processes_aggregated.csv
| append [
    search event=ProcessExecution earliest=-1y latest=now [
        | inputcsv dispatch=t child_processes.csv
        | append [
            search event=ProcessExecution earliest=-1y latest=now [
                search event=ProcessExecution FileName="cmd.exe"
                | eval ChildProcessID=ProcessID
                | rename ParentProcessID AS ProcessID
                | outputcsv dispatch=t child_processes.csv
                | fields ProcessID
            ]
            | rename FileName AS ParentFileName
        ]
        | eval zipped=mvzip(FileName,ChildProcessID,"!!!!!cpid=")
        | stats values(*) as * by ProcessID
        | mvexpand zipped
        | rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)$"
        | eval ParentProcessID=ProcessID
        | rename ParentProcessID as ProcessID
        | table ParentFileName ParentProcessID FileName ChildProcessID ProcessID
        | outputcsv dispatch=t child_with_parent_processes_aggregated.csv
        | fields TargetProcessId_decimal
    ]
    | rename FileName AS GrandParentFileName
    | eval GrandParentProcessID=ProcessID
]
| eval zipped=mvzip(mvzip(mvzip(FileName,ChildProcessID,"!!!!!cpid="),ParentFileName,"!!!!!pname="),ParentProcessID,"!!!!!ppid=")
| stats values(*) as * by ProcessID
| mvexpand zipped
| rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)!!!!!pname=(?<ParentFileName>.*)!!!!!ppid=(?<ParentProcessID>.*)$"
| rename ChildProcessID as ProcessID
| table GrandParentFileName GrandParentProcessID ParentFileName ParentProcessID FileName ProcessID

woodcock
Esteemed Legend

The way to do it is to dump the information to a temporary lookup file using the outputlookup command and then use it multiple times in a row with | lookup like I demonstrate in this Q&A:
https://answers.splunk.com/answers/778755/multiple-joinouter-within-same-search.html#answer-778767

0 Karma

ngperf
Explorer

Unfortunately as I mentioned, I don’t have permission on the hosted instance I’m using to utilise lookup tables. Is there another way to do this?

0 Karma

woodcock
Esteemed Legend

Are you sure? You are unable to use |outputlookup SomeFileNameHere.csv? It is possible to block this but it requires sophisticated extra admin to do so and I've never see an environment locked down that much. Have you tried it?

0 Karma

ngperf
Explorer

I get the following error below when using either outputlookup or outputcsv (without dispatch=t). I believe this is due to the "output_file" permission being disabled in authorize.conf.

Error in 'outputlookup' command: You have insufficient privileges to perform this operation.

0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...