So here is my search
index=someindex sourcetype=somesourcetype source="someloglocation*" eventtype="nix_kernel_attached" "\"outcome\":\"success\""
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| eval eventtype=mvindex(eventtype,1)
| eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI")
| search userinfo=userid* "\"message\":\"login\"" eventtype=nix_kernel_attached LoginType=*
| join type=inner max=0 userinfo
[search index=someindex sourcetype=somesourcetype source="someloglocation/*" eventtype="nix-all-logs" "\"outcome\":\"success\""
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| eval eventtype=mvindex(eventtype,0)
| eval LoginType=case(eventtype=="nix-all-logs", "CLI")
| search userinfo=someuserid* "\"message\":\"login\"" eventtype=nix_kernel_attached LoginType=*]
I need to display the UserID and the LoginType in a table so that we can show how the user came in.
I've been messing with this for a while, one of the problems is that some of these events have an eventtype with 2 different values for the same event. Hence the mvindex command to yank out the one that doesn't pertain to that particular search
If there is a better method when working with mvindex I am all ears for it. Problem is if a user logs in with the cli tools it shows up in both eventtypes but if they login with the UI then it only registers with one eventtype as you can tell from what my search is doing.
By the way this join is "working" in that it does return results but I don't trust the results because of the eventtype thing. It also looks like its bringing back duplicates which sure I can eliminate with a dedup but I'm hoping there is a more sane method to this madness.
Oh I also don't have access to the backends so I can't make any changes to the way the data is being indexed.
Thank you all for your help with this very much.
We made the following assumptions -
1) userId is present on both kinds of records, once, and is in the exact same format if there are duplicate events .
2) Each of these two kinds of events represents a log on
3) When there are two different logons, one of each type in the same time frame, you want the nix kernel one
4) These duplicates occur within 2-3 seconds of each other, but the order is not consistent.
Given the above assumptions, this should get you most of the way there...
index=someindex sourcetype=somesourcetype source="someloglocation*" "\"outcome\":\"success\""
(eventtype="nix_kernel_attached" OR eventtype="nix-all-logs")
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| rename COMMENT as "move the one you want to keep earlier than the other one"
| eval mytime=if(eventtype=="nix-all-logs",_time-5,_time)
| sort 0 userinfo mytime
| rename COMMENT as "mark each with a running count, only those with nothing before them in 10 second range will be kept"
| streamstats time_window=10s count as dupcount by userinfo
| where dupcount==1
| eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI",eventtype=="nix-all-logs", "CLI")
| sort 0 _time
Obviously, if you want the other one, then change the eval mytime
statement.
We made the following assumptions -
1) userId is present on both kinds of records, once, and is in the exact same format if there are duplicate events .
2) Each of these two kinds of events represents a log on
3) When there are two different logons, one of each type in the same time frame, you want the nix kernel one
4) These duplicates occur within 2-3 seconds of each other, but the order is not consistent.
Given the above assumptions, this should get you most of the way there...
index=someindex sourcetype=somesourcetype source="someloglocation*" "\"outcome\":\"success\""
(eventtype="nix_kernel_attached" OR eventtype="nix-all-logs")
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| rename COMMENT as "move the one you want to keep earlier than the other one"
| eval mytime=if(eventtype=="nix-all-logs",_time-5,_time)
| sort 0 userinfo mytime
| rename COMMENT as "mark each with a running count, only those with nothing before them in 10 second range will be kept"
| streamstats time_window=10s count as dupcount by userinfo
| where dupcount==1
| eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI",eventtype=="nix-all-logs", "CLI")
| sort 0 _time
Obviously, if you want the other one, then change the eval mytime
statement.
So its throwing out this error. Error in 'streamstats' command: time_window can only be used on input that is sorted in time order (both ascending and descending order are ok)
Going to look at the streamstats usage but wanted to post this here in case you beat me to a solution 🙂
Of course. Silly me. We will have to put the _time
aside and overwrite it.
index=someindex sourcetype=somesourcetype source="someloglocation*" "\"outcome\":\"success\""
(eventtype="nix_kernel_attached" OR eventtype="nix-all-logs")
| multikv
| mvexpand _raw
| rex field=_raw "\"userId\":\"(?<userinfo>[^\"]+)\""
| rename COMMENT as "move the one you want to keep earlier than the other one"
| eval orig_time = _time
| eval _time=if(eventtype=="nix-all-logs",_time-5,_time)
| sort 0 _time
| rename COMMENT as "mark each with a running count, only those with nothing before them in 10 second range will be kept"
| streamstats time_window=10s count as dupcount by userinfo
| where dupcount==1
| eval LoginType=case(eventtype == "nix_kernel_attached", "WebUI",eventtype=="nix-all-logs", "CLI")
| eval _time = orig_time
| sort 0 _time
Yes that did it. Thank you very much. This one was a tricky one