Splunk Dev

real-time search using python SDK export command

esharf
Engager

so here is my code:

 

import splunklib.client as client
import splunklib.results as results

client.connect(**connection_args)
job_kwargs = {"search_mode": "realtime", "earliest_time": "rt", "latest_time": "rt"}
for item in service.jobs.export(query=my_query, **job_kwargs):
    if isinstance(item, results.Message):
        print(item.message)
    else:
        print(item)

 

when I'm trying to run this code with a general query

 

query="search index=main"

 

It’s working properly.
but if I’m trying with

 

query="search `notable` | eval rule_name=if(isnull(rule_name),source,rule_name) | eval rule_title=if(isnull(rule_title),rule_name,rule_title) | `get_urgency` | `risk_correlation` | eval rule_description=if(isnull(rule_description),source,rule_description) | eval security_domain=if(isnull(security_domain),source,security_domain)"

 

I get a lot of events that I cannot see in the regular search.
also, I get almost every multiple times with a little change (such as dest_ip=8.8.8.8 anddest_ip=8.8.8.9) and a part of them are even identical.

note when I’m trying to test it I found that I have on average 9 events in 5 min but when I’m using the real-time search I get almost 130 on average.

Labels (3)

srijondas
Explorer

I am facing a similar problem.

0 Karma
Get Updates on the Splunk Community!

Accelerating Observability as Code with the Splunk AI Assistant

We’ve seen in previous posts what Observability as Code (OaC) is and how it’s now essential for managing ...

Integrating Splunk Search API and Quarto to Create Reproducible Investigation ...

 Splunk is More Than Just the Web Console For Digital Forensics and Incident Response (DFIR) practitioners, ...

Congratulations to the 2025-2026 SplunkTrust!

Hello, Splunk Community! We are beyond thrilled to announce our newest group of SplunkTrust members!  The ...