Hi!
My goal is to be able to tie together events from Linux events and Windows events in order to track Windows users logging in as root or admin users on Linux machines. Both indexes share a common field src_ip and I was trying to use transaction in order to get events as close as possible to when a specific user logs on and ssh's to a Linux machine. Multiple users can log on to one windows machine which is why I thought transaction might work if it was possible to get all ssh after a specific user logged on and tie it to that user then when a new user logged on tie all the ssh session after that to that specific user.
Index os = linux index
Index windows = windows index
Transaction startswith = some text for windows events
Transaction endswith = some text for linux events
index=os host=SomeLinuxMachine*
| rex field=_raw "from\s+(?<src_ip>.*?)\s+port"
| rex field=_raw "for\s+(?<ssh_user>.*?)\s+from"
| transaction startswith="An account was successfully logged on." endswith="sshd"
| table _time src_ip ssh_user host
| append [search index=windows eventtype="windows_logon_success" NOT(Account_Name="*$" OR Account_Name="ANONYMOUS LOGON")
| rex field=_raw "Account\sName:\s+(?<Account_Name_fixed>\S{4,})"
| rex field=_raw "Logon\sID:\s+(?<Logon_ID_fixed>\S{6,})"
| rex field=_raw "Message=(?<Message_fixed>.{3,})\s+Subject"
| rename Account_Name_fixed AS src_user
| stats latest(src_user) AS src_user by src_ip]
Using the time of the events we were able to get the latest user logging into the Windows workstation and matching that up to the SSH logs. This was tested and success with a user logging into a windows machine and then logging right off to have another user log on to the same windows machine and ssh to the same linux machine.
Example log for index=os sourcetype=os:secure
LinuxHostName sshd[36115]: Accepted password for AdminUser from x.x.x.x port 50069 ssh2
Example log for index=SomeWindowsIndex eventtype=windows_logon_success
02/06/2018 04:43:10 PM
LogName=Security
SourceName=Microsoft Windows security auditing.
EventCode=4624
EventType=0
Type=Information
ComputerName=SomeDomainController.Domain
TaskCategory=Logon
OpCode=Info
RecordNumber=463894150
Keywords=Audit Success
Message=An account was successfully logged on.
Subject:
Security ID: NT AUTHORITY\SYSTEM
Account Name: SomeDomainController$
Account Domain: Domain
Logon ID: 0x3e7
Logon Type: 3
New Logon:
Security ID: Domain\username
Account Name: username
Account Domain: Domain
Logon ID: 0xb9ff3c71b
Logon GUID: {00000000-0000-0000-0000-000000000000}
The search
index=os host=SomeLinuxHost* sourcetype="os:secure"
punct="_[]:______...___"
| rex field=_raw "sshd\[\d+\]\:\s+(?<status>Accepted|Failed)"
| rex field=_raw "from\s+(?<src_ip>.*?)\s+port"
| rex field=_raw "for\s+(?<ssh_user>.*?)\s+from"
| eval eventtime =_time
| eval modtime = _time - 28800
| map maxsearches=1000 search="search earliest=$modtime$ latest=$eventtime$ index=SomeWindowsIndex eventtype=windows_logon_success NOT(Account_Name=\"*$\" OR Account_Name=\"ANONYMOUS LOGON\") $src_ip$ | rex field=_raw \"Account\sName:\s+(?<Account_Name_fixed>\S{4,})\"
| rename Account_Name_fixed AS win_src_user
| stats latest(_time) AS _time latest(win_src_user) AS win_src_user latest(ssh_user) AS ssh_user latest(host) AS host by src_ip
| eval ssh_user= \"$ssh_user$\"
| eval host = \"$host$\"
| eval _time=\"$eventtime$\""
|table _time, host, src_ip, ssh_user, win_src_user
End result successful
_time host src_ip ssh_user win_src_user
2018-02-06 15:56:11 SameLinuxHost SameWindowsWorkstation SSH_root user A
2018-02-06 15:58:02 SameLinuxHost SameWindowsWorkstation SSH_root user B
2018-02-06 15:58:16 SameLinuxHost SameWindowsWorkstation SSH_root user A
Using the time of the events we were able to get the latest user logging into the Windows workstation and matching that up to the SSH logs. This was tested and success with a user logging into a windows machine and then logging right off to have another user log on to the same windows machine and ssh to the same linux machine.
Example log for index=os sourcetype=os:secure
LinuxHostName sshd[36115]: Accepted password for AdminUser from x.x.x.x port 50069 ssh2
Example log for index=SomeWindowsIndex eventtype=windows_logon_success
02/06/2018 04:43:10 PM
LogName=Security
SourceName=Microsoft Windows security auditing.
EventCode=4624
EventType=0
Type=Information
ComputerName=SomeDomainController.Domain
TaskCategory=Logon
OpCode=Info
RecordNumber=463894150
Keywords=Audit Success
Message=An account was successfully logged on.
Subject:
Security ID: NT AUTHORITY\SYSTEM
Account Name: SomeDomainController$
Account Domain: Domain
Logon ID: 0x3e7
Logon Type: 3
New Logon:
Security ID: Domain\username
Account Name: username
Account Domain: Domain
Logon ID: 0xb9ff3c71b
Logon GUID: {00000000-0000-0000-0000-000000000000}
The search
index=os host=SomeLinuxHost* sourcetype="os:secure"
punct="_[]:______...___"
| rex field=_raw "sshd\[\d+\]\:\s+(?<status>Accepted|Failed)"
| rex field=_raw "from\s+(?<src_ip>.*?)\s+port"
| rex field=_raw "for\s+(?<ssh_user>.*?)\s+from"
| eval eventtime =_time
| eval modtime = _time - 28800
| map maxsearches=1000 search="search earliest=$modtime$ latest=$eventtime$ index=SomeWindowsIndex eventtype=windows_logon_success NOT(Account_Name=\"*$\" OR Account_Name=\"ANONYMOUS LOGON\") $src_ip$ | rex field=_raw \"Account\sName:\s+(?<Account_Name_fixed>\S{4,})\"
| rename Account_Name_fixed AS win_src_user
| stats latest(_time) AS _time latest(win_src_user) AS win_src_user latest(ssh_user) AS ssh_user latest(host) AS host by src_ip
| eval ssh_user= \"$ssh_user$\"
| eval host = \"$host$\"
| eval _time=\"$eventtime$\""
|table _time, host, src_ip, ssh_user, win_src_user
End result successful
_time host src_ip ssh_user win_src_user
2018-02-06 15:56:11 SameLinuxHost SameWindowsWorkstation SSH_root user A
2018-02-06 15:58:02 SameLinuxHost SameWindowsWorkstation SSH_root user B
2018-02-06 15:58:16 SameLinuxHost SameWindowsWorkstation SSH_root user A
Not bad! Now, this method is fine for SMALL numbers of events. As a general case, map
is a resource hog, so you wouldn't want to do it for a thousand different searches as you have it coded there.
Don't use transaction
when you just need to connect things. Usually stats
or eventstats
will usually get you what you want. When there's a time factor, your go-to tool should be streamstats
.
Always state your problem first, accurately, without saying HOW you are going to do it. This avoids coding based merely on the first tool or method you thought of.
PROBLEM DEFINITION : For each SSH on index=os
, you want to find the src_user
from the most recent index=windows
login within a certain length of time (let's say 10m
) for that same src_ip
.
First, pull every single record you need from both indexes.
So this...
(index=os host=SomeLinuxMachine*)
| head 10
| rex field=_raw "from\s+(?<src_ip>.*?)\s+port"
| rex field=_raw "for\s+(?<ssh_user>.*?)\s+from"
| rex field=_raw "An account was successfully (?<myevent>logged) on."
| rex field=_raw "(?<myevent>sshd)"
| table _time _raw index src_ip ssh_user src_user myevent host
...and this...
(index=windows eventtype="windows_logon_success" NOT (Account_Name="*$" OR Account_Name="ANONYMOUS LOGON"))
| head 10
| rex field=_raw "Account\sName:\s+(?<src_user>\S{4,})"
| fields _time _raw index src_ip ssh_user src_user myevent host
...together make this...
(index=os host=SomeLinuxMachine*) OR
(index=windows eventtype="windows_logon_success" NOT (Account_Name="*$" OR Account_Name="ANONYMOUS LOGON"))
| rex field=_raw "from\s+(?<src_ip>.*?)\s+port"
| rex field=_raw "for\s+(?<ssh_user>.*?)\s+from"
| rex field=_raw "An account was successfully (?<myevent>logged) on."
| rex field=_raw "(?<myevent>sshd)"
| rex field=_raw "Account\sName:\s+(?<src_user>\S{4,})"
| fields _time _raw src_ip ssh_user src_user myevent host
NEXT, in this case, you want to roll the data from the index=windows
logon FORWARD to the index=os
SSH records. The records will be coming out in backwards order, so we need to reverse
them first, then use streamstats
to copy the windows data. Let's arbitrarily say we are only going to count windows logons within 10m
of the ssh.
| reverse
| streamstats current=f last(src_user) as src_user time_window=10m BY src_ip
Now each index=os
record with a src_ip
has the windows data, if it existed, so we can kill the windows data, reverse
the records again, and fill in the empty ones...
| search index=os
| reverse
| eval src_user = coalesce(src_user,"((UNKNOWN))")
Finally, you still have multiple records for each ssh that you want to roll together. Ideally, you'll use either a stats
or eventstats
here as well. If you post some dummy data for one set, then we can help you code that. (I'm not sure how many records make up the "transaction"
or I'd have air-coded it for you already.)
Oh, and one BIG efficiency note - table
brings all the records to the search head. Use fields
to cut down the list of streamable fields at your earliest convenience, and only use table
after everything is already at the search head, such as after the first non-streamable command
DalJeanis thank you for your reply! This was a very informative post and I'll keep your suggestions in mind when posting more questions.