Splunk Search

How to join 2 tables by field and closest table2 _time before table1 _time?

warlitos
Explorer

Hello,

Let's say I have the following tables

index=events

_time event_id ip

 

index=connections

_time ip_address user


When users connect to the system, it gets registered on table connections, with the time of the connection, user and ip_address asigned to that user. Users are attached to this ip until they disconnect. When a user disconnects and a new user connects to the system, the same ip_address can be asigned to the new user.

Events are provoked and registered on table events, associated to an ip, also with the _time of the event and a unique event_id.

ip and ip_address are the same address but with different names on each table.

 

If I want to obtain the user behind the ip that provoked a single event based on event_id (3000 on this example), which means the last user that was connected to the ip before the event happened, I would do:

index=connections
[ search index=events event_id="3000"
| head 1
| eval latest=_time
| eval earliest=relative_time(latest, "-24h")
| eval ip_address=ip
| return earliest latest ip_address event_id
]
| sort by -_time
| head 1
| table _time event_id ip_address user

This gives a table with a single row, for example, with tables:

connections

22:00 10.0.0.5 margarita
19:00 10.0.0.17 charles
11:00 10.0.0.5 alice

 

events

23:00 3002 10.0.0.5
20:00 3001 10.0.0.17
18:00 3000 10.0.0.5

 

I would get:

18:00 3000 10.0.0.5 alice

 

Where the search gets the last user (alice) that was connected to the ip (10.0.0.5) before the event _time (18:00), although there is a connection registered later with that ip.

 

Now, I would like to obtain this result for every row on events table, with fomat:

_time event_id ip

user

 

For example, for both tables examples above, I would like to get:

23:00 3002 10.0.0.5 margarita
20:00 3001 10.0.0.17 charles
18:00 3000 10.0.0.5

alice

 

It should be like a join ip - ip_address but having into account that the event _time defines which row of connections to be used, as there would be more than one row with the same ip_address.

I have thought and tried different approachs, like adding multiple subsearches, using both JOIN and subsearch and using foreach command, but I always encounter a problem where I can't return more than one "latest", but I feel like there should be an easy way to achieve this, but I am not very expert with Splunk.

Any ideas/hints of how could I achieve this?

Thank you very much!

Labels (2)
Tags (3)
0 Karma
1 Solution

ITWhisperer
SplunkTrust
SplunkTrust

Try something like this

index=connections
| append
  [ search index=events
  | rename ip as ip_address]
| sort _time
| streamstats latest(user) as user by ip_address
| where isnotnull(event_id)

View solution in original post

ITWhisperer
SplunkTrust
SplunkTrust

Try something like this

index=connections
| append
  [ search index=events
  | rename ip as ip_address]
| sort _time
| streamstats latest(user) as user by ip_address
| where isnotnull(event_id)

warlitos
Explorer

This worked perfectly, thank you!

0 Karma

mayurr98
Super Champion

Try this:

index=connections
| stats max(_time) as connection_time ip_address user
| join type=left ip_address
[ search index=events event_id=*
| rename ip as ip_address
| stats max(_time) as provoked_time values(event_id) as event_id by ip_address ]
| convert ctime(*_time)

Upvote/Accept if it works. 

0 Karma

warlitos
Explorer

Hello,

Thanks for the response.

On line:

 

| stats max(_time) as connection_time ip_address user

 

I'm getting:  Error in 'stats' command: The argument 'ip_address' is invalid. 

I tried solving it with

 

| stats max(_time) as connection_time values(ip_address) values(user) 

 

But I'm getting results as:

connection_timevalues(ip_address)values(user)ip_addressprovoked_time
23:0010.0.0.6alice  
 10.0.0.15robert  
 10.0.0.129margarita  

 

I added some more information and simplified the example data.

0 Karma

mayurr98
Super Champion

try this:

 

 

index=connections
| stats max(_time) as connection_time values(user) as user by ip_address
| join type=left ip_address
[ search index=events event_id=*
| rename ip as ip_address
| stats max(_time) as provoked_time values(event_id) as event_id by ip_address ]
| convert ctime(*_time)

 

0 Karma
Get Updates on the Splunk Community!

Built-in Service Level Objectives Management to Bridge the Gap Between Service & ...

Wednesday, May 29, 2024  |  11AM PST / 2PM ESTRegister now and join us to learn more about how you can ...

Get Your Exclusive Splunk Certified Cybersecurity Defense Engineer Certification at ...

We’re excited to announce a new Splunk certification exam being released at .conf24! If you’re headed to Vegas ...

Share Your Ideas & Meet the Lantern team at .Conf! Plus All of This Month’s New ...

Splunk Lantern is Splunk’s customer success center that provides advice from Splunk experts on valuable data ...