Splunk Search

How to join 2 tables by field and closest table2 _time before table1 _time?

warlitos
Explorer

Hello,

Let's say I have the following tables

index=events

_time event_id ip

 

index=connections

_time ip_address user


When users connect to the system, it gets registered on table connections, with the time of the connection, user and ip_address asigned to that user. Users are attached to this ip until they disconnect. When a user disconnects and a new user connects to the system, the same ip_address can be asigned to the new user.

Events are provoked and registered on table events, associated to an ip, also with the _time of the event and a unique event_id.

ip and ip_address are the same address but with different names on each table.

 

If I want to obtain the user behind the ip that provoked a single event based on event_id (3000 on this example), which means the last user that was connected to the ip before the event happened, I would do:

index=connections
[ search index=events event_id="3000"
| head 1
| eval latest=_time
| eval earliest=relative_time(latest, "-24h")
| eval ip_address=ip
| return earliest latest ip_address event_id
]
| sort by -_time
| head 1
| table _time event_id ip_address user

This gives a table with a single row, for example, with tables:

connections

22:00 10.0.0.5 margarita
19:00 10.0.0.17 charles
11:00 10.0.0.5 alice

 

events

23:00 3002 10.0.0.5
20:00 3001 10.0.0.17
18:00 3000 10.0.0.5

 

I would get:

18:00 3000 10.0.0.5 alice

 

Where the search gets the last user (alice) that was connected to the ip (10.0.0.5) before the event _time (18:00), although there is a connection registered later with that ip.

 

Now, I would like to obtain this result for every row on events table, with fomat:

_time event_id ip

user

 

For example, for both tables examples above, I would like to get:

23:00 3002 10.0.0.5 margarita
20:00 3001 10.0.0.17 charles
18:00 3000 10.0.0.5

alice

 

It should be like a join ip - ip_address but having into account that the event _time defines which row of connections to be used, as there would be more than one row with the same ip_address.

I have thought and tried different approachs, like adding multiple subsearches, using both JOIN and subsearch and using foreach command, but I always encounter a problem where I can't return more than one "latest", but I feel like there should be an easy way to achieve this, but I am not very expert with Splunk.

Any ideas/hints of how could I achieve this?

Thank you very much!

Labels (2)
Tags (3)
0 Karma
1 Solution

ITWhisperer
SplunkTrust
SplunkTrust

Try something like this

index=connections
| append
  [ search index=events
  | rename ip as ip_address]
| sort _time
| streamstats latest(user) as user by ip_address
| where isnotnull(event_id)

View solution in original post

ITWhisperer
SplunkTrust
SplunkTrust

Try something like this

index=connections
| append
  [ search index=events
  | rename ip as ip_address]
| sort _time
| streamstats latest(user) as user by ip_address
| where isnotnull(event_id)

warlitos
Explorer

This worked perfectly, thank you!

0 Karma

mayurr98
Super Champion

Try this:

index=connections
| stats max(_time) as connection_time ip_address user
| join type=left ip_address
[ search index=events event_id=*
| rename ip as ip_address
| stats max(_time) as provoked_time values(event_id) as event_id by ip_address ]
| convert ctime(*_time)

Upvote/Accept if it works. 

0 Karma

warlitos
Explorer

Hello,

Thanks for the response.

On line:

 

| stats max(_time) as connection_time ip_address user

 

I'm getting:  Error in 'stats' command: The argument 'ip_address' is invalid. 

I tried solving it with

 

| stats max(_time) as connection_time values(ip_address) values(user) 

 

But I'm getting results as:

connection_timevalues(ip_address)values(user)ip_addressprovoked_time
23:0010.0.0.6alice  
 10.0.0.15robert  
 10.0.0.129margarita  

 

I added some more information and simplified the example data.

0 Karma

mayurr98
Super Champion

try this:

 

 

index=connections
| stats max(_time) as connection_time values(user) as user by ip_address
| join type=left ip_address
[ search index=events event_id=*
| rename ip as ip_address
| stats max(_time) as provoked_time values(event_id) as event_id by ip_address ]
| convert ctime(*_time)

 

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...