Splunk Search

How to join on optional date range without map?

apps_inpaytech
Explorer

Hi,

I am trying to report on access requests to actual logins.

I have a list of events from our systems of when users have logged in:

| table _time os host user clientName clientAddress signature logonType

I have a list of requests which cover a time frame and potentially multiple logins to multiple systems:

| table key host reporterName reporterEmail summary changeStartDate changeEndDate

So i want a list of events, with any corresponding requests (could be none, so i can alert the user/IT) joining on host, user, and _time between changeStartDate and changeEndDate.

I do have this working by using map (see below), but it's very slow and not operable over large datasets/times. There must be a better way.

I had issues with matching on the time range, and where it may not have a match, and optional username matching based on OS.

Does anyone have any ideas?

Existing search:

...search...
| table _time os host user clientName clientAddress signature logonType 
| convert mktime(_time) as epoch 
| sort -_time 
| map maxsearches=9999 search="
| inputlookup Request_admin_access.csv
| eval os=\"$os$\"
        | eval outerHost=\"$host$\"
        | eval user=\"$user$\"
        | eval clientName=\"$clientName$\"
        | eval clientAddress=\"$clientAddress$\"
        | eval signature=\"$signature$\"
        | eval logonType=\"$logonType$\"
        | eval startCheck=if(tonumber($epoch$)>=tonumber(changeStartDate), 1, 0) 
| eval endCheck=if(tonumber($epoch$)<=tonumber(changeEndDate), 1, 0) 
| eval userCheck=if(normalisedReporterName==\"$normalisedUserName$\", 1, 0)
| where host=outerHost
| eval match=case(
  os==\"Windows\" AND startCheck==1 AND endCheck==1,1,
  os==\"Linux\" AND startCheck==1 AND endCheck==1 AND userCheck==1,1)
| appendpipe [
  | makeresults format=csv data=\"_time,os,host,user,clientName,clientAddress,signature,logonType,wimMatch
$epoch$,$os$,$host$,$user$,$clientName$,$clientAddress$,$signature$,$logonType$,1\"
          ]
| where match==1 
| eval _time=$epoch$
| head 1
| convert ctime(changeStartDate) timeformat=\"%F %T\" | convert ctime(changeEndDate) timeformat=\"%F %T\"
| fields _time os host user clientName clientAddress signature logonType key reporterName reporterEmail summary changeStartDate changeEndDate"

 

Labels (2)
Tags (3)
0 Karma
1 Solution

bowesmana
SplunkTrust
SplunkTrust

This is an example based on your example dataset

It assumes that there is a lookup file requests.csv (which I generated using the second code snipped below)

The makeresults stuff just sets up your data, so assume your search runs up to the inputlookup statement below.

| makeresults 
| eval _raw=split(replace("time,os,host,user
1/10/2023 9:00,Linux,Server1,UserA
1/10/2023 11:00,Linux,Server1,UserA
1/10/2023 12:00,Linux,Server2,UserA
1/10/2023 9:00,Linux,Server2,UserB
1/10/2023 14:00,Linux,Server1,UserA","\n","###"),"###")
| multikv forceheader=1 
| eval _time=strptime(time, "%d/%m/%Y %k:%M")
| table _time,os,host,user
| inputlookup append=t requests.csv
| eval user=coalesce(user, reporterName)
| foreach change* [ eval <<FIELD>>=strptime('<<FIELD>>', "%d/%m/%Y %k:%M") ]
| stats list(_time) as _time values(key) as key values(reporterEmail) as reporterEmail values(summary) as summary values(changeStartDate) as changeStartDate values(changeEndDate) as changeEndDate by user host
| eval isInside=mvmap(_time, if(_time>=changeStartDate AND _time<changeEndDate, _time.":1", _time.":0"))
| mvexpand isInside
| rex field=isInside "(?<_time>[^:]*):(?<isInside>\d)"

the logic is then that it appends the contents of the lookup file to the end of the data and makes the common name (user or reporterName) and then converts the change time fields to epoch.

Then the stats function joins all the items together - there is an assumption that there is only one requests in requests.csv for each user/server - if more then the logic will need to change.

After the stats, the mvmap just compares the times and then expands out the results with isInside showing if the event is inside the request period 

 

Here's the csv generation so you can test if needed.

| makeresults 
| eval _raw=split(replace("key,host,reporterName,reporterEmail,summary,changeStartDate,changeEndDate
REQ-1000,Server1,UserA,UserA@dummy.com,Investigate error,1/10/2023 8:00,1/10/2023 13:00
REA-1001,Server2,UserB,UserB@dummy.com,Reset service,1/10/2023 8:00,1/10/2023 10:00","\n","###"),"###")
| multikv forceheader=1 
| table key,host,reporterName,reporterEmail,summary,changeStartDate,changeEndDate
| outputlookup requests.csv

 

View solution in original post

0 Karma

bowesmana
SplunkTrust
SplunkTrust

The starting point for correlating two datasets together is to combine them into a single search then uses stats to combine them through a common field.

So, you should start with this approach

...search...
| table _time os host user clientName clientAddress signature logonType 
| convert mktime(_time) as epoch 
| sort -_time 
| inputlookup append=t Request_admin_access.csv
... now use eval+stats to join and collapse the data events and the lookup events together, e.g.
| stats values(*) as * by host

You seem to have host in both data and lookup

Map is certainly not the right tool for this job.

apps_inpaytech
Explorer

@bowesmana , thanks for your response, but i am still having trouble with how to 'now use eval+stats to join and collapse the data events and the lookup events together'.

I don't see how i can apply these rules:

  1. Optional joining, i.e. i want every record in the events list
  2. Event date is between the request changeStartDate and changeEndDate

For example i could have these events (some columns removed for brevity)

_timeoshostuser
1/10/2023 9:00LinuxServer1UserA
1/10/2023 11:00LinuxServer1UserA
1/10/2023 12:00LinuxServer2UserA
1/10/2023 9:00LinuxServer2UserB
1/10/2023 14:00LinuxServer1UserA

and these requests

keyhostreporterNamereporterEmailsummarychangeStartDatechangeEndDate
REQ-1000Server1UserAUserA@dummy.comInvestigate error1/10/2023 8:001/10/2023 13:00
REA-1001Server2UserBUserB@dummy.comReset service1/10/2023 8:001/10/2023 10:00

and i would like this result

_timeoshostuserkeyreporterNamereporterEmailsummarychangeStartDatechangeEndDate
1/10/2023 9:00LinuxServer1UserAREQ-1000UserAUserA@dummy.comReset service1/10/2023 8:001/10/2023 13:00
1/10/2023 11:00LinuxServer1UserAREQ-1000UserAUserA@dummy.comReset service1/10/2023 8:001/10/2023 13:00
1/10/2023 12:00LinuxServer2UserA      
1/10/2023 9:00LinuxServer2UserBREA-1001UserBUserB@dummy.comInvestigate error1/10/2023 8:001/10/2023 10:00
1/10/2023  14:00:00 PMLinuxServer1UserA      

So, UserA raised 1 request and matches to 2 of the events, but the last event does not match as it's outside the date/time range.

Thanks in advance

0 Karma

bowesmana
SplunkTrust
SplunkTrust

This is an example based on your example dataset

It assumes that there is a lookup file requests.csv (which I generated using the second code snipped below)

The makeresults stuff just sets up your data, so assume your search runs up to the inputlookup statement below.

| makeresults 
| eval _raw=split(replace("time,os,host,user
1/10/2023 9:00,Linux,Server1,UserA
1/10/2023 11:00,Linux,Server1,UserA
1/10/2023 12:00,Linux,Server2,UserA
1/10/2023 9:00,Linux,Server2,UserB
1/10/2023 14:00,Linux,Server1,UserA","\n","###"),"###")
| multikv forceheader=1 
| eval _time=strptime(time, "%d/%m/%Y %k:%M")
| table _time,os,host,user
| inputlookup append=t requests.csv
| eval user=coalesce(user, reporterName)
| foreach change* [ eval <<FIELD>>=strptime('<<FIELD>>', "%d/%m/%Y %k:%M") ]
| stats list(_time) as _time values(key) as key values(reporterEmail) as reporterEmail values(summary) as summary values(changeStartDate) as changeStartDate values(changeEndDate) as changeEndDate by user host
| eval isInside=mvmap(_time, if(_time>=changeStartDate AND _time<changeEndDate, _time.":1", _time.":0"))
| mvexpand isInside
| rex field=isInside "(?<_time>[^:]*):(?<isInside>\d)"

the logic is then that it appends the contents of the lookup file to the end of the data and makes the common name (user or reporterName) and then converts the change time fields to epoch.

Then the stats function joins all the items together - there is an assumption that there is only one requests in requests.csv for each user/server - if more then the logic will need to change.

After the stats, the mvmap just compares the times and then expands out the results with isInside showing if the event is inside the request period 

 

Here's the csv generation so you can test if needed.

| makeresults 
| eval _raw=split(replace("key,host,reporterName,reporterEmail,summary,changeStartDate,changeEndDate
REQ-1000,Server1,UserA,UserA@dummy.com,Investigate error,1/10/2023 8:00,1/10/2023 13:00
REA-1001,Server2,UserB,UserB@dummy.com,Reset service,1/10/2023 8:00,1/10/2023 10:00","\n","###"),"###")
| multikv forceheader=1 
| table key,host,reporterName,reporterEmail,summary,changeStartDate,changeEndDate
| outputlookup requests.csv

 

0 Karma

apps_inpaytech
Explorer

@bowesmana I understand the technique now!

After some tweaking, I now get the expected results, some important points for anyone who is reading this...

  1. group by parent table so that all the child records are in multi-value columns
  2. find the multi-value index that matches
  3. remove the non-matching multi-value records
  4. list() does not dedup results, this is needed when filtering the mv results by index
| convert mktime(_time) as epoch
| inputlookup append=t Request_admin_access.csv
``` need the same use column name for stats to group on ```
| eval userForMatching=coalesce(normalisedUserName, normalisedReporterName)
``` group by events, so that all the possible child requests are in mv columns ```
| stats list(epoch) as epoch list(key) as key values(reporterName) as reporterName values(reporterEmail) as reporterEmail list(summary) as summary list(changeStartDate) as changeStartDate list(changeEndDate) as changeEndDate values(user) as user values(os) as os values(clientName) as clientName values(clientAddress) as clientAddress values(signature) as signature values(logonType) as logonType by host userForMatching 
``` expand the events ```
| mvexpand epoch
``` find the mv index where event._time between request.start and request.end dates ```
| eval isAfterStart=mvmap(changeStartDate, if(epoch>=changeStartDate, 1, 0))
| eval isBeforeEnd=mvmap(changeEndDate, if(epoch<changeEndDate, 1, 0))
| eval idx=mvfind(mvzip(isAfterStart, isBeforeEnd), "1,1")
| rename epoch as _time
``` filter to just the matching request ```
| eval key=mvindex(key, idx)
| eval reporterName=if(isnull(idx),"",reporterName)
| eval reporterEmail=if(isnull(idx),"",reporterEmail)
| eval summary=mvindex(summary, idx)
| eval changeStartDate=mvindex(changeStartDate, idx)
| eval changeEndDate=mvindex(changeEndDate, idx)
``` human readable times ```
| convert ctime(changeStartDate) timeformat="%F %T" | convert ctime(changeEndDate) timeformat="%F %T"
| table _time os host user clientName clientAddress signature logonType key reporterName reporterEmail summary changeStartDate changeEndDate
| sort -_time

Many thanks

Get Updates on the Splunk Community!

.conf24 | Day 0

Hello Splunk Community! My name is Chris, and I'm based in Canberra, Australia's capital, and I travelled for ...

Enhance Security Visibility with Splunk Enterprise Security 7.1 through Threat ...

 (view in My Videos)Struggling with alert fatigue, lack of context, and prioritization around security ...

Troubleshooting the OpenTelemetry Collector

  In this tech talk, you’ll learn how to troubleshoot the OpenTelemetry collector - from checking the ...