Hi all,
I am new to Splunk and have been trying to work on a use case to detect anomalous switches from one type of account to another.
Index A: Has the list of switches i.e. has two columns: 'Old account', 'New account'.
Index B: Has the *type* of accounts. It has two columns: 'Accounts', 'Account_types'.
Till now, using commands like join (after renaming certain columns), I have been able to get to a point where I have a table of 4 columns, 'Old account', 'Old_account_type', New account', 'New_account_type'.
Aim:
I need to implement logic to detect if old accounts switch to 'unusual' new accounts**.**
Idea so far:
I wish to create a dictionary of some sort where there is a list of new accounts and new_account_type(s) an old account has switched to. And then, if the old account switches to an account not in this dictionary, I wish to flag it up. Does this sound like a logical idea?
For example, if looking at past 4 switches, if an old account named A of the type 'admin', switches to new accounts named 1, 2, 3, 4 of type admin, user, admin, admin, then the dictionary should look like
A_switches = {
"Old Account": "A",
"old_account_type":"admin",
"New Account": [1 , 2 , 3, 4],
"type": [admin, user]
}
This query needs to be run each hour to flag up unusual switches. Can someone suggest how I can implement the above logic i.e. create a dictionary and spot unusual activity?
Apologies for the long question and if something isn't clear.
Before tackling the question of anomaly, I made the following simulations in order to clarify premises. Please let me know if my understanding of the problem is correct: indexA is a regular audit log that contains an ID that cannot be changed, account names associated with each ID that can change over time; each of these account names have an associated type that is stored in indexB; only the latest association is valid. Hence,
``` simulation of indexA ```
| makeresults
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "index1", id = "userA", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack”)
| fields - inc
_time | account | id | index |
2021-11-19 00:27:02 | john | userX | indexA |
2021-11-19 01:27:02 | jeff | userX | indexA |
2021-11-19 02:27:02 | joe | userX | indexA |
2021-11-19 03:27:02 | jack | userX | indexA |
Note the above depicts progression of a single ID userX over a period of 4 hours.
``` simulation of indexB ```
| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "indexB"
| rename column as account, "row 1" as type
account | type | _time | index |
jack | user | 2021-11-19 00:32:27 | indexB |
jeff | user | 2021-11-19 00:32:27 | indexB |
jim | admin | 2021-11-19 00:32:27 | indexB |
joe | robot | 2021-11-19 00:32:27 | indexB |
john | admin | 2021-11-19 00:32:27 | indexB |
The above depicts the latest snapshot of the account table, therefore a single timestamp.
If the two assumptions look correct, I can think of two ways to combine the data, based on the labels you put in the question. First is KV lookup: you can dump the latest account table from indexB into a lookup table, then use lookup to associate accounts in indexA with types.
| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "index2"
| rename column as account, "row 1" as type
``` dump latest snapshot of indexB into lookup ```
| dedup account type
| fields - _time
| outputlookup accounttype.csv
Then, use this lookup in indexA
| makeresults
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "index1", id = "userA", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack”)
| fields - inc
``` lookup accounttype.csv ```
| lookup accounttype.csv account
This method requires the lookup table to be maintained up to date. This can be a disadvantage.
Meanwhile, since you are also considering join, let me illustrate a different method, append, which is less expensive. (Many thanks to bowesmana who recently reminded me of this trick.)
| makeresults
``` simulate index1```
| eval inc = mvrange(0, 4)
| mvexpand inc
| eval _time=_time + inc*3600
| eval index = "indexA", id = "userX", account = case(inc==0, "john", inc==1, "jeff", inc==2, "joe", inc==3, "jack")
| fields - inc
``` less expensive than join ```
| append
[
``` simulat index2 ```
| makeresults
| eval john="admin", jack="user", jeff="user", joe="robot", jim="admin"
| stats values(j*) as j*
| transpose
| eval _time = now(), index = "indexB"
| rename column as account, "row 1" as type
``` only use latest, not interested in _time in indexB ```
| dedup account type
| fields - _time]
``` associate account type with account ```
| fields - index
| stats values(*) as * values(_time) as _time by account
| where isnotnull(id) ``` only necessary in this simulation ```
| sort _time
This gives you
account | id | type | _time |
john | userX | admin | 2021-11-19 01:07:50 |
jeff | userX | user | 2021-11-19 02:07:50 |
joe | userX | robot | 2021-11-19 03:07:50 |
jack | userX | user | 2021-11-19 04:07:50 |
With simulated data, userX's earliest account:type is john:admin, then it becomes jeff:user, then joe:user, and as of late, jack:user; in account type alone, the progression is admin->user->robot-user.
Note I added a filter "| where isnotnull(id)" because account jim in indexB is not associated with userX, making a funny display in the end result. In the real world, an account is always associated with an id, the funny display should not happen.
Also, wildcard "values(*) as *" does not work on _time, hence "values(_time) as _time" is necessary.
Would this gather the information you need?
| stats values(new_account) as new_accounts values(new_account_type) as new_account_types by old_account old_account_type