Splunk Search

Need help in formulate query for our use case

kapoorsumit2020
Loves-to-Learn Everything

Team,

I'm newbie in writing Splunk queries. Could you please provide me guidance how to design a SPL for below use case.

Here are sample logs:

AIRFLOW_CTX_DAG_OWNER=Prathibha
AIRFLOW_CTX_DAG_ID=M_OPI_NPPV_NPPES
AIRFLOW_CTX_TASK_ID=NPPES_INSERT
AIRFLOW_CTX_EXECUTION_DATE=2021-12-08T18:57:24.419709+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-08T18:57:24.419709+00:00

[2021-12-08 19:12:59,923] {{cursor.py:696}} INFO - query: [INSERT OVERWRITE INTO IDRC_OPI_DEV.CMS_BDM_OPI_NPPES_DEV.OH_IN_PRVDR_NPPES SELEC...]
[2021-12-08 19:13:13,514] {{cursor.py:720}} INFO - query execution done
[2021-12-08 19:13:13,570] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,570] {{snowflake.py:277}} INFO - Rows affected: 1
[2021-12-08 19:13:13,592] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,592] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12da-0000-0024028474a6
[2021-12-08 19:13:13,612] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,612] {{snowflake.py:277}} INFO - Rows affected: 7019070
[2021-12-08 19:13:13,632] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,632] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12ce-0000-002402848486
[2021-12-08 19:13:13,811] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=M_OPI_NPPV_NPPES, task_id=NPPES_INSERT, execution_date=20211208T185724, start_date=20211208T191256, end_date=20211208T191313
[2021-12-08 19:13:13,868] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,867] {{local_task_job.py:146}} INFO - Task exited with return code

Expected output in tabular form:

DAG_ID                                      TASK_ID                       STATUS                      ROWS_EFFECTED
M_OPI_NPPV_NPPES         NPPES_INSERT         SUCCESS                  1,7019070

Thanks,

Sumit

Labels (1)
0 Karma

kapoorsumit2020
Loves-to-Learn Everything

When I run following SPL without \n it doesn't yield any result. 

| transaction startswith="AIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"........

However :

| transaction startswith="\\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"........

or

| transaction startswith="\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"........

both yields same results.

'\n' is added in Splunk airflow logs (in JSON) when the airflow logs are being pushed to Splunk. That's why I added \\n before AIRFLOW_CTX_DAG_OWNER :  

Airflow logs:

[2021-12-13 20:20:13,145] {{logging_mixin.py:104}} INFO - Running <TaskInstance: any_bash_command_dag.bash_command 2021-12-13T20:20:10.033766+00:00 [running]> on host ip-10-223-50-200.ec2.internal
[2021-12-13 20:20:13,567] {{taskinstance.py:1283}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=any_bash_command_dag
AIRFLOW_CTX_TASK_ID=bash_command
AIRFLOW_CTX_EXECUTION_DATE=2021-12-13T20:20:10.033766+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T20:20:10.033766+00:00

Airflow logs in Splunk:

2021-12-13T20:18:02.837Z 2b98fffe-7c1b-4dff-aedf-101c04bcd17d INFO Decoded payload: {
"messageType": "DATA_MESSAGE",
"owner": "765460745490",
"logGroup": "airflow-OnePIAirflowEnvironment-DEV-Task",
"logStream": "any_bash_command_dag/bash_command/2021-12-13T20_18_00.187744+00_00/1.log",
"subscriptionFilters": [
"airflow_2_splunk_task"
],
"logEvents": [
{
"id": "36560436722252801980494271713377497999438999533834272768",
"timestamp": 1639426682686,
"message": "[2021-12-13 20:18:02,686] {{taskinstance.py:1283}} INFO - Exporting the following env vars:\nAIRFLOW_CTX_DAG_OWNER=airflow\nAIRFLOW_CTX_DAG_ID=any_bash_command_dag\nAIRFLOW_CTX_TASK_ID=bash_command\nAIRFLOW_CTX_EXECUTION_DATE=2021-12-13T20:18:00.187744+00:00\nAIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T20:18:00.187744+00:00"
}
]
}

0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

You could try something like this (the part before the blank lines just sets up some dummy data):

| makeresults 
| eval _raw="AIRFLOW_CTX_DAG_OWNER=Prathibha
AIRFLOW_CTX_DAG_ID=M_OPI_NPPV_NPPES
AIRFLOW_CTX_TASK_ID=NPPES_INSERT
AIRFLOW_CTX_EXECUTION_DATE=2021-12-08T18:57:24.419709+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-08T18:57:24.419709+00:00

[2021-12-08 19:12:59,923] {{cursor.py:696}} INFO - query: [INSERT OVERWRITE INTO IDRC_OPI_DEV.CMS_BDM_OPI_NPPES_DEV.OH_IN_PRVDR_NPPES SELEC...]
[2021-12-08 19:13:13,514] {{cursor.py:720}} INFO - query execution done
[2021-12-08 19:13:13,570] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,570] {{snowflake.py:277}} INFO - Rows affected: 1
[2021-12-08 19:13:13,592] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,592] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12da-0000-0024028474a6
[2021-12-08 19:13:13,612] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,612] {{snowflake.py:277}} INFO - Rows affected: 7019070
[2021-12-08 19:13:13,632] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,632] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12ce-0000-002402848486
[2021-12-08 19:13:13,811] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=M_OPI_NPPV_NPPES, task_id=NPPES_INSERT, execution_date=20211208T185724, start_date=20211208T191256, end_date=20211208T191313
[2021-12-08 19:13:13,868] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,867] {{local_task_job.py:146}} INFO - Task exited with return code"
| multikv noheader=t
| fields _raw



| transaction endswith="AIRFLOW_CTX_DAG_OWNER" keepevicted=t keeporphans=t mvraw=t
| fields _raw
| fields - _time
| rex "AIRFLOW_CTX_DAG_ID=(?<dag_id>.*)"
| rex "AIRFLOW_CTX_TASK_ID=(?<task_id>.*)"
| rex max_match=0 "Rows affected: (?<rows>.*)"
| rex "Marking task as (?<status>[^\.]+)"
| table dag_id task_id status rows
0 Karma

kapoorsumit2020
Loves-to-Learn Everything

Thank you @ITWhisperer . your SPL is working as expected when I feed real data.

I need to add two more columns in table : dag_run_date, dag_run_time, the values of those two column needs to be extracted from this statement but unable to form corresponding rex command . Can you help me on that?

AIRFLOW_CTX_EXECUTION_DATE=2021-12-08T18:57:24.419709+00:00

 

Tags (1)
0 Karma

ITWhisperer
SplunkTrust
SplunkTrust
| rex "AIRFLOW_CTX_EXECUTION_DATE=(?<dag_run_date>\d{4}\-\d{2}\-\d{2})T(?<dag_run_time>\d{2}:\d{2}:\d{2}\.\d{6})"
0 Karma

kapoorsumit2020
Loves-to-Learn Everything

Thanks again @ITWhisperer 

I'm  seeing an issue during additional testing. Can you please review and let me know what wrong am I doing in SPL#2.

SPL#1 : This lists dags, tasks_id, status (SUCCESS/FAILED), Rows_Affected, Task_Start_Date, Task_End_Date correctly:

index=cloud sourcetype=lambda:Airflow2Splunk "\"logGroup\"" "\"airflow-OnePIAirflowEnvironment-DEV-Task\""
| transaction startswith="\\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as"
| rex field=_raw "AIRFLOW_CTX_DAG_OWNER"=(?<dag_owner>\w+)
| rex field=_raw "AIRFLOW_CTX_DAG_ID=(?<dag_id>\w+)"
| rex field=_raw "AIRFLOW_CTX_TASK_ID=(?<task_id>\w+)"
| rex field=_raw max_match=0 "Rows affected: (?<rows_affected>\d+)"
| eval rows_affected=mvjoin(rows_affected, ",")
| rex field=_raw "Marking task as (?<status>[^\.]+)"
| rex field=_raw "start_date=(?<task_start_date>\d{8}T\d{6})"
| rex field=_raw "end_date=(?<task_end_date>\d{8}T\d{6})"
| eval new_start_task_date=strptime(task_start_date,"%Y%m%dT%H%M%S") | eval start_task_date=strftime(new_start_task_date,"%Y-%m-%d %H:%M:%S")
| eval new_end_task_date=strptime(task_end_date,"%Y%m%dT%H%M%S") | eval end_task_date=strftime(new_end_task_date,"%Y-%m-%d %H:%M:%S")
| table dag_owner dag_id task_id status rows_affected start_task_date end_task_date

SPL#2

This should list dags, tasks_id, status (FAILED), Rows_Affected, Task_Start_Date, Task_End_Date correctly:

But this SPL is not working as expected. I can some tasks whose status is SUCCESS also in the list. 

index=cloud sourcetype=lambda:Airflow2Splunk "\"logGroup\"" "\"airflow-OnePIAirflowEnvironment-DEV-Task\""
| transaction startswith="\\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"
| rex field=_raw "AIRFLOW_CTX_DAG_OWNER"=(?<dag_owner>\w+)
| rex field=_raw "AIRFLOW_CTX_DAG_ID=(?<dag_id>\w+)"
| rex field=_raw "AIRFLOW_CTX_TASK_ID=(?<task_id>\w+)"
| rex field=_raw max_match=0 "Rows affected: (?<rows_affected>\d+)"
| eval rows_affected=mvjoin(rows_affected, ",")
| rex field=_raw "Marking task as (?<status>[^\.]+)"
| rex field=_raw "start_date=(?<task_start_date>\d{8}T\d{6})"
| rex field=_raw "end_date=(?<task_end_date>\d{8}T\d{6})"
| eval new_start_task_date=strptime(task_start_date,"%Y%m%dT%H%M%S") | eval start_task_date=strftime(new_start_task_date,"%Y-%m-%d %H:%M:%S")
| eval new_end_task_date=strptime(task_end_date,"%Y%m%dT%H%M%S") | eval end_task_date=strftime(new_end_task_date,"%Y-%m-%d %H:%M:%S")
| table dag_owner dag_id task_id status rows_affected start_task_date end_task_date

Tags (1)
0 Karma

ITWhisperer
SplunkTrust
SplunkTrust

Do you need the \\n before AIRFLOW_CTX_DAG_OWNER?

0 Karma

kapoorsumit2020
Loves-to-Learn Everything

AIRFLOW_CTX_DAG_OWNER is printed like this in splunk logs:

2021-12-13T20:18:02.837Z 2b98fffe-7c1b-4dff-aedf-101c04bcd17d INFO Decoded payload: {
"messageType": "DATA_MESSAGE",
"owner": "765460745490",
"logGroup": "airflow-OnePIAirflowEnvironment-DEV-Task",
"logStream": "any_bash_command_dag/bash_command/2021-12-13T20_18_00.187744+00_00/1.log",
"subscriptionFilters": [
"airflow_2_splunk_task"
],
"logEvents": [
{
"id": "36560436722252801980494271713377497999438999533834272768",
"timestamp": 1639426682686,
"message": "[2021-12-13 20:18:02,686] {{taskinstance.py:1283}} INFO - Exporting the following env vars:\nAIRFLOW_CTX_DAG_OWNER=airflow\nAIRFLOW_CTX_DAG_ID=any_bash_command_dag\nAIRFLOW_CTX_TASK_ID=bash_command\nAIRFLOW_CTX_EXECUTION_DATE=2021-12-13T20:18:00.187744+00:00\nAIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T20:18:00.187744+00:00"
}
]
}

The SPL without \n doesn't yield any result.

index=cloud sourcetype=lambda:Airflow2Splunk "\"logGroup\"" "\"airflow-OnePIAirflowEnvironment-DEV-Task\""
| transaction startswith="AIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED".

....

....

....

0 Karma
Get Updates on the Splunk Community!

Building Reliable Asset and Identity Frameworks in Splunk ES

 Accurate asset and identity resolution is the backbone of security operations. Without it, alerts are ...

Cloud Monitoring Console - Unlocking Greater Visibility in SVC Usage Reporting

For Splunk Cloud customers, understanding and optimizing Splunk Virtual Compute (SVC) usage and resource ...

Automatic Discovery Part 3: Practical Use Cases

If you’ve enabled Automatic Discovery in your install of the Splunk Distribution of the OpenTelemetry ...