Two different sources returning data in the below format.
Source 1 - Determines the time range for a given date based on the execution of a Job, which logically concludes the End of Day in Application.
Source 2 – Events generated in real time for various use cases in the application. EventID1 is generated as part of the Job in Source1.
Source 1
DATE |
Start Time |
End Time |
Day 3 |
2023-09-12 01:12:12.123 |
2023-09-13 01:13:13.123 |
Day 2 |
2023-09-11 01:11:11.123 |
2023-09-12 01:12:12.123 |
Day 1 |
2023-09-10 01:10:10.123 |
2023-09-11 01:11:11.123 |
Source 2
Event type |
Time |
Others |
EventID2 |
2023-09-11 01:20:20.123 |
|
EventID1 |
2023-09-11 01:11:11.123 |
|
EventID9 |
2023-09-10 01:20:30.123 |
|
EventID3 |
2023-09-10 01:20:10.123 |
|
EventID5 |
2023-09-10 01:10:20.123 |
|
EventID1 |
2023-09-10 01:10:10.123 |
|
There are no common fields available to join the two sources other than the time at which the job is executed and at which the EventID1 is generated.
Expectation is to logically group the events based on Date and derive the stats for each day.
I'm new to Splunk and i would really appreciate if you guys can provide suggestions on how to handle this one.
Expected Result
Date |
Events |
Count |
Day 1 |
EventID1 |
1 |
Day 2 |
EventID1 |
1 |
Actually it's not so hard to achieve.
Here is another example, where I have added another Day 0 and some event dates inside and outside any day.
| makeresults
| eval _raw="DATE,Start_Time,End_Time
Day_3,2023-09-12 01:12:12.003,2023-09-13 01:13:13.993
Day_2,2023-09-11 01:11:11.002,2023-09-12 01:12:12.992
Day_1,2023-09-10 01:10:10.001,2023-09-11 01:11:11.991
Day_0,2023-09-04 01:12:12.000,2023-09-06 17:22:13.990"
| multikv forceheader=1
| table DATE Start_Time End_Time
| eval _time = strptime(Start_Time, "%F %T.%Q")
| eval end = strptime(End_Time, "%F %T.%Q"), start=_time
| append [
| makeresults
| eval _raw="Event type,Time,Others
EventID2,2023-09-11 01:20:20.133, ``` INSIDE DAY 2 ```
EventID1,2023-09-11 01:11:11.132, ``` INSIDE DAY 2 ```
EventID9,2023-09-10 01:20:30.131, ``` INSIDE DAY 1 ```
EventID3,2023-09-10 01:20:10.130, ``` INSIDE DAY 1 ```
EventID5,2023-09-10 01:10:20.129, ``` INSIDE DAY 1 ```
EventID1,2023-09-10 01:10:10.128, ``` INSIDE DAY 1 ```
EventID4,2023-09-07 01:10:10.127, ``` OUTSIDE ANY ```
EventID3,2023-09-06 06:10:10.126, ``` INSIDE DAY 0 ```
EventID2,2023-09-05 19:10:10.125, ``` INSIDE DAY 0 ```
EventID1,2023-09-04 04:10:10.124, ``` INSIDE DAY 0 ```
EventID0,2023-09-04 01:10:10.123," ``` OUTSIDE ANY ```
| multikv forceheader=1
| table Event_type Time
| eval _time = strptime(Time, "%F %T.%Q")
| eval eventTime=_time
| fields - Time
]
| sort _time
| filldown DATE start end
| eval eventIsInside=case(isnull(Event_type), "YES", isnotnull(Event_type) AND _time>=start AND _time<=end, "YES", 1==1, "NO")
| where eventIsInside="YES"
| stats values(*_Time) as *_Time list(Event_type) as eventIDs list(eventTime) as eventTimes by DATE
| eval eventTimes=strftime(eventTimes, "%F %T.%Q")
| table DATE Start_Time End_Time eventIDs eventTimes
You can see that this works by making a common time, which is based on either start time or event time and then sorting by time.
Setting start and end epoch times for the source 1 data means you can then 'filldown' those fields to subsequent event (source 2) rows until the next source 1 Day.
Then as each event source 2 now has the preceeding day's start/end time, it can make the comparison for it's own time.
Hope this helps.
If you paste this into your search window, you can see it being done with your example dataset
| makeresults
| eval _raw="DATE,Start_Time,End_Time
Day_3,2023-09-12 01:12:12.123,2023-09-13 01:13:13.123
Day_2,2023-09-11 01:11:11.123,2023-09-12 01:12:12.123
Day_1,2023-09-10 01:10:10.123,2023-09-11 01:11:11.123"
| multikv forceheader=1
| table DATE Start_Time End_Time
| eval _time = relative_time(strptime(Start_Time, "%F %T.%Q"), "@d")
| append [
| makeresults
| eval _raw="Event type,Time,Others
EventID2,2023-09-11 01:20:20.123,
EventID1,2023-09-11 01:11:11.123,
EventID9,2023-09-10 01:20:30.123,
EventID3,2023-09-10 01:20:10.123,
EventID5,2023-09-10 01:10:20.123,
EventID1,2023-09-10 01:10:10.123,"
| multikv forceheader=1
| table Event_type Time
| eval _time = strptime(Time, "%F %T.%Q")
| fields - Time
]
| bin _time span=1d
| stats list(*) as * count by _time
but the way you should do this is to
search source1 OR search source2
| eval _time = if(event=from_source_1,
relative_time(strptime(Start_Time, "%F %T.%Q"), "@d"),
strptime(Time, "%F %T.%Q"))
| bin _time span=1d
| stats list(*) as * count by _time
so this will create a _time field for the source 1 events that is the start of the day, it creates a _time field based on source 2 event times and then uses BIN to create a 1 day grouping and then stats list to collect them together. Count will always be one more than the source 2 events.
Note that this
@bowesmana thanks for your inputs.
source 2 events are not tied to the physical clock and a single day in application could span multiple days in calendar or multiple days in application can fit in a single calendar day time frame.
i'm exploring the option of populating these two sources separately in dashboard and try to pass the source 1 date/time as inputs to source 2 and get the events by each logical date.
Actually it's not so hard to achieve.
Here is another example, where I have added another Day 0 and some event dates inside and outside any day.
| makeresults
| eval _raw="DATE,Start_Time,End_Time
Day_3,2023-09-12 01:12:12.003,2023-09-13 01:13:13.993
Day_2,2023-09-11 01:11:11.002,2023-09-12 01:12:12.992
Day_1,2023-09-10 01:10:10.001,2023-09-11 01:11:11.991
Day_0,2023-09-04 01:12:12.000,2023-09-06 17:22:13.990"
| multikv forceheader=1
| table DATE Start_Time End_Time
| eval _time = strptime(Start_Time, "%F %T.%Q")
| eval end = strptime(End_Time, "%F %T.%Q"), start=_time
| append [
| makeresults
| eval _raw="Event type,Time,Others
EventID2,2023-09-11 01:20:20.133, ``` INSIDE DAY 2 ```
EventID1,2023-09-11 01:11:11.132, ``` INSIDE DAY 2 ```
EventID9,2023-09-10 01:20:30.131, ``` INSIDE DAY 1 ```
EventID3,2023-09-10 01:20:10.130, ``` INSIDE DAY 1 ```
EventID5,2023-09-10 01:10:20.129, ``` INSIDE DAY 1 ```
EventID1,2023-09-10 01:10:10.128, ``` INSIDE DAY 1 ```
EventID4,2023-09-07 01:10:10.127, ``` OUTSIDE ANY ```
EventID3,2023-09-06 06:10:10.126, ``` INSIDE DAY 0 ```
EventID2,2023-09-05 19:10:10.125, ``` INSIDE DAY 0 ```
EventID1,2023-09-04 04:10:10.124, ``` INSIDE DAY 0 ```
EventID0,2023-09-04 01:10:10.123," ``` OUTSIDE ANY ```
| multikv forceheader=1
| table Event_type Time
| eval _time = strptime(Time, "%F %T.%Q")
| eval eventTime=_time
| fields - Time
]
| sort _time
| filldown DATE start end
| eval eventIsInside=case(isnull(Event_type), "YES", isnotnull(Event_type) AND _time>=start AND _time<=end, "YES", 1==1, "NO")
| where eventIsInside="YES"
| stats values(*_Time) as *_Time list(Event_type) as eventIDs list(eventTime) as eventTimes by DATE
| eval eventTimes=strftime(eventTimes, "%F %T.%Q")
| table DATE Start_Time End_Time eventIDs eventTimes
You can see that this works by making a common time, which is based on either start time or event time and then sorting by time.
Setting start and end epoch times for the source 1 data means you can then 'filldown' those fields to subsequent event (source 2) rows until the next source 1 Day.
Then as each event source 2 now has the preceeding day's start/end time, it can make the comparison for it's own time.
Hope this helps.
Thanks @bowesmana this is closer to what i'm trying to achieve and has given me some idea on how to work on splunk searches of this complexity.
the if (event=from_source1... is a test you will have to make using whatever fields you have to indicate the data is a souce1 event (source/sourcetype/index?)
If you need to only take source 2 events that are inside the source1 window or it can span more than one day, you'll have to do it a bit differently