Hi all,

I am new to Splunk and have been trying to work on a use case to detect anomalous switches from one type of account to another.

Index A: Has the list of switches i.e. has two columns: 'Old account', 'New account'.
Index B: Has the *type* of accounts. It has two columns: 'Accounts', 'Account_types'.

Till now, using commands like join (after renaming certain columns), I have been able to get to a point where I have a table of 4 columns, 'Old account', 'Old_account_type', New account', 'New_account_type'.

I need to implement logic to detect if old accounts switch to 'unusual' new accounts**.**

Idea so far:
I wish to create a dictionary of some sort where there is a list of new accounts and new_account_type(s) an old account has switched to. And then, if the old account switches to an account not in this dictionary, I wish to flag it up. Does this sound like a logical idea?

For example, if looking at past 4 switches, if an old account named A of the type 'admin', switches to new accounts named 1, 2, 3, 4 of type admin, user, admin, admin, then the dictionary should look like
A_switches = {
"Old Account": "A",
"New Account": [1 , 2 , 3, 4],
"type": [admin, user]

This query needs to be run each hour to flag up unusual switches. Can someone suggest how I can implement the above logic i.e. create a dictionary and spot unusual activity?

Apologies for the long question and if something isn't clear.

Before tackling the question of anomaly, I made the following simulations in order to clarify premises.  Please let me know if my understanding of the problem is correct: indexA is a regular audit log that contains an ID that cannot be changed, account names associated with each ID that can change over time; each of these account names have an associated type that is stored in indexB; only the latest association is valid.  Hence,


``` simulation of indexA ```
| makeresults
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "index1", id = "userA", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack”)
| fields - inc


2021-11-19 00:27:02johnuserXindexA
2021-11-19 01:27:02jeffuserXindexA
2021-11-19 02:27:02joeuserXindexA
2021-11-19 03:27:02jackuserXindexA

Note  the above depicts progression of a single ID userX over a period of 4 hours.


``` simulation of indexB ```
| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "indexB"
| rename column as account, "row 1" as type


jackuser2021-11-19 00:32:27indexB
jeffuser2021-11-19 00:32:27indexB
jimadmin2021-11-19 00:32:27indexB
joerobot2021-11-19 00:32:27indexB
johnadmin2021-11-19 00:32:27indexB

The above depicts the latest snapshot of the account table, therefore a single timestamp.

If the two assumptions look correct, I can think of two ways to combine the data, based on the labels you put in the question.  First is KV lookup: you can dump the latest account table from indexB into a lookup table, then use lookup to associate accounts in indexA with types.


| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "index2"
| rename column as account, "row 1" as type

``` dump latest snapshot of indexB into lookup ```
| dedup account type
| fields - _time
| outputlookup accounttype.csv


Then, use this lookup in indexA


| makeresults
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "index1", id = "userA", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack”)
| fields - inc

``` lookup accounttype.csv ```
| lookup accounttype.csv account


This method requires the lookup table to be maintained up to date.  This can be a disadvantage.

Meanwhile, since you are also considering join, let me illustrate a different method, append, which is less expensive. (Many thanks to bowesmana who recently reminded me of this trick.)


| makeresults
``` simulate index1```
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "indexA", id = "userX", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack")
| fields - inc

``` less expensive than join ```
| append
    ``` simulat index2 ```
    | makeresults
    | eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
    | stats values(j*) as j*
    | transpose
    | eval _time = now(), index = "indexB"
    | rename column as account, "row 1" as type

    ``` only use latest, not interested in _time in indexB ```
    | dedup account type
    | fields - _time]

``` associate account type with account ```
| fields - index
| stats values(*) as * values(_time) as _time by account
| where isnotnull(id) ``` only necessary in this simulation ```
| sort _time


This gives you

johnuserXadmin2021-11-19 01:07:50
jeffuserXuser2021-11-19 02:07:50
joeuserXrobot2021-11-19 03:07:50
jackuserXuser2021-11-19 04:07:50

With simulated data, userX's earliest account:type is john:admin, then it becomes jeff:user, then joe:user, and as of late, jack:user; in account type alone, the progression is admin->user->robot-user.

Note I added a filter "| where isnotnull(id)" because account jim in indexB is not associated with userX, making a funny display in the end result.  In the real world, an account is always associated with an id, the funny display should not happen.

Also, wildcard "values(*) as *"  does not work on _time, hence "values(_time) as _time" is necessary.

Would this gather the information you need?

| stats values(new_account) as new_accounts values(new_account_type) as new_account_types by old_account old_account_type
