Splunk Search

Concurrent users per time bucket from transactions

rjthibod
Champion

The objective is take events that indicate user activity, breakdown the data into segments of time, and then figure out what segments/bins should be marked to indicate that user activity took place during that time. Here is an example of the data I am looking at

Start | End  |  Field_Name  |  Unique_Visitor_ID
 a1   |  a1  |     AAA      |      ZZZ
 a1   |  a2  |     AAA      |      YYY
 a2   |  a2  |     AAA      |      YYY
 a3   |  a4  |     AAA      |      ZZZ
 a4   |  a4  |     AAA      |      ZZZ
 a5   |  a6  |     AAA      |      YYY
 a6   |  a6  |     AAA      |      ZZZ

In the table above, "Start" and "End" values indicate the start and end time segments that define a window of time where user "Unique_Visitor_ID" was using resource "Field_Name". Think of it kind of like the specification of a Gantt Chart, where "Start" and "End" define the range of time windows during which a user was accessing a resource.

What we want to do is create a plot where we know how many users were using each resource during each time segment. The trick is that each event in the table can span multiple time segments, and each event is generated from a grouping of events into a transaction.

I have been able to generate a query to populate the chart like I want, but it is extremely inefficient due to how a combination of 'map' and 'gentimes' are being used. Any help on simplifying this would be extremely appreciated.

Here is the end of the query where I try to produce a chart-able result after forming the table above. Basically, put all start and end segments in increments of 15 minutes, and then run through a map/gentimes command that will break up all of the transaction events that span multiple segments into individual events that cover only a single segment.

GENERATE_TRANSACTION_TABLE 
    | fields start end FieldName UniqueID 
    | bin start span=15m 
    | bin end span=15m
    | stats max(end) as lasttime by UniqueID FieldName start  
    | stats min(start) as starttime by UniqueID FieldName lasttime 
    | stats values(UniqueID) as UniqueID by starttime lasttime FieldName  (<-- filter and group like events)
    | eval starttime=strftime(starttime, "%m/%d/%Y:%H:%M:%S") 
    | eval lasttime=lasttime+1    (<-- this is a workaround to make gentimes work for events with same start and end)
    | eval lasttime=strftime(lasttime, "%m/%d/%Y:%H:%M:%S") 
    | map maxsearches=1000 
       search="gentimes start=$starttime$ end=$lasttime$ increment=15m 
       | eval FieldName=$FieldName$ 
       | eval UniqueID=$UniqueID$"
    | fields starttime FieldName UniqueID
    | dedup starttime FieldName UniqueID 
    | makemv delim=" " UniqueID 
    | mvexpand UniqueID 
    | rename starttime as _time 
    | timechart span=15m dc(UniqueID) as Count by FieldName

Here is some example data to show the challenge. Assume this is what comes out of the transaction process.

    Start         End      Field_Name   Unique_Visitor_ID
1434355312  1434355421     AAA            ZZZ
1434355534  1434357109     AAA            ZZZ
1434357201  1434358920     AAA            ZZZ
1434362435     1434378784     BBB            YYY

This is what the same data looks like after assigning time buckets with a span of 30 minutes

         Start                End           Field_Name   Unique_Visitor_ID
06/15/2015:09:00:00    06/15/2015:09:00:00     AAA            ZZZ
06/15/2015:09:00:00    06/15/2015:09:30:00     AAA            ZZZ
06/15/2015:09:30:00    06/15/2015:10:00:00     AAA            ZZZ
06/15/2015:11:00:00    06/15/2015:15:30:00     BBB            YYY

This is what the end result would look like after calling timechart.

           _time              AAA               BBB
06/15/2015:09:00:00           1                     0 
06/15/2015:09:30:00           1                     0
06/15/2015:10:00:00           1                     0
06/15/2015:10:30:00           0                     0
06/15/2015:11:00:00           0                     1
06/15/2015:11:30:00           0                     1
06/15/2015:12:00:00           0                     1
06/15/2015:12:30:00           0                     1
06/15/2015:13:00:00           0                     1
06/15/2015:13:30:00           0                     1
06/15/2015:14:00:00           0                     1
06/15/2015:14:30:00           0                     1
06/15/2015:15:00:00           0                     1
06/15/2015:15:30:00           0                     1
06/15/2015:16:00:00           0                     0
1 Solution

rjthibod
Champion

This is the more generic approach that is used heavily in one of my apps. It will split the time ranges marked by Start and End and convert them into 1-minute (60 second) time buckets with the duration2 field indicating how much time in the time bucket the event occupied. Assume Start and End are time as Epoch milliseconds.

Note: @sideview was very close and much more detailed in terms of options for simpler cases (why I upvoted him). My case was more complicated and required the most generic approach I could come up with, i.e., transaction and concurrency couldn't handle it.

 BASE SEARCH ... 
 | eval earliest=Start
 | eval latest=End
 | eval duration = latest - earliest
 | eval start_time_min = floor(earliest - (earliest % 60))
 | eval end_time_min   = floor(latest - (latest % 60))
 | eval time_bins = mvrange(start_time_min, end_time_min + 1, 60)
 | mvexpand time_bins 
 | eval duration2 = if(start_time_min == end_time_min, duration, if(start_time_min == time_bins, round(start_time_min + 60 - earliest, 3), if(end_time_min == time_bins, round(latest - end_time_min, 3), 60))) 
 | rename time_bins as _time
 | table _time duration2 Field_Name Unique_Visitor_ID
 | eval _span = 60
 | ... do stats or whatever you need

View solution in original post

rjthibod
Champion

This is the more generic approach that is used heavily in one of my apps. It will split the time ranges marked by Start and End and convert them into 1-minute (60 second) time buckets with the duration2 field indicating how much time in the time bucket the event occupied. Assume Start and End are time as Epoch milliseconds.

Note: @sideview was very close and much more detailed in terms of options for simpler cases (why I upvoted him). My case was more complicated and required the most generic approach I could come up with, i.e., transaction and concurrency couldn't handle it.

 BASE SEARCH ... 
 | eval earliest=Start
 | eval latest=End
 | eval duration = latest - earliest
 | eval start_time_min = floor(earliest - (earliest % 60))
 | eval end_time_min   = floor(latest - (latest % 60))
 | eval time_bins = mvrange(start_time_min, end_time_min + 1, 60)
 | mvexpand time_bins 
 | eval duration2 = if(start_time_min == end_time_min, duration, if(start_time_min == time_bins, round(start_time_min + 60 - earliest, 3), if(end_time_min == time_bins, round(latest - end_time_min, 3), 60))) 
 | rename time_bins as _time
 | table _time duration2 Field_Name Unique_Visitor_ID
 | eval _span = 60
 | ... do stats or whatever you need

sideview
SplunkTrust
SplunkTrust

1) First, it's worth saying that if it weren't for the AAA vs BBB thing, this would be a very straightforward use case for the concurrency command.

eval duration=End-Start | concurrency start=Start duration=duration | timechart max(concurrency)

2) But you need a split by. The need to "split by" while calculating the concurrency, makes it complicated. Fortunately this is fairly well trod, if extremely advanced territory. And this approach will be much faster than the kind of approach that needs things like map and gentimes and makecontinuous.

eval duration=End-Start 
| eval increment = mvappend("1","-1") 
| mvexpand increment 
| eval _time = if(increment==1, _time, _time + duration) 
| sort 0 + _time 
| fillnull Field_Name value="NULL" 
| streamstats sum(increment) as post_concurrency by Field_Name 
| eval concurrency = if(increment==-1, post_concurrency+1, post_concurrency) 
| timechart bins=400 max(concurrency) as max_concurrency last(post_concurrency) as last_concurrency by Field_Name limit=30 
| filldown last_concurrency* 
| foreach "max_concurrency: *" [eval <<MATCHSTR>>=coalesce('max_concurrency: <<MATCHSTR>>','last_concurrency: <<MATCHSTR>>')] 
| fields - last_concurrency* max_concurrency*

3) But you have a third wrinkle I think, that if the same user shows up for the same resource more than once in a given time period, you don't want to double count them? Assuming I'm right you need to be a little more careful how the counting gets done. I've taken a crack at that here, with an extra streamstats and an extra eval trying to cut away the double counting.

eval duration=End-Start 
| eval increment = mvappend("1","-1") 
| mvexpand increment 
| eval _time = if(increment==1, _time, _time + duration) 
| sort 0 + _time 
| fillnull Field_Name value="NULL" 
| streamstats sum(increment) as post_concurrency_per_user by Field_Name Unique_Visitor_ID 
| eval post_concurrency=if(post_concurrency_per_user>1,1,0) 
| streamstats sum(post_concurrency) as post_concurrency by Field_Name 
|eval concurrency = if(increment==-1, post_concurrency+1, post_concurrency) 
| timechart bins=400 max(concurrency) as max_concurrency last(post_concurrency) as last_concurrency by Field_Name limit=30 
| filldown last_concurrency* 
| foreach "max_concurrency: *" [eval <<MATCHSTR>>=coalesce('max_concurrency: <<MATCHSTR>>','last_concurrency: <<MATCHSTR>>')] 
| fields - last_concurrency* max_concurrency*

rjthibod
Champion

Thanks @sideview.

This is the newer path I took a while ago in our app, but it was even more complicated than what you came up with scenario 3). If you are interested, you can go check out some of the macros in our app (Layer8Insight App for Splunk, splunkbase.splunk.com/app/3171

We pre-calculate the time bins down to the minute during which a user touches the resources, use mvexpand to expand that set of time bins into events, than run the various aggregations over those events.

I would say come to my talk at .conf, but it got rejected. Was trying to present on a generic approach to handle durational events/data instead of the discrete/sampled data most people are used to in Splunk. Unfortunately, that topic wasn't accepted. 😞

0 Karma

rjthibod
Champion

This is the old, non-scalable answer for my very specific use case. Look at accepted answer for more useful, generic answer.

My solution in the comments of the original post did work, but it was extremely inefficient because gentimes is invoked so many times. Instead of massaging gentimes and map to work in an odd manner, I just wrote my own command.

Below is my python code for my new command that did what I wanted to do.

Assume the command is called tranexpand. It is invoked with fields span and fields where span is the size of the time buckets for the events parsed by the command and fields is a comma-separated list of fields that need to be maintained in the output data. An example invocation looks like tranexpand span=30m fields="field1,field2". The command assumes the events being passed to it include fields starttime and endtime. These fields indicate the start and end time bucket for the event. For example, if a transaction/event ended at 8:15:12am and ended at 8:37:35am and the bucket size is 30 minutes, the starttime field would be the equivalent of 8:00:00am and the endtime field would be the equivalent of 8:30:00am.

import re
import splunk.Intersplunk

def getSpan(val):
    if not val:
        return None
    match = re.findall("(\d+)([smhd])", val)

    if len(match) > 0:
        val = int(match[0][0])
        units = match[0][1]
        # don't do anything for units == 's', val doesn't need to change
        if units == 'm':
            val *= 60
        elif units == 'h':
            val *= 3600
        elif units == 'd':
            val *= (24 * 3600)
        return val
    return None


def generateNewEvents(results, settings):
    try:
        keywords, argvals = splunk.Intersplunk.getKeywordsAndOptions()
        spanstr = argvals.get("span", None)
        span = getSpan(spanstr)
        fields = argvals.get("fields", None)

        if not span:
            return splunk.Intersplunk.generateErrorResults(
                "generateNewEvents requires span=val[s|m|h|d]")

        if not fields:
            return splunk.Intersplunk.generateErrorResults(
                "generateNewEvents requires comma separated" +
                " field list wrapped in quotes: fields=\"A[,B[...]]\"")

        fields = fields.split(',')

        new_results = []

        # for each result, add fields set to message
        for r in results:
            start = r.get("starttime", None)
            end = r.get("lasttime", None)

            if (start is not None) and (end is not None):
                try:
                    start = int(float(start))
                    end = int(float(end)) + 1

                    for x in range(start, end, span):
                        new_event = {}
                        new_event['_time'] = str(x)
                        for y in fields:
                            new_event[y] = r.get(y, None)
                        new_results.append(new_event)
                except:
                    pass

        results = new_results

    except Exception, e:
        import traceback
        stack = traceback.format_exc()
        results = splunk.Intersplunk.generateErrorResults(
            str(e) + ". Traceback: " + str(stack))

    return results

results, dummyresults, settings = splunk.Intersplunk.getOrganizedResults()
results = generateNewEvents(results, settings)
splunk.Intersplunk.outputResults(results)
0 Karma

lguinn2
Legend

As an alternative, you might look at this question, which has a couple of suggestions for how to build a gantt chart in Splunk

Building a gantt chart

and this free app

Gantt chart visualization

I haven't used either of these...

0 Karma

rjthibod
Champion

Thank you, I have seen those before.

The need I am addressing here is not to build a gantt chart. l could do that with some JS with my original data. Instead, I need to count concurrent users across the specific field value in the data.

0 Karma

lguinn2
Legend

[Answer edited to correct some typos and where I used wrong field names]

First, I have not used map and gentimes much, but I think this is what you want:

| map maxsearches=1000 
      search="gentimes start=$start_time$ end=$end_time$ increment=15m 
     | fields starttime endtime $FieldName$ $UnqiueID$" 

I have played around with this, and I think that the mvcombine and mvexpand could be a problem - but I would need to see the output of your GENERATE_TRANSACTION_TABLE to figure it out.

I would probably do it this way:

GENERATE_TRANSACTION_TABLE 
| fields start end FieldName UniqueID  
| dedup start end FieldName UniqueID 
| bin start span=15m 
| bin end span=15m
| sort UniqueID FieldName start
| streamstats current=f window=2 earliest(timestamp) as last_time by UniqueID FieldName
| eval timestamp=if(isnotnull(last_time),relative(last_time,"+15m"),start)
| rename timestamp as _time 
| timechart span=15m dc(UniqueID) as Count by FieldName

Finally, I don't know how you are generating your transactions, but it will almost certainly be faster if you can use the stats command rather than the transaction command. If you can. And the table command (on line 2) is not accomplishing anything for you in your original search - if you are trying to optimize field extraction, use fields instead - or just leave it out.

Kenshiro70
Path Finder

This worked for me in diagnosing concurrent API calls from customers.

I consider myself reasonably good at Splunk, but wow.

(bows in the presence of a Splunk master)

0 Karma

rjthibod
Champion

I appreciate your help and the suggestions.

  • The comment about table versus fields is correct, thank you for that.
  • The transactions cannot be generated using stats. Also, that is not the bottleneck in my testing, it is always gentimes
  • Your suggestion for changing gentimes is not helping speed it up. In fact, if I trim it down to just run gentimes start=$start$ end=$end$ increment=15m, it still takes nearly a second for each invocation (see comment above)
  • I am still testing out your suggested approach. Will come back with results in a bit.
0 Karma

rjthibod
Champion

Can you confirm the streamstats command in your example? Are you sure you meant to use the field "timestamp" in the call to earliest? That field does not yet exist at this point in the pipeline.

0 Karma

lguinn2
Legend
| streamstats current=f window=1 earliest(timestamp) as last_time by UniqueID FieldName

Try changing the window to 1

And yes, I meant to use the field timestamp - it won't exist for the first event, but it will for subsequent events. Note that I test for a null value of last_time in the following command.

Sorry about the typos in the original answer, I have updated it. My only excuse is that I had minor surgery the other day, so I was taking pain meds. 🙂

0 Karma

rjthibod
Champion

I am sorry but the streamstats command you provided does not produce a timestamp field when it doesn't exist. Just tested it to confirm.

Separately, I don't quite understand what you are trying to accomplish with that command. Please clarify. If it helps, look at the sample data I posted in the original question to demonstrate your intention.

0 Karma

rjthibod
Champion

Playing around with what you provided seems to not quite get there. If you look at the original posting, I have added some sample data that should show the extent of the issue.

0 Karma

woodcock
Esteemed Legend

Given the sample data, give me the desired output (I am not at all following what you are explaining).

0 Karma

rjthibod
Champion

Sorry, I hope this can help.

The following is what a segment of events looks like when the transactions have been made

    start               end             FieldName       UniqueID        
1434283638.50   1434283944.90   Resource_AA     USER_YY
1434284009.43   1434284172.20   Resource_AA     USER_YY
1434284178.02   1434284240.24   Resource_AA     USER_YY
1434353495.28   1434353616.58   Resource_BB     USER_XX
1434353671.64   1434353753.25   Resource_BB     USER_XX
1434353833.93   1434353872.11   Resource_BB     USER_ZZ
1434353852.65   1434353868.99   Resource_AA     USER_ZZ
1434353861.75   1434353884.71   Resource_BB     USER_ZZ
1434353852.60   1434353931.20   Resource_AA     USER_ZZ
1434353902.17   1434353915.57   Resource_BB     USER_ZZ
1434353907.88   1434354047.3          Resource_AA       USER_ZZ
1434354040.52   1434354077.74   Resource_BB     USER_XX 

This is what that series of data should look after the call to 'dedup'

     start              end                   FieldName         UniqueID
06/14/2015:13:00:00     06/14/2015:13:00:00     Resource_AA     USER_YY
06/14/2015:13:00:00     06/14/2015:13:15:00     Resource_AA     USER_YY
06/14/2015:13:15:00     06/14/2015:13:15:00     Resource_AA     USER_YY
06/15/2015:08:30:00     06/15/2015:08:30:00     Resource_BB     USER_XX
06/15/2015:08:30:00     06/15/2015:08:30:00     Resource_BB     USER_ZZ
06/15/2015:08:30:00     06/15/2015:08:30:00     Resource_AA     USER_ZZ

This is what the end result would look like (the unique user count for each resource in a given time bucket)

_time                    Resource_AA      Resource_BB
06/14/2015:13:00:00             1                 0
06/14/2015:13:15:00             1                 0
06/14/2015:08:30:00             1                 2
0 Karma

rjthibod
Champion

Basically, distil the data down to show unique user count per resource per time bucket. The trick is each time bucket can have multiple complete transactions for a single resource from the same user. I can't really change that fact.

0 Karma

rjthibod
Champion

Looked at search.log and I see error messages like the one below every time it says it invokes 'gentimes'. After this message appears, the time until the next cycle is almost a second as seen below.

06-16-2015 02:32:57.375 INFO DispatchThread - Error reading runtime settings: File :C:\Program Files\Splunk\var\run\splunk\dispatch\subsearch___search3_1434418371.2490_1434418377.5\runtime.csv does not exist
06-16-2015 02:32:57.375 INFO DispatchThread - Disk quota = 10485760000
06-16-2015 02:32:58.281 INFO script - Invoked script gentimes with 213 input bytes (0 events). Returned 2 output bytes in 891 ms.

0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In September, the Splunk Threat Research Team had two releases of new security content via the Enterprise ...

New in Observability - Improvements to Custom Metrics SLOs, Log Observer Connect & ...

The latest enhancements to the Splunk observability portfolio deliver improved SLO management accuracy, better ...

Improve Data Pipelines Using Splunk Data Management

  Register Now   This Tech Talk will explore the pipeline management offerings Edge Processor and Ingest ...