Splunk Search

Is it possible to do a conditional stats values command

twhite_splunk
Splunk Employee
Splunk Employee

A common usecase I run into is I want to join two sources of data together only if fields meet certain criteria. The common pattern is this:

source="A" OR source="B"
| stats values(field_from_A) as field_from_A values(field_from_B) as field_from_B by common_id
| mvexpand field_from_A
| mvexpand field_from_B
| where field_from_A > field_from_B

The nature of this data is that the stats output has very large mv fields, but the where filter removes most if them. Because of that, it'd be great if I could have that where filter done by the indexers.

Is there a way to conditionally do stats values across events?

Edit adding more details:

The problem I'm trying to solve is taking normalized event streams and denormalizing them. Below is an example of the kind of flow I'm working with:

sourceA - {time: 2018-08-01, username: jim, email: jim@email.com, userid: 582}
sourceB - {time: 2018-08-02, action: purchase, item: pen, userid: 582}
sourceB - {time: 2018-08-03, action: purchase, item: paper, userid: 582}
sourceA - {time: 2018-08-10, username: jim, email: james@email.com, userid: 582}
sourceB - {time: 2018-08-15, action: purchase, item: paper, userid: 582}

Then a question that would be asked is to display email and items purchased over the past 30 days.

The above is a simplified example, and in the real data there are a lot more fields in both the A and B streams, and they both get updated independently of each other (plus also more streams).

Tags (3)
0 Karma

DalJeanis
Legend

TL;DR

@twhite - now that I've fully read your example use case, there is a better option. This is a job for a simple streamstats.

So, assuming that you want the username and email that are the most recent prior ones for any given purchase, we have this..

 source="A" OR source="B"
| fields userid email username time action item
| sort 0 userid _time source

| rename COMMENT as "Roll the data from the user record to the purchase record"   
| streamstats  last(username) as username last(email) as email by userid

| rename COMMENT as "Now keep only the purchase records"   
| where  source="B"

That's it. Make sure the field names are spelled and capitalized correctly, and that the _time field exists. Also, make sure that if the times are simultaneous, that the purchase record will end up second, not first.


Okay, we believe that if you clarified your use case, that we'd be able to help you develop a much better way of getting what you want.

Here is our statement of what your code would do -

For each value of field_from_b in index=b, find all records in index=a with a matching common_id that have values of field_from_a that are greater than the field_from_b, and report those values.

The following code would achieve that with less RAM than the multiple-mvexpand implementation...

 source="A" OR source="B"
| eval compare_value=case(index="A",field_from_a, index="B",field_from_b)
| sort 0 common_id - compare_value 
| streamstats values(eval(case(index="A",field_from_a))) as greater_as by common_id
| where index="B" and isnotnull(greater_as)
| rename greater_as as field_from_a
| mvexpand field_from_a
| where field_from_a > field_from_b

Brief explanation:

Streamstats sees only those records that have gone before, so we sort in descending compare_value order. Records which have come before are greater than or equal to the current record.

We copy the values from index="a" that are greater (ie before) the current value from index="b". (Incidentally, if it happens to be a record in index="a", it gets all the prior values as well, but we are only keeping the index="b" record after the next test, and we are only keeping them if they had at least one index="a" record that was greater.

The final test is just in case a record has an equal value. Your use case was strictly greater, so we enforce that then.


updated sort to sort 0

adonio
Ultra Champion

hello there,
what is the use case? from your query, seems like you are dealing with numerical fields only (>).
maybe you can first |bin and then add the _time field after the by clause. that will reduce the mv plenty.
something like this:

   (index = "A" source="A") OR (index = "B" source="B")
     | bin span=5m _time
     | stats values(field_A) as field_A values(field_B) as B by common_id _time
     | mvexpand field_A
     | mvexpand field_B
     | where field_A > field_B

also, maybe use search instead of where
this answer explains the difference:
https://answers.splunk.com/answers/50659/whats-the-difference-between-where-and-search-in-the-pipeli...

0 Karma

twhite_splunk
Splunk Employee
Splunk Employee

Thank you for the suggestion about | bin !

Unfortunately, the sources won't necessarily have a reasonable time overlap to assume. The problem I'm trying to solve is taking normalized event streams and denormalizing them. Below is an example of the kind of flow I'm working with:

sourceA - {time: 2018-08-01, username: jim, email: jim@email.com, userid: 582}
sourceB - {time: 2018-08-02, action: purchase, item: pen, userid: 582}
sourceB - {time: 2018-08-03, action: purchase, item: paper, userid: 582}
sourceA - {time: 2018-08-10, username: jim, email: james@email.com, userid: 582}
sourceB - {time: 2018-08-15, action: purchase, item: paper, userid: 582}

Then a question that would be asked is to display email and items purchased over the past 30 days.

The above is a simplified example, and in the real data there are a lot more fields in both the A and B streams, and they both get updated independently of each other (plus also more streams).

Thank you again!

0 Karma

adonio
Ultra Champion

so, couple of things here
looking at very large sets of data for long periods of time can be heavy on resources regardless.
for this particular usecase, although you can accomplish with |join command or like you approach it, | stats values i will suggest summary index, have it run every day and then youll have very little event to consider when looking back 30 days
another way to simplify your search will be with a lookup.
i would probably run something like this: ... source = sourceA ... | fields ... username userid email ...| stats values(username) as username values(email) as email values(userid) as userid valus(...) as ...
| outputlookup all_my_clients.csv

now you will run a simple search
... source = sourceB ... userid = * | lookup .... OUTPUT ... | stats values(item) by email

hope it helps

0 Karma

twhite_splunk
Splunk Employee
Splunk Employee

Ah, thank you for the follow up. Summary indexing "chunks" has been my general approach, but I was hopeful I had missed a way to compare events before they've been grouped.

0 Karma

jkat54
SplunkTrust
SplunkTrust

How about this?

... | stats values(eval(if(field_A>field_B,field_A,""))) as field_A_gt_field_B by common_id

Or

... | stats values(eval(if(field_A>field_B,field_A,null()))) as field_A_gt_field_B by common_id
0 Karma

twhite_splunk
Splunk Employee
Splunk Employee

Hi there!

I edited my original question to clarify - but field_A and field_B are in different events, so I don't think the above will work? I've tried a few different ways of putting the eval within the values command, but I think that ultimately is comparing fields within an event - is that the correct way to think about using eval inside of stats?

0 Karma
Get Updates on the Splunk Community!

.conf24 | Registration Open!

Hello, hello! I come bearing good news: Registration for .conf24 is now open!   conf is Splunk’s rad annual ...

ICYMI - Check out the latest releases of Splunk Edge Processor

Splunk is pleased to announce the latest enhancements to Splunk Edge Processor.  HEC Receiver authorization ...

Introducing the 2024 SplunkTrust!

Hello, Splunk Community! We are beyond thrilled to announce our newest group of SplunkTrust members!  The ...