Splunk Search

Can you help me with event time subtraction?

zacksoft
Contributor

Need help with the following scenario.

I want to be able to know how many users and how long each user was logged-in in a specified time frame.

The steps could be :

a) Identify a user's (lets call the user 'X') log in time from the event timestamp by searching the event with the key word X and "log-in success"

b) Identify the log out time of the user X from the event timestamp with the event containing the keyword X and "log-out success" OR X and "session-time-out" whichever comes FIRST AFTER THE EVENT of log-in .

c) Then subtract the time between b) and a) for each user and the to put it in a chart.

I could use some suggestions on how to proceed with it.

Tags (1)
0 Karma

DalJeanis
Legend

Okay, here's how you go about this

1) identify a time when you yourself logged in, logged out, and timed out. Go search up the exact form of the records.
2) set up a search that gets records that looks like that and properly extracts the userids. Validate that the userids are present on all three kinds of records.

Then, here's the pseudocode for one way you can do your search...

  (index=foo1 sourcetype=bar1 ... your search for login records) OR
  (index=foo2 sourcetype=bar2 ... your search for logout records) OR
  (index=foo3 sourcetype=bar3 ... your search for timeout records) 
  | fields .... list all fields you might need from any of the three record types
  | eval login_time=case(it/is/a/login/record, _time)
  | eval logout_time=case(it/is/a/logout/record, _time, 
        it/is/a/timeout/record, _time)
  | eval matchkey=case(it/is/a/login/record, login_userid_field,
        it/is/a/logout/record, logout_userid_field, 
        it/is/a/timeout/record, timeout_userid_field)
 | sort 0 _time
 | streamstats window=2 global=t last(login_time) as login_time by matchkey
 | where it/is/not/a/login/record
 | eval duration=logout_time - login_time
 | where isnotnull(duration)

Basically, we are using streamstats to roll the login time from the login records to the logout records, then throwing away the login records. This ignores the question of whether there might be multiple logouts for a given login... any duplicate logouts after the first will be null and discarded.

What you are left with is the first logout for any given logon. Any logons without logoffs will not appear.

1) You could reverse the sort order and make minor adjustments, if you wanted the start of session rather than the end. Or, you could just subtract the duration off the _time and resort.

2) If you want to generate reports of concurrency over time, then calculate the start and end time and add code similar to this on the end...

 | fields start_time end_time
 | eval myfan=mvrange(0,2)
 | mvexpand myfan
 | eval _time=case(myfan=0,start_time, myfan=1,end_time)
 | eval addOne=case(myfan=0,+1)
 | eval killOne=case(myfan=1,+1)
 | bin _time span=1m
 | stats sum(addone) as addSome sum(killOne) as killSome by _time
 | eval netSessions=addSome-killSome
 | streamstats sum(netSessions) as activeSessions 

zacksoft
Contributor

I am a bit confused with the case statement here, not sure how to implement

(sourcetype="bsa:phutan:alogs" AX90900 AND "login passed") OR
   (sourcetype="bsa:phutan:alogs" AX90900 AND "logged out") OR
   (sourcetype="bsa:phutan:alogs" AX90900 AND "timed out") 
   | fields _time  **<==== What fields should I fetch here**
   | eval login_time=case(it/is/a/login/record, _time) **<==  How do the 'it/is/a/login/record' part going to fetch the login event's time ?**
   | eval logout_time=case(it/is/a/logout/record, _time, 
         it/is/a/timeout/record, _time)
   | eval matchkey=case(it/is/a/login/record, login_userid_field,
         it/is/a/logout/record, logout_userid_field, 
         it/is/a/timeout/record, timeout_userid_field)
  | sort 0 _time
  | streamstats window=2 global=t last(login_time) as login_time by matchkey
  | where it/is/not/a/login/record
  | eval duration=logout_time - login_time
  | where isnotnull(duration)

I am hardcoding the userid here (AX90900) but I would be needing some regex I guess for dynamic extractions of user ids from events and login/logout/timeout are in three different format events , so may be three different type regex expression required. I can build the regex, just need to know where to implement here in the code !

0 Karma

DalJeanis
Legend

Something like this, perhaps.

 sourcetype="bsa:phutan:alogs" AX90900 AND "login passed") OR
(sourcetype="bsa:phutan:alogs" AX90900 AND "logged out") OR
(sourcetype="bsa:phutan:alogs" AX90900 AND "timed out") 
| rex "(?<rectype>login passed|logged out|timed out)"
| fields _time rectype
| eval login_time=case(rectype="login passed", _time) 
| eval logout_time=case(rectype="logged out", _time, 
      rectype="timed out", _time)

| rename COMMENT as "extract your userid field here before this line"
| eval matchkey=case(rectype="login passed", login_userid_field,
      rectype="logged out", logout_userid_field, 
      rectype="timed out", timeout_userid_field)


| rename COMMENT as "copy forward the most recent logon to each record, then throw away the logon records."
| sort 0 _time
| streamstats window=2 global=t last(login_time) as login_time by matchkey
| where rectype!="login passed"

| rename COMMENT as "calculate the duration and throw away records without one."
| eval duration=logout_time - login_time
| where isnotnull(duration)
0 Karma

zacksoft
Contributor

Thank you. I am going step by step eradicating small bugs in the data and tuning the commands accordingly. Now I am stuck here.
What does the following statement mean ? And how do we implement it?

| rename COMMENT as "copy forward the most recent logon to each record, then throw away the logon records."
0 Karma
Get Updates on the Splunk Community!

Announcing Scheduled Export GA for Dashboard Studio

We're excited to announce the general availability of Scheduled Export for Dashboard Studio. Starting in ...

Extending Observability Content to Splunk Cloud

Watch Now!   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to leverage ...

More Control Over Your Monitoring Costs with Archived Metrics GA in US-AWS!

What if there was a way you could keep all the metrics data you need while saving on storage costs?This is now ...