Splunk Search

How to correlate events from three sources based on time condition

spnewashik
New Member

I have one index which have events from 3 different sources (A, B & C). The value of CELL, CALLERNO & CALLEDNO are the phone no. The data of the sources are as below:

Source:A

time                    CELL           Agent
2019-06-11 09:50:00  12445678      Bob  
2019-06-11 09:46:00  12997865      Alex     
2019-06-11 09:45:00  12776547      Alice        

Source:B

time                    CALLERNO       STAFF            
2019-06-11 10:50:00 12445634        Jon 
2019-06-11 11:46:00 12997897        Alex        
2019-06-11 02:45:00 12776547        Alice       

Source:C

time                  CALLERNO   STAFFNO       CALLEDNO             
2019-06-11 08:50:00 12445678     Jon           12445634
2019-06-11 07:46:00 12737865     July         12453255         
2019-06-11 06:45:00 12776547     Alice         12997865
  1. I want to match the value of "CELL" from source "A" with the value of "CALLERNO" (source B) & "CALLEDNO" (source C).
  2. Here one condition for matching, if the value of "CALLERNO" (source B) & "CALLEDNO" (source C) are within past 10min from the value of "CELL" (source A).
  3. If match the condition, then it will show me the value of "STAFF" name from source B and "STAFFNO" from source C in one table along with the "CELL" & "Agent" value of source "A".
  4. If not match the condition then it will show "Not match"
  5. Here one problem I faced that is the same field name of source B & C - "CALLERNO". I don't care value of it from source C.
  6. For that, I create a single Field Alias named as "CONTACT" with value of "CELL" (source-A), "CALLERNO" (source-B) & "CALLEDNO" (source-C)

I applied below command which perfectly shows only for matching without my time condition. Here I just test whether matching is working or not and the time condition is not tested here:

index=cim source=A |join Contact type=left    [search  source=B OR source=C| fields _time,Contact,Agent,STAFF,STAFFNO] | fillnull value="not match" | table _time,Contact,Agent,STAFF,STAFFNO

For matching the time condition (10min ), I applied the below command which doesn't show me expected result:

index=cim source=A |join Contact type=left  usetime=true earlier=true  [search  source=B OR source=C earliest=-10m latest=now| fields _time,Contact,Agent,STAFF,STAFFNO] | fillnull value="not match" | table _time,Contact,Agent,STAFF,STAFFNO

Please note that it is working perfectly if I join only two source instead of three sources. Please help me.

The expected output I mention below

Expected Output:

time                    CONTACT Agent          STAFF         STAFFNO            
2019-06-11 08:50:00 12445678     Jon           Jon           Not-match
2019-06-11 07:46:00 12737865     July         Adam          July    
2019-06-11 06:45:00 12776547     Alice         Not-match     Jud
0 Karma

woodcock
Esteemed Legend

Try this:

|makeresults | eval raw="time=2019-06-11T09:50:00,sourcetype=A,CELL=12445678,Agent=Bob time=2019-06-11T09:40:00,sourcetype=A,CELL=12997865,Agent=Alex time=2019-06-11T09:20:00,sourcetype=A,CELL=12776547,Agent=Alice time=2019-06-11T09:10:00,sourcetype=A,CELL=12321456,Agent=Adam time=2019-06-11T10:50:00,sourcetype=B,CALLERNO=12445634,STAFF=Jon time=2019-06-11T10:46:00,sourcetype=B,CALLERNO=12997897,STAFF=Alex time=2019-06-11T09:30:00,sourcetype=B,CALLERNO=12997865,STAFF=Alice time=2019-06-11T08:55:00,sourcetype=B,CALLERNO=12321456,STAFF=Adam time=2019-06-11T09:40:00,sourcetype=C,CALLERNO=12995678,STAFFNO=Jon,CALLEDNO=12445678 time=2019-06-11T08:50:00,sourcetype=C,CALLERNO=12324356,STAFFNO=Adam,CALLEDNO=12321456 time=2019-06-11T07:46:00,sourcetype=C,CALLERNO=12737865,STAFFNO=July,CALLEDNO=12453255 time=2019-06-11T06:45:00,sourcetype=C,CALLERNO=12776547,STAFFNO=Alice,CALLEDNO=12098765"
| makemv raw
| mvexpand raw
| rename raw AS _raw
| kv
| fields - _raw

| rename COMMENT AS "Everything above generates sample event data; everything below is your solution"

| eval _time = strptime(time, "%Y-%m-%dT%H:%M:%S")
| sort 0 _time
| eval CONTACT = coalesce(CELL, CALLEDNO, CALLERNO)
| eval ABjoiner=if((sourcetype=="A" OR sourcetype=="B"), "AB", null())
| streamstats current=f last(_time) AS ABprev_time last(STAFF) AS ABprevSTAFF BY CONTACT ABjoiner
| eval ACjoiner=if((sourcetype=="A" OR sourcetype=="C"), "AC", null())
| streamstats current=f last(_time) AS ACprev_time last(STAFFNO) AS ACprevSTAFFNO BY CONTACT ACjoiner
| search sourcetype="A"
| eval STAFF = if((_time - ABprev_time) <= (10 * 60), ABprevSTAFF, null())
| eval STAFFNO = if((_time - ACprev_time) <= (10 * 60), ACprevSTAFFNO, null())
| table _time CONTACT Agent STAFF STAFFNO
| sort 0 - _time
| fillnull value="Not-match" STAFF STAFFNO
0 Karma

rslama
Path Finder

If I understand your question correctly, I think this is what you need:
index=smeindex sourcetype IN (A,B,C)
| bucket _time span=10m
| stats values(field1) as field1 values(field2) as field2 values(field3) as field3 by _time

then you can add any condition, like:
|where field1=field2

0 Karma

woodcock
Esteemed Legend

Your sample data matches neither your description (the times are not 10-minutes apart anywhere) nor your final Expected Output (there is no Adam nor Jud). Clean up the coherency of the details in your question and then maybe we can make this work for you.

0 Karma

spnewashik
New Member

Hi woodcock,
thanks for your reply. I just share this data as an example, thats why the time different is not accurate. The expected output is also the sample copy that the final output would be like this table.
However, for easy understanding I am sharing here the time match for source data and expected output:

Source:A

time                    CELL           Agent
2019-06-11 09:50:00  12445678      Bob  
2019-06-11 09:40:00  12997865      Alex     
2019-06-11 09:20:00  12776547      Alice
2019-06-11 09:10:00  12321456      Adam

Source:B

time                    CALLERNO       STAFF            
2019-06-11 10:50:00 12445634        Jon 
2019-06-11 10:46:00 12997897        Alex        
2019-06-11 09:30:00 12997865        Alice
2019-06-11 08:55:00  12321456      Adam 

Source:C

time                  CALLERNO   STAFFNO       CALLEDNO             
2019-06-11 09:40:00 12995678     Jon           12445678
2019-06-11 08:50:00 12324356     Adam         12321456
2019-06-11 07:46:00 12737865     July         12453255         
2019-06-11 06:45:00 12776547     Alice         12098765

Expected Output:

time                    CONTACT  Agent         STAFF         STAFFNO            
2019-06-11 09:50:00 12445678     Bob           Not-match     Jon
2019-06-11 09:40:00 12997865     Alex         Alice         Not-match    
2019-06-11 09:10:00 12776547     Alice         Not-match     Not-match
2019-06-11 08:10:00  12321456   Adam          Not-match     Not-match

I am sharing here three scenario based on my expected output:

  1. CELL=12445678 exist in Source-A (time 09:50) & within just past 10m (time 09:40) it is also exist in Source-C. This value is not exist in source-B. So the output shows value of "Agent" (Bob) from source-A, "STAFFNO" (Jon) from source-C and shows "Not-match" for "STAFF" of Source-B.

  2. CELL=12997865 exist in Source-A (time 09:40) & within just past 10m (time 09:30) it is also exist in Source-B. This value is not exist in source-C. So the output shows value of "Agent" (Alex) from source-A, "STAFF" (Alice) from source-B and shows "Not-match" for "STAFFNO" OF Source-C.

  3. CELL=12321456 exist in Source-A (time 09:10).It is exist in Source-B & Source-C but not within just past 10m compared with Source-A time. So the output shows value of "Agent" (Adam) from source-A & shows "Not-match" for "STAFF" & "STAFFNO".

0 Karma
Get Updates on the Splunk Community!

Earn a $35 Gift Card for Answering our Splunk Admins & App Developer Survey

Survey for Splunk Admins and App Developers is open now! | Earn a $35 gift card!      Hello there,  Splunk ...

Continuing Innovation & New Integrations Unlock Full Stack Observability For Your ...

You’ve probably heard the latest about AppDynamics joining the Splunk Observability portfolio, deepening our ...

Monitoring Amazon Elastic Kubernetes Service (EKS)

As we’ve seen, integrating Kubernetes environments with Splunk Observability Cloud is a quick and easy way to ...