Good day, It's been a while.
I am trying to join two indexes together to see if a ticket has been logged based on the first search.
Search 1:
Is used to gather all the results. Here we will be using title, as our main source. (But we dont want to through away all the other colums)
index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json"
"falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
| makemv delim="\n" "falcon_spotlight.remediation.entities{}.title"
| mvexpand "falcon_spotlight.remediation.entities{}.title"
| stats values("falcon_spotlight.host_info.hostname") as hosts dc("falcon_spotlight.host_info.hostname") as host_count by "falcon_spotlight.remediation.entities{}.title"
| eval hosts=mvsort(hosts)
| table "falcon_spotlight.remediation.entities{}.title" host_count
Search 2:
Is used to gather all the tickets logged to our ITSM.
index=db_service_now sourcetype="snow:incident" status!="Resolved" status!="Closed" affect_dest="CrowdStrike Vulnerability Management"
| dedup number description
| table _time active "number" "affect_dest" "description" "dv_description" "dv_impact" "priority" "status" "assignment_group_name" "assignment_user_name" "close_code" "close_notes" "closed_at" "short_description"
in search 1 the title will be something like "Update Nvidea GeForce"
in search two description will be something like "March: Low - Update Nvidea Geforce"
index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json"
"falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
| makemv delim="\n" "falcon_spotlight.remediation.entities{}.title"
| mvexpand "falcon_spotlight.remediation.entities{}.title"
| eval cs_title=trim('falcon_spotlight.remediation.entities{}.title')
| where len(cs_title)>0
| stats values('falcon_spotlight.host_info.hostname') AS hosts
dc('falcon_spotlight.host_info.hostname') AS host_count
BY cs_title
| eval hosts=mvsort(hosts), lc_title=lower(cs_title), key=1
| join type=left max=0 key [
search index=db_service_now sourcetype="snow:incident"
status!="Resolved" status!="Closed"
affect_dest="CrowdStrike Vulnerability Management"
| dedup number
| eval desc=lower(coalesce(dv_description, description, short_description))
| eval key=1
| fields key number status priority assignment_group_name assignment_user_name desc
]
| eval matched = if(like(desc, "%" . lc_title . "%"), 1, 0)
| eval number = if(matched=1, number, null())
| eval status = if(matched=1, status, null())
| eval priority = if(matched=1, priority, null())
| eval assignment_group_name = if(matched=1, assignment_group_name, null())
| eval assignment_user_name = if(matched=1, assignment_user_name, null())
| stats
max(host_count) AS host_count
values(hosts) AS hosts
values(number) AS snow_numbers
values(status) AS snow_status
values(priority) AS snow_priority
values(assignment_group_name) AS snow_group
values(assignment_user_name) AS snow_assignee
sum(matched) AS snow_count
BY cs_title
| eval has_ticket=if(snow_count>0,"Yes","No")
| dedup snow_numbers
| table cs_title host_count has_ticket snow_count snow_numbers snow_status snow_priority snow_group snow_assignee
| sort -has_ticket -host_count
How can I combine my searches to search the second one where it contains the first title field?
This is what I tried but it is not completely what I am looking for
Any assistance would be greatly appreciated!
Let me reconstruct this step by step. No matter what technique you choose, matching logic is the key question.
in search 1 the title will be something like "Update Nvidea GeForce"
in search two description will be something like "March: Low - Update Nvidea Geforce"
The above is from OP. Is description always composed of some sort of classifier ("March: Low") followed by a dash then title? I know that you mentioned a version of your implementation as "not a proper solution". Can you elaborate why? Because you have never described matching logic in detail (including what kind of characters/strings have to throw off any specific implementation), this is the simplest I can think of. So I will assume that it IS the proper solution - maybe it was your implementation that had a problem:
| eval match_key = coalesce(title,
mvindex(split(description, " - "), 1, -1))
However, in another reply, you said
"falcon_spotlight.remediation.entities{}.title" will contain "Update Nvidea Geforce" And description might contain the same as the title but have more information, "March - Low Update Nvidea Geforce"
This example quite contradicts the example in the OP. What is the real logic? Is title always contained in description? Your third illustrated SPL seems to suggest so. This more generic logic can be expressed in the following:
| eval match_key = if(description LIKE "%" . title . "%", title, null())
The second strategy is doable but much more expensive in practice. So, I will focus on the simpler strategy.
The above two snippets are just concepts expressed in SPL, not a solution. To really construct something usable, let me circle back to @PickleRick's comment about join: only use as a last resort.
Next, let's try to find a usable solution. I will first illustrate the most commonly recommended method, i.e., use one index search to include all necessary events.
You haven't told us how the desired result should look like. So, I can only speculate based on the third SPL you illustrated: You want select stats grouped by the short title. Here is the first method:
(index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json"
"falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
``` of interest: falcon_spotlight.remediation.entities{}.title, falcon_spotlight.host_info.hostname ```
) OR (
index=db_service_now sourcetype="snow:incident" status!="Resolved" status!="Closed" affect_dest="CrowdStrike Vulnerability Management"
``` of interest: number description priority status assignment_group_name assignment_user_name ```
)
| rename falcon_spotlight.host_info.hostname as host
| eval title = coalesce(trim('falcon_spotlight.remediation.entities{}.title'),
mvindex(split(description, " - "), 1, -1))
| mvexpand title
| stats
dc(host) AS host_count
values(host) AS hosts
values(number) AS snow_numbers
values(status) AS snow_status
values(priority) AS snow_priority
values(assignment_group_name) AS snow_group
values(assignment_user_name) AS snow_assignee
dc(number) AS snow_count
BY title
| eval has_ticket=if(snow_count>0,"Yes","No")
| table cs_title host_count has_ticket snow_count snow_numbers snow_status snow_priority snow_group snow_assignee
| sort -has_ticket -host_count
Here, I removed a lot of calculations that I believe are not necessary, such as makemv (flattened JSON array elements falcon_spotlight.remediation.entities{}.* are already multivalue; they are NOT newline delimited), mvsort (any values output is already sorted lexically), etc.
The only expensive calculation in the above is mvexpand. You may get inconsistent data if there are too many events.
A second method is as @livehybrid suggested, using append. It only marginally reduces the mvexpand problem; it can introduce a different memory problem although I believe db_service_now events are a lot sparser so it is probably not immaterial.
(index="db_crowdstrike" sourcetype="crowdstrike:spotlight:vulnerability:json"
"falcon_spotlight.status"!="closed" "falcon_spotlight.suppression_info.is_suppressed"="false"
``` of interest: falcon_spotlight.remediation.entities{}.title, falcon_spotlight.host_info.hostname ```
| rename falcon_spotlight.host_info.hostname as host
| eval title = trim('falcon_spotlight.remediation.entities{}.title')
| mvexpand title
| append
[ search
index=db_service_now sourcetype="snow:incident" status!="Resolved" status!="Closed" affect_dest="CrowdStrike Vulnerability Management"
``` of interest: number description priority status assignment_group_name assignment_user_name ```
| dedup number
| eval title = mvindex(split(description, " - "), 1, -1)
]
| stats
dc(host) AS host_count
values(host) AS hosts
values(number) AS snow_numbers
values(status) AS snow_status
values(priority) AS snow_priority
values(assignment_group_name) AS snow_group
values(assignment_user_name) AS snow_assignee
dc(number) AS snow_count
BY title
| eval has_ticket=if(snow_count>0,"Yes","No")
| table cs_title host_count has_ticket snow_count snow_numbers snow_status snow_priority snow_group snow_assignee
| sort -has_ticket -host_count
Note in both approaches, you want to create that match key in respective datasets first.
Of course, if service_now's "description" is free text and does not follow a predefined pattern as to how they match title in crowdstrike, you will need to use the second matching strategy. However, the calculation will be even more expensive than just eventstats suggested by @bowesmana . Let us know if you are this unfortunate.
I have used a technique which can work, depending on data volumes and cardinality. Using append to collect both datasets together, then you can use
| eventstats values(cs_title) as titles
| eval titles=if(isnotnull(cs_title), null(), titles)
which will collect ALL titles in q1 into every event from q2 - it collects every title to every event then drops the field from q1 events. What this is doing it to push all titles into a field for all q2 events which can then be matched against, a bit like a dynamic runtime lookup.
and then you can follow that with a match, e.g.
| eval matching_title=mvmap(titles, if(match(description, "(?i)".titles), titles, null()))
| fields - titles
which will set a new field matching_title which will be non-null if a match can be found - NB- it will do case insensitive match.
This can use a lot of memory if you have lots of titles, but works.
Note that there are some other optimisations you should do in your Q1 - ALWAYS limit the fields you want in the expanded events before using mvexpand. Particulary, make sure you exclude _raw as _raw JSON is generally always unnecessary and will be duplicated for every expanded event.
You should not need to use mvsort on a field that has come from stats values(), as the output from values is always sorted.
1. Since you're trying to join on a key field and you set this field to be equal to 1 everywhere (for every event in both sets) it will definitely not work properly.
2. Join is generally a limited command which should be avoided if possible.
3. @livehybrid 's idea with append might be OK but if you had only streaming command you could use multisearch instead
4. But generally what you're trying to do is not available as such. You can't "join" by partial text matching. The only way of achieving something similar to what you're trying to do would be to create a lookup definition with a wildcard match type and then do two separate searches. With one search you'd create a set of patterns and output them to a lookup. And then with a separate search you'd use that lookup for matching. But it will not be efficient. Splunk will need to try to match each lookup entry to each event so you'd have O(m*n) complexity.
Hi @JandrevdM
Im not usually a fan of append however could work here if your appended query has less than 50,000 results.
Not currently on a proper browser so hard to write out, but If both have the shared field "short_description" then you can do something like:
`query_1`
| append [`query_2`]
| stats values(*) as * by short_description
You could then use a where command to filter out anything which does not have a values in the expected fields leaving just those which are present in both.
🌟 Did this answer help you? If so, please consider:
Your feedback encourages the volunteers in this community to continue contributing
Thank you for the assistance, unfortunately I do not have a shared field and did try append.
"falcon_spotlight.remediation.entities{}.title" will contain "Update Nvidea Geforce"
And description might contain the same as the title but have more information,
"March - Low Update Nvidea Geforce"
I event tried to remove certain words to make a matching filed but that will not be a proper solution
So it is more a if search 2 description contains the title and not match.