Splunk Search

Sub Searching multiple indexes and sourcetypes

jj39501
New Member

Hi team,

I would like a little help with a query I am having difficulty with. The objective to leverage sub searching to combine searches from 2 different indexes and sourcetypes. There are no 1 to 1 field mappings between the data sets so there is some renaming involved but I don't this should impact the end result.

Search 1
index="index A" sourcetype="sourcetype A" "icmp" | rename id.resp_h as Destination_Host | search (Destination_Host!=10.0.0.0/8 AND Destination_Host!=172.16.0.0/12 AND Destination_Host!=xxx.xxx.0.0/16 AND Destination_Host!=xxx.xxx.0.0/16 AND
Destination_Host!=xxx.xxx.*) | where (orig_ip_bytes > 1000 AND orig_bytes > 1000) | rename id.orig_h as Source_IP | stats count earliest(_time) as earliest latest(_time) as latest by Source_IP, Destination_Host | eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0) | where isOutlier=1 | convert ctime(earliest) ctime(latest)

Output
Source_IP Destination_Host count earliest latest isOutlier
xxx.xxx.xxx.xx 52.112.3.145 1 10/29/2019 13:30:52.881125 10/29/2019 13:30:52.881125 1
xxx.xxx.xxx.xx 205.185.216.10 1 10/29/2019 08:42:51.725199 10/29/2019 08:42:51.725199 1
xxx.xxx.xxx.xx 8.247.48.126 1 10/29/2019 03:43:12.544464 10/29/2019 03:43:12.544464 1

Search 2
index="index B" sourcetype="sourcetype B" description=Assign OR description=Renew | rename ip as Source_IP, mac as "Mac Address", nt_host as Source_Host | dedup "Mac Address" | table Source_IP, Source_Host, "Mac Address"

Output
Source_IP Source_Host Mac Address
xxx.xxx.xxx.xx xxxxxxx xxxxxxxxx

Combined Search
index="index 2" sourcetype="sourcetype 2" description=Assign OR description=Renew | rename ip as Source_IP, mac as "Mac Address", nt_host as Host | table Source_IP, "Mac Address", Host
| join Source_IP
[search index=index 1 sourcetype="sourcetype 2" "icmp" | rename id.resp_h as Destination_Host | search (Destination_Host!=10.0.0.0/8 AND Destination_Host!=xxx.xxx.0.0/12 AND Destination_Host!=xxx.xxx.0.0/16 AND Destination_Host!=xxx.xxx.0.0/16 AND Destination_Host!=xxx.xxx.*) | where (orig_ip_bytes > 1000 AND orig_bytes > 1000) | rename id.orig_h as Source_IP | stats count earliest(_time) as earliest latest(_time) as latest by Source_IP, Destination_Host | eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0) | where isOutlier=1 | convert ctime(earliest) ctime(latest)]
| dedup "Mac Address"

Desired Output with combined data
Source_IP Source_Host Mac Address Destination_Host count earliest latest isOutlier
xxx.xxx.xxx.xx ABC Host xx:xx:xx 52.112.3.145 1 10/29/2019 12:30 10/29/2019 13:30 1

instead getting this output with only search 2 data
Source_IP Source_Host Mac Address Destination_Host count earliest latest isOutlier
xxx.xxx.xxx.xx ABC Host xx:xx:xx 52.112.3.145 1 10/29/2019 12:30 10/29/2019 13:30 1

0 Karma

arjunpkishore5
Motivator

I have combined your 2 searches into one. However, I have no way of testing since I do not have the base data. Try this out and let me know if this works.

(index="index 2" sourcetype="sourcetype 2" description=Assign OR description=Renew) OR (index=index 1 sourcetype="sourcetype 2" "icmp" NOT ip.resp_h IN ("10.0.0.0/8", "xxx.xxx.*") ) 
| rename ip as Source_IP, mac as "Mac Address", nt_host as Source_Host , id.resp_h as Destination_Host, id.orig_h as Source_IP_2 
| fillnull Destination_Host value="Unknown" 
| eval Source_IP=coalesce(Source_IP, if((orig_ip_bytes > 1000 AND orig_bytes > 1000), Source_IP_2, null())) 
| stats count(eval(if(index=="index 1", Source_IP, null()))) as total_count, min(eval(if(index=="index 1", _time, null()))) as earliest, max(eval(if(index=="index 1", _time, null()))) as latest, values(mac) as "Mac Address", values(Source_Host) as Source_Host by Source_IP, Destination_Host 
| eval Destination_Host=if(Destination_Host=="Unknown", null(), Destination_Host) 
| eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0) 
| stats max(*) as * by Source_IP 
| where isOutlier=1 
| convert ctime(earliest) ctime(latest)
0 Karma

arjunpkishore5
Motivator

Welp. looks like @gcusello beat me to the answer 😛

0 Karma

jj39501
New Member

Not quite. ha!

I do appreciate you looking at this for me. I tried your search as well, however I am getting the following error. Thoughts!

Error in 'stats' command: You must specify a rename for the aggregation specifier on the dynamically evaluated field 'count(eval(if(index=="index 1", Source_IP, null())))'.

0 Karma

arjunpkishore5
Motivator

@jj39501 Edited my original answer with corrections. Please try now. 🙂

0 Karma

jj39501
New Member

Thanks for the feedback. I am no longer getting the error message. However my output is still omitting the Source_Host and MAC Address fields/data.

When I run the search up until the first stats command, the MAC Address and Source_Host fields show up, but they are blank. Thoughts?

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi jj39501,
at first you don't need to use search and where commands after a nother search command, you can put all the search parameters in the main search and you'll have a more performant search.
So your first search could be:

index="index A" sourcetype="sourcetype A" "icmp" (id.resp_h!=10.0.0.0/8 id.resp_h!=172.16.0.0/12 id.resp_h!=xxx.xxx.0.0/16 id.resp_h!=xxx.xxx.0.0/16 id.resp_h!=xxx.xxx.*) (orig_ip_bytes > 1000 orig_bytes > 1000) 
| rename id.resp_h as Destination_Host id.orig_h as Source_IP 
| stats count earliest(_time) as earliest latest(_time) as latest by Source_IP, Destination_Host 
| eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0) 
| where isOutlier=1 
| convert ctime(earliest) ctime(latest)

For the combined, I hint to not use join command that's very slow and it has the limit of 50,000 results for the subsearch, so try a different approach like the following:

(index="index A" sourcetype="sourcetype A" "icmp" (id.resp_h!=10.0.0.0/8 id.resp_h!=172.16.0.0/12 id.resp_h!=xxx.xxx.0.0/16 id.resp_h!=xxx.xxx.0.0/16 id.resp_h!=xxx.xxx.*) (orig_ip_bytes > 1000 orig_bytes > 1000) ) OR (index="index B" sourcetype="sourcetype B" description=Assign OR description=Renew)
| rename id.resp_h as Destination_Host id.orig_h as Source_IP  ip as Source_IP mac as "Mac Address" nt_host as Source_Host 
| stats count earliest(_time) AS earliest latest(_time) AS latest values(Destination_Host ) AS Destination_Host values("Mac Address") AS "Mac Address" BY Source_IP
 | dedup "Mac Address"
| stats count earliest(_time) AS earliest latest(_time) AS latest values("Mac Address") AS "Mac Address" values(Source_Host) AS Source_Host BY Source_IP Destination_Host 
| eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0) 
| where isOutlier=1 
| convert ctime(earliest) ctime(latest)

The approach is: to put in the main search both the searches and connect them using stats command by the join key.
If I forgot some field, please sorry me and add it using values option.

Ciao.
Giuseppe

0 Karma

jj39501
New Member

Giuseppe,

Thank you for your time and willingness to help with this search. I tried your suggestion and unfortunately there seems to be in issue in the second stats command that I can't figure out. As is, I get the raw event data just fine. However the statistics are not being applied. Removing the second stats command gives me statistical data but only it looks like it only being applied to index B.

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi jj39501,
Please try again, I did a copy error!

(index="index A" sourcetype="sourcetype A" "icmp" (id.resp_h!=10.0.0.0/8 id.resp_h!=172.16.0.0/12 id.resp_h!=xxx.xxx.0.0/16 id.resp_h!=xxx.xxx.0.0/16 id.resp_h!=xxx.xxx.*) (orig_ip_bytes > 1000 orig_bytes > 1000) ) OR (index="index B" sourcetype="sourcetype B" description=Assign OR description=Renew)
| rename id.resp_h as Destination_Host id.orig_h as Source_IP  ip as Source_IP mac as "Mac Address" nt_host as Source_Host 
| stats count earliest(_time) AS earliest latest(_time) AS latest values(Destination_Host ) AS Destination_Host values("Mac Address") AS "Mac Address" values(Source_Host) AS Source_Host BY Source_IP
| dedup "Mac Address"
| stats count earliest(_time) AS earliest latest(_time) AS latest values("Mac Address") AS "Mac Address" values(Source_Host) AS Source_Host BY Source_IP Destination_Host 
| eval isOutlier=if(earliest >= relative_time(now(), "-1d@d"), 1, 0) 
| where isOutlier=1 
| convert ctime(earliest) ctime(latest)

If there is still a problem, please debug it and check what are the fields after the first stats.

Ciao.
Giuseppe

0 Karma

arjunpkishore5
Motivator

The desired output and actual output looks exactly the same. Did you intend to post a different dataset ?

0 Karma
Get Updates on the Splunk Community!

Streamline Data Ingestion With Deployment Server Essentials

REGISTER NOW!Every day the list of sources Admins are responsible for gets bigger and bigger, often making the ...

Remediate Threats Faster and Simplify Investigations With Splunk Enterprise Security ...

REGISTER NOW!Join us for a Tech Talk around our latest release of Splunk Enterprise Security 7.2! We’ll walk ...

Introduction to Splunk AI

WATCH NOWHow are you using AI in Splunk? Whether you see AI as a threat or opportunity, AI is here to stay. ...