Splunk Search

Multiple login from same source ip with dynamic source ip

gadepoonam
Explorer

I am trying to implement security use case to detect Multiple login from same Source IP. Source IP is dynamic, every time user logs in he will be having different source ip and his previous source IP might be assigned to different user. I am using fortigate for firewall and OS is RHEL. In order to track multiple login from same ip, i am following below approach:
1. Get the firewall acces user details: source ip and interval for which firewall session is active.
2. Corresponding to this source IP and time interval, get the list of users which have matching session for RHEL.

Even though there are valid sessions, the search query mentioned below does not return any data after applying where clause for time interval. The outer query and sub query when ran independently is able to return correct data.

index=firewall sourcetype=fgt_event (tunneltype="ssl-tunnel" AND msg="SSL tunnel established") OR (tunneltype="ssl-tunnel" AND msg"SSL tunnel shutdown") | transaction user tunnelid startswith=("ssl-tunnel" AND "SSL tunnel established") endswith=("ssl-tunnel" AND "SSL tunnel shutdown") | eval start_firewall=_time | eval end_firewall=_time+duration
|  convert ctime(start_firewall) ctime(end_firewall)
| rename user as firewall_user 
|  rename  tunnelip as src_ip 
|  table  firewall_user start_firewall end_firewall  src_ip | sort  by start_firewall | join src_ip    [ search   index=rhel sourcetype=linux_secure process=sshd  | transaction pid startswith="Accepted password" endswith="session closed"  |  eval start_secure=_time | eval end_secure=_time+duration |  convert ctime(start_secure) ctime(end_secure)  
|  where start_firewall<=start_secure AND end_firewall >= end_secure 
    |  table  user  start_secure  end_secure src_ip dest by  start_secure ]
| table  firewall_user  src_ip start_firewall start_secure end_firewall  end_secure user dest
Tags (1)
0 Karma
1 Solution

DalJeanis
Legend

Try this -

 index=firewall sourcetype=fgt_event 
(tunneltype="ssl-tunnel" AND msg="SSL tunnel established") OR 
(tunneltype="ssl-tunnel" AND msg"SSL tunnel shutdown") 
| rename COMMENT as "match up the firewall transactions"
| transaction user tunnelid 
    startswith=("ssl-tunnel" AND "SSL tunnel established") 
    endswith=("ssl-tunnel" AND "SSL tunnel shutdown") 

| rename COMMENT as "reformat the firewall transaction fields"
| eval start_firewall=_time 
| eval end_firewall=_time+duration
| rename user as firewall_user 
| rename  tunnelip as src_ip 
| stats count by src_ip firewall_user start_firewall end_firewall  

| rename COMMENT as "join every firewall transaction to all matching RHEL transactions"
| join type=left max=0 src_ip 
    [ search index=rhel sourcetype=linux_secure process=sshd  
    | rename COMMENT as "match up the RHEL transactions"
    | transaction pid startswith="Accepted password" endswith="session closed"  
    | rename COMMENT as "reformat the RHEL transaction fields"
    | eval start_secure=_time 
    | eval end_secure=_time+duration
    | stats count by src_ip user start_secure end_secure dest  
    ]

| rename COMMENT as "kill all copies where the time doesn't match"
| where start_firewall<=start_secure AND end_firewall >= start_secure 
| table  firewall_user  src_ip start_firewall start_secure end_firewall  end_secure user dest
| convert ctime(start_firewall) ctime(end_firewall)  ctime(start_secure) ctime(end_secure)

Okay, here's some general notes and explanation -

1) Don't reformat stuff that you may not use. Hold the ctime until the end. Also, convert actually changes the underlying format of the fields, so as a general case date/time fields can't be directly compared afterwards. Compare to fieldformat and eval.

2) Don't require one time to fit completely within the other. If the RHEL starts while a firewall redirect is in place, then it's a match. Let's not have any accidents where one application takes a little longer to log off than the other.

3) There was a by start_secure on the end of a table command inside the subsearch. The by will create a blank field called by, not sure what the duplicate start_secure will do. Just delete those. Also, don't worry about sorting the sides before the join.

4) Oh and by the way, get in the habit of always having a number after the keyword sort. Use sort 0 to keep all your results, otherwise sort will default to truncating at 100, 1000 or something records. Crazy stupid default, coming from a relational database and mainframe perspective, but you get used to it.

5) You have to join the left side src_ip with EVERY matching src_ip on the right side before you eliminate by time range. Use join max=0 (This WOULD have been the real culprit, except the next one already killed all the subsearch.)

6) inside the subsearch, you are comparing to fields that don't exist inside the subsearch. ( start_firewall and end_firewall)


NOW, about your overall strategy - these files can both get pretty big, and we are doing a cross-join by src_ip, so you either need to keep the timeframe pretty tight, or you'll need to implement something else to reduce the geometric size of the join.

I'm thinking that there is probably a simple way to make that work, by creating a match_time at a carefully chosen span.

First, find out the typical duration of the firewall events. Let's suppose that they typically last 6 minutes. For that average, a useful choice of span might be anything between 2m and 12m, you can test the search at various levels and see which one chokes least.

We span the RHEL transaction to that span, and then duplicate each firewall transaction to every match_time that might be needed. Here's sample code changes for 10m (600s):

...
| rename COMMENT as "create match_time for firewall transactions at span=10m"
| eval match_time=mvrange(600*floor(start_firewall/600),end_firewall+1,600)
| mvexpand match_time

| rename COMMENT as "join every firewall transaction to all matching RHEL transactions"
| join type=left max=0 src_ip match_time
      [
       ...
       | stats count by src_ip user start_secure end_secure dest  
       | eval match_time = 600*floor(start_secure/600)
      ]

 | rename COMMENT as "kill all copies where the time doesn't match, stats together all dups if any"
 | where start_firewall<=start_secure AND end_firewall >= start_secure 
 | stats by firewall_user  src_ip start_firewall start_secure end_firewall  end_secure user dest
 | convert ctime(start_firewall) ctime(end_firewall)  ctime(start_secure) ctime(end_secure)

TL;DR: Please test a couple of spans from about 10% to 200% of your typical firewall transaction length and see what the actual results are, then let us know.

Musings -

Why did I choose 10m for my example, with 6m typical firewall transactions?

I'm pretty sure the "best" span in theory is going to require calculus, and prior knowledge of the avg and stdev of the firewall duration, but my gut says that a good number for span has to be between half and twice the typical duration, with larger numbers having a slight advantage due to the upward outliers.

If you choose half the average span, then each firewall transaction will typically break up into three parts, one starting before the transaction, one ending after. The one that ends after will almost always contain time where that IP was allocated to someone else, so you will commonly have two matches, one of which is wrong, among the three transactions. As well, LONG outliers in the firewall will create many transactions, and most of them will be wrong... although they also won't match anything, which is good, even though they will try, which is bad.

At the other end, going twice as long as the average span, then each firewall transaction will break up into two parts only half the time. The match on overlap will go both ways, though, which is still geometric (bad) so actually I may have gut-checked the wrong way and shorter spans are better.

BOTTOM LINE - test a couple of spans from about 10% to 200% of your typical firewall transaction and see what the actual results are, then let us know.

View solution in original post

DalJeanis
Legend

Try this -

 index=firewall sourcetype=fgt_event 
(tunneltype="ssl-tunnel" AND msg="SSL tunnel established") OR 
(tunneltype="ssl-tunnel" AND msg"SSL tunnel shutdown") 
| rename COMMENT as "match up the firewall transactions"
| transaction user tunnelid 
    startswith=("ssl-tunnel" AND "SSL tunnel established") 
    endswith=("ssl-tunnel" AND "SSL tunnel shutdown") 

| rename COMMENT as "reformat the firewall transaction fields"
| eval start_firewall=_time 
| eval end_firewall=_time+duration
| rename user as firewall_user 
| rename  tunnelip as src_ip 
| stats count by src_ip firewall_user start_firewall end_firewall  

| rename COMMENT as "join every firewall transaction to all matching RHEL transactions"
| join type=left max=0 src_ip 
    [ search index=rhel sourcetype=linux_secure process=sshd  
    | rename COMMENT as "match up the RHEL transactions"
    | transaction pid startswith="Accepted password" endswith="session closed"  
    | rename COMMENT as "reformat the RHEL transaction fields"
    | eval start_secure=_time 
    | eval end_secure=_time+duration
    | stats count by src_ip user start_secure end_secure dest  
    ]

| rename COMMENT as "kill all copies where the time doesn't match"
| where start_firewall<=start_secure AND end_firewall >= start_secure 
| table  firewall_user  src_ip start_firewall start_secure end_firewall  end_secure user dest
| convert ctime(start_firewall) ctime(end_firewall)  ctime(start_secure) ctime(end_secure)

Okay, here's some general notes and explanation -

1) Don't reformat stuff that you may not use. Hold the ctime until the end. Also, convert actually changes the underlying format of the fields, so as a general case date/time fields can't be directly compared afterwards. Compare to fieldformat and eval.

2) Don't require one time to fit completely within the other. If the RHEL starts while a firewall redirect is in place, then it's a match. Let's not have any accidents where one application takes a little longer to log off than the other.

3) There was a by start_secure on the end of a table command inside the subsearch. The by will create a blank field called by, not sure what the duplicate start_secure will do. Just delete those. Also, don't worry about sorting the sides before the join.

4) Oh and by the way, get in the habit of always having a number after the keyword sort. Use sort 0 to keep all your results, otherwise sort will default to truncating at 100, 1000 or something records. Crazy stupid default, coming from a relational database and mainframe perspective, but you get used to it.

5) You have to join the left side src_ip with EVERY matching src_ip on the right side before you eliminate by time range. Use join max=0 (This WOULD have been the real culprit, except the next one already killed all the subsearch.)

6) inside the subsearch, you are comparing to fields that don't exist inside the subsearch. ( start_firewall and end_firewall)


NOW, about your overall strategy - these files can both get pretty big, and we are doing a cross-join by src_ip, so you either need to keep the timeframe pretty tight, or you'll need to implement something else to reduce the geometric size of the join.

I'm thinking that there is probably a simple way to make that work, by creating a match_time at a carefully chosen span.

First, find out the typical duration of the firewall events. Let's suppose that they typically last 6 minutes. For that average, a useful choice of span might be anything between 2m and 12m, you can test the search at various levels and see which one chokes least.

We span the RHEL transaction to that span, and then duplicate each firewall transaction to every match_time that might be needed. Here's sample code changes for 10m (600s):

...
| rename COMMENT as "create match_time for firewall transactions at span=10m"
| eval match_time=mvrange(600*floor(start_firewall/600),end_firewall+1,600)
| mvexpand match_time

| rename COMMENT as "join every firewall transaction to all matching RHEL transactions"
| join type=left max=0 src_ip match_time
      [
       ...
       | stats count by src_ip user start_secure end_secure dest  
       | eval match_time = 600*floor(start_secure/600)
      ]

 | rename COMMENT as "kill all copies where the time doesn't match, stats together all dups if any"
 | where start_firewall<=start_secure AND end_firewall >= start_secure 
 | stats by firewall_user  src_ip start_firewall start_secure end_firewall  end_secure user dest
 | convert ctime(start_firewall) ctime(end_firewall)  ctime(start_secure) ctime(end_secure)

TL;DR: Please test a couple of spans from about 10% to 200% of your typical firewall transaction length and see what the actual results are, then let us know.

Musings -

Why did I choose 10m for my example, with 6m typical firewall transactions?

I'm pretty sure the "best" span in theory is going to require calculus, and prior knowledge of the avg and stdev of the firewall duration, but my gut says that a good number for span has to be between half and twice the typical duration, with larger numbers having a slight advantage due to the upward outliers.

If you choose half the average span, then each firewall transaction will typically break up into three parts, one starting before the transaction, one ending after. The one that ends after will almost always contain time where that IP was allocated to someone else, so you will commonly have two matches, one of which is wrong, among the three transactions. As well, LONG outliers in the firewall will create many transactions, and most of them will be wrong... although they also won't match anything, which is good, even though they will try, which is bad.

At the other end, going twice as long as the average span, then each firewall transaction will break up into two parts only half the time. The match on overlap will go both ways, though, which is still geometric (bad) so actually I may have gut-checked the wrong way and shorter spans are better.

BOTTOM LINE - test a couple of spans from about 10% to 200% of your typical firewall transaction and see what the actual results are, then let us know.

gadepoonam
Explorer

Thanks a lot for search query and detailed explanation!

DalJeanis
Legend

Results - shorter is much better, to a point, With an average of 6 minutes, and 500 records for a single src_ip -- a simulated two-day pull -- I found 100-120 seconds span to be more or less optimum, about 40 times faster than the unspanned join. Time difference was consistent when I scaled it up to 4 different IPs.

0 Karma

DalJeanis
Legend

I've marked your code as code. Doesn't look like anything was lost, but it's best practice to mark it before you post, either with the 101 010 button, by indenting 4 or more spaces, surrounding it with grave accents (`) or a couple of other methods.

0 Karma
Get Updates on the Splunk Community!

Splunk Smartness with Brandon Sternfield | Episode 3

Hello and welcome to another episode of "Splunk Smartness," the interview series where we explore the power of ...

Monitoring Postgres with OpenTelemetry

Behind every business-critical application, you’ll find databases. These behind-the-scenes stores power ...

Mastering Synthetic Browser Testing: Pro Tips to Keep Your Web App Running Smoothly

To start, if you're new to synthetic monitoring, I recommend exploring this synthetic monitoring overview. In ...