Splunk Enterprise Security

Splunk ES Threat matching src -How to solve this Multisearch Issue?

MarkusM
Loves-to-Learn

Hi,

I am facing an strange issue on a SIEM Installation (Splunk 9.0.2 / ES 7.0.1) in regards to multisearch which is used inside Threat Intel Framework for threatmatch src.

The customer complains he does not get notables for Threat Activity that is coming from IP Intel Matches.

I was able to find out that the threat match is not running properly anymore...

Running the search manually I get:

Error in 'multisearch' command: Multisearch subsearches might only contain purely streaming operations (subsearch 1 contains a non-streaming command)

Expanding all the Macros, I dont get whats up with the multisearch...

This looks all proper to me.

| multisearch
    [| tstats prestats=true summariesonly=true values("sourcetype"),values("DNS.dest") from datamodel="Network_Resolution"."DNS" by "DNS.query"
    | eval SEGMENTS=split(ltrim('DNS.query', "."), "."),SEG1=mvindex(SEGMENTS, -1),SEG2=mvjoin(mvindex(SEGMENTS, -2, -1), "."),SEG3=mvjoin(mvindex(SEGMENTS, -3, -1), "."),SEG4=mvjoin(mvindex(SEGMENTS, -4, -1), ".")
    | lookup mozilla_public_suffix_lookup domain AS SEG1 OUTPUTNEW length AS SEG1_LENGTH
    | lookup mozilla_public_suffix_lookup domain AS SEG2 OUTPUTNEW length AS SEG2_LENGTH
    | lookup mozilla_public_suffix_lookup domain AS SEG3 OUTPUTNEW length AS SEG3_LENGTH
    | lookup mozilla_public_suffix_lookup domain AS SEG4 OUTPUTNEW length AS SEG4_LENGTH
    | lookup cim_http_tld_lookup tld AS SEG1 OUTPUT tld AS TLD
    | eval TGT_LENGTH=coalesce(SEG4_LENGTH,SEG3_LENGTH,SEG2_LENGTH,SEG1_LENGTH, if(cidrmatch("0.0.0.0/0", 'DNS.query'), if(1==0, 4, 0), if(isnull(TLD), 2, 0))),SRC_LENGTH=mvcount(SEGMENTS),"DNS.query_truncated"=if(TGT_LENGTH==0, null, if(SRC_LENGTH>=TGT_LENGTH, mvjoin(mvindex(SEGMENTS, -TGT_LENGTH, -1), "."), null))
    | fields - TLD SEG1 SEG2 SEG3 SEG4 SEG1_LENGTH SEG2_LENGTH SEG3_LENGTH SEG4_LENGTH TGT_LENGTH SRC_LENGTH SEGMENTS
    | eval "DNS.query_truncated"=if("DNS.query_truncated"="DNS.query" OR 'DNS.query_truncated'='DNS.query', null(), 'DNS.query_truncated')
    | lookup "threatintel_by_cidr" value as "DNS.query" OUTPUT threat_collection as tc0,threat_collection_key as tck0
    | lookup "threatintel_by_domain" value as "DNS.query" OUTPUT threat_collection as tc1,threat_collection_key as tck1
    | lookup "threatintel_by_domain" value as "DNS.query_truncated" OUTPUT threat_collection as tc2,threat_collection_key as tck2
    | lookup "threatintel_by_system" value as "DNS.query" OUTPUT threat_collection as tc3,threat_collection_key as tck3
    | where isnotnull('tck0') OR isnotnull('tck1') OR isnotnull('tck2') OR isnotnull('tck3')
    | eval intelzip0=mvzip('tc0','tck0',"@@")
    | eval intelzip1=mvzip('tc1','tck1',"@@")
    | eval intelzip2=mvzip('tc2','tck2',"@@")
    | eval intelzip3=mvzip('tc3','tck3',"@@")
    | eval threat_collection_key=mvappend(intelzip0,intelzip1,intelzip2,intelzip3)
    | eval "psrsvd_ct_sourcetype"=if(isnull('psrsvd_ct_sourcetype'),'psrsvd_ct_sourcetype','psrsvd_ct_sourcetype')
    | eval "psrsvd_nc_sourcetype"=if(isnull('psrsvd_nc_sourcetype'),'psrsvd_nc_sourcetype','psrsvd_nc_sourcetype')
    | eval "psrsvd_vm_sourcetype"=if(isnull('psrsvd_vm_sourcetype'),'psrsvd_vm_sourcetype','psrsvd_vm_sourcetype')
    | eval "psrsvd_ct_dest"=if(isnull('psrsvd_ct_dest'),'psrsvd_ct_DNS.dest','psrsvd_ct_dest')
    | eval "psrsvd_nc_dest"=if(isnull('psrsvd_nc_dest'),'psrsvd_nc_DNS.dest','psrsvd_nc_dest')
    | eval "psrsvd_vm_dest"=if(isnull('psrsvd_vm_dest'),'psrsvd_vm_DNS.dest','psrsvd_vm_dest')
    | eval "threat_match_field"=if(isnull('threat_match_field'),"src",'threat_match_field')
    | eval "threat_match_value"=if(isnull('threat_match_value'),'DNS.query','threat_match_value')
        ]
    [| tstats prestats=true summariesonly=true values("sourcetype"),values("All_Traffic.dest"),values("All_Traffic.user") from datamodel="Network_Traffic"."All_Traffic" where "All_Traffic.action"="allowed" by "All_Traffic.src"
    | eval SEGMENTS=split(ltrim('All_Traffic.src', "."), "."),SEG1=mvindex(SEGMENTS, -1),SEG2=mvjoin(mvindex(SEGMENTS, -2, -1), "."),SEG3=mvjoin(mvindex(SEGMENTS, -3, -1), "."),SEG4=mvjoin(mvindex(SEGMENTS, -4, -1), ".")
    | lookup mozilla_public_suffix_lookup domain AS SEG1 OUTPUTNEW length AS SEG1_LENGTH
    | lookup mozilla_public_suffix_lookup domain AS SEG2 OUTPUTNEW length AS SEG2_LENGTH
    | lookup mozilla_public_suffix_lookup domain AS SEG3 OUTPUTNEW length AS SEG3_LENGTH
    | lookup mozilla_public_suffix_lookup domain AS SEG4 OUTPUTNEW length AS SEG4_LENGTH
    | lookup cim_http_tld_lookup tld AS SEG1 OUTPUT tld AS TLD
    | eval TGT_LENGTH=coalesce(SEG4_LENGTH,SEG3_LENGTH,SEG2_LENGTH,SEG1_LENGTH, if(cidrmatch("0.0.0.0/0", 'All_Traffic.src'), if(1==0, 4, 0), if(isnull(TLD), 2, 0))),SRC_LENGTH=mvcount(SEGMENTS),"All_Traffic.src_truncated"=if(TGT_LENGTH==0, null, if(SRC_LENGTH>=TGT_LENGTH, mvjoin(mvindex(SEGMENTS, -TGT_LENGTH, -1), "."), null))
    | fields - TLD SEG1 SEG2 SEG3 SEG4 SEG1_LENGTH SEG2_LENGTH SEG3_LENGTH SEG4_LENGTH TGT_LENGTH SRC_LENGTH SEGMENTS
    | eval "All_Traffic.src_truncated"=if("All_Traffic.src_truncated"="All_Traffic.src" OR 'All_Traffic.src_truncated'='All_Traffic.src', null(), 'All_Traffic.src_truncated')
    | lookup "threatintel_by_cidr" value as "All_Traffic.src" OUTPUT threat_collection as tc0,threat_collection_key as tck0
    | lookup "threatintel_by_domain" value as "All_Traffic.src" OUTPUT threat_collection as tc1,threat_collection_key as tck1
    | lookup "threatintel_by_domain" value as "All_Traffic.src_truncated" OUTPUT threat_collection as tc2,threat_collection_key as tck2
    | lookup "threatintel_by_system" value as "All_Traffic.src" OUTPUT threat_collection as tc3,threat_collection_key as tck3
    | where isnotnull('tck0') OR isnotnull('tck1') OR isnotnull('tck2') OR isnotnull('tck3')
    | eval intelzip0=mvzip('tc0','tck0',"@@")
    | eval intelzip1=mvzip('tc1','tck1',"@@")
    | eval intelzip2=mvzip('tc2','tck2',"@@")
    | eval intelzip3=mvzip('tc3','tck3',"@@")
    | eval threat_collection_key=mvappend(intelzip0,intelzip1,intelzip2,intelzip3)
    | eval "psrsvd_ct_sourcetype"=if(isnull('psrsvd_ct_sourcetype'),'psrsvd_ct_sourcetype','psrsvd_ct_sourcetype')
    | eval "psrsvd_nc_sourcetype"=if(isnull('psrsvd_nc_sourcetype'),'psrsvd_nc_sourcetype','psrsvd_nc_sourcetype')
    | eval "psrsvd_vm_sourcetype"=if(isnull('psrsvd_vm_sourcetype'),'psrsvd_vm_sourcetype','psrsvd_vm_sourcetype')
    | eval "psrsvd_ct_dest"=if(isnull('psrsvd_ct_dest'),'psrsvd_ct_All_Traffic.dest','psrsvd_ct_dest')
    | eval "psrsvd_nc_dest"=if(isnull('psrsvd_nc_dest'),'psrsvd_nc_All_Traffic.dest','psrsvd_nc_dest')
    | eval "psrsvd_vm_dest"=if(isnull('psrsvd_vm_dest'),'psrsvd_vm_All_Traffic.dest','psrsvd_vm_dest')
    | eval "psrsvd_ct_user"=if(isnull('psrsvd_ct_user'),'psrsvd_ct_All_Traffic.user','psrsvd_ct_user')
    | eval "psrsvd_nc_user"=if(isnull('psrsvd_nc_user'),'psrsvd_nc_All_Traffic.user','psrsvd_nc_user')
    | eval "psrsvd_vm_user"=if(isnull('psrsvd_vm_user'),'psrsvd_vm_All_Traffic.user','psrsvd_vm_user')
    | eval "threat_match_field"=if(isnull('threat_match_field'),"src",'threat_match_field')
    | eval "threat_match_value"=if(isnull('threat_match_value'),'All_Traffic.src','threat_match_value')
        ]
| mvexpand threat_collection_key
| stats values("dest") as "dest",values("sourcetype") as "sourcetype",values("user") as "user" by threat_match_field,threat_match_value,threat_collection_key
| rex field=threat_collection_key "^(?<threat_collection>.*)@@(?<threat_collection_key>.*)$"
| eval "dest"=mvindex('dest',0,10-1)
| eval "sourcetype"=mvindex('sourcetype',0,10-1)
| eval "user"=mvindex('user',0,10-1)
| eval certificate_intel_key=if(threat_collection="certificate_intel",'threat_collection_key',null())
| eval email_intel_key=if(threat_collection="email_intel",'threat_collection_key',null())
| eval file_intel_key=if(threat_collection="file_intel",'threat_collection_key',null())
| eval http_intel_key=if(threat_collection="http_intel",'threat_collection_key',null())
| eval ip_intel_key=if(threat_collection="ip_intel",'threat_collection_key',null())
| eval process_intel_key=if(threat_collection="process_intel",'threat_collection_key',null())
| eval registry_intel_key=if(threat_collection="registry_intel",'threat_collection_key',null())
| eval service_intel_key=if(threat_collection="service_intel",'threat_collection_key',null())
| eval user_intel_key=if(threat_collection="user_intel",'threat_collection_key',null())
| lookup "certificate_intel" _key as "certificate_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "email_intel" _key as "email_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "file_intel" _key as "file_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "http_intel" _key as "http_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "ip_intel" _key as "ip_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "process_intel" _key as "process_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "registry_intel" _key as "registry_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "service_intel" _key as "service_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup "user_intel" _key as "user_intel_key" OUTPUTNEW "description","threat_key","weight","disabled"
| lookup threat_group_intel _key as threat_key OUTPUTNEW description,weight
| eval weight=if(isnum(weight),weight,60)
| fields - intelzip*,"certificate_intel_key","email_intel_key","file_intel_key","http_intel_key","ip_intel_key","process_intel_key","registry_intel_key","service_intel_key","user_intel_key"
| where NOT match(disabled, "1|[Tt]|[Tt][Rr][Uu][Ee]")
| dedup threat_match_field,threat_match_value,threat_key


Has anyone faced similar problems?

BR,
Markus

Labels (1)
Tags (1)
0 Karma

Micheal_S
Path Finder

@MarkusM  

 

Did you ever find a fix for this? I ran into the same thing just recently and when tinkering with the search I found that the lookups were causing it, but if the search is autogenerated I don't know how to fix it. 

0 Karma

richgalloway
SplunkTrust
SplunkTrust

Checking the list of streaming commands at https://docs.splunk.com/Documentation/Splunk/9.0.3/SearchReference/Commandsbytype#Streaming_commands we see that tstats is not on the list, which should explain the error message.

---
If this reply helps you, Karma would be appreciated.

MarkusM
Loves-to-Learn

Thank you for your input - I think the tstats command is a generating command from either indexed fields found in TSIDX files, or fields defined in a data model, especially with prestats option set.

 Also the SPL in my post is the out of the box search that comes with Enterprise Security 7.0.1within the Threat Intel Framework. No Changes there.

Thats why I dont get what made this stop working.

BR
Markus

0 Karma
Get Updates on the Splunk Community!

.conf24 | Day 0

Hello Splunk Community! My name is Chris, and I'm based in Canberra, Australia's capital, and I travelled for ...

Enhance Security Visibility with Splunk Enterprise Security 7.1 through Threat ...

 (view in My Videos)Struggling with alert fatigue, lack of context, and prioritization around security ...

Troubleshooting the OpenTelemetry Collector

  In this tech talk, you’ll learn how to troubleshoot the OpenTelemetry collector - from checking the ...