I am developing a custom streaming command. During tests and debugging I noticed the command works fine in this search:
index="_internal" | head 1 | table host | customcommand
and produces the following result:
<class 'generator'>
But when I use the command in the following search it produces no results:
index="_internal" | head 1 | customcommand
This is the code:
@Configuration()
class CustomCommand(StreamingCommand):
def stream(self, events):
yield {"event": str(type(events))}
and this is commands.conf:
[customcommand]
chunked = true
filename = customcommand.py
python.version = python3
requires_srinfo = true
streaming = true
How can I fix that?
Hi @wipark
Your command yields a single event with the type of the events parameter, but does not process or yield the actual input events. In a streaming command, you must iterate over the incoming events and yield output for each one. Your current implementation only yields once, regardless of input.
Try this updated code:
@Configuration() class CustomCommand(StreamingCommand): def stream(self, events): for event in events: event["event"] = str(type(events)) yield event
The stream method receives an iterator of events. You need to loop over events and yield each event (usually you would modify the events to perform your commands intended function..!).
Your original code only yielded once, so unless the search pipeline expected a single event, nothing was passed downstream.
🌟 Did this answer help you? If so, please consider:
Your feedback encourages the volunteers in this community to continue contributing
This is a simple code to reproduce the problem. My actual code iterates over the events. The problem is when I dont use the table command before my customcommand there is no events to iterate over. In this case without table there is no results not even one. Also my search in this example uses head 1, thus there is only one input result.
Hi @wipark
Your command yields a single event with the type of the events parameter, but does not process or yield the actual input events. In a streaming command, you must iterate over the incoming events and yield output for each one. Your current implementation only yields once, regardless of input.
Try this updated code:
@Configuration() class CustomCommand(StreamingCommand): def stream(self, events): for event in events: event["event"] = str(type(events)) yield event
The stream method receives an iterator of events. You need to loop over events and yield each event (usually you would modify the events to perform your commands intended function..!).
Your original code only yielded once, so unless the search pipeline expected a single event, nothing was passed downstream.
🌟 Did this answer help you? If so, please consider:
Your feedback encourages the volunteers in this community to continue contributing
Thanks, that solved the problem. It seems that for the code to work correctly, I need to yield the original record, or at least some fields of it. I haven’t tested thoroughly yet, but today I started testing based on this example.
The following code works as expected for both of these queries:
index=_internal | head 1 | table host | streamingcsc
index=_internal | head 1 | streamingcsc
#!/usr/bin/env python
import os, sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib"))
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
@Configuration()
class StreamingCSC(StreamingCommand):
def stream(self, records):
for record in records:
record["event"] = str(type(records))
yield record
dispatch(StreamingCSC, sys.argv, sys.stdin, sys.stdout, __name__)
But The following code only works with this query:
index=_internal | head 1 | table host | streamingcsc
#!/usr/bin/env python
import os, sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "lib"))
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
@Configuration()
class StreamingCSC(StreamingCommand):
def stream(self, records):
for record in records:
yield {"event": str(type(records))}
dispatch(StreamingCSC, sys.argv, sys.stdin, sys.stdout, __name__)
Upon further investigation It seems the _time field needs to be present for Splunk to show the results. A code like this works:
def stream(self, events):
yield {"myfield": "fff", "_time": "1748073052.114"}
This is a simple code to reproduce the problem. My actual code iterates over the events. The problem is when I dont use the table command before my customcommand there is no events to iterate over. In this case without table there is no results not even one. Also my search in this example uses head 1, thus there is only one input result.
I will try your version when I got back to work.