Splunk Search

How to correlate events from two sourcetypes where the correlation field from SourcetypeA is multivalued and SourcetypeB is a single value?

IngloriousSplun
Communicator

I have two different network sensors - Sensor A and Sensor B. Each has their own event format that I aggregate in Splunk. I'm trying to correlate events between the two sensor platforms and only produce a result if both sensors saw the same event.

The complication I'm having is that Sensor A will sometimes send a multi-value field containing multiple values for the specific field I'm interested in correlating on, whereas Sensor B will always only send a single value. For example, if I'm correlating on file hash Sensor A may send two different file hashes separated by what seems to be just a space in a single md5sum hash field. Sensor B, however, will always send just one file hash per event in that field.

What is the best way to correlate these events? I've thought about deduping the field for Sensor A first, then running it through transaction, but I'm concerned I'm not doing this properly and preserving the other event data that I want to display with it. Another option I found was to not use transaction and use statsinstead. I've tried both methods below and fail to receive a result, even though I've found events that should correlate manually. Any thoughts on the most accurate way to accomplish this? I want to be able to list the hash, event id for both sensors, timestamps for both sensors, etc for each correlated event.

Using transaction:

index=A OR index=B OR index=C sourcetype=SensorA OR sourcetype=SensorB | mvexpand sensorAFieldA | dedup sensorAFieldA | singleField=coalesce(sensorAFieldA, sensorBFieldB) | transaction singleField maxspan=1h | eval match=if(match(sensorAFieldA,sensorBFieldB),"1","0") | where match > 0 | table sensorAFieldA, sensorBFieldB, sensorAID, sensorBID

Using stats:

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | stats values(sensorAFieldA) AS fieldA , values(sensorBFieldB) AS fieldB | mvexpand fieldA | eval matching=if(match(fieldA,fieldB),"1","0") | where matching > 0 | table fieldA, fieldB

Thanks

1 Solution

sideview
SplunkTrust
SplunkTrust

Maybe it's not this simple, but how about just coalescing to a common field name, then asking stats to count distinct sourcetypes for each value and restricting to the rows that have 2 sourcetypes? Stats can work across a mix of single-value and multivalue rows so I don't think you need the mvexpand necessarily. It would look like this:

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval normalized=coalesce(sensorAFieldA,sensorBFieldB) | stats count dc(sourcetype) as sourcetypes by normalized | search sourcetypes>1 | rename normalized as file_hash

the maxspan=1h is interesting, so to incorporate that sort of logic you could do this

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval normalized=coalesce(sensorAFieldA,sensorBFieldB) | stats values(_time) as timestamps max(_time) as latest min(_time) as earliest count dc(sourcetype) as sourcetypes by normalized | eval delta=latest-earliest | search sourcetypes>1 delta<3600 | rename normalized as file_hash

Although technically if the sensorA values kept flagging a particular file for longer than an hour, and then sensor B only flagged it once, this search would filter that out incorrectly. In which case maybe transaction is the better way to go.

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval file_hash=coalesce(sensorAFieldA,sensorBFieldB) | transaction file_hash maxspan=1h | search sourcetype=sensorA sourcetype=sensorB

View solution in original post

martin_mueller
SplunkTrust
SplunkTrust

If any element of your MV field can match then you can just let transaction do its thing:

| stats count | eval field = "v1,v2 v2,v3 v3" | makemv field | mvexpand field | makemv delim="," field
| streamstats count | eval _time = now() 
| transaction field

The first two lines set up three events. The first has field=[v1, v2], the second [v2,v3], the third v3 only. transaction merges them all together.

0 Karma

martin_mueller
SplunkTrust
SplunkTrust

Everything until before the | transaction just sets up some dummy data looking like this:

count  field  _time
1      v1     2015-03-20 18:48:29   
       v2
2      v2     2015-03-20 18:48:29   
       v3
3      v3     2015-03-20 18:48:29

Using this dummy data for | transaction field shows that Splunk will merge them all into one event, honouring the multivalue field. This should translate to your problem.

0 Karma

IngloriousSplun
Communicator

Ok, that helps me visualize it much better. I realized what happened - someone created a single alias for this field on both sensors, and that seems to be what was causing the unusual results. Now if I do the query index=A OR index=B OR index=C sourcetype=A or sourcetype=B | eval hashes=coalesce(fieldA,fieldB) | transaction hashes maxspan=* | ... I get the correct result.

Thanks for the help!

0 Karma

IngloriousSplun
Communicator

Can you expand on your answer, specifically eval field="v1,v2 v2,v3 v3"? I'm not sure what I should be putting between the quotes, is that the fields?

From SensorA I have a field, file_hash, and from SensorB I have a field, File_Hash. SensorA field file_hash is a MV field, and can contain more than one hash at times. SensorB's File_Hash field only ever contains one hash. Are you saying I should make my query: | stats count | eval field="file_hash,File_Hash" | makemv field ...?

I should have two results from the data set I'm using, I've confirmed it manually.

0 Karma

sideview
SplunkTrust
SplunkTrust

Maybe it's not this simple, but how about just coalescing to a common field name, then asking stats to count distinct sourcetypes for each value and restricting to the rows that have 2 sourcetypes? Stats can work across a mix of single-value and multivalue rows so I don't think you need the mvexpand necessarily. It would look like this:

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval normalized=coalesce(sensorAFieldA,sensorBFieldB) | stats count dc(sourcetype) as sourcetypes by normalized | search sourcetypes>1 | rename normalized as file_hash

the maxspan=1h is interesting, so to incorporate that sort of logic you could do this

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval normalized=coalesce(sensorAFieldA,sensorBFieldB) | stats values(_time) as timestamps max(_time) as latest min(_time) as earliest count dc(sourcetype) as sourcetypes by normalized | eval delta=latest-earliest | search sourcetypes>1 delta<3600 | rename normalized as file_hash

Although technically if the sensorA values kept flagging a particular file for longer than an hour, and then sensor B only flagged it once, this search would filter that out incorrectly. In which case maybe transaction is the better way to go.

index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval file_hash=coalesce(sensorAFieldA,sensorBFieldB) | transaction file_hash maxspan=1h | search sourcetype=sensorA sourcetype=sensorB

IngloriousSplun
Communicator

I tried this search: index=A OR index=B OR index=C sourcetype=sensorA OR sourcetype=sensorB | eval file_hash=coalesce(sensorAFieldA,sensorBFieldB) | transaction file_hash maxspan=1h | search sourcetype=sensorA sourcetype=sensorB, which is similar to one I was trying previously, but I still get no results. I know I should have at least 2 results in the data set I'm using as I confirmed manually.

0 Karma

IngloriousSplun
Communicator

I realized what happened - someone created a single alias for this field on both sensors, and that seems to be what was causing the unusual results. Now if I do the query index=A OR index=B OR index=C sourcetype=A or sourcetype=B | eval hashes=coalesce(fieldA,fieldB) | transaction hashes maxspan=* | ... I get the correct result.

Thanks for the help!

0 Karma

sideview
SplunkTrust
SplunkTrust

And for the file hash that you manually confirmed, the value present in sourcetype A and the value presentin sourcetype B are within 1 hour of eachother?

0 Karma

martin_mueller
SplunkTrust
SplunkTrust

The crucial question is "what's the meaning of that MV field in A?".

Does exactly one of its values correlate with the single value in B? If so, which value?
Could any value correlate with B?
Should all values correlate with B? If so, how to merge the values in A?

These should be answered before trying to express the answers in SPL.

0 Karma

IngloriousSplun
Communicator

Any value in A could correlate to B

0 Karma
Get Updates on the Splunk Community!

Announcing Scheduled Export GA for Dashboard Studio

We're excited to announce the general availability of Scheduled Export for Dashboard Studio. Starting in ...

Extending Observability Content to Splunk Cloud

Watch Now!   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to leverage ...

More Control Over Your Monitoring Costs with Archived Metrics GA in US-AWS!

What if there was a way you could keep all the metrics data you need while saving on storage costs?This is now ...