Splunk Search

How to join 2 indexes

alexspunkshell
Contributor

Hi All,

I want to join two indexes and get a result. 

Search Query -1

index=Microsoft
| eval Event_Date=mvindex('eventDateTime',0)
| eval UPN=mvindex('userStates{}.userPrincipalName',0)
| eval Logon_Location=mvindex('userStates{}.logonLocation',0)
| eval Event_Title=mvindex('title',0)
| eval Event_Severity=mvindex('severity',0)
| eval AAD_Acct=mvindex('userStates{}.aadUserId',0)
| eval LogonIP=mvindex('userStates{}.logonIp',0)
| eval Investigate=+RiskyUsersBlade/userId".AAD_Acct
| stats count by LogonIP Event_Date, Event_Title, Event_Severity UPN Logon_Location Investigate

 

Search Query - 2

index=o365 "Result of Query-1 LogonIP" earliest=-30d | stats dc(user) as "Distinct users"

 

If the Search Query-2 "Distinct users" results are greater than 20 then, I want to ignore the result.

 

@ITWhisperer @scelikok @soutamo @saravanan90 @thambisetty @bowesmana   @to4kawa @woodcock  @venkatasri 

 

Labels (3)
0 Karma
1 Solution

gcusello
SplunkTrust
SplunkTrust

Hi @alexspunkshell,

in this case you have two choices:

  • join command, but I try to avoid it because it's very slow and I use it only when I don't find any other solution,
  • stats command.

Probably your use case is one situation when it isn't possible use other than join, so please try this:

index=o365 earliest=-30d 
| join ClientIPAddress [ search 
     index=Microsoft
     | eval 
          Event_Date=mvindex('eventDateTime',0),
          UPN=mvindex('userStates{}.userPrincipalName',0),
          Logon_Location=mvindex('userStates{}.logonLocation',0),
          Event_Title=mvindex('title',0),
          Event_Severity=mvindex('severity',0),
          AAD_Acct=mvindex('userStates{}.aadUserId',0),
          LogonIP=mvindex('userStates{}.logonIp',0),
          Investigate = "https://portal.azure.com/#blade/Microsoft_AAD_IAM/RiskyUsersBlade/userId".AAD_Acct
     | stats count by LogonIP Event_Date, Event_Title, Event_Severity UPN Logon_Location Investigate
     | rename LogonIP AS ClientIPAddress
     ]
| stats values(Event_Date) AS Event_Date values(Event_Title) AS Event_Title values(Event_Severity) AS Event_Severity values(UPN) AS UPN values(Logon_Location) AS Logon_Location values(Investigatecount) AS Investigatecount dc(user) as "Distinct users"
| where "Distinct users"<20

Anyway, please try also something like this:

(index=o365 earliest=-30d) OR index=Microsoft
| eval 
     Event_Date=mvindex('eventDateTime',0),
     UPN=mvindex('userStates{}.userPrincipalName',0),
     Logon_Location=mvindex('userStates{}.logonLocation',0),
     Event_Title=mvindex('title',0),
     Event_Severity=mvindex('severity',0),
     AAD_Acct=mvindex('userStates{}.aadUserId',0),
     LogonIP=mvindex('userStates{}.logonIp',0),
     Investigate = "https://portal.azure.com/#blade/Microsoft_AAD_IAM/RiskyUsersBlade/userId".AAD_Acct
| rename LogonIP AS ClientIPAddress
| stats values(Event_Date) AS Event_Date values(Event_Title) AS Event_Title values(Event_Severity) AS Event_Severity values(UPN) AS UPN values(Logon_Location) AS Logon_Location values(Investigatecount) AS Investigatecount values(user) AS user by ClientIPAddress 
| stats values(Event_Date) AS Event_Date values(Event_Title) AS Event_Title values(Event_Severity) AS Event_Severity values(UPN) AS UPN values(Logon_Location) AS Logon_Location values(Investigatecount) AS Investigatecount values(ClientIPAddress ) AS ClientIPAddress dc(user) as "Distinct users"
| where "Distinct users"<20

Ciao.

Giuseppe 

View solution in original post

gcusello
SplunkTrust
SplunkTrust

Hi @alexspunkshell,

you don't need a join but you need to filter quey2 events using query1 results.

In this case, please, try something like this:

index=o365 "Result of Query-1 LogonIP" earliest=-30d 
[ search index=Microsoft
     | eval 
          Event_Date=mvindex('eventDateTime',0),
          UPN=mvindex('userStates{}.userPrincipalName',0),
          Logon_Location=mvindex('userStates{}.logonLocation',0),
          Event_Title=mvindex('title',0),
          Event_Severity=mvindex('severity',0),
          AAD_Acct=mvindex('userStates{}.aadUserId',0),
          LogonIP=mvindex('userStates{}.logonIp',0),
          Investigate = "https://portal.azure.com/#blade/Microsoft_AAD_IAM/RiskyUsersBlade/userId".AAD_Acct
     | stats count by LogonIP Event_Date, Event_Title, Event_Severity UPN Logon_Location Investigate
     | fields LogonIP
     ]
| stats dc(user) as "Distinct users"
| where "Distinct users"<20

Ciao.

Giuseppe

alexspunkshell
Contributor

@gcusello  Thanks for your reply.

I tried with below query and i also tried with other logics. But no luck. 

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi @alexspunkshell,

let me understand: which kind of error have you?

no results, too results, only a part of results?,

are you sure to have the LogonIP field in both the searches, with the same name (fields are case sensitive)?

did you manually matched results of the main search and the subsearch?

Ciao.

Giuseppe

alexspunkshell
Contributor

@gcusello Thanks for your reply.

I didn't get any results.

Here IP addresses are same in both indexes but the field name is different.

In Query-1 "LogonIP" is the field. And in Query-2 "ClientIPAddress" is the field.

I want to check query-1 "LogonIP" field with query-2 "ClientIPAddress" field. If the query - 2 "ClientIPAddress" is used by 20+ users then, I want to filter those results.

 

Yes, i manually matched the results of the main search and the sub search and i can able to verify.

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi @alexspunkshell,

this means that you have to rename the field in subsearch to match the field name in the main search, so please try this:

index=o365 earliest=-30d [ search 
     index=Microsoft
     | eval 
          Event_Date=mvindex('eventDateTime',0),
          UPN=mvindex('userStates{}.userPrincipalName',0),
          Logon_Location=mvindex('userStates{}.logonLocation',0),
          Event_Title=mvindex('title',0),
          Event_Severity=mvindex('severity',0),
          AAD_Acct=mvindex('userStates{}.aadUserId',0),
          LogonIP=mvindex('userStates{}.logonIp',0),
          Investigate = "https://portal.azure.com/#blade/Microsoft_AAD_IAM/RiskyUsersBlade/userId".AAD_Acct
     | stats count by LogonIP Event_Date, Event_Title, Event_Severity UPN Logon_Location Investigate
     | rename LogonIP AS ClientIPAddress
     | fields ClientIPAddress
     ]
| stats dc(user) as "Distinct users"
| where "Distinct users"<20

in this way all the LogonIP of the subsearch will match the ClientIPAddress of the main search.

 Ciao.

Giuseppe

alexspunkshell
Contributor

@gcusello Thanks for your reply.

Now I am getting results. But I also want to get the below result of the 1st query.

| stats count by LogonIP Event_Date, Event_Title, Event_Severity UPN Logon_Location Investigate

 

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi @alexspunkshell,

in this case you have two choices:

  • join command, but I try to avoid it because it's very slow and I use it only when I don't find any other solution,
  • stats command.

Probably your use case is one situation when it isn't possible use other than join, so please try this:

index=o365 earliest=-30d 
| join ClientIPAddress [ search 
     index=Microsoft
     | eval 
          Event_Date=mvindex('eventDateTime',0),
          UPN=mvindex('userStates{}.userPrincipalName',0),
          Logon_Location=mvindex('userStates{}.logonLocation',0),
          Event_Title=mvindex('title',0),
          Event_Severity=mvindex('severity',0),
          AAD_Acct=mvindex('userStates{}.aadUserId',0),
          LogonIP=mvindex('userStates{}.logonIp',0),
          Investigate = "https://portal.azure.com/#blade/Microsoft_AAD_IAM/RiskyUsersBlade/userId".AAD_Acct
     | stats count by LogonIP Event_Date, Event_Title, Event_Severity UPN Logon_Location Investigate
     | rename LogonIP AS ClientIPAddress
     ]
| stats values(Event_Date) AS Event_Date values(Event_Title) AS Event_Title values(Event_Severity) AS Event_Severity values(UPN) AS UPN values(Logon_Location) AS Logon_Location values(Investigatecount) AS Investigatecount dc(user) as "Distinct users"
| where "Distinct users"<20

Anyway, please try also something like this:

(index=o365 earliest=-30d) OR index=Microsoft
| eval 
     Event_Date=mvindex('eventDateTime',0),
     UPN=mvindex('userStates{}.userPrincipalName',0),
     Logon_Location=mvindex('userStates{}.logonLocation',0),
     Event_Title=mvindex('title',0),
     Event_Severity=mvindex('severity',0),
     AAD_Acct=mvindex('userStates{}.aadUserId',0),
     LogonIP=mvindex('userStates{}.logonIp',0),
     Investigate = "https://portal.azure.com/#blade/Microsoft_AAD_IAM/RiskyUsersBlade/userId".AAD_Acct
| rename LogonIP AS ClientIPAddress
| stats values(Event_Date) AS Event_Date values(Event_Title) AS Event_Title values(Event_Severity) AS Event_Severity values(UPN) AS UPN values(Logon_Location) AS Logon_Location values(Investigatecount) AS Investigatecount values(user) AS user by ClientIPAddress 
| stats values(Event_Date) AS Event_Date values(Event_Title) AS Event_Title values(Event_Severity) AS Event_Severity values(UPN) AS UPN values(Logon_Location) AS Logon_Location values(Investigatecount) AS Investigatecount values(ClientIPAddress ) AS ClientIPAddress dc(user) as "Distinct users"
| where "Distinct users"<20

Ciao.

Giuseppe 

alexspunkshell
Contributor

@gcusello Now it works. But different results are merged and give in single results.

Here I am supposed to get 3 results. But all 3 results merged and displayed in 1 result.

 

alexspunkshell_0-1627038296708.png

 

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi @alexspunkshell,

instead of values(Event_Dates), use min(Event_Date) if you want the first date or max(Event_Date) if you want the last.

Ciao.

Giuseppe

Get Updates on the Splunk Community!

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...

What's new in Splunk Cloud Platform 9.1.2312?

Hi Splunky people! We are excited to share the newest updates in Splunk Cloud Platform 9.1.2312! Analysts can ...

What’s New in Splunk Security Essentials 3.8.0?

Splunk Security Essentials (SSE) is an app that can amplify the power of your existing Splunk Cloud Platform, ...