Problem: I need to carry out a time-based correlation across three chained sourcetypes,
Specifically, I'm trying to correlate remote-access logon events in order to end up with the following information,
Logon timestamp - Username - Remote IP - Device hostname
(the aim is to be able to detect users accessing the corporate VPN from personal devices, which will trigger initially modest, and subsequently sterner and sterner finger-wagging)
Doing this requires searching across three sourcetypes, from each of which I can extract the following fields:
Logon timestamp, Username, Remote IP
Username, Remote IP, Virtual IP
Virtual IP, Device hostname
Suffice to say the searches required to extract the required data from any of the three sourcetypes are not simple.
Further, because the Virtual IP's issued to remote-access sessions are rapidly recycled, and because users are likely to connect from multiple places ("This cafe has rubbish Wi-Fi; I'll go across the road") using multiple devices ("My phone won't connect; let me try the tablet"), correlation can only be done across time... a narrow time window containing a remote-access auth event, the granting of a VPN connection and the issuing of a DHCP lease => user logging in remotely.
What I've come up with for a query structure has the form,
[search Remote-access auth sourcetype | with lots of piping | etc]
| append [search VPN gateway sourcetype | again with lots of piping | and field renames for matching with the previous search]
| transaction "Username" "Remote IP" maxspan=30s
At this point, I can output a table containing,
Logon timestamp - Username - Remote IP - Virtual IP
and everything is apparently tickety-boo.
The query then continues,
| append [search DHCP server sourcetype | yet more piping | and field renames | plus an inner transaction for good measure]
| transaction "Virtual IP" maxspan=30s
| table "Logon timestamp" Username "Remote IP" Hostname
and results in what looks like the last transaction
command never executed - the table ends up containing the output from the first transaction
, with the Hostname
field blank in all the rows, followed by output from the DHCP search, where the only non-blank field is Hostname
.
Help...?
I think the way to do this would be to: create your own lookup from the results of the sourcetype A and sourcetype B containing the values that would link sourcetype A to sourcetype B and sourcetype A to sourcetype C. Then you can use that lookup in your search to create a transaction between sourctype A, sourcetype B and sourcetype C. First, add configuration for the lookup table you are about to create to your transforms.conf file:
[myRelationshipTable]
filename=myRelationshipTable.csv
Then use something along these lines to create your lookup table:
[search Remote-access auth sourcetype
| with lots of piping
| etc]
| append [search VPN gateway sourcetype
| again with lots of piping
| and field renames for matching with the previous search]
| stats first("Virtual IP") as "Virtual IP" by Username "Remote IP"
| dedup "Virtual IP" Username "Remote IP"
| outputlookup myRelationshipTable
This is essentially the same thing as what is written about in this blog post, except it would be maintaining a relationship table instead of a state table. Evaluate your use case before following everything in the post (for example, if this search is not needed for the long term, there is no need to schedule it to run every minute, or you might have different retention needs on the data you create in the lookup since this is a relationship table and not a state table, etc.).
After creating the relationship table, you can use it in your search to correlate all of the data:
[search Remote-access auth sourcetype
| with lots of piping
| etc]
| inputlookup myRelationshipTable
| append [search VPN gateway sourcetype
| again with lots of piping
| and field renames for matching with the previous search]
| append [search DHCP server sourcetype
| yet more piping
| and field renames but no inner transaction unless absolutely necessary
| transaction "Virtual IP" maxspan=30s
| table "Logon timestamp" Username "Remote IP" Hostname
I think the way to do this would be to: create your own lookup from the results of the sourcetype A and sourcetype B containing the values that would link sourcetype A to sourcetype B and sourcetype A to sourcetype C. Then you can use that lookup in your search to create a transaction between sourctype A, sourcetype B and sourcetype C. First, add configuration for the lookup table you are about to create to your transforms.conf file:
[myRelationshipTable]
filename=myRelationshipTable.csv
Then use something along these lines to create your lookup table:
[search Remote-access auth sourcetype
| with lots of piping
| etc]
| append [search VPN gateway sourcetype
| again with lots of piping
| and field renames for matching with the previous search]
| stats first("Virtual IP") as "Virtual IP" by Username "Remote IP"
| dedup "Virtual IP" Username "Remote IP"
| outputlookup myRelationshipTable
This is essentially the same thing as what is written about in this blog post, except it would be maintaining a relationship table instead of a state table. Evaluate your use case before following everything in the post (for example, if this search is not needed for the long term, there is no need to schedule it to run every minute, or you might have different retention needs on the data you create in the lookup since this is a relationship table and not a state table, etc.).
After creating the relationship table, you can use it in your search to correlate all of the data:
[search Remote-access auth sourcetype
| with lots of piping
| etc]
| inputlookup myRelationshipTable
| append [search VPN gateway sourcetype
| again with lots of piping
| and field renames for matching with the previous search]
| append [search DHCP server sourcetype
| yet more piping
| and field renames but no inner transaction unless absolutely necessary
| transaction "Virtual IP" maxspan=30s
| table "Logon timestamp" Username "Remote IP" Hostname
That definitely looks like a less resource-intensive way to do it - I'll give it a go starting tomorrow.
FWIW, after I twigged that a sort -_time
is required after a subsearch append to make transaction
work, I discovered that the search became flaky, with results I knew to be there not showing up...
...until I dropped my time interval to under an hour, because the number of results returned by the search of the DHCP logs was making Splunk barf.
That's going to have implications for how we actually go about tracking this sort of information - currently, I'm thinking of having the search run on a 10-minute interval and append a csv file, and then use that as the input to a report/dashboard panel.
Where that blog post will come into it is in expiring old entries from the head of the csv file. Very helpful.
Partial solution found by trial and error - each transaction
command needs to be preceded by a sort -_time
in order to work.