Hi All,
While trying to build a correlation search, I have run into a standpoint, where I need some help. I have two indexes and source types: index1, src1 and index2, src2. Here is how the fields in it look like.
What I am trying to achieve here is to build a correlation search, which:
This search will throw a notable event if it produces an output as above.
Any help is highly appreciated,
Thank you
Like this:
|makeresults | eval raw="sourcetype=src1,server_name=win7server,logon-user=abc,time_created=12/30/2018T3:47:55AM,file_path=c:\Program_files(x86)\abc sourcetype=src1,server_name=win8server,logon-user=cde,time_created=12/31/2018T4:44:51AM,file_path=c:\Program_files\Google\Chrome sourcetype=src1,server_name=win10server,logon-user=sam,time_created=12/31/2018T8:48:51AM,file_path=c:\Program_files(x86)\Microsoft_Office sourcetype=src2,dest=win7server,user=abc,detected_timestamp=2018-12-30T09:57:32.0,file_name=c:\Program_files(x86)\abc sourcetype=src2,dest=win8server,user=cde,detected_timestamp=2018-12-31T08:55:57.0,file_name=c:\Program_files\Google\Chrome sourcetype=src2,dest=win10server,user=sam,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\Microsoft_Office sourcetype=src2,dest=win10server,user=karla,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\Activision sourcetype=src2,dest=windows2012,user=Pam,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\Tencent sourcetype=src2,dest=windows2016,user=Pat,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\any"
| makemv raw
| mvexpand raw
| rename raw AS _raw
| kv
| fields - _raw _time
| rex field=time_created mode=sed "s/T/ / s/AM/ AM/"
| rex field=detected_timestamp mode=sed "s/T/ /"
| rename COMMENT AS "Everything above generates sample event data; everything below is your solution"
| eval _time = if((sourcetype="src1"), strptime(time_created, "%m/%d/%Y %H:%M:%S %p"), strptime(detected_timestamp, "%Y-%m-%d %H:%M:%S.%1n"))
| eval file_name = coalesce(file_name, file_path)
| eval user = coalesce(user, logon_user)
| eval dest = coalesce(dest, server_name)
| fields - file_path detected_timestamp time_created logon_user server_name
| stats list(*) AS * list(_time) AS time range(_time) AS distance_seconds dc(sourcetype) AS num_sourcetypes BY dest user file_name
| where num_sourcetypes>1 AND distance_seconds < (12 * 60 * 60)
Like this:
|makeresults | eval raw="sourcetype=src1,server_name=win7server,logon-user=abc,time_created=12/30/2018T3:47:55AM,file_path=c:\Program_files(x86)\abc sourcetype=src1,server_name=win8server,logon-user=cde,time_created=12/31/2018T4:44:51AM,file_path=c:\Program_files\Google\Chrome sourcetype=src1,server_name=win10server,logon-user=sam,time_created=12/31/2018T8:48:51AM,file_path=c:\Program_files(x86)\Microsoft_Office sourcetype=src2,dest=win7server,user=abc,detected_timestamp=2018-12-30T09:57:32.0,file_name=c:\Program_files(x86)\abc sourcetype=src2,dest=win8server,user=cde,detected_timestamp=2018-12-31T08:55:57.0,file_name=c:\Program_files\Google\Chrome sourcetype=src2,dest=win10server,user=sam,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\Microsoft_Office sourcetype=src2,dest=win10server,user=karla,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\Activision sourcetype=src2,dest=windows2012,user=Pam,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\Tencent sourcetype=src2,dest=windows2016,user=Pat,detected_timestamp=2018-12-31T07:07:18.0,file_name=c:\Program_files(x86)\any"
| makemv raw
| mvexpand raw
| rename raw AS _raw
| kv
| fields - _raw _time
| rex field=time_created mode=sed "s/T/ / s/AM/ AM/"
| rex field=detected_timestamp mode=sed "s/T/ /"
| rename COMMENT AS "Everything above generates sample event data; everything below is your solution"
| eval _time = if((sourcetype="src1"), strptime(time_created, "%m/%d/%Y %H:%M:%S %p"), strptime(detected_timestamp, "%Y-%m-%d %H:%M:%S.%1n"))
| eval file_name = coalesce(file_name, file_path)
| eval user = coalesce(user, logon_user)
| eval dest = coalesce(dest, server_name)
| fields - file_path detected_timestamp time_created logon_user server_name
| stats list(*) AS * list(_time) AS time range(_time) AS distance_seconds dc(sourcetype) AS num_sourcetypes BY dest user file_name
| where num_sourcetypes>1 AND distance_seconds < (12 * 60 * 60)
Also, i'm a little confused in the line | eval _time = if((sourcetype="src1"), strptime(time_created, "%m/%d/%Y %H:%M:%S %p"), strptime(detected_timestamp, "%Y-%m-%d %H:%M:%S.%1n")), can you please tell me how the eval strptime command will run here on the fields both the fields time_created and detected_timestamp, as they are from different indexes and sourcetypes. And we are just mentioning one sourcetype src1 in the if command, how is the eval command calling the field detected_timestamp of the other sourcetype src2 here?
The sourcetype src2
is in the else
part of the if
(the 3rd argument).
Awesome. Thank you. One more question. Can we modify this query to compare the fields file_path and file_name, and find the paths which are present in both the fields, even if their _time is different upto 12 hours, or is it doing that right now as well?
What I was trying to achieve was finding the common paths from both the fields, irrespective of their timestamp difference, and then trigger a notable event.
Open a new conversation with a new question.
Hi Woodcock,
Thank you for your response. I'll try this and will let you know my findings soon.
Use iso format, or let Splunk convert the time into a single format for your through props
You can try something like this -
index=<your index name> source IN(src1,src2) | eval time_created = strptime(time_created,"%m/%d/%Y %I:%M:%S %p")| eval detected_timestamp=strptime(detected_timestamp, "%Y-%m-%d %h:%m:%s.%N")| rename time_created as detected_timestamp, logon_user as user, server_name as dest,file_path as file_name| stats count(source) as src, stdev(detected_timestamp) as diff_seconds by dest, user,file_name| where diff_seconds <= 43200 AND src>1
Hi Vijeta,
Thank you for replying. Perhaps I missed a couple of things in the question. These sourcetypes belong to two different indexes and timestamp fields time_created and detected_timestamp aren't normalize. They are a pile of timestamps of various timezones. If src1 detects and records a file_path, and the same value gets recorded into file_name of src2 within 12 hours irrespective of their timezones, my search fetches the results of the dest, file_name and user. I tried your search, tweaked it according to my requirements,but couldn't do it.