Hi,
I have data in the following format from Microsoft Windows OS process executions:
FileName,ProcessID,ParentProcessID
child1.exe,126,108
parent1.exe,108,93
grandparent1.exe,93,24
child2.exe,276,92
parent2.exe,92,24
...
As you can see, for example, the process hierarchies here would be as follows:
grandparent1.exe -> parent1.exe -> child1.exe
grandparent1.exe -> parent2.exe -> child2.exe
And there could be many more relationships with various parents and grandparents, as you would expect.
I would like to output these relationships in the following manner:
FileName,ParentFileName,GrandparentFileName
child1.exe,parent1.exe,grandparent1.exe
child2.exe,parent2.exe,grandparent1.exe
...
The limitations here are that, frustratingly, I have no permissions to use lookup tables on the hosted Splunk environment I'm using.
Currently, I can quite easily get the parent information using the following:
| inputcsv dispatch=t procs.csv
| append [
search event=ProcessExecution earliest=-1y latest=now [
search event=ProcessExecution FileName="cmd.exe"
| rename ParentProcessID AS ProcessID
| outputcsv dispatch=t procs.csv
| fields ProcessID
]
| rename FileName AS ParentFileName
| fields ParentFileName
]
| stats values(FileName) as FileName
values(ParentFileName) as ParentFileName
by ProcessID
But I'm totally lost on how I would get the grandparent information into this.
I'd like to stay away from using 'join' because I'll sometimes be processing a lot more than 50,000 records. As you can see above, I'm limiting my first subsearch to 'FileName' matching 'cmd.exe" and only querying for the parent processes of those records. This way, the search is efficient and will never hit 50,000.
Any help is much appreciated, thank you.
I figured it out, it was simpler than I realised.
Aggregate parent & child together, output aggregated results to CSV and then use the parent's ParentProcessID as the ProcessID to query to get the grandparent. Finally, aggregate everything again to join the grandparent into the previous parent/child aggregation.
For each query, you can preserve the original process IDs with a simple eval ThisProcessID=ProcessID
and then use them in the final aggregation/table view. You also need to rename the fields for the parent and grandparent queries so they don't get merged into the same field during aggregation, i.e. 'FileName' becomes 'ParentFileName'.
In my use-case I had a lot more fields than below and I needed to zip some values and expand them later to prevent multi-value aggregation, so I've included that too.
See example query below.
| inputcsv dispatch=t child_with_parent_processes_aggregated.csv
| append [
search event=ProcessExecution earliest=-1y latest=now [
| inputcsv dispatch=t child_processes.csv
| append [
search event=ProcessExecution earliest=-1y latest=now [
search event=ProcessExecution FileName="cmd.exe"
| eval ChildProcessID=ProcessID
| rename ParentProcessID AS ProcessID
| outputcsv dispatch=t child_processes.csv
| fields ProcessID
]
| rename FileName AS ParentFileName
]
| eval zipped=mvzip(FileName,ChildProcessID,"!!!!!cpid=")
| stats values(*) as * by ProcessID
| mvexpand zipped
| rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)$"
| eval ParentProcessID=ProcessID
| rename ParentProcessID as ProcessID
| table ParentFileName ParentProcessID FileName ChildProcessID ProcessID
| outputcsv dispatch=t child_with_parent_processes_aggregated.csv
| fields TargetProcessId_decimal
]
| rename FileName AS GrandParentFileName
| eval GrandParentProcessID=ProcessID
]
| eval zipped=mvzip(mvzip(mvzip(FileName,ChildProcessID,"!!!!!cpid="),ParentFileName,"!!!!!pname="),ParentProcessID,"!!!!!ppid=")
| stats values(*) as * by ProcessID
| mvexpand zipped
| rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)!!!!!pname=(?<ParentFileName>.*)!!!!!ppid=(?<ParentProcessID>.*)$"
| rename ChildProcessID as ProcessID
| table GrandParentFileName GrandParentProcessID ParentFileName ParentProcessID FileName ProcessID
I figured it out, it was simpler than I realised.
Aggregate parent & child together, output aggregated results to CSV and then use the parent's ParentProcessID as the ProcessID to query to get the grandparent. Finally, aggregate everything again to join the grandparent into the previous parent/child aggregation.
For each query, you can preserve the original process IDs with a simple eval ThisProcessID=ProcessID
and then use them in the final aggregation/table view. You also need to rename the fields for the parent and grandparent queries so they don't get merged into the same field during aggregation, i.e. 'FileName' becomes 'ParentFileName'.
In my use-case I had a lot more fields than below and I needed to zip some values and expand them later to prevent multi-value aggregation, so I've included that too.
See example query below.
| inputcsv dispatch=t child_with_parent_processes_aggregated.csv
| append [
search event=ProcessExecution earliest=-1y latest=now [
| inputcsv dispatch=t child_processes.csv
| append [
search event=ProcessExecution earliest=-1y latest=now [
search event=ProcessExecution FileName="cmd.exe"
| eval ChildProcessID=ProcessID
| rename ParentProcessID AS ProcessID
| outputcsv dispatch=t child_processes.csv
| fields ProcessID
]
| rename FileName AS ParentFileName
]
| eval zipped=mvzip(FileName,ChildProcessID,"!!!!!cpid=")
| stats values(*) as * by ProcessID
| mvexpand zipped
| rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)$"
| eval ParentProcessID=ProcessID
| rename ParentProcessID as ProcessID
| table ParentFileName ParentProcessID FileName ChildProcessID ProcessID
| outputcsv dispatch=t child_with_parent_processes_aggregated.csv
| fields TargetProcessId_decimal
]
| rename FileName AS GrandParentFileName
| eval GrandParentProcessID=ProcessID
]
| eval zipped=mvzip(mvzip(mvzip(FileName,ChildProcessID,"!!!!!cpid="),ParentFileName,"!!!!!pname="),ParentProcessID,"!!!!!ppid=")
| stats values(*) as * by ProcessID
| mvexpand zipped
| rex field=zipped "^(?<FileName>.*)!!!!!cpid=(?<ChildProcessID>.*)!!!!!pname=(?<ParentFileName>.*)!!!!!ppid=(?<ParentProcessID>.*)$"
| rename ChildProcessID as ProcessID
| table GrandParentFileName GrandParentProcessID ParentFileName ParentProcessID FileName ProcessID
The way to do it is to dump the information to a temporary lookup file
using the outputlookup
command and then use it multiple times in a row with | lookup
like I demonstrate in this Q&A:
https://answers.splunk.com/answers/778755/multiple-joinouter-within-same-search.html#answer-778767
Unfortunately as I mentioned, I don’t have permission on the hosted instance I’m using to utilise lookup tables. Is there another way to do this?
Are you sure? You are unable to use |outputlookup SomeFileNameHere.csv
? It is possible to block this but it requires sophisticated extra admin to do so and I've never see an environment locked down that much. Have you tried it?
I get the following error below when using either outputlookup
or outputcsv
(without dispatch=t
). I believe this is due to the "output_file" permission being disabled in authorize.conf
.
Error in 'outputlookup' command: You have insufficient privileges to perform this operation.