Hello there,
I have two sets of data under two different indexes. The fields for each index are respectively [customer_id, datetime] and [customer_id, date_of_creation, motive].
I would like to perform a join on the field "customer_id" in order to have the motives for each line. Problem is that in the second index, there can be multiple lines with the same "customer_id", so to perform the join on this field I need to check that the dates field are consistent (difference of 5 minutes max).
Any idea how could I do that ? Thanks in advance 😄
It can be done a lot of ways. One would be something like this...
Assuming you want the first B record after each A record...
(index = A search terms ) OR (index=B search terms)
| fields index customer_id datetime date_of_creation motive
| eval mydate=if(index="A", format datetime into epoch time, format date_of_creation into epoch time)
| eval recordA=if(index="A",1,0)
| sort 0 customer_id mydate
| streamstats sum(recordA) as recordA by customer_id
| streamstats count as countA by customer_id recordA
| where countA <= 2
| stats min(mydate) as Atime max(mydate) as Btime range(mydate) as duration
values(index) as index values(motive) as motive
by customer_id recordA
Assuming you want the last B record before each A record, change the sort line to this...
| sort 0 customer_id - mydate
...and the stats line to this...
| stats max(mydate) as Atime min(mydate) as Btime range(mydate) as duration
values(index) as index values(motive) as motive
by customer_id recordA
It can be done a lot of ways. One would be something like this...
Assuming you want the first B record after each A record...
(index = A search terms ) OR (index=B search terms)
| fields index customer_id datetime date_of_creation motive
| eval mydate=if(index="A", format datetime into epoch time, format date_of_creation into epoch time)
| eval recordA=if(index="A",1,0)
| sort 0 customer_id mydate
| streamstats sum(recordA) as recordA by customer_id
| streamstats count as countA by customer_id recordA
| where countA <= 2
| stats min(mydate) as Atime max(mydate) as Btime range(mydate) as duration
values(index) as index values(motive) as motive
by customer_id recordA
Assuming you want the last B record before each A record, change the sort line to this...
| sort 0 customer_id - mydate
...and the stats line to this...
| stats max(mydate) as Atime min(mydate) as Btime range(mydate) as duration
values(index) as index values(motive) as motive
by customer_id recordA
@kcollori - you could also add a test at the end to blank out the motive if it wasn't within your five minute window, or if no motive record was found...
| eval motive=case(duration=0,"unknown",
mvcount(index)=1 AND index="A","unknown",
duration<=300,motive,
true(),"unknown")
Thanks a lot Dal ! It does what I was looking for. I have just one more issue that is my index A is no longer an index, for some reasons I decided to work with a CSV located in the csv directory of Splunk.
How could I adapt your code knowing that A is now a CSV and B still an index ?
Thanks in advance 😄
@kcollori - not much difference. As long as you have less than 50K events in the csv, just add inputcsv
between lines 1 and 2 of the original search and drop the indexA search from the initial part. Leave the fields
command both before and after the inputcsv
.
(index=B search terms)
| fields index customer_id datetime date_of_creation motive
| inputcsv append=t usedtobeindexA.csv
| fields index customer_id datetime date_of_creation motive
remainder of search