Splunk Search

SPL OPTIMZATION

Kenny_splunk
Explorer

Hey guys, so I was wondering if anyone had any idea how to optimize this query to minimize the sub searches. 

My brain hurts just looking at it honestly, for all the SPL Pros please lend a hand if possible. 

 

index=efg* * | search EVENT_TYPE=FG_EVENTATTR AND ((NAME=ConsumerName AND VALUE=OneStream) OR NAME=ProducerFilename OR NAME=OneStreamSubmissionID OR NAME=ConsumerFileSize OR NAME=RouteID)
| search | where trim(VALUE)!=""
| eval keyValuePair=mvzip(NAME,VALUE,"=")
| eval efgTime=min(MODIFYTS)
```We need to convert EDT/EST timestamps to UTC time.```
| eval EST_time=strptime(efgTime,"%Y-%m-%d %H:%M:%S.%N")

```IMPORTANT STEP: During EDT you add 14400 to convert to UTC; during EST you add 18000. (We need to automate this step in the code.)```

| eval tempTime = EST_time
| eval UTC_time=strftime(tempTime, "%Y-%m-%d %H:%M:%S.%1N")
| stats values(*) as * by ARRIVEDFILE_KEY
| eval temptime3=min(UTC_time)
| eval keyValuePair=mvappend("EFG_Delivery_Time=".temptime3, keyValuePair)
| eval keyValuePair=mvsort(keyValuePair)

```Let's extract our values now.```

| eval tempStr_1 = mvfilter(LIKE(keyValuePair, "%ConsumerFileSize=%"))
| eval tempStr_2 = mvfilter(LIKE(keyValuePair, "%EFG_Delivery_Time=%"))
| eval tempStr_3 = mvfilter(LIKE(keyValuePair, "%OneStreamSubmissionID=%"))
| eval tempStr_4 = mvfilter(LIKE(keyValuePair, "%ProducerFilename=%"))
| eval tempStr_5 = mvfilter(LIKE(keyValuePair, "%RouteID=%"))

```Now, let's assign the values to the right field name.```

| eval "File Size"=ltrim(tempStr_1,"ConsumerFileSize=")
| eval "EFG Delivery Time"=ltrim(tempStr_2,"EFG_Delivery_Time=")
| eval "Submission ID"=substr(tempStr_3, -38)
| eval "Source File Name"=ltrim(tempStr_4,"ProducerFilename=")
| eval "Route ID"=ltrim(tempStr_5,"RouteID=")

```Bring it all together! (Join EFG data to the data in the OS lookup table.```

| search keyValuePair="*OneStreamSubmissionID*"
| rename "Submission ID" as Submission_ID
| rename "Source File Name" as Source_File_Name

| join type=left max=0 Source_File_Name [ search index=asvsdp* source=Watcher_Delivery_Status sourcetype=c1_json event_code=SINK_DELIVERY_COMPLETION (sink_name=onelake-delta-table-sink OR sink_name=onelake-table-sink OR onelake-direct-sink)
| eval test0=session_id
| eval test1=substr(test0, 6)
| eval o=len(test1)
| eval Quick_Check=substr(test1, o-33, o)
| eval p=if(like(Quick_Check, "%-%"), 35, 33)
| eval File_Name_From_Session_ID=substr(test1, 1, o-p)
| rename File_Name_From_Session_ID as Source_File_Name
```| lookup DFS-EFG-SDP-lookup_table_03.csv local=true Source_File_Name AS Source_File_Name OUTPUT Submission_ID, OS_time, BAP, Status```
| join type=left max=0 Source_File_Name [ search index=asvexternalfilegateway_summary * | table Source_File_Name, Submission_ID, Processed_time, OS_time, BAP, Status ]
| table event_code, event_timestamp, session_id, sink_name, _time, Source_File_Name, Submission_ID, OS_time, BAP, Status | search "Source_File_Name" IN (*OS.AIS.COF.DataOne.PROD*, *fgmulti_985440_GHR.COF.PROD.USPS.CARD*, *COF-DFS*) ]

```| lookup DFS-EFG-SDP-lookup_table_03.csv Submission_ID AS Submission_ID OUTPUT Processed_time, OS_time, BAP, Status```

| join type=left max=0 Submission_ID [ search index=asvexternalfilegateway_summary * | table Submission_ID, Processed_time, OS_time, BAP, Status ]

| eval "Delivery Status"=if(event_code="SINK_DELIVERY_COMPLETION","DELIVERED","FAILED")
| eval BAP = upper(BAP)
```| rename Processed_time as "OL Delivery Time"
| eval "OL Delivery Time"=if('Delivery Status'="FAILED","Failed at OneStream",'OL Delivery Time')```
| rename OS_time as "OS Delivery Time"
```Display consolidated data in tabular format.```

| eval "OL Delivery Time"=strftime(event_timestamp/1000, "%Y-%m-%d %H:%M:%S.%3N")

``` Convert OS timestamp from UTC EST/EDT ```
| eval OS_TC='OS Delivery Time'
| eval OS_UTC_time=strptime(OS_TC,"%Y-%m-%d %H:%M:%S.%3N")
```IMPORTANT STEP: During EDT you add 14400 to convert to UTC; during EST you add 18000. (We need to automate this step in the code.)```
| eval tempTime_2 = OS_UTC_time - 18000
```| eval tempTime = EST_time```
| eval "OS Delivery Time"=strftime(tempTime_2, "%Y-%m-%d %H:%M:%S.%3N")

``` Convert OL timestamp from UTC EST/EDT ```
| eval OL_UTC_time=strptime('OL Delivery Time',"%Y-%m-%d %H:%M:%S.%3N")
```IMPORTANT STEP: During EDT you add 14400 to convert to UTC; during EST you add 18000. (We need to automate this step in the code.)```
| eval tempTime_3 = OL_UTC_time - 18000
```| eval tempTime = EST_time```
| eval "OL Delivery Time"=strftime(tempTime_3, "%Y-%m-%d %H:%M:%S.%3N")

| rename Source_File_Name as "Source File Name"
| rename Submission_ID as "Submission ID"
| fields BAP "Route ID" "Source File Name" "File Size" "EFG Delivery Time" "OS Delivery Time" "OL Delivery Time" "Delivery Status" "Submission ID"
``` | search Source_File_Name IN (*COF-DFS*)```

| append [ search index=efg* source=efg_prod_summary sourcetype=stash STATUS_MESSAGE=Failed ConsumerName=OneStream | eval BAP=upper("badiscoverdatasupport")
| eval "Delivery Status"="FAILED", "Submission ID"="--"
| rename RouteID as "Route ID", SourceFilename as "Source File Name", FILE_SIZE as "File Size", ArrivalTime as "EFG Delivery Time"
| table BAP "Route ID" "Source File Name" "File Size" "EFG Delivery Time" "OS Delivery Time" "OL Delivery Time" "Delivery Status" "Submission ID"
| search "Source File Name" IN (*OS.AIS.COF.DataOne.PROD*, *fgmulti_985440_GHR.COF.PROD.USPS.CARD*, *COF-DFS*) ]

| sort -"EFG Delivery Time"
| search "Source File Name" IN (*OS.AIS.COF.DataOne.PROD*, *fgmulti_985440_GHR.COF.PROD.USPS.CARD*, *COF-DFS*)
| dedup "Submission ID"
Labels (1)
0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

What sort of optimisation are you try to do?

My guess is that you are trying to remove all the joins?

It would help immensely if you could share some raw events from your various different sources which demonstrate the sort of result you are trying to achieve with your search, and describe in non-SPL terms what it is that you are trying to achieve, for example, what your example result would look like and the relationship between the results and the various input events.

Also, what have you already tried in terms of "optimisation"?

0 Karma
Get Updates on the Splunk Community!

Unlock New Opportunities with Splunk Education: Explore Our Latest Courses!

At Splunk Education, we’re dedicated to providing top-tier learning experiences that cater to every skill ...

Technical Workshop Series: Splunk Data Management and SPL2 | Register here!

Hey, Splunk Community! Ready to take your data management skills to the next level? Join us for a 3-part ...

Spotting Financial Fraud in the Haystack: A Guide to Behavioral Analytics with Splunk

In today's digital financial ecosystem, security teams face an unprecedented challenge. The sheer volume of ...