Splunk Search

Using streamstats to track currently active values

vbumgarner
Contributor

Given input like this:

id,  action, message
 1,     add, Adding this thing
 2,     add, Adding this other thing
  ,        , I am a different message
 1, destroy, Remove this thing
  ,        , I am yet a different message
 2, destroy, Remove this other thing
  ,        , And I am yet a different message

I want to get:

 activeids, id,  action, message
         1,  1,     add, Adding this thing
       1;2,  2,     add, Adding this other thing
       1;2,   ,        , I am a different message
         2,  1, destroy, Remove this thing
         2,   ,        , I am yet a different message
          ,  2, destroy, Remove this other thing
          ,   ,        , And I am yet a different message

I've been fighting with streamstats global=false current=false window=1 last(activeids) as activeids and a load of eval statements, but streamstats doesn't seem to calculate the values when I think it should (in order of execution per event).

When does streamstats actually do its work vs. other statements in the pipeline?

Here's a query that illustrates what I'm trying to do:

| stats count 
| eval r="message=No_id_yet"
| eval r=mvappend(r,"id=1 action=add message=Adding_this_id")
| eval r=mvappend(r,"id=2 action=add message=Adding_this_other_id")
| eval r=mvappend(r,"message=Im_a_different_message")
| eval r=mvappend(r,"id=1 action=destroy message=Remove_this_thing")
| eval r=mvappend(r,"message=Im_yet_a_different_message")
| eval r=mvappend(r,"id=2 action=destroy message=Remove_this_other_thing")
| eval r=mvappend(r,"message=And_Im_yet_a_different_message")
| mvexpand r | rename r as _raw | extract 
| table id action message 

| streamstats window=1 current=false last(activeids) as activeids_prev 
| fillnull activeids_prev 

| eval add=if(action=="add", id, null) 
| eval remove=if(action=="destroy", id, null) 
| eval activeids=if(isnotnull(add), mvdedup(mvappend(activeids_prev,add)), activeids_prev)
| eval activeids=if(isnotnull(remove),split( replace( mvjoin(activeids, "IMPOSSIBLEDELIMITER") , remove, "") , "IMPOSSIBLEDELIMITER" ), activeids )

What I see in the results is that the last(activeids) as activeids_prev doesn't actually match anything. It seems that the eval statements are happening before the streamstats.

What am I missing?

woodcock
Esteemed Legend

OK, @vbumgarner, time to Accept the best one of these fine answers.

0 Karma

niketn
Legend

@vbumgarner, logging another approach different from @somesoni2 and @woodcock. Based on the pattern of data seems like the behavior of push and pop for your data is a queue implementation i.e the data that comes in first goes out first.

In my solution approach I have tried to maintain recursive list ids to be added and ids to be destroyed (with a filldown). With nomv lists are converted into single value patterns to replaced destroy list in add list to get the final list.

PS: I have added some additional data to cover add/delete sequences.

| makeresults
| eval r="message=No_id_yet" 
| eval r=mvappend(r,"id=1 action=add message=Adding_this_id") 
| eval r=mvappend(r,"id=2 action=add message=Adding_this_other_id") 
| eval r=mvappend(r,"id=3 action=add message=Adding_this_other_id") 
| eval r=mvappend(r,"message=Im_a_different_message") 
| eval r=mvappend(r,"id=1 action=destroy message=Remove_this_thing") 
| eval r=mvappend(r,"message=Im_yet_a_different_message") 
| eval r=mvappend(r,"id=2 action=destroy message=Remove_this_other_thing") 
| eval r=mvappend(r,"message=And_Im_yet_a_different_message") 
| eval r=mvappend(r,"id=4 action=add message=Adding_this_other_id")
| eval r=mvappend(r,"message=Im_yet_a_different_message")
| eval r=mvappend(r,"id=3 action=destroy message=Remove_this_other_thing") 
| eval r=mvappend(r,"id=5 action=add message=Adding_this_other_id")
| eval r=mvappend(r,"id=6 action=add message=Adding_this_other_id")
| eval r=mvappend(r,"id=4 action=destroy message=Remove_this_other_thing") 
| eval r=mvappend(r,"id=5 action=destroy message=Remove_this_other_thing") 
| eval r=mvappend(r,"id=6 action=destroy message=Remove_this_other_thing") 
| mvexpand r 
| rename r as _raw 
| extract 
| table id action message

| search action="add" OR action="destroy"
| eval addList=case(action=="add",id), destroyList=case(action="destroy",id)
| streamstats values(addList) as addList values(destroyList) as destroyList by action 
| nomv addList
| nomv destroyList
| filldown addList destroyList
| fillnull value=0 destroyList
| eval finalList=replace(replace(replace(addList,destroyList,""),"^\s",""),"\s",";")

Whether this works for you or not, I would like to seriously thank you for posting such a good brain-teaser. This should actually be listed as Smart Questions 🙂

____________________________________________
| makeresults | eval message= "Happy Splunking!!!"
0 Karma

Anam
Community Manager
Community Manager

Hi @vbumgarner

My name is Anam and I am the Community Content Specialist for Splunk Answers. Please go ahead and accept the answer that worked for you. If it is a comment, let me know and I can convert it to an answer and accept it.

Thanks

0 Karma

vbumgarner
Contributor

Though these answers are all interesting and full of great tricks, they don't quite get to where I was headed. I ended up writing some python to accomplish what is needed -- adding and removing values from a streaming set based on other field values.

0 Karma

Anam
Community Manager
Community Manager

That sounds great! Can you share what your solution was so other community members can benefit from it?

Thanks

0 Karma

woodcock
Esteemed Legend

Thank you @vbumgarner, for the fake data generator. I approached it COMPLETELY differently than @somesoni2 because I try to preserve my events and work with them, but his answer clearly works. I probably would not have even tried if you hadn't clarified the whole matter; I wish all OPs were as thorough as you! This was a VERY fun and interesting challenge! Try this (it should scale very nicely):

| gentimes start=-1 
| eval r="message=No_id_yet" 
| eval r=mvappend(r,"id=1 action=add message=Adding_this_id") 
| eval r=mvappend(r,"id=2 action=add message=Adding_this_other_id") 
| eval r=mvappend(r,"message=Im_a_different_message") 
| eval r=mvappend(r,"id=1 action=destroy message=Remove_this_thing") 
| eval r=mvappend(r,"message=Im_yet_a_different_message") 
| eval r=mvappend(r,"id=2 action=destroy message=Remove_this_other_thing") 
| eval r=mvappend(r,"message=And_Im_yet_a_different_message") 
| mvexpand r 
| rename r as _raw 
| extract 
| table id action message

| rename COMMENT AS "Everything above generates sample data; everything below is your solution"

| eval {action}=id
| streamstats values(add) AS add values(destroy) AS destroy
| nomv add
| rex field=add, mode=sed "s/[\r\n\s]+/,/g s/^/,/ s/$/,/"
| nomv destroy
| rex field=destroy, mode=sed "s/[\r\n\s]+/|/g s/^/,(?:/ s/$/)(?=,)/"
| eval activeids=split(if(coalesce(len(destroy),0)==0, add, replace(add, destroy, ",")), ",")

P.S. This would make a GREAT SmartAnSwerS post!

niketn
Legend

@woodcock, thanks for your answer... I learned something new today | eval {action}=id ... wow!!! I did not know 🙂

____________________________________________
| makeresults | eval message= "Happy Splunking!!!"
0 Karma

vbumgarner
Contributor

Wow, I didn't know | eval {action}=id either. That's golden!

Okay, to throw another wrench into this... I can't count on remove or add not being called multiple times for the same id, nor that I won't encounter removes before adds.

Add these two lines at line 3:

 | eval r=mvappend(r,"id=2 action=destroy message=Remove_this_other_thing") 
 | eval r=mvappend(r,"id=2 action=destroy message=Remove_this_other_thing") 
0 Karma

woodcock
Esteemed Legend

My solution should work with any combination of mis-sequencing or duplicates, so long as it is sorted in the order that the most up-to-date things are on top when the streamstats runs. The only thing is that if you do a "destroy" before an "add", then the "add" will never work because once a "destroy" is encountered, it persists forever in my solution.

0 Karma

niketn
Legend

@vbumgarner then do a dedup on id and action after the table command:

  | table id action message
  | dedup id action
  |  <remainingSearchAsSuggested>
____________________________________________
| makeresults | eval message= "Happy Splunking!!!"
0 Karma

woodcock
Esteemed Legend

Are you telling me that you knew that replace will take RegEx from inside a field's value? I have never met anyone who knew that and that is really the main key to my approach.

0 Karma

niketn
Legend

No I did not know that | eval addList=case(action=="add",id), destroyList=case(action="destroy",id) can be done easily with | eval {action}=id

I do know about replace() using RegEx, but I am sure I would not even know 10% of Tips and Tricks you have under your sleeves :). I am actually bad with sed and try to escape the need for sed most of the time with replace() instead.

____________________________________________
| makeresults | eval message= "Happy Splunking!!!"
0 Karma

woodcock
Esteemed Legend

I got that tip both in conference sessions (10 little-used commands) and also in answers. The best "I didn't know that!" moments always come from this forum.

somesoni2
Revered Legend

See this

| gentimes start=-1 | eval r="message=No_id_yet" | eval r=mvappend(r,"id=1 action=add message=Adding_this_id") | eval r=mvappend(r,"id=2 action=add message=Adding_this_other_id") | eval r=mvappend(r,"message=Im_a_different_message") | eval r=mvappend(r,"id=1 action=destroy message=Remove_this_thing") | eval r=mvappend(r,"message=Im_yet_a_different_message") | eval r=mvappend(r,"id=2 action=destroy message=Remove_this_other_thing") | eval r=mvappend(r,"message=And_Im_yet_a_different_message") | mvexpand r | rename r as _raw | extract | table id action message | streamstats count as rownum| filldown id | filldown action | eval id="idnum".id | fillnull value=" " action| eval temp=rownum."##".id."##".action."##".message | eval action1=if(action="add",1,-1) | chart values(action1) over temp by id | rex field=temp "(?<rownum>.*)##(?<id>.*)##(?<action>.*)##(?<message>.*)" | fields - temp | table rownum id action message * | filldown * | eval activeids="" | foreach idnum* [| eval activeids=if('<<FIELD>>'=1,activeids.":"."<<MATCHSTR>>",activeids)] | eval activeids=replace(activeids,"^:","") | eval action=if(NOT match(message,"Adding") AND NOT match(message,"Remove"),"",action) | table activeids id action message

elliotproebstel
Champion

The commands are executing in order. After the table command on line 11, you have a table with three columns/fields: id, action, and message. The next command is this:

| streamstats window=1 current=false last(activeids) as activeids_prev 

But that's asking streamstats to act on a field called activeids, which does not yet exist.

0 Karma

vbumgarner
Contributor

Right. What I need is for the eval statements to run per row as it’s going along. My latest attempt is to try to shove the eval statements inside the last() on the streamstats command, but it doesn’t seem to honor the field just created by itself, and a fillnull beforehand doesn’t make any difference.

0 Karma

vbumgarner
Contributor

And by “right”, I mean “you’re absolutely right, the eval statements AFTER the streamstats are irrelevant.”

0 Karma

ssadanala1
Contributor

do you have active id's fields in your data ?

0 Karma

vbumgarner
Contributor

Take a look at the example query.

0 Karma
Get Updates on the Splunk Community!

Introducing the 2024 SplunkTrust!

Hello, Splunk Community! We are beyond thrilled to announce our newest group of SplunkTrust members!  The ...

Introducing the 2024 Splunk MVPs!

We are excited to announce the 2024 cohort of the Splunk MVP program. Splunk MVPs are passionate members of ...

Splunk Custom Visualizations App End of Life

The Splunk Custom Visualizations apps End of Life for SimpleXML will reach end of support on Dec 21, 2024, ...