I have an alert which filters process creation Windows logs. I'm attempting to add the grandparent process and command line to those logs. I've been mostly successful in this part by utilizing a lookup table to temporarily store all the event logs prior to filtering and then calling the table again via the lookup command once the events have been filtered down.
This is what the search looks like roughly
index=winEvents EventCode=4688
| eval user=coalesce(User,Account_Name)
| eval Parent_Image=coalesce(ParentImage, Creator_Process_Name)
| eval ParentPID=coalesce(ParentProcessId, tonumber(Creator_Process_ID, 16))
| eval ProcessPID=coalesce(ProcessId, tonumber(New_Process_ID, 16))
| eval Process_Image=coalesce(Image,New_Process_Name)
| eval command=coalesce(CommandLine, Process_Command_Line)
| eval processInfo = 'Parent_Image' + "," + 'ParentPID' + "," + 'command' + "," + '_time'
| table *
| outputlookup temp_table.csv create_empty=true allow_updates=false
<<<<<<Filtering>>>>>>
| lookup temp_table.csv ComputerName AS ComputerName ProcessPID AS ParentPID Process_Image AS Parent_Image OUTPUT Parent_Image AS GrandParent_Image ParentPID AS GrandParentPID command AS Parent_CommandLine _time AS grand_time
| table *
This mostly works great, however, there's a flaw. The alert runs over a time span of a single hour, so the lookup table has all the process creation events for that time period, and when the lookup command goes into the table and searches the events which match the ComputerName, ProcessPID, and ParentPID for an event, it occasionally finds *multiple* events rather than a single one. Because of that, I can't be sure which is the actual grandparent process without checking.
I've been trying to fix this by leveraging the _time field(outputted as grand_time) for each event in the lookup table and matching it against the _time for each event in the search. The correct parent should be the one closet to zero when subtracting grand_time from the _time in each event of the search.
I haven't been able to get the logic to work, though. I know it's possible, I just haven't been able to wrap my mind around it.
I've also looked into seeing if I could make a lookup definition for the lookup table and leverage some kind of filtering on that end, but I'm not sure if that'd work either. It'd be a lot easier if the lookup command allowed you to do some form of filtering like inputlookup, but that doesn't seem to be the case.
Apologies it took so long to get back to this question. Thank you both for your enlightening responses. Fortunately, after reading through them, I managed to come across a working solution.
| eval Parent_User=coalesce(ParentUser,null())
| eval user=coalesce(User,Account_Name)
| eval parent_image=coalesce(ParentImage, Creator_Process_Name)
| eval ParentCMD=coalesce(ParentCommandLine, null())
| eval parent_pid=coalesce(ParentProcessId, tonumber(Creator_Process_ID, 16))
| eval process_pid=coalesce(ProcessId, tonumber(New_Process_ID, 16))
| eval process_image=coalesce(Image,New_Process_Name)
| eval command=coalesce(CommandLine, Process_Command_Line, script_content)
| eval processInfo = '_time' + "|,|" + 'parent_image' + "|,|" + 'parent_pid' + "|,|" + 'command'
| convert timeformat="%F %T" ctime(_time) AS time
| table *
| outputlookup tempEvents.csv create_empty=true allow_updates=false output_format=splunk_mv_csv
I already posted this above, but ultimately, all I'm doing is sending all my events to a lookup table before filtering it down. The important part is the processInfo field which a basic concatination of the above fields and split by a unique delimitor. The lookup overwrites itself each run.
| lookup tempEvents.csv ComputerName AS ComputerName process_pid AS parent_pid process_image AS parent_image OUTPUT processInfo AS grand_processInfo _time AS grand_time
| eval min_grand_time = mvindex(mvsort(mvmap(grand_time, _time - grand_time)), 0)
| eval grand_processInfo = mvdedup(grand_processInfo)
| eval grand_processInfo = mvappend(grand_processInfo, "")
| eval true_grandparent=null()
| foreach mode=multivalue grand_processInfo [ eval split_mv = split(<<ITEM>>, "|,|"), true_grandparent = if((_time - tonumber(mvindex(split_mv, 0))) = min_grand_time, mvappend(true_grandparent, tostring(<<ITEM>>)), true_grandparent)]
| rex field=true_grandparent "^\d+\|,\|(?<grandparent_image>.+?)\|,\|(?<grandparent_pid>.+?)\|,\|(?<parent_commandline>.+$)"
Here, the lookup command re-adds events from the lookup table which match the ComputerName to the lookup's ComputerName fields, the process_id as the lookup's parent_id, and the process_image as the lookup's parent_image.
The original issue was caused due to the fact that the lookup command would add multiple events where there should idealy only be a single event which matches for all three of those fields(ComputerName, process_id, and process_image). This is just a consequences of how Windows uses process_ids. They're only unique for as a long as a process is open. As soon as a process is ended, its process id can be recycled.
As such, my solution hinged on comparing the _time field for the search events and the lookup events.
In the first line after the lookup command, I declare a new field called min_grand_time and use mvmap to itterate over the grand_time field from the lookup table. I subtract each value in grand_time from the current search event's _time field to get a positive integer(time doesn't move backwards, after all). The resulting mv field is then sorted using mvsort(this isn't actually a sort based on number values, but it works out regardless). After the sort, I can use mvindex to return the value at the first index to get the value closest to 0.
The next line is a dedup which I noticed was needed in grand_processInfo. I only realized after I added the dedup that the extra events are caused by me querying both sysmon and windows_event logs(a log from each exists for each process created on a system). I'll adjust the seach later by changing out the first table command(just prior to the initial outputlookup) and using stats to filter it down before sending it to the lookup table.
After the dedup, grand_processInfo is single value field......however, I need it register as a mv field for the purpose of the following foreach command. To do so without actually adding anything, I use mvappend to add an empty string.
I then create new field called true_grandparent for use in the foreach command. I may not need to declare a field prior to using it in a foreach, but I did so anyway.
The foreach is where the magic happens which I realized I could use from ITWhisperer(thank you for that). Using it, I itterate over the grand_processInfo field(which is why I needed to use mvappend earlier in the case that grand_processInfo was a single value field). In the loop, I declare a new field named split_mv where I use the split command to split the current <<ITEM>> along its delimiter which I created at the start of the search. After it's been split into a new mv field with the _time, parent_image, parent_id, and command fields, I declare a new field called true_grandparent where if the _time for a search event minus the _time for a lookup event equals the value found in the field min_grand_time which I declared earlier, then I use mvappend to add the current <<ITEM>> to true_grandparent. Otherwise, it simply stays the same.
By running this foreach over each value of grand_processInfo, I'm guarnteed to only get only one value appended to true_grandparent.
Finally, I simply use a rex command on true_grandparent to split out the fields again. It isn't shown in the above code snippets, but I also add a fillnull command to 'fill in' the blank spots that could occur if a process's grandparent process isn't found in the lookup table.
| fillnull value="N/A" great_grandparent_image, grandparent_image, great_grandparent_pid, grandparent_pid, grandparent_commandline, parent_commandline
Hopefully the above explanation makes sense. I've been testing over the past few days, and fortunately, it does seem to work exactly as intended. With any luck, others attempting to do the same thing will be able to follow some variation of the above steps to achieve the same result.
Thank you both, ITWhisperer and PickleRick for your expertise.
Where are the duplicate coming from? Can you remove them before adding them to the lookup file?
The duplicates aren't errors or anything. It's just due to how process ID's work in Windows. Process ID"s are ephemeral and get reused by the system for new processes once the ID in question is made available. For example, you can open notepad.exe on Windows at 12:30pm, and it might get assigned process ID 18932(0x49F4). For the time notepad.exe is open, no other process can have process id 18932(0x49F4). However, if you close the program at 12:35pm, that ID becomes available again, and it's possible for Windows to assign it to a new program, such as cmd.exe at 1:45pm.
That's what's happening in this case. The process ID is being recycled and just happens to be getting reassigned to the same program that used it last time. That's why I'm trying to filter on the time somehow. Obviously the parent for a process can't come *after* a process is created, and the parent must be the one closet to it chronologically. Sorry if it's confusing.
As for filtering it down prior to using the lookup.....perhaps I could run a dedup before pushing it all to the lookup....but I'm not sure how I'd ensure I don't remove the wrong processes
1. Dedup is a tricky command and while it works as designed and described in docs it's often not what users expect.
2. There is an option for time-based lookups which seem to be more or less what you need (lookup-based lookup takes into account the difference between time from the event and the time stored in the lookup). But the settings must be defined during lookup creation and the time margins can't be dynamically set during a search - they are defined on a per-lookup basis. I haven't used time-based lookups myself though.
See https://help.splunk.com/en/splunk-enterprise/manage-knowledge-objects/knowledge-management-manual/9.... for more details
Can you provide some sample events demonstrating the issue, de-personalised of course?
Of course,
The below picture is an example of production data taken from my lookup table, so it's not fabricated(aside from some light obfuscation). Here you can see the the child process I want to find the grandparent process for.
In this second picture, I've queried the lookup table by checking for events matching the ComputerName, the Process_Image, and the ProcessPID of the child process. This is exactly what the lookup command does in my search.
| lookup tempEvents.csv ComputerName AS ComputerName ProcessPID AS ParentPID Process_Image AS Parent_Image OUTPUT Parent_Image AS GrandParent_Image ParentPID AS GrandParentPID command AS Parent_CommandLine _time AS grand_time
Ideally, there should be only one given the lookup table only has the process creation logs for a single hour. Unfortunately, though....
There are two process, and as such, both get outputted by the lookup command despite only one of them being the grandparent process. If you check the time for each, the actual grandparent is going to be the one occurring at 2025-11-14 19:30:36 because that's the one closest in time. Unfortunately, I'm not sure how to programmatically make that deduction.
Try something like this (instead of writing and reading from a lookup file):
``` Put events in chronological order ```
| sort 0 _time
``` Duplicate events using row to distinguish the copies ``
| eval row=mvrange(0,2)
| mvexpand row
``` Set row zero to hold Parent info, and row one to hold Process ```
| eval PID=if(row=0,ParentPID,ProcessPID)
| eval Image=if(row=0,Parent_Image,Process_Image)
``` List ParentPID, Parent_Image and _time for all events with the same PID and Image (filtering out known Parent info) ```
``` Using list() instead of values() maintains the order of info (as the sorted order) ```
| eventstats list(eval(if(PID=ParentPID,null(),ParentPID))) as ParentPIDs list(eval(if(PID=ParentPID,null(),Parent_Image))) as Parent_Images list(eval(if(PID=ParentPID,null(),_time))) as times by PID Image
``` Create a multi-value field of indexes into lists ```
| eval possibles=mvrange(0,mvcount(times))
``` For each index, (over)write the Grandparent info if the corresponding time is earlier than event time. This ensures that information for subsequent events is not used, and that the last valid value is used. ```
| foreach mode=multivalue possibles
[| eval GrandparentPID=if(mvindex(times,<<ITEM>>) < _time,mvindex(ParentPIDs,<<ITEM>>),GrandparentPID), Grandparent_Image=if(mvindex(times,<<ITEM>>) < _time,mvindex(Parent_Images,<<ITEM>>),Grandparent_Image)]
``` Remove redundant events ```
| where row=0
I think I see where you're going with this. For the sake of making sure I understand each part, though, could you give a brief reasoning for each part? Specifically, does all this happen after I insert the data via the lookup command, or does it come before? I'm assuming it comes after but want to be sure. Additionally, what purpose does the 'row' field fill? I can see how you're using it to set the PID and Image fields, but why in that manner?
It is instead of the outputlookup and lookup. The row field allows the event to be duplicated with one event having the process id and one having the parent process id in the same field. Once the eventstats is done, the duplicated event is no longer required because all the required information is in just one of the events. The image and parent image are not strictly required for your usecase but serve as an additional check that the right match has been found.
I have added comments to the suggested solution.
I'm wondering if it couldn't be done with streamstats. Yes, duplicate the events create a "join field" from either a PID or PPID depending on which duplicated row you use and then do something like
| streamstats current=f last(PPID) by joinfield
The problem with the streamstats approach is that it doesn't deal with recycling of process ids correctly. For example, if cmd.exe has a process id of 1000, and is later reused so a subsequent process id of 1000 is also cmd.exe, streamstats can ignore the current/second instance, but last() will pick up the previous/first instance and incorrectly report this as being the grandparent of the current process id 1000, which obviously is not correct.
I'm not sure I get what you mean 🙂
| sort 0 _time
| eval row=mvrange(0,2)
| mvexpand row
| eval GPID=if(row=0,PPID,null())
| eval PID=if(row=0,null(),PID)
| eval PPID=if(row=0,PID,PPID)
| streamstats current=f last(GPID) as lastGPID by PPID
| where row=1
| rename lastGPID as GPIDWhat am I missing?
Try with this data
| makeresults format=csv data="time,PID,PPID,Process_Image,Parent_Image,ComputerName
1,2,0,C,A,X
2,2,1,C,B,X
3,3,2,D,C,X
4,4,2,E,C,X
5,5,4,F,E,X
11,2,0,C,A,X
12,2,1,C,B,X
13,3,2,D,C,X
14,4,2,E,C,X
15,5,4,F,E,X"
| eval _time=now()-50+time+(time/100)
But your data seems to be flawed. You can't have (PID,PPID)=(2,0) and then in the next event (PID,PPID)=(2,1). How did the PPID change? Where did the PID=1 come from? There's no telling where the event with PID=1 come from since we have no information about this PID at all.
The whole exercise doesn't make sense if we don't have information about all process spawns. Otherwise it's not analysing data, it's fortune telling. Same goes for 11 and 12 - again you have 2,0 and 2,1. The same PID with two different PPIDs? Something's fishy here.
The image shared by OP shows the same image and process id being used multiple times, which was also the point of the problem. This is what my sample data is replicating. To be specific, (PID,PPID)=(2,0) represents the first time the PID 2 is used i.e. the process was spawned by process id 0. This process then dies and the process id is recycled when process id 1 spawns the same image and happens to reuse the same id (PID,PPID)=(2,1). This is the scenario described by OP. This second process id 2 then spawns another process id 3 and the issue is how to determine which is the grandfather process for process 3, is it 0 or 1? The same goes for process id 4 and to some extent 5. I then repeat the data at a later time to show the recycling happening again. Any solution should deal with this situation too.
Ideally, the OP should hare some real examples of process ids being recycled with events that can be copied into makeresults, but since these haven't been forthcoming, I have invented what I understand to be the scenario in question.
Yes. There are border cases but generally if we have a continuous time range and the same PPID means two different processes without an event of spawning a new process with this PID it means that we have incomplete data and no method of (stream/event)stats will yield correct results because we simply miss information.
Streamstats should be enough because the processes are spawned in a chronological way and the dependency is one-way - you must already have a parent process to be able to spawn a child process from there.
As I understand the original question it's a problem when you have a process with PID=2 (let's say it's cmd.exe) spawning a process with PID=3 and PPID=2 (for example, certutil.exe). And then after some time the PID=2 is reused for iis.exe which spawns malware.exe with PID=67,PPID=2.
So simple stats over all data would yield either a single of those PID=2 processes, depending on whether we use first/last/earliest/latest or both of them if we use list/values, for both PID=3 as well as PID=67.
If we use streamstats, we can "notice" that a process with PID=2 changed. But we must have that info about PID=2 being spawned anew. Otherwise how are we supposed to tell that it has changed?
You are correct, this is about border cases - to quote from OP " it occasionally finds *multiple* events rather than a single one". That is the essence of the problem.
The other thing to bear in mind is the nature of the events being worked with, i.e. "process creation Windows logs" i.e. we don't have other events such as process termination events to know when a process completes (freeing up its PID), so we have to assume that if a new process creation event occurs with the same PID as previously seen, the previous instance of the PID has (at some point) terminated.
So, to explain my sample data:
1,2,0,C,A,XAt time point 1, PID-2 is created by PID-0 (PPID) with image name "C" and parent image name "A" on computer "X"
2,2,1,C,B,XAt time point 2, which could be at an indeterminate point in time after time point 1 (although I have set this to be 1 second later), PID-2 is created by PID-1 (PPID) again with image name "C", but this time with parent image name "B" (although it could be the same image name, I just used a different one to ensure the correct instance of the process was being found), again on computer "X". (Point to note is that my solution doesn't take different computers into account but that is simple to add to the by clauses where PID and Image are used.)
3,3,2,D,C,XStill on computer "X", at time point 3, which could be at an indeterminate point in time after time point 2 (although I have set this to be 1 second later), PID-3 is created by PID-2 (PPID) with image name "D", and with parent image name "C".
The issue for the OP is that the lookup file process originally used matches to both the first of my events, so they wanted a way to programmatically determine what the grandparent process id (GPID) was.
Your streamstats methods does work in this situation, but only if you switch a couple of the lines around!
Yes. It occasionally finds duplicates because the lookup (which works kinda like stats values() here) is gathering both occurrences of the PPID.
And we don't need the process termination event (that would only make the situation murkier - we'd have to track whether the parent PID has already ended and would be confused if we had incomplete data because we got a newly spawned proces from a PPID which had already died; I don't think it's what we want ;-))
Generally - we need _all_ relevant process creation events to be able to get a reasonable output, whatever method we use. And we will still _not_ get correct info for the events at the beginning of our search period since we'll probably not have any info about processes which had been spawned before our search time range.
I'm not saying my solution is 100% correct (it was a sketch of an idea; it might indeed need tweaking; just wanted to share an approach to solving the problem - create a "join field" and "carry over" the information about GPID from earlier events). And yes, you can easily extend that solution to tracking different computers of course by adding additional BY field.
Apologies it took so long to get back to this question. Thank you both for your enlightening responses. Fortunately, after reading through them, I managed to come across a working solution.
| eval Parent_User=coalesce(ParentUser,null())
| eval user=coalesce(User,Account_Name)
| eval parent_image=coalesce(ParentImage, Creator_Process_Name)
| eval ParentCMD=coalesce(ParentCommandLine, null())
| eval parent_pid=coalesce(ParentProcessId, tonumber(Creator_Process_ID, 16))
| eval process_pid=coalesce(ProcessId, tonumber(New_Process_ID, 16))
| eval process_image=coalesce(Image,New_Process_Name)
| eval command=coalesce(CommandLine, Process_Command_Line, script_content)
| eval processInfo = '_time' + "|,|" + 'parent_image' + "|,|" + 'parent_pid' + "|,|" + 'command'
| convert timeformat="%F %T" ctime(_time) AS time
| table *
| outputlookup tempEvents.csv create_empty=true allow_updates=false output_format=splunk_mv_csv
I already posted this above, but ultimately, all I'm doing is sending all my events to a lookup table before filtering it down. The important part is the processInfo field which a basic concatination of the above fields and split by a unique delimitor. The lookup overwrites itself each run.
| lookup tempEvents.csv ComputerName AS ComputerName process_pid AS parent_pid process_image AS parent_image OUTPUT processInfo AS grand_processInfo _time AS grand_time
| eval min_grand_time = mvindex(mvsort(mvmap(grand_time, _time - grand_time)), 0)
| eval grand_processInfo = mvdedup(grand_processInfo)
| eval grand_processInfo = mvappend(grand_processInfo, "")
| eval true_grandparent=null()
| foreach mode=multivalue grand_processInfo [ eval split_mv = split(<<ITEM>>, "|,|"), true_grandparent = if((_time - tonumber(mvindex(split_mv, 0))) = min_grand_time, mvappend(true_grandparent, tostring(<<ITEM>>)), true_grandparent)]
| rex field=true_grandparent "^\d+\|,\|(?<grandparent_image>.+?)\|,\|(?<grandparent_pid>.+?)\|,\|(?<parent_commandline>.+$)"
Here, the lookup command re-adds events from the lookup table which match the ComputerName to the lookup's ComputerName fields, the process_id as the lookup's parent_id, and the process_image as the lookup's parent_image.
The original issue was caused due to the fact that the lookup command would add multiple events where there should idealy only be a single event which matches for all three of those fields(ComputerName, process_id, and process_image). This is just a consequences of how Windows uses process_ids. They're only unique for as a long as a process is open. As soon as a process is ended, its process id can be recycled.
As such, my solution hinged on comparing the _time field for the search events and the lookup events.
In the first line after the lookup command, I declare a new field called min_grand_time and use mvmap to itterate over the grand_time field from the lookup table. I subtract each value in grand_time from the current search event's _time field to get a positive integer(time doesn't move backwards, after all). The resulting mv field is then sorted using mvsort(this isn't actually a sort based on number values, but it works out regardless). After the sort, I can use mvindex to return the value at the first index to get the value closest to 0.
The next line is a dedup which I noticed was needed in grand_processInfo. I only realized after I added the dedup that the extra events are caused by me querying both sysmon and windows_event logs(a log from each exists for each process created on a system). I'll adjust the seach later by changing out the first table command(just prior to the initial outputlookup) and using stats to filter it down before sending it to the lookup table.
After the dedup, grand_processInfo is single value field......however, I need it register as a mv field for the purpose of the following foreach command. To do so without actually adding anything, I use mvappend to add an empty string.
I then create new field called true_grandparent for use in the foreach command. I may not need to declare a field prior to using it in a foreach, but I did so anyway.
The foreach is where the magic happens which I realized I could use from ITWhisperer(thank you for that). Using it, I itterate over the grand_processInfo field(which is why I needed to use mvappend earlier in the case that grand_processInfo was a single value field). In the loop, I declare a new field named split_mv where I use the split command to split the current <<ITEM>> along its delimiter which I created at the start of the search. After it's been split into a new mv field with the _time, parent_image, parent_id, and command fields, I declare a new field called true_grandparent where if the _time for a search event minus the _time for a lookup event equals the value found in the field min_grand_time which I declared earlier, then I use mvappend to add the current <<ITEM>> to true_grandparent. Otherwise, it simply stays the same.
By running this foreach over each value of grand_processInfo, I'm guarnteed to only get only one value appended to true_grandparent.
Finally, I simply use a rex command on true_grandparent to split out the fields again. It isn't shown in the above code snippets, but I also add a fillnull command to 'fill in' the blank spots that could occur if a process's grandparent process isn't found in the lookup table.
| fillnull value="N/A" great_grandparent_image, grandparent_image, great_grandparent_pid, grandparent_pid, grandparent_commandline, parent_commandline
Hopefully the above explanation makes sense. I've been testing over the past few days, and fortunately, it does seem to work exactly as intended. With any luck, others attempting to do the same thing will be able to follow some variation of the above steps to achieve the same result.
Thank you both, ITWhisperer and PickleRick for your expertise.