The problem: I want to identify users who use SSH with login other than their own. I have two logs:
1) SSH log like
timestamp="1111120" yyy.yyy.yyy.yyy sshd: Accepted publickey for user1 from xxx.xxx.xxx.xxx port 41902 ssh2
2) Firewall log like
timestamp="1111111" login="user1" assigned_ip="xxx.xxx.xxx.xxx"
Now it works using the query below
sourcetype="sshd" "Accepted" | rex field=_raw " (?<srv_ip>\S+) sshd" | lookup dnslookup clientip AS srv_ip OUTPUT clienthost as fqdn | rex field=_raw "for (?<local_user>\S+) from" | rex field=_raw "from (?<src_ip>\S+)" | join type=inner src_ip [search sourcetype="firewall_logs" | rename assigned_ip as src_ip | stats max(_time) as fwtimestamp by login,src_ip | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(fwtimestamp) as fwtime | sort -fwtimestamp] | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(accesstimestamp) as accesstime | fields - accesstimestamp fwtimestamp | where fwtime<accesstime | where local_user!="root" | where login!=local_user
But it misses a lot of events because of using 'stats max'. So I need to compare ssh event to firewall event by closest same IP lease.
time=5 ip=ip1 login=man1
time=7 ip=ip2 login=man2
time=9 ip=ip2 login=man2
time=10 sship=ipssh sshlogin=evil ip=ip2
time=15 ip=ip2 login=man2
Result should be something like this:
fwtime=9 sshtime=10 ip=ip2 login=man2 sship=ipssh sshlogin=evil
How can I do it?
I can use usetime=true earlier=true to make timebased join. But when I do something like:
sourcetype="sshd" "Accepted"| rex field=raw " (?<srvip>\S+) sshd" | lookup dnslookup clientip AS srvip OUTPUT clienthost as fqdn | rex field=raw "for (?<localuser>\S+) from" | rex field=raw "from (?<srcip>\S+)" | stats max(time) as accesstimestamp by srcip,fqdn,localuser | join type=inner usetime=true earlier=true srcip [search sourcetype="firewalllogs" | rename assignedip as srcip | eval fwtimestamp=time | table fwtimestamp,login,srcip | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(fwtimestamp) as fwtime | sort -fwtimestamp] | convert timeformat="%m/%d/%Y %H:%M:%S" ctime(accesstimestamp) as accesstime| fields - accesstimestamp fwtimestamp | where localuser!="root" | where login!=localuser
I can see results where fwtime>accesstime. But it shouldn't be. What's the reason?
To make it more lightweight. I use the following query (similar example):
sourcetype="webaccess" | stats max(time) as accesstime by srcip,weblogin | join type=inner usetime=true earlier=true [ search sourcetype="fwlogs" | eval fwtime=time | table fwtime,login,srcip] | where weblogin!=login
and I can see results where fwtime>accesstime. That's weird.
Maybe stats have no information about time?
Your problems are surely due to the limits on the number of events returned by subsearches. You can do the join without
join (and thus without the subsearch and its lmits) and I strongly encourage you to do so. This search will do the join and enhance event data with the field I think you need:
(sourcetype="sshd" "Accepted") OR (sourcetype="firewall_logs") | rex " (?<srv_ip>\S+) sshd.*? for (?<local_user>\S+) from (?<src_ip>\S+)" | lookup dnslookup clientip AS srv_ip OUTPUT clienthost AS fqdn | rename assigned_ip as src_ip | eventstats dc(sourcetype) AS sourcetypes by src_ip | search sourcetypes>=2 | eventstats max(_time) AS maxTimeBySTandLogin BY sourcetype, login
At this point you only have events which share the same value of
src_ip (events with any particular
src_ip that do not show up in each sourcetype are filtered out) and you have enhanced each event to gain a field
maxTimeBySTandLogin which contains the highest/latest/most-recent time among groups of events which share the same
login. In other words, within each sourcetype dataset, each event possesses its own copy the (shared)
max(_time) which was assessed by comparing all events with the same
login. From here, you should be able to modify until you get exactly what you would like. If you can't get there on your own, do explain exactly what the post-join logic is supposed to do (it is not at clear to me because some of your explanation diverges from itself).
Thank you for your answer. I'll try to be more accurate. I have two types of logs:
1) SSH access log (sourcetype="sshd"). Examples:
May 12 23:59:59 SRV1IP sshd: Accepted publickey for SSHLOGIN1 from SSHCLIENT1IP port 65093 ssh2
May 12 23:59:59 SRV2IP sshd: Accepted publickey for SSHLOGIN2 from SSHCLIENT2IP port 53704 ssh2
You can get needed fields using | rex " (?<srvip>\S+) sshd.*? for (?<localuser>\S+) from (?<srcip>\S+)" as you did.
2) Firewall logs (sourcetype="firewalllogs"). Examples:
timestamp="1431464386" event="ADD (post-auth)" login="LOGIN1" assignedip="CLIENT1IP"
timestamp="1431464446" event="ADD (post-auth)" login="LOGIN2" assignedip="CLIENT2IP"
Users should use their own logins for ssh authentication. But they can use other ones. In such cases they can use other laptop (with IP for different login), or maybe they are hacked. In both cases login!=sshlogin, but clientip==sshclientip. So I want to know and alert about such cases. The algorithm is:
1) Get SSH events where we can see SSHCLIENTIP, SRVIP and SSHLOGIN. It is events with accepting connections.
2) Then we need to know what LOGIN SSHCLIENTIP has. That's why we need to merge every event from clause 1 with firewall event, where SSHEVENTTIME > FWEVENTTIME for paticular SSHCLIENIP (which equal to CLIENTIP from fw logs) and FWEVENTTIME is the nearest to SSHEVENTTIME.
3) After merging we can get events where SSHLOGIN!=LOGIN.
In the first part of the original query max() function was used only to limit the number of events before joining. It is not the best solution because we can miss some events, but for the sake of simplicity let us not consider ones.