Afternoon All,
Have been playing with a search that will eventually become a saved search within Splunk ES. Idea is for the search to run and pull in observables that are populated in the ip_intel lookup file and then populate an index.
So far I have this which works fine:
| inputlookup ip_intel append=true
| dedup ip
| collect index=backup_ti source=daily_ip_intel
The problem is, the inputlookup automatically displays all rows within the file. Im looking to have this search run once a day only updating the index with new observables rather than duplicating or overwriting the existing data. Is there a way to do this? The timestamp against the indicators within that file are messy so its not possible to exclude them via a relative time sub search for example.
You don't actually say what your problems are, however, I think this should work
index=backup_ti source=daily_ip_intel earliest=0
| stats count by ip threat_key description
| eval state=0
| append [
| inputlookup ip_intel
| dedup ip
| stats count by ip threat_key description
| eval state=1
]
| fields - count
| stats min(state) as ms by ip threat_key description
| where ms=1
| collect index=backup_ti source=daily_ip_intel
This first collects what is present (state=0), then appends all lookup data (state=1)
then the stats will collapse all entries using the group by and retain only those not already present (ms=1)
Hopefully this works
This is why I developed a solution to overcome the problem of lookups and/or indexes. My solution is using a Neo4j graph database in between the source of the information and Splunk. The good thing of the graph database is that it is capable of searching for relationships or the lack of relationships. So you can even combine several sources to give you the context that you need.
I wrote a post about that recently: SOCs: why they struggle with context
What's the purpose of having that data in an index? The index is time based, so what does 'new' mean. If it's ALL_TIME, then you would have to run the inputlookup then merge with all the existing data in that index and just collect the new entries.
If that's what you're trying to do, I don't understand why you would want it in an index. You might be better off having a second lookup which contains the 'first_seen' time of the data, but would need to know more to give a better answer
Thanks for the response. So the purpose is just to have a place where the observables are backed up in this index ready to be used if needed. Would also like to build dashboarding based on that index which is a little tricky based on the lookup file itself for capturing metrics, ingestion rate of IoC's etc.
The ip_intel file is growing on a daily basis, being fed by various Threat Intel Sources. So When I say "new" this would be new observables/new data ingesting into that file.
Ive got a little further with this but getting some errors.
Seems like if that's just a backup and the existing lookup is a complete list, then it would be far easier to set a very short retention policy in the index and just ingest the lookup in its entirety every day.
In that way you benefit by only having to search the index for 1 day to get all observables, otherwise you have to search all time on the index, which would have to search more buckets over time.
Just an additional piece of information the ip_intel file is actually a KV store so im wondering if thats causing some issues.
Thats an interesting suggestion! and one that we probably would have gone for however just going to provide some further context and the driver for this query.
The ip_intel lookup file was actually overwritten with blank data a while back caused by installation of an app. So whilst we have put extra checks in place to ensure that doesn't happen again, there is always a possibility.
So if the same thing happened here, would just mean ingesting a blank lookup file into the index.
Agree this is a trickier way of doing it, although hoping there is some Splunk magic that could be used with the query we have been testing. Any thoughts there at all?
You don't actually say what your problems are, however, I think this should work
index=backup_ti source=daily_ip_intel earliest=0
| stats count by ip threat_key description
| eval state=0
| append [
| inputlookup ip_intel
| dedup ip
| stats count by ip threat_key description
| eval state=1
]
| fields - count
| stats min(state) as ms by ip threat_key description
| where ms=1
| collect index=backup_ti source=daily_ip_intel
This first collects what is present (state=0), then appends all lookup data (state=1)
then the stats will collapse all entries using the group by and retain only those not already present (ms=1)
Hopefully this works
@lekanneer Appreciate the link to that finding - an interesting read sir
@bowesmana Apologies, the issues we were having were due to the formatting of the findings and the duplicates however your search does absolutely everything I was looking for here!
Thanks Kindly 🙂
Great Success!