Hello,
Let's say I have the following tables
index=events
_time | event_id | ip |
index=connections
_time | ip_address | user |
When users connect to the system, it gets registered on table connections, with the time of the connection, user and ip_address asigned to that user. Users are attached to this ip until they disconnect. When a user disconnects and a new user connects to the system, the same ip_address can be asigned to the new user.
Events are provoked and registered on table events, associated to an ip, also with the _time of the event and a unique event_id.
ip and ip_address are the same address but with different names on each table.
If I want to obtain the user behind the ip that provoked a single event based on event_id (3000 on this example), which means the last user that was connected to the ip before the event happened, I would do:
index=connections
[ search index=events event_id="3000"
| head 1
| eval latest=_time
| eval earliest=relative_time(latest, "-24h")
| eval ip_address=ip
| return earliest latest ip_address event_id
]
| sort by -_time
| head 1
| table _time event_id ip_address user
This gives a table with a single row, for example, with tables:
connections
22:00 | 10.0.0.5 | margarita |
19:00 | 10.0.0.17 | charles |
11:00 | 10.0.0.5 | alice |
events
23:00 | 3002 | 10.0.0.5 |
20:00 | 3001 | 10.0.0.17 |
18:00 | 3000 | 10.0.0.5 |
I would get:
18:00 | 3000 | 10.0.0.5 | alice |
Where the search gets the last user (alice) that was connected to the ip (10.0.0.5) before the event _time (18:00), although there is a connection registered later with that ip.
Now, I would like to obtain this result for every row on events table, with fomat:
_time | event_id | ip |
user |
For example, for both tables examples above, I would like to get:
23:00 | 3002 | 10.0.0.5 | margarita |
20:00 | 3001 | 10.0.0.17 | charles |
18:00 | 3000 | 10.0.0.5 |
alice |
It should be like a join ip - ip_address but having into account that the event _time defines which row of connections to be used, as there would be more than one row with the same ip_address.
I have thought and tried different approachs, like adding multiple subsearches, using both JOIN and subsearch and using foreach command, but I always encounter a problem where I can't return more than one "latest", but I feel like there should be an easy way to achieve this, but I am not very expert with Splunk.
Any ideas/hints of how could I achieve this?
Thank you very much!
Try something like this
index=connections
| append
[ search index=events
| rename ip as ip_address]
| sort _time
| streamstats latest(user) as user by ip_address
| where isnotnull(event_id)
Try something like this
index=connections
| append
[ search index=events
| rename ip as ip_address]
| sort _time
| streamstats latest(user) as user by ip_address
| where isnotnull(event_id)
This worked perfectly, thank you!
Try this:
index=connections
| stats max(_time) as connection_time ip_address user
| join type=left ip_address
[ search index=events event_id=*
| rename ip as ip_address
| stats max(_time) as provoked_time values(event_id) as event_id by ip_address ]
| convert ctime(*_time)
Upvote/Accept if it works.
Hello,
Thanks for the response.
On line:
| stats max(_time) as connection_time ip_address user
I'm getting: Error in 'stats' command: The argument 'ip_address' is invalid.
I tried solving it with
| stats max(_time) as connection_time values(ip_address) values(user)
But I'm getting results as:
connection_time | values(ip_address) | values(user) | ip_address | provoked_time |
23:00 | 10.0.0.6 | alice | ||
10.0.0.15 | robert | |||
10.0.0.129 | margarita |
I added some more information and simplified the example data.
try this:
index=connections
| stats max(_time) as connection_time values(user) as user by ip_address
| join type=left ip_address
[ search index=events event_id=*
| rename ip as ip_address
| stats max(_time) as provoked_time values(event_id) as event_id by ip_address ]
| convert ctime(*_time)