Splunk Search

Combine Data model data with another search for a match

geraldcontreras
Path Finder

Hi all,

I have CTI data that somes into splunk and id like to correlate for matches in indexes against the CTI data.
The problem is also that the CTI data can range back many years, but i may only want to search data from network index for last 24hrs.

Basically looking for where a index=network src value equals a index=CTI indicator value

Example below works, but is very slow

    (index=network src=* earliest=-24h latest=now) OR (index=cti indicator=* earliest=1 latest=now) 
| fields indicator index sourcetype src
| eval splunk_indicator=coalesce(src,indicator)  
| stats dc(sourcetype) AS distinct_sourcetype values(*) AS * by splunk_indicator 
| where distinct_sourcetype > 1

So i created a data model for the CTI Data.
But i need a way to combine the Datamodel search with a "normal" search.
So either | tstats or |datamodel
But i can seem to find a way to do this where there is no common field.

Is there a way i can either
-combine datamodel with a normal search
- search the CTI data as a blob rather then using time (so that i can set my index=network to 24hrs and search for matches across all CTI data regardless of the CTI time)
- more efficiently search 2 indexes with different time frames for matches
- a better way to correlate one index against an other with different time constraints

Thanks for any input or direction

0 Karma

ololdach
Builder

Hi geraldcontreras,
as I understand your question, your challenge is to combine a query looking at a long historic timespan, basically "All Time" with a very recent view on your network looking for indicators (of compromise). The reason, why your first query is slow is, that for both parts of the query, you are searching through all the events. The terms earliest=, latest= filter the results, but don't affect the search time range.

Splunk is great, so there are multiple ways to solve this problem 🙂 ... this is how I would solve it for my customers:
I would create a lookup table for known indicators as KV store and have a scheduled report to populate and update it. The trick is to have a unique key that describes your indicator, so that you can easily update the field in the store. In my example I take an md5(indicator) as key :
index=... | whatever it takes to calculate your indicators | eval key=md5(indicator_name) | table key, indicator_value | outputlookup indicator_kv key_field=key

Have this query run at regular intervals to update the lookup table. Whenever a value changes for a known indicator that's in the table, the key_field option will actually update the existing value. If the key does not yet exist, it is added.

Now, your dashboard query (the needs to be fast one) would look like this:
index=network src=* | eval key=md5(src) | lookup indicator_kv _key as key output indicator_value as splunk_indicator

(Hint: If you don't want the lookup to return NULL values for splunk_indicator, if the key is not found, you can assign a default value in the lookup definition under Advanced Options)

Hope it helps
Oliver

0 Karma

geraldcontreras
Path Finder

Hi ololdach,

Thanks for the answer,
Is there a limit though with lookups? i thought there is a 50k limit? Thats why i never went with a lookup, but if its different the way you have mentioned then i will give it a go.

I have this now working as i created another data model for the indicators from splunk (as opposed to the CTI indicators ingested). The below is working exactly how i want, but there is a new problem where the splunk indicators are not complete, there appears to be many values missing.

Data Model = CSOC_IOC_Fields
Data From splunk, IE - splunk indicators- Data Model = CSOC_IOC_Fields
For this data model i basically pull a bunch of different fields and coalesce them all into one called indicator

EG - root search

(index IN (msad,network_ise,network_fw,aws,nexus*,azure) sourcetype IN (aws:cloudtrail,aws:cloudwatch,aws:elb:accesslogs,cisco:estreamer:data,navitas:navigate:applicationlog,o365:management:activity,navitas:nginx:accesslog,cisco:ise:syslog,ms:o365:reporting:messagetrace) (http_x_forwarded_for=* OR src=* OR dest=* OR SenderAddress=* OR RecipientAddress=* OR ClientIP=* OR AttachmentData{}.SHA256=* OR alert_user=* Subject=* OR url=* OR alert_msg_subject=* OR alert_suspect_sender=* OR alert_zapped_url=* )) | fields http_x_forwarded_for,src,dest,ClientIP,Subject,url,SenderAddress,RecipientAddress,alert_msg_subject,alert_suspect_sender,alert_zapped_url _time index sourcetype ioc_indicator indicator

Then i created an evaluated field merging all the fields into one called "indicator" IE- ioc_all_search.indicator
This is the Data model that is missing some events (it still contains alot of events).

datamodel=CSOC_Falcon_Threat_Intelligence.CTI_All
Then i have the CTI Database being pulled from our CTI into splunk. It is just CTI data with no correlation to anything in our environment (which is the purpose of my goal - to correlate data from our environment with the CTI data)

| tstats count from datamodel=CSOC_IOC_Fields.ioc_all_search where earliest=-30d latest=now groupby ioc_all_search.indicator index sourcetype 
| append 
    [| tstats values(CTI_All.malicious_confidence) values(CTI_All.labels_name) values(CTI_All.kill_chains) values(CTI_All.actors) values(CTI_All.malware_families) from datamodel=CSOC_Threat_Intelligence.CTI_All WHERE earliest=-90d latest=now groupby index sourcetype CTI_All.indicator] 
| eval splunk_indicator=coalesce('ioc_all_search.indicator','CTI_All.indicator') 
| stats dc(sourcetype) AS distinct_sourcetype values(*) AS * by splunk_indicator 
| where distinct_sourcetype > 1 
| search index=CTI AND index=*

Im aware i can created tailored searches for each field type and these will be quick, but i wanted to try and do a catch all.

0 Karma

ololdach
Builder

Hi geraldcontreras,

the limits exist in some areas, but I correlate very complex data with 1.5M events per hour without getting into trouble. This is getting way too abstract. For us to help you, you may want to share some details. Please give us some sample data to work with and some samples of what you'd like to achieve. To me it sounds, as if you might take the long and scenic route to reach your goal.

The second comment I'd like to make is that one should only use data models to accelerate a search if you have a thorough understanding of all the performance implications and have ruled out all other solutions. Data models can't do magic. They are ok in what they do, but usually there is more to gain, when you optimise the query to begin with or use your own (optimised) aggregation. (This is wisdom learned the hard way. Way back I (quite naively) started with data models until I had hit every single wall there is at least a dozen times.)

Best regards
Oliver

0 Karma

to4kawa
SplunkTrust
SplunkTrust

how about both tstats result append?

0 Karma

geraldcontreras
Path Finder

Hi to4kawa,

If i create a data model for the second set of data it somewhat works, but takes a long time to parse and search and isnt as reliable as the slower search i have above.

I have used the below, but it seems to miss some indicators from the data model CSOC_IOC_Fields which is a combination of different indexes and just the fields i need. Is there a search limit on a append command?

    | tstats values(CTI_All.malicious_confidence) values(CTI_All.labels_name) values(CTI_All.kill_chains) values(CTI_All.actors) values(CTI_All.malware_families) from datamodel=CSOC_Threat_Intelligence.CTI_All groupby index sourcetype CTI_All.indicator 
    | append 
        [| tstats count from datamodel=CSOC_IOC_Fields.IOC_All_indexes groupby index sourcetype IOC_All_indexes.ioc_indicator 
                ] 
    | eval spunk_indicator=coalesce('IOC_All_indexes.ioc_indicator','CTI_All.indicator') 
    | stats dc(index) AS distinct_index values(*) AS * by spunk_indicator  
    | where distinct_index > 1
    | search index=CTI AND index=*
0 Karma