I have one set of logs showing authentication which contain time stamps, user names, and IP addresses (source 1). I'd like to leverage that data against a different set of logs where I have timestamp and IPs but don't have user names (source 2). One of the challenges is after the initial authentication event in source 1 there isn't a 1 to 1 correlation between events. In other words the system recognizes the authentication event happened and no longer provides subsequent authentication logs for that user. I can't simply do a stats command where I split by IP address because for a particular IP and time period there might be multiple user names.
It feels like the solution lies somehow in using streamstats but that I might also have to do a join/append somehow. Has anyone tackled this? I've looked at the streamstats example Splunk blog post but that only solves for, in my case, source 1.
So after much noodling and prayer I came up with a search but it is somewhat ugly, goes around both elbows to get to your butt, and doesn't scale overly well. There has GOT to be a better way. I'm wondering about trying to use a subsearch to spit out a csv to later be referenced, map, or even searchtxn. Needs sleep first. The query is pretty much as follows.
<web data search> src_ip=<ip of interest>| eval web_time = _time | table web_time src_ip <other fields of interest> | join src_ip type=left max=0 [search <your authentication logs> src_ip=<ip of interest> | streamstats current=true last(user) as user last(_time) as login by src_ip | stats values(_time) as logins by src_ip | fields logins src_ip] | makemv logins | mvexpand logins | where logins < web_time | stats values(logins) as logins by web_time src_ip <other fields of interest> | makemv logins | stats max(logins) as login by web_time src_ip <other fields of interest> | join type=left login src_ip [search <your authentication logs> src_ip=<ip of interest> | streamstats current=false last(user) as user last(_time) as login by src_ip | table src_ip login user] | table src_ip login web_time <other fields of interest>
So what's that business all about?
0 - I'm starting with the event of interest in my web logs and ip address of interest
1 - I know I'm going be using other time fields so go ahead and rename this time field plus put it in epoch time for maths later on (that I don't reference here)
2 - don't really need this /shrug
3 - remember every time you use join in Splunk a cat gets what's coming to it - makes joining data easy though. I use max=0 because in this case I'm going to have the same data coming out of the subsearch applied to all my data (at least where the IP matches)
4 - join subsearch - go into your authentication logs, use streamstats to get the time each user logged in from your IP of interest, collapse all of those values into one field to pass up to your main query. I used this to try to get past the 50k/1 min default sub search limitation. Note I'm not passing up the users.
5 - Since every web event has every auth time (in one field) we first want to make this a multi value field and then
6 - expand it.
7 - now that you have expanded data and have an event for all login times and each web event (related by IP) you can do a where to trim back to login event times that only come BEFORE each web event of interest
8 - collapse all of those login times back down to one field per IP and web event time
9 - now make that a multi-valued field so that you can
10 - get the most recent login event relative to each web event by using max(). In retrospect I probably don't need 8 & 9
11 - throw more cats under the bus - the intent here is since I already have a login time from the previous join I'm going to match those back up with the user name by using the events where the login time values originally came. I'm wondering now if I could use list instead of values in my first join which might allow me to keep user name (either as 2 fields or do a concat prior to the stats command)
12 - basically the core of the previous subsearch but this time I'm passing back up the src_ip, login, and user name
13 - table of all fields
14 - Profit
Where this fell down on me is when I removed the specific IP I used and ran this over 24hrs of data. I have a lot of data but there are some fields and things I can trim it down with that I'm not including in this thread. I tried some subsearches within my subsearches to further limit the IPs searched in each subsearch but ultimately what I ran into was the 1 min default limit.
This DOES work but as mentioned it isn't pretty. Someone should be able to do this much more efficiently/elegantly that I did.
What is the time difference between authentication event for an IP/user data in source 1 and IP data in source2?
The time difference can vary in terms of when the initial authentication event happens and the particular event being investigated from the second log type/source. What can happen is
user 1 authenticates from ip 1.1.1.1 at X (source 1)
event of interest happens involving 1.1.1.1 at X+10min (source 2)
user 2 authenticates from ip 1.1.1.1 at X+12 (source 1)
event of interest happens involving 1.1.1.1 at X+14 (source 2)
What I'm trying to do then is show which user is associated with the 2 events - potentially dynamically and at scale.
It would be tough to correlate without having a common correlation field (user name which is missing in source2). Only option would be that there is a certain pattern and non-overlapping distribution of events between those two. So If I understand correct, here are the events related based on time.
Bucket 1 Source 1 - authenticate for IP1 for user 1 - time 11
Bucket 1 Source2 - one or more entries for IP1.... this is assumed to be related to only for user1
Bucket 2 Source 1 - authenticate for IP1/IP2 for user 2 = time time 12
Bucket 2 Source2 - one or more entries for IP1/IP2.... this is assumed to be related to only for user2
User1 entry for IP1 will not appear in Bucket 2. Is this correct?
That is correct. Think of it in terms of source 1 is authentication and source 2 is web logs. In ArcSight I would use a construct similar to a lookup table but they are implemented differently than in Splunk. A lookup table of this size wouldn't scale though.