Splunk Search

Join indexes on field that contain.

JandrevdM
Path Finder

Good day, It's been a while. 

I am trying to join two indexes together to see if a ticket has been logged based on the first search.

Search 1: 
Is used to gather all the results. Here we will be using title, as our main source. (But we dont want to through away all the other colums)

index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json" 
"falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
| makemv delim="\n" "falcon_spotlight.remediation.entities{}.title"
| mvexpand "falcon_spotlight.remediation.entities{}.title"
| stats values("falcon_spotlight.host_info.hostname") as hosts dc("falcon_spotlight.host_info.hostname") as host_count by "falcon_spotlight.remediation.entities{}.title"
| eval hosts=mvsort(hosts)
| table "falcon_spotlight.remediation.entities{}.title" host_count


Search 2:
Is used to gather all the tickets logged to our ITSM. 

index=db_service_now sourcetype="snow:incident" status!="Resolved" status!="Closed" affect_dest="CrowdStrike Vulnerability Management"
| dedup number description
| table _time active "number" "affect_dest" "description" "dv_description" "dv_impact" "priority" "status" "assignment_group_name" "assignment_user_name" "close_code" "close_notes" "closed_at" "short_description"

 
in search 1 the title will be something like "Update Nvidea GeForce"

in search two description will be something like "March: Low - Update Nvidea Geforce"

index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json"
"falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
| makemv delim="\n" "falcon_spotlight.remediation.entities{}.title"
| mvexpand "falcon_spotlight.remediation.entities{}.title"
| eval cs_title=trim('falcon_spotlight.remediation.entities{}.title')
| where len(cs_title)>0                                 
| stats values('falcon_spotlight.host_info.hostname') AS hosts
        dc('falcon_spotlight.host_info.hostname') AS host_count
  BY cs_title
| eval hosts=mvsort(hosts), lc_title=lower(cs_title), key=1
| join type=left max=0 key [
    search index=db_service_now sourcetype="snow:incident"
          status!="Resolved" status!="Closed"
          affect_dest="CrowdStrike Vulnerability Management"
    | dedup number                                   
    | eval desc=lower(coalesce(dv_description, description, short_description))
    | eval key=1
    | fields key number status priority assignment_group_name assignment_user_name desc
]
| eval matched = if(like(desc, "%" . lc_title . "%"), 1, 0)
| eval number                = if(matched=1, number, null())
| eval status                = if(matched=1, status, null())
| eval priority              = if(matched=1, priority, null())
| eval assignment_group_name = if(matched=1, assignment_group_name, null())
| eval assignment_user_name  = if(matched=1, assignment_user_name,  null())
| stats
    max(host_count)                       AS host_count
    values(hosts)                         AS hosts
    values(number)                        AS snow_numbers
    values(status)                        AS snow_status
    values(priority)                      AS snow_priority
    values(assignment_group_name)         AS snow_group
    values(assignment_user_name)          AS snow_assignee
    sum(matched)                          AS snow_count
  BY cs_title
| eval has_ticket=if(snow_count>0,"Yes","No")
| dedup snow_numbers
| table cs_title host_count has_ticket snow_count snow_numbers snow_status snow_priority snow_group snow_assignee
| sort -has_ticket -host_count


How can I combine my searches to search the second one where it contains the first title field?
This is what I tried but it is not completely what I am looking for
Any assistance would be greatly appreciated!


Labels (1)
0 Karma

yuanliu
SplunkTrust
SplunkTrust

Let me reconstruct this step by step.  No matter what technique you choose, matching logic is the key question.

in search 1 the title will be something like "Update Nvidea GeForce"

in search two description will be something like "March: Low - Update Nvidea Geforce"

The above is from OP.  Is description always composed of some sort of classifier ("March: Low") followed by a dash then title?  I know that you mentioned a version of your implementation as "not a proper solution".  Can you elaborate why?  Because you have never described matching logic in detail (including what kind of characters/strings have to throw off any specific implementation), this is the simplest I can think of.  So I will assume that it IS the proper solution - maybe it was your implementation that had a problem:

 

 

| eval match_key = coalesce(title,
  mvindex(split(description, " - "), 1, -1))

 

 

However, in another reply, you said

"falcon_spotlight.remediation.entities{}.title" will contain "Update Nvidea Geforce" And description might contain the same as the title but have more information, "March - Low Update Nvidea Geforce"

This example quite contradicts the example in the OP.  What is the real logic?  Is title always contained in description?  Your third illustrated SPL seems to suggest so.  This more generic logic can be expressed in the following:

 

| eval match_key = if(description LIKE "%" . title . "%", title, null())

 

 The second strategy is doable but much more expensive in practice.  So, I will focus on the simpler strategy.

The above two snippets are just concepts expressed in SPL, not a solution.  To really construct something usable, let me circle back to @PickleRick's comment about join: only use as a last resort.

Next, let's try to find a usable solution.  I will first illustrate the most commonly recommended method, i.e., use one index search to include all necessary events.

You haven't told us how the desired result should look like. So, I can only speculate based on the third SPL you illustrated: You want select stats grouped by the short title.  Here is the first method:

 

(index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json" 
  "falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
  ``` of interest: falcon_spotlight.remediation.entities{}.title, falcon_spotlight.host_info.hostname ```
  ) OR (
    index=db_service_now sourcetype="snow:incident" status!="Resolved" status!="Closed" affect_dest="CrowdStrike Vulnerability Management"
    ``` of interest: number description priority status assignment_group_name assignment_user_name ```
  )
| rename falcon_spotlight.host_info.hostname as host
| eval title = coalesce(trim('falcon_spotlight.remediation.entities{}.title'),
    mvindex(split(description, " - "), 1, -1))
| mvexpand title
| stats
    dc(host)                              AS host_count
    values(host)                          AS hosts
    values(number)                        AS snow_numbers
    values(status)                        AS snow_status
    values(priority)                      AS snow_priority
    values(assignment_group_name)         AS snow_group
    values(assignment_user_name)          AS snow_assignee
    dc(number)                            AS snow_count
  BY title
| eval has_ticket=if(snow_count>0,"Yes","No")
| table cs_title host_count has_ticket snow_count snow_numbers snow_status snow_priority snow_group snow_assignee
| sort -has_ticket -host_count

 

Here, I removed a lot of calculations that I believe are not necessary, such as makemv (flattened JSON array elements falcon_spotlight.remediation.entities{}.* are already multivalue; they are NOT newline delimited), mvsort (any values output is already sorted lexically), etc.

The only expensive calculation in the above is mvexpand.  You may get inconsistent data if there are too many events.

A second method is as @livehybrid suggested, using append.  It only marginally reduces the mvexpand problem; it can introduce a different memory problem although I believe db_service_now events are a lot sparser so it is probably not immaterial.

 

(index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json" 
  "falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
  ``` of interest: falcon_spotlight.remediation.entities{}.title, falcon_spotlight.host_info.hostname ```
| rename falcon_spotlight.host_info.hostname as host
| eval title = trim('falcon_spotlight.remediation.entities{}.title')
| mvexpand title
| append
    [ search
      index=db_service_now sourcetype="snow:incident" status!="Resolved" status!="Closed" affect_dest="CrowdStrike Vulnerability Management"
      ``` of interest: number description priority status assignment_group_name assignment_user_name ```
    | dedup number
    | eval title = mvindex(split(description, " - "), 1, -1)
    ]
| stats
    dc(host)                              AS host_count
    values(host)                          AS hosts
    values(number)                        AS snow_numbers
    values(status)                        AS snow_status
    values(priority)                      AS snow_priority
    values(assignment_group_name)         AS snow_group
    values(assignment_user_name)          AS snow_assignee
    dc(number)                            AS snow_count
  BY title
| eval has_ticket=if(snow_count>0,"Yes","No")
| table cs_title host_count has_ticket snow_count snow_numbers snow_status snow_priority snow_group snow_assignee
| sort -has_ticket -host_count

 

Note in both approaches, you want to create that match key in respective datasets first.

Of course, if service_now's "description" is free text and does not follow a predefined pattern as to how they match title in crowdstrike, you will need to use the second matching strategy.  However, the calculation will be even more expensive than just eventstats suggested by @bowesmana .  Let us know if you are this unfortunate.

0 Karma

bowesmana
SplunkTrust
SplunkTrust

I have used a technique which can work, depending on data volumes and cardinality. Using append to collect both datasets together, then you can use 

| eventstats values(cs_title) as titles
| eval titles=if(isnotnull(cs_title), null(), titles)

which will collect ALL titles in q1 into every event from q2 - it collects every title to every event then drops the field from q1 events. What this is doing it to push all titles into a field for all q2 events which can then be matched against, a bit like a dynamic runtime lookup. 

and then you can follow that with a match, e.g.

| eval matching_title=mvmap(titles, if(match(description, "(?i)".titles), titles, null()))
| fields - titles

which will set a new field matching_title which will be non-null if a match can be found - NB- it will do case insensitive match.

This can use a lot of memory if you have lots of titles, but works.

Note that there are some other optimisations you should do in your Q1 - ALWAYS limit the fields you want in the expanded events before using mvexpand. Particulary, make sure you exclude _raw as _raw JSON is generally always unnecessary and will be duplicated for every expanded event.

You should not need to use mvsort on a field that has come from stats values(), as the output from values is always sorted.

PickleRick
SplunkTrust
SplunkTrust

1. Since you're trying to join on a key field and you set this field to be equal to 1 everywhere (for every event in both sets) it will definitely not work properly.

2. Join is generally a limited command which should be avoided if possible.

3. @livehybrid 's idea with append might be OK but if you had only streaming command you could use multisearch instead

4. But generally what you're trying to do is not available as such. You can't "join" by partial text matching. The only way of achieving something similar to what you're trying to do would be to create a lookup definition with a wildcard match type and then do two separate searches. With one search you'd create a set of patterns and output them to a lookup. And then with a separate search you'd use that lookup for matching. But it will not be efficient. Splunk will need to try to match each lookup entry to each event so you'd have O(m*n) complexity.

0 Karma

livehybrid
SplunkTrust
SplunkTrust

Hi @JandrevdM 

Im not usually a fan of append however could work here if your appended query has less than 50,000 results.

Not currently on a proper browser so hard to write out, but If both have the shared field "short_description" then you can do something like:

`query_1` 
| append [`query_2`]
| stats values(*) as * by short_description

You could then use a where command to filter out anything which does not have a values in the expected fields leaving just those which are present in both.

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

JandrevdM
Path Finder

Thank you for the assistance, unfortunately I do not have a shared field and did try append.

"falcon_spotlight.remediation.entities{}.title" will contain "Update Nvidea Geforce"
And description might contain the same as the title but have more information, 
"March - Low Update Nvidea Geforce"

I event tried to remove certain words to make a matching filed but that will not be a proper solution
So it is more a if search 2 description contains the title and not match.

0 Karma
Get Updates on the Splunk Community!

Splunk Decoded: Service Maps vs Service Analyzer Tree View vs Flow Maps

It’s Monday morning, and your phone is buzzing with alert escalations – your customer-facing portal is running ...

What’s New in Splunk Observability – September 2025

What's NewWe are excited to announce the latest enhancements to Splunk Observability, designed to help ITOps ...

Fun with Regular Expression - multiples of nine

Fun with Regular Expression - multiples of nineThis challenge was first posted on Slack #regex channel ...