Knowledge Management

How to make my query efficient ?

dmenon84
Path Finder

Hi,

Here is my query that I am currently running. Is there a way to make it more efficient? I am joining 2 sourcetypes from different indexes on a common field value. sourcetype=mwb_feeds are feeds that come in via rss feeds. I only want the inner sub query to look for data that came in for last 14 days to get more current feeds. Please guide on how to proceed.

index=pan_logs sourcetype="pan:threat" | fields src_ip,dest_ip,url,dest_hostname,_time | eval NEWTIME=strftime(_time, "%b %d %H:%M:%S") | join dest_hostname [search sourcetype=mwb_feeds | fields dest_hostname,Classification | replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP" WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification] | dedup src_ip | rename dest_hostname AS HOST | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST

Thanks in advance for all the help !

Tags (1)
0 Karma
1 Solution

somesoni2
Revered Legend

Give this a try
Updated field name for Classification

(index=pan_logs sourcetype="pan:threat") OR (sourcetype=mwb_feeds)
| fields src_ip,dest_ip,url,dest_hostname,_time Classification
| eventstats values(Classification) as Classificationby dest_hostname
| where sourcetype="pan:threat" AND isnotnull(Classification)
| dedup src_ip
| replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
| rename dest_hostname AS HOST | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST

View solution in original post

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

be sure to mark your code with the "code" button when you post. It's the button marked 101 010. Except if you're using Internet Explorer, then you're stuck with using a grave accent (`) before and after.

0 Karma

somesoni2
Revered Legend

Another option is select the whole query and press Ctrl + K. Be sure to leave a new line between your text para and code/query block.

0 Karma

somesoni2
Revered Legend

Give this a try
Updated field name for Classification

(index=pan_logs sourcetype="pan:threat") OR (sourcetype=mwb_feeds)
| fields src_ip,dest_ip,url,dest_hostname,_time Classification
| eventstats values(Classification) as Classificationby dest_hostname
| where sourcetype="pan:threat" AND isnotnull(Classification)
| dedup src_ip
| replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
| rename dest_hostname AS HOST | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST

View solution in original post

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

And, just in case anyone is interested in HOW this works, and WHY somesoni's version is more efficient, here's the blow by blow.

1) It's usually more efficient in splunk to read all the records at the same time rather than joining...

 (index=pan_logs sourcetype="pan:threat") OR (sourcetype=mwb_feeds)

2) It's always more efficient to dump all the data you don't need at the earliest possible point, so this command keeps only the fields that you would need on any kind of record... although with the "fields" command, there's no need to explicitly list _time...

 | fields src_ip,dest_ip,url,dest_hostname,_time Classification sourcetype

3) This command calculates all the Classifications for each host and attaches them to every event with that host on it...

 | eventstats values(Classification) as Classification by dest_hostname

4) This eliminates the records from the mwb_feeds, since their info has already been rolled onto the other events, and keeps only the pan:threat records that are against a desthost with a threat Classification....

 | where sourcetype="pan:threat" AND isnotnull(Classification)

5) and back to your original reporting, which was just fine as it was...

 | dedup src_ip
 | replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
 | rename dest_hostname AS HOST 
 | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST

updated capitalization of the fieldname Classification.
added sourcetype to the fields list

dmenon84
Path Finder

Thanks a lot for quick response and solution and wonderful explanation. I ran the query you suggested but it doesn't return any data. Just to clarify I am getting in rss threat feeds every 4 hours . I modified my query a bit and when I run it I find 1 match

index=pan_logs sourcetype="pan:threat"  |  fields src_ip,dest_ip,url,dest_hostname,_time | eval NEWTIME=strftime(_time, "%b %d %H:%M:%S") | join dest_hostname [search sourcetype=mwb_feeds earliest=-24h@h  | fields dest_hostname,Classification | replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification] | dedup src_ip | rename dest_hostname AS HOST | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST

Result returned for last 60 minutes
ENGAGED IN MALWARE DISTRIBUTION computersecuritybreached- callsupport18885691655.xyz src_ipvalues dest_ipvalues computersecuritybreached-callsupport18885691655.xyz/favicon.ico Mar 31 14:26:53

I ran query recommended by you but it did not return anything. Is it because they are in difference indexes. Also the firewall data is a lot so even first line takes a lot of time to run. I also tried

(index=pan_logs sourcetype="pan:threat" earliest=-1h@h ) OR (sourcetype=mwb_feeds earliest=-24h@h )

but no luck.

Classification is field in sourcetype=mwb_feeds but doesnt exist in sourcetype="pan:threat". dest_hostname is common field and can sometimes have similar data and thats what I want to match on. sourcetype=mwb_feeds has very few events compared to pan threat logs.

Thanks in advance !

0 Karma

somesoni2
Revered Legend

Give this a try. In this version, we've a filter based on dest_hostname retrieved from mwb_feeds, so only relevant firewall records are loaded. If it still doesn't give any records, could you run the query part by part and see on which step you loose all the data?

 (index=pan_logs sourcetype="pan:threat" [search sourcetype=mwb_feeds earliest=-24h@h | stats count by dest_hostname | table dest_hostname] earliest=-1h@h ) OR (sourcetype=mwb_feeds earliest=-24h@h )
 | fields src_ip,dest_ip,url,dest_hostname,_time, Classification
 | eventstats values(Classification) as Classification by dest_hostname
 | where sourcetype="pan:threat" AND isnotnull(Classification)
 | dedup src_ip
 | replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
 | rename dest_hostname AS HOST | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST

dmenon84
Path Finder

You are awesome !!!! - this works I took out the "where" clause since that was not returning anything

(index=pan_logs sourcetype="pan:threat" [search sourcetype=mwb_feeds earliest=-24h@h | stats count by dest_hostname | table dest_hostname] earliest=-1h@h ) OR (sourcetype=mwb_feeds earliest=-24h@h ) | fields src_ip,dest_ip,url,dest_hostname,_time, Classification  | eventstats values(Classification) as Classification by dest_hostname   | dedup src_ip
  | replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
  | rename dest_hostname AS HOST | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE", values(NEWTIME) AS "TIME CONNECTED" by Classification,HOST
0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Okay, wait. now that it's working, why are we deduping the src_ip but keeping _time??? That's not intuitively obvious.

I believe, based on how splunk is defined as returning the most recent records first, that it will give you the most recent time connected for each host, but those values will not be aligned with the src_ip values that they are returned alongside. Each set of values will have been sorted semantically.

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

ROFL. We screened out sourcetype in the fields command. No wonder the where was giving you nothing.

Yes, the where clause was unneeded once the mwb_feeds was moved to a subsearch prior to the search, eliminating all records from unneeded src_ips before they were even selected.

0 Karma

dmenon84
Path Finder

I did something like this for time

(index=pan_logs sourcetype="pan:threat" [search sourcetype=mwb_feeds earliest=-168h@h | stats count by dest_hostname | table dest_hostname] earliest=-24h@h ) OR (sourcetype=mwb_feeds earliest=-168h@h ) | fields src_ip,dest_ip,url,dest_hostname,_time, Classification   | eventstats values(Classification) as Classification by dest_hostname   | dedup src_ip  | eval NEWTIME=strftime(_time, "%b %d")
   | replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
   | rename dest_hostname AS HOST | rename NEWTIME as "TIME CONNECTED" | stats values(src_ip) as "SOURCE IP",values(dest_ip) AS "DESTINATION IP",values(url) AS "WEBSITE" by "TIME CONNECTED",Classification,HOST | sort -"TIME CONNECTED"
0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Given that code, deduping by src_ip at that point makes no sense to me. We have records that each connect a src_ip to a dest_ip at some particular point in time, and we're going to randomly take the first one for any given src_ip and discard the rest?

Also, just as a general practice, you should do all your renaming and reformatting at the very end, after any stats-type commands (other than prepwork for the stats, like binning _time etc). If you reformat each record, and then do stats, then you've cost the computer extra work changing details that it could have just changed once after the sums.

One last thing - get in the habit of using sort 0 rather than just sort. Splunk has a default number of records to return from a sort, the top 100 or something, and if you don't explicitly tell it otherwise, you may lose some results without ever noticing.

Compare the output from this to the output from the dedup and sort version, and see whether anything significant was lost in the other version.

(index=pan_logs sourcetype="pan:threat" 
    [ search sourcetype=mwb_feeds earliest=-168h@h 
    | stats count by dest_hostname 
    | table dest_hostname] 
    earliest=-24h@h ) 
OR (sourcetype=mwb_feeds earliest=-168h@h ) 
| table src_ip, dest_ip, url, dest_hostname, _time, Classification   
| eventstats values(Classification) as Classification by dest_hostname  
| bin _time span=1d
| stats count as hitcount, values(url) as url, values(Classification) as Classification, values(src_ip) as src_ip , values(dest_ip) as dest_ip by dest_hostname _time
| replace "PSH" WITH "PHISHING", "EMD" WITH "ENGAGED IN MALWARE DISTRIBUTION", "PUP"  WITH "POTENTIALLY UNWANTED PROGRAM", "EXP" WITH "EXPLOITS", "FSA" WITH "FRAUDULENT SERVICES AND APPLICATIONS" IN Classification
| eval _time=strftime(_time, "%b %d")
| rename dest_hostname AS HOST, _time as "TIME CONNECTED", src_ip as "SOURCE IP", dest_ip AS "DESTINATION IP",  url AS "WEBSITE"
| sort 0 - "TIME CONNECTED"
0 Karma

DalJeanis
SplunkTrust
SplunkTrust

capitalization error. in somesoni2's code, change all classification to Classification.

0 Karma

somesoni2
Revered Legend

I couldn't have explained it better.

0 Karma

DalJeanis
SplunkTrust
SplunkTrust

Strong, the pedantic mode, in this one is. - Yoda

somesoni2
Revered Legend

Upvote this, I must.

0 Karma
.conf21 Now Fully Virtual!
Register for FREE Today!

We've made .conf21 totally virtual and totally FREE! Our completely online experience will run from 10/19 through 10/20 with some additional events, too!