Team,
I'm newbie in writing Splunk queries. Could you please provide me guidance how to design a SPL for below use case.
Here are sample logs:
AIRFLOW_CTX_DAG_OWNER=Prathibha
AIRFLOW_CTX_DAG_ID=M_OPI_NPPV_NPPES
AIRFLOW_CTX_TASK_ID=NPPES_INSERT
AIRFLOW_CTX_EXECUTION_DATE=2021-12-08T18:57:24.419709+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-08T18:57:24.419709+00:00
[2021-12-08 19:12:59,923] {{cursor.py:696}} INFO - query: [INSERT OVERWRITE INTO IDRC_OPI_DEV.CMS_BDM_OPI_NPPES_DEV.OH_IN_PRVDR_NPPES SELEC...]
[2021-12-08 19:13:13,514] {{cursor.py:720}} INFO - query execution done
[2021-12-08 19:13:13,570] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,570] {{snowflake.py:277}} INFO - Rows affected: 1
[2021-12-08 19:13:13,592] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,592] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12da-0000-0024028474a6
[2021-12-08 19:13:13,612] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,612] {{snowflake.py:277}} INFO - Rows affected: 7019070
[2021-12-08 19:13:13,632] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,632] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12ce-0000-002402848486
[2021-12-08 19:13:13,811] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=M_OPI_NPPV_NPPES, task_id=NPPES_INSERT, execution_date=20211208T185724, start_date=20211208T191256, end_date=20211208T191313
[2021-12-08 19:13:13,868] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,867] {{local_task_job.py:146}} INFO - Task exited with return code
Expected output in tabular form:
DAG_ID TASK_ID STATUS ROWS_EFFECTED
M_OPI_NPPV_NPPES NPPES_INSERT SUCCESS 1,7019070
Thanks,
Sumit
When I run following SPL without \n it doesn't yield any result.
| transaction startswith="AIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"........
However :
| transaction startswith="\\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"........
or
| transaction startswith="\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"........
both yields same results.
'\n' is added in Splunk airflow logs (in JSON) when the airflow logs are being pushed to Splunk. That's why I added \\n before AIRFLOW_CTX_DAG_OWNER :
Airflow logs:
[2021-12-13 20:20:13,145] {{logging_mixin.py:104}} INFO - Running <TaskInstance: any_bash_command_dag.bash_command 2021-12-13T20:20:10.033766+00:00 [running]> on host ip-10-223-50-200.ec2.internal
[2021-12-13 20:20:13,567] {{taskinstance.py:1283}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=any_bash_command_dag
AIRFLOW_CTX_TASK_ID=bash_command
AIRFLOW_CTX_EXECUTION_DATE=2021-12-13T20:20:10.033766+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T20:20:10.033766+00:00
Airflow logs in Splunk:
2021-12-13T20:18:02.837Z 2b98fffe-7c1b-4dff-aedf-101c04bcd17d INFO Decoded payload: {
"messageType": "DATA_MESSAGE",
"owner": "765460745490",
"logGroup": "airflow-OnePIAirflowEnvironment-DEV-Task",
"logStream": "any_bash_command_dag/bash_command/2021-12-13T20_18_00.187744+00_00/1.log",
"subscriptionFilters": [
"airflow_2_splunk_task"
],
"logEvents": [
{
"id": "36560436722252801980494271713377497999438999533834272768",
"timestamp": 1639426682686,
"message": "[2021-12-13 20:18:02,686] {{taskinstance.py:1283}} INFO - Exporting the following env vars:\nAIRFLOW_CTX_DAG_OWNER=airflow\nAIRFLOW_CTX_DAG_ID=any_bash_command_dag\nAIRFLOW_CTX_TASK_ID=bash_command\nAIRFLOW_CTX_EXECUTION_DATE=2021-12-13T20:18:00.187744+00:00\nAIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T20:18:00.187744+00:00"
}
]
}
You could try something like this (the part before the blank lines just sets up some dummy data):
| makeresults
| eval _raw="AIRFLOW_CTX_DAG_OWNER=Prathibha
AIRFLOW_CTX_DAG_ID=M_OPI_NPPV_NPPES
AIRFLOW_CTX_TASK_ID=NPPES_INSERT
AIRFLOW_CTX_EXECUTION_DATE=2021-12-08T18:57:24.419709+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-08T18:57:24.419709+00:00
[2021-12-08 19:12:59,923] {{cursor.py:696}} INFO - query: [INSERT OVERWRITE INTO IDRC_OPI_DEV.CMS_BDM_OPI_NPPES_DEV.OH_IN_PRVDR_NPPES SELEC...]
[2021-12-08 19:13:13,514] {{cursor.py:720}} INFO - query execution done
[2021-12-08 19:13:13,570] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,570] {{snowflake.py:277}} INFO - Rows affected: 1
[2021-12-08 19:13:13,592] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,592] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12da-0000-0024028474a6
[2021-12-08 19:13:13,612] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,612] {{snowflake.py:277}} INFO - Rows affected: 7019070
[2021-12-08 19:13:13,632] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,632] {{snowflake.py:278}} INFO - Snowflake query id: 01a0d120-0000-12ce-0000-002402848486
[2021-12-08 19:13:13,811] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=M_OPI_NPPV_NPPES, task_id=NPPES_INSERT, execution_date=20211208T185724, start_date=20211208T191256, end_date=20211208T191313
[2021-12-08 19:13:13,868] {{logging_mixin.py:104}} INFO - [2021-12-08 19:13:13,867] {{local_task_job.py:146}} INFO - Task exited with return code"
| multikv noheader=t
| fields _raw
| transaction endswith="AIRFLOW_CTX_DAG_OWNER" keepevicted=t keeporphans=t mvraw=t
| fields _raw
| fields - _time
| rex "AIRFLOW_CTX_DAG_ID=(?<dag_id>.*)"
| rex "AIRFLOW_CTX_TASK_ID=(?<task_id>.*)"
| rex max_match=0 "Rows affected: (?<rows>.*)"
| rex "Marking task as (?<status>[^\.]+)"
| table dag_id task_id status rows
Thank you @ITWhisperer . your SPL is working as expected when I feed real data.
I need to add two more columns in table : dag_run_date, dag_run_time, the values of those two column needs to be extracted from this statement but unable to form corresponding rex command . Can you help me on that?
AIRFLOW_CTX_EXECUTION_DATE=2021-12-08T18:57:24.419709+00:00
| rex "AIRFLOW_CTX_EXECUTION_DATE=(?<dag_run_date>\d{4}\-\d{2}\-\d{2})T(?<dag_run_time>\d{2}:\d{2}:\d{2}\.\d{6})"
Thanks again @ITWhisperer
I'm seeing an issue during additional testing. Can you please review and let me know what wrong am I doing in SPL#2.
SPL#1 : This lists dags, tasks_id, status (SUCCESS/FAILED), Rows_Affected, Task_Start_Date, Task_End_Date correctly:
index=cloud sourcetype=lambda:Airflow2Splunk "\"logGroup\"" "\"airflow-OnePIAirflowEnvironment-DEV-Task\""
| transaction startswith="\\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as"
| rex field=_raw "AIRFLOW_CTX_DAG_OWNER"=(?<dag_owner>\w+)
| rex field=_raw "AIRFLOW_CTX_DAG_ID=(?<dag_id>\w+)"
| rex field=_raw "AIRFLOW_CTX_TASK_ID=(?<task_id>\w+)"
| rex field=_raw max_match=0 "Rows affected: (?<rows_affected>\d+)"
| eval rows_affected=mvjoin(rows_affected, ",")
| rex field=_raw "Marking task as (?<status>[^\.]+)"
| rex field=_raw "start_date=(?<task_start_date>\d{8}T\d{6})"
| rex field=_raw "end_date=(?<task_end_date>\d{8}T\d{6})"
| eval new_start_task_date=strptime(task_start_date,"%Y%m%dT%H%M%S") | eval start_task_date=strftime(new_start_task_date,"%Y-%m-%d %H:%M:%S")
| eval new_end_task_date=strptime(task_end_date,"%Y%m%dT%H%M%S") | eval end_task_date=strftime(new_end_task_date,"%Y-%m-%d %H:%M:%S")
| table dag_owner dag_id task_id status rows_affected start_task_date end_task_date
SPL#2
This should list dags, tasks_id, status (FAILED), Rows_Affected, Task_Start_Date, Task_End_Date correctly:
But this SPL is not working as expected. I can some tasks whose status is SUCCESS also in the list.
index=cloud sourcetype=lambda:Airflow2Splunk "\"logGroup\"" "\"airflow-OnePIAirflowEnvironment-DEV-Task\""
| transaction startswith="\\nAIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED"
| rex field=_raw "AIRFLOW_CTX_DAG_OWNER"=(?<dag_owner>\w+)
| rex field=_raw "AIRFLOW_CTX_DAG_ID=(?<dag_id>\w+)"
| rex field=_raw "AIRFLOW_CTX_TASK_ID=(?<task_id>\w+)"
| rex field=_raw max_match=0 "Rows affected: (?<rows_affected>\d+)"
| eval rows_affected=mvjoin(rows_affected, ",")
| rex field=_raw "Marking task as (?<status>[^\.]+)"
| rex field=_raw "start_date=(?<task_start_date>\d{8}T\d{6})"
| rex field=_raw "end_date=(?<task_end_date>\d{8}T\d{6})"
| eval new_start_task_date=strptime(task_start_date,"%Y%m%dT%H%M%S") | eval start_task_date=strftime(new_start_task_date,"%Y-%m-%d %H:%M:%S")
| eval new_end_task_date=strptime(task_end_date,"%Y%m%dT%H%M%S") | eval end_task_date=strftime(new_end_task_date,"%Y-%m-%d %H:%M:%S")
| table dag_owner dag_id task_id status rows_affected start_task_date end_task_date
Do you need the \\n before AIRFLOW_CTX_DAG_OWNER?
AIRFLOW_CTX_DAG_OWNER is printed like this in splunk logs:
2021-12-13T20:18:02.837Z 2b98fffe-7c1b-4dff-aedf-101c04bcd17d INFO Decoded payload: {
"messageType": "DATA_MESSAGE",
"owner": "765460745490",
"logGroup": "airflow-OnePIAirflowEnvironment-DEV-Task",
"logStream": "any_bash_command_dag/bash_command/2021-12-13T20_18_00.187744+00_00/1.log",
"subscriptionFilters": [
"airflow_2_splunk_task"
],
"logEvents": [
{
"id": "36560436722252801980494271713377497999438999533834272768",
"timestamp": 1639426682686,
"message": "[2021-12-13 20:18:02,686] {{taskinstance.py:1283}} INFO - Exporting the following env vars:\nAIRFLOW_CTX_DAG_OWNER=airflow\nAIRFLOW_CTX_DAG_ID=any_bash_command_dag\nAIRFLOW_CTX_TASK_ID=bash_command\nAIRFLOW_CTX_EXECUTION_DATE=2021-12-13T20:18:00.187744+00:00\nAIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-13T20:18:00.187744+00:00"
}
]
}
The SPL without \n doesn't yield any result.
index=cloud sourcetype=lambda:Airflow2Splunk "\"logGroup\"" "\"airflow-OnePIAirflowEnvironment-DEV-Task\""
| transaction startswith="AIRFLOW_CTX_DAG_OWNER" endswith="Marking task as FAILED".
....
....
....