Splunk Search

Anomaly detection in Splunk

axm1295
New Member

Hi all,

I am new to Splunk and have been trying to work on a use case to detect anomalous switches from one type of account to another.

Index A: Has the list of switches i.e. has two columns: 'Old account', 'New account'.
Index B: Has the *type* of accounts. It has two columns: 'Accounts', 'Account_types'.

Till now, using commands like join (after renaming certain columns), I have been able to get to a point where I have a table of 4 columns, 'Old account', 'Old_account_type', New account', 'New_account_type'.

Aim:
I need to implement logic to detect if old accounts switch to 'unusual' new accounts**.**

Idea so far:
I wish to create a dictionary of some sort where there is a list of new accounts and new_account_type(s) an old account has switched to. And then, if the old account switches to an account not in this dictionary, I wish to flag it up. Does this sound like a logical idea?

For example, if looking at past 4 switches, if an old account named A of the type 'admin', switches to new accounts named 1, 2, 3, 4 of type admin, user, admin, admin, then the dictionary should look like
A_switches = {
"Old Account": "A",
"old_account_type":"admin",
"New Account": [1 , 2 , 3, 4],
"type": [admin, user]
}

This query needs to be run each hour to flag up unusual switches. Can someone suggest how I can implement the above logic i.e. create a dictionary and spot unusual activity?

Apologies for the long question and if something isn't clear.

Labels (5)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

Before tackling the question of anomaly, I made the following simulations in order to clarify premises.  Please let me know if my understanding of the problem is correct: indexA is a regular audit log that contains an ID that cannot be changed, account names associated with each ID that can change over time; each of these account names have an associated type that is stored in indexB; only the latest association is valid.  Hence,

 

``` simulation of indexA ```
| makeresults
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "index1", id = "userA", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack”)
| fields - inc

 

 
_timeaccountidindex
2021-11-19 00:27:02johnuserXindexA
2021-11-19 01:27:02jeffuserXindexA
2021-11-19 02:27:02joeuserXindexA
2021-11-19 03:27:02jackuserXindexA

Note  the above depicts progression of a single ID userX over a period of 4 hours.

 

``` simulation of indexB ```
| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "indexB"
| rename column as account, "row 1" as type

 

accounttype_timeindex
jackuser2021-11-19 00:32:27indexB
jeffuser2021-11-19 00:32:27indexB
jimadmin2021-11-19 00:32:27indexB
joerobot2021-11-19 00:32:27indexB
johnadmin2021-11-19 00:32:27indexB

The above depicts the latest snapshot of the account table, therefore a single timestamp.

If the two assumptions look correct, I can think of two ways to combine the data, based on the labels you put in the question.  First is KV lookup: you can dump the latest account table from indexB into a lookup table, then use lookup to associate accounts in indexA with types.

 

| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "index2"
| rename column as account, "row 1" as type

``` dump latest snapshot of indexB into lookup ```
| dedup account type
| fields - _time
| outputlookup accounttype.csv

 

Then, use this lookup in indexA

 

| makeresults
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "index1", id = "userA", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack”)
| fields - inc

``` lookup accounttype.csv ```
| lookup accounttype.csv account

 

This method requires the lookup table to be maintained up to date.  This can be a disadvantage.

Meanwhile, since you are also considering join, let me illustrate a different method, append, which is less expensive. (Many thanks to bowesmana who recently reminded me of this trick.)

 

| makeresults
``` simulate index1```
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "indexA", id = "userX", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack")
| fields - inc

``` less expensive than join ```
| append
    [
    ``` simulat index2 ```
    | makeresults
    | eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
    | stats values(j*) as j*
    | transpose
    | eval _time = now(), index = "indexB"
    | rename column as account, "row 1" as type

    ``` only use latest, not interested in _time in indexB ```
    | dedup account type
    | fields - _time]

``` associate account type with account ```
| fields - index
| stats values(*) as * values(_time) as _time by account
| where isnotnull(id) ``` only necessary in this simulation ```
| sort _time

 

This gives you

accountidtype_time
johnuserXadmin2021-11-19 01:07:50
jeffuserXuser2021-11-19 02:07:50
joeuserXrobot2021-11-19 03:07:50
jackuserXuser2021-11-19 04:07:50

With simulated data, userX's earliest account:type is john:admin, then it becomes jeff:user, then joe:user, and as of late, jack:user; in account type alone, the progression is admin->user->robot-user.

Note I added a filter "| where isnotnull(id)" because account jim in indexB is not associated with userX, making a funny display in the end result.  In the real world, an account is always associated with an id, the funny display should not happen.

Also, wildcard "values(*) as *"  does not work on _time, hence "values(_time) as _time" is necessary.

Tags (1)
0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

Would this gather the information you need?

| stats values(new_account) as new_accounts values(new_account_type) as new_account_types by old_account old_account_type
0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In September, the Splunk Threat Research Team had two releases of new security content via the Enterprise ...

New in Observability - Improvements to Custom Metrics SLOs, Log Observer Connect & ...

The latest enhancements to the Splunk observability portfolio deliver improved SLO management accuracy, better ...

Improve Data Pipelines Using Splunk Data Management

  Register Now   This Tech Talk will explore the pipeline management offerings Edge Processor and Ingest ...