 
					
				
		
I have seen some promotional material lauding how the new SCPv2 enables custom search commands to process millions of events with lower memory overhead now that they can operate in a true streaming/chunked fashion. However, I cannot seem to get any CSC's with the v2 protocol to handle more than a few hundred thousand events (even using the default implementation that simply yields the records passed).
For example, consider the following example StreamingCommand:
from splunklib.searchcommands import dispatch, StreamingCommand
@Configuration()
class simpleStreamingCmd(StreamingCommand):
def stream(self, records):
        for record in records:
            yield record
if __name__ == "__main__":
    dispatch(simpleStreamingCmd, sys.argv, sys.stdin, sys.stdout, __name__)
Commands.conf configuration:
[simplestreamingcmd]
filename = simplestreamingcmd.py
chunked = true
Using a search that inputs a CSV of 1,000,000 events and feeds those events to the simple streaming command (which simply yields them right back), the following error is thrown (found in search.log):
09-18-2018 11:00:31.750 ERROR ChunkedExternProcessor - Failure writing result chunk, buffer full. External process possibly failed to read its stdin.
09-18-2018 11:00:31.750 ERROR ChunkedExternProcessor - Error in 'simplestreamingcmd' command: Failed to send message to external search command, see search.log.
I started with a much more complex CSC to accomplish a specific task and eventually reduced it down to the simple example you see here, trying to figure out where the problem lies. I have tried writing StreamingCommands, EventingCommands, and ReportingCommands on multiple different search heads, and even tried multiple versions of Splunk (6.5.3 and 7.0.2) and updated to the latest Python SDK. Regardless of those, this seems to happen every time more than 300,000 events are passed to any chunked SCPv2 CSC.
Any thoughts on what might be going on here? I would really like to use SCPv2, but unless I am doing something wrong here, this seems like a rather fundamental issue with it.
I have seen a couple other users reporting what appears to be the same issue here: https://github.com/splunk/splunk-sdk-python/issues/150
 
					
				
		
UPDATE:
With the help of a few hints from a friendly at Splunk, I believe that I have managed to get this working.  I have tested on numerous configurations (single server vs. 3 SHC with 6 indexer cluster, generating vs. eventing base searches, with and w/o previews, localop and parallel on indexers) and all seem to work.  Sometimes you must tune a timing parameter ( throttleusec) that helps the custom command child process throttle the results passing back to the Splunk parent daemon, but I have gotten this version to work with hundreds of millions of events very reliably.
The solution is embodied in the new echo custom command implemented in this change...
https://github.com/TiVo/splunk-sdk-python/commit/5188f7d709cadd80e786692b371a64c4ae0991d2
Also, Splunk reports on my service ticket that this underlying timing bug will be resolved in a future release.
ORIGINAL ANSWER:
I have spent a couple of days attempting to better understand and work around this problem. At the end of my efforts, I have concluded that there is a bug in the Splunk daemon itself that behaves somewhat differently (timing-wise) from version to version and machine to machine. Along the way, I found multiple opportunities to enhance/improve the python SDK, but my fixes did not ultimately prevent the underlying problem from recurring. Details below.
We initially observed this problem in production as a scheduled job began consistently failing after working fine for months. The problem signature was (as stated in the question here):
<timestamp> ERROR ChunkedExternProcessor - Failure writing result chunk, buffer full. External process possibly failed to read its stdin.
<timestamp> ERROR ChunkedExternProcessor - Error in '<our_custom_cmd>' command: Failed to send message to external search command, see search.log.
Once this error appeared, it occurred consistently.  The exact timing of when it occurred to the search relative to the search launch time varied somewhat.  We were quickly able to reproduce this problem on local, much simpler Splunk workstation installs (single machine) using | makeresults or even index=_* | head 1000000 | table _time host source sourcetype style base searches connected to our custom command.  As also already stated here, we also quickly determined that reducing the custom command to the simplest possible configuration (that simply yielded back its input) still produced the problem.  During these rounds of testing, we found that the error was not 100% consistent and varying the number of events sent to the custom command and the size of those events seemed to change the frequency of the reported error.  Additionally, debugging, esp. logging, added to the custom command impacting the likelihood of hitting the error.
Given the text of the error, we began to suspect that somehow our custom command was allowing a pipe to fill causing this issue.  Especially suspicious was the custom command's stdin which we knew to be connected to the Splunk daemon that was reporting the error.  Reviewing the implementation strategy of the command, including the python SDK base classes, presented a few potential optimizations.
First, the python SDK will currently simply read the entirety of the data input into RAM before processing (due to the implementation of _read_chunk() in SearchCommand).  This seemed problematic for multiple reasons (memory usage of the custom command, lack of true streaming implementation for large data sets).  We first attempted to repair this by building a "chunk-aware" input and processing the events as we read them from stdin.  This timing change (reading the input records more slowly and producing output records while doing so) seemed to much more quickly trigger the buffer full failure, so, while we think this is actually the best implementation, we abandoned it.
class ChunkedInput(object):
    def __init__(self, infile, limit):
        self._file = infile
        self._limit = limit
    def __iter__(self):
        while True:
            if self._limit <= 0:
                return
            line = self._file.readline()
            yield line
            self._limit -= len(line)
Instead, we repaired the "read everything into RAM" problem by implementing a new class StreamingSearchCommand, derived from SearchCommand.  In this implementation, we reworked _read_chunk() and _records_protocol_v2() to download the incoming events into a gzip'd file in the dispatch directory and then reopen and stream them back from there.  This greatly reduced memory footprint required by our simple test custom SPL command, but the buffer full errors continued.
Next, we imagined that perhaps, since our command was not continuing to monitor and read from stdin once it had collected all of the incoming events that the pipe attached to our stdin might be filling to the point where the Splunk daemon was going to block attempting to write into it.  We imagined that this condition could be underlying the error reported here.  We repaired this oversight by improving our custom SmartStreamingCommand class to also occasionally poll ( select actually) the stdin file descriptor and read out a chunk if data was present.  Testing of this implementation confirmed that the Splunk daemon did, in fact, continue to occasionally write things to us through this pipe, even after we had collected all input records.  Still, this improvement did not completely prevent the dreaded buffer full error.
Finally, reviewing the python SDK implementation further we were concerned that it might not be flushing records in a streaming fashion, but instead waiting until the generator chain ( self._record_writer.write_records(process(self._records(ifile))) ) drained.  So, we added an occasional flush() call to our SmartStreamingCommand implementation.  Unfortunately, we continued to hit buffer full errors.
At this point, we decided to bring out the big guns (strace) and we started by monitoring our process.  We could easily see that we were regularly monitoring stdin and reading it quickly if data was present.  Everything on the python SDK/custom command process side seemed okay, so we switched to straceing the Splunk daemon itself.  We found that attaching strace had the (Heisenberg) effect of eliminating the problem altogether.  Excellent.
After numerous tries, changing the number and list of syscalls that we were intercepting, we finally managed to catch the failure in action one time. We expected review of this precious, captured output to show a system call returning EWOULDBLOCK or similar, allowing us to work backwards to understand the condition that caused the Splunk daemon to become upset and produce this error. Unfortunately, after quite a bit of time tracing file descriptors, futexes and signals across threads in the Splunk daemon, all of the system calls looked fine and no clear culprit was illuminated.
Additional inspection of the many search.log examples that were generated during the testing and evaluation of this issue did seem to show a pattern.  Specifically, the Splunk daemon would fairly consistently issue this error approximately 80ms after the custom command had flushed a batch of input to it.  We believe this suggests that the code associated with reading event records back from the custom command is thus likely implicated in this issue.  We attempted, to some, but not complete success, to leverage this observation by adding a slight sleep() before flushing each batch of records.
After numerous attempts to work around this issue, including building lower footprint, more efficient python SDK replacements, we remain stuck with this issue, unable to build custom commands that process more than a million or so events without causing issues. This is a significant weakness in our current "big data" infrastructure and is blocking us on a few fronts. We would welcome advice or collaboration intended to work towards a solution to this issue. I will file a Splunk support ticket referencing this item on Splunk Answers...
 
		
		
		
		
		
	
			
		
		
			
					
		This should be fixed in the Splunk Python SDK version 1.6.15 or later. Upgrading the SDK in your app should be enough for all supported Splunk versions. (No need to upgrade splunkd.)
The problem was indeed related to the flush() method. It appears the SDK support for the chunked protocol was written assuming the protocol would support "partial chunks", allowing the response to one input message to be split into multiple output messages, with a partial: true flag used to indicate that the response would be continued on the next message. The code in the SDK to mark partial chunks had been commented out, but the code still sent partial chunks when a response produced maxresults rows (50,000 by default) -- just with no indication that it was to be interpreted partial response.
This was a problem even for commands that simply returned the same number of rows, because it happened when the limit was reached, even if it was never exceeded. As a result, every time the script produced 50,000 records, the expected response was followed by an additional chunk, which -- per the protocol -- was the response to the next chunk. (The protocol expects each request to have one repsonse.)
Since the script would produce responses before reading the request, as the script got more and more out of sync with the protocol, more unread requests would end up buffered in the stdin pipe with more responses buffered in the stdout pipe until both buffers were full and writes started to block/fail.
I considered adding a workaround to splunkd so that apps wouldn't need to update the SDK they use, but there was no a reliable way to determine which commands needed it, or which commands would be broken by it.
Anyway, if you're curious, the full fix (and a tiny bit of related clean up) is in https://github.com/splunk/splunk-sdk-python/pull/301/files
Kudos to @kulick and @cpride_splunk for their early analysis of this bug!
When using smartstreamingcommand from the package in your updated answer we avoid the error at the subsearch limit but with a new problem:
Have you seen something like this?
|mongoreadbeta testdata |table * |echo |table *
Can easily process 2 million records however sometimes we see:
It appears to always break on the 100K boundary.
Ex: 1 1100000 records - chunked generating command
Duration (seconds)      Component   Invocations Input count Output count
    0.00     command.addinfo    13  600,000 600,000
    47.69    command.echo   13  1,100,000   600,000
Ex: 2 1,041,865 records after search filter
Duration (seconds)      Component   Invocations Input count Output count
    0.00     command.addinfo    12  641,865 641,865
    56.21    command.echo   12  1,041,865   641,865
    7.62     command.mongoreadbeta  12  -   1,100,000
When i run it with a non-chunked generating command it works:
Ex3 1100000 rows - v1 generating command
|mongoread testdata |echo |table *
Duration (seconds)      Component   Invocations Input count Output count
    114.51   command.echo   23  1,100,000   1,100,000
    26.14    command.mongoread  1   -   1,100,000
    0.91     command.table  1   1,100,000   2,200,000
Version: 7.2.7
Build: f817a93effc2
Using new echo custom command implemented in this change...
https://github.com/TiVo/splunk-sdk-python/commit/5188f7d709cadd80e786692b371a64c4ae0991d2
 
					
				
		
I think that my latest update to my previous "answer" now actually is an answer to your original question and the problem that we were both experiencing. I'd love to know if it helps you...
 
					
				
		
UPDATE:
With the help of a few hints from a friendly at Splunk, I believe that I have managed to get this working.  I have tested on numerous configurations (single server vs. 3 SHC with 6 indexer cluster, generating vs. eventing base searches, with and w/o previews, localop and parallel on indexers) and all seem to work.  Sometimes you must tune a timing parameter ( throttleusec) that helps the custom command child process throttle the results passing back to the Splunk parent daemon, but I have gotten this version to work with hundreds of millions of events very reliably.
The solution is embodied in the new echo custom command implemented in this change...
https://github.com/TiVo/splunk-sdk-python/commit/5188f7d709cadd80e786692b371a64c4ae0991d2
Also, Splunk reports on my service ticket that this underlying timing bug will be resolved in a future release.
ORIGINAL ANSWER:
I have spent a couple of days attempting to better understand and work around this problem. At the end of my efforts, I have concluded that there is a bug in the Splunk daemon itself that behaves somewhat differently (timing-wise) from version to version and machine to machine. Along the way, I found multiple opportunities to enhance/improve the python SDK, but my fixes did not ultimately prevent the underlying problem from recurring. Details below.
We initially observed this problem in production as a scheduled job began consistently failing after working fine for months. The problem signature was (as stated in the question here):
<timestamp> ERROR ChunkedExternProcessor - Failure writing result chunk, buffer full. External process possibly failed to read its stdin.
<timestamp> ERROR ChunkedExternProcessor - Error in '<our_custom_cmd>' command: Failed to send message to external search command, see search.log.
Once this error appeared, it occurred consistently.  The exact timing of when it occurred to the search relative to the search launch time varied somewhat.  We were quickly able to reproduce this problem on local, much simpler Splunk workstation installs (single machine) using | makeresults or even index=_* | head 1000000 | table _time host source sourcetype style base searches connected to our custom command.  As also already stated here, we also quickly determined that reducing the custom command to the simplest possible configuration (that simply yielded back its input) still produced the problem.  During these rounds of testing, we found that the error was not 100% consistent and varying the number of events sent to the custom command and the size of those events seemed to change the frequency of the reported error.  Additionally, debugging, esp. logging, added to the custom command impacting the likelihood of hitting the error.
Given the text of the error, we began to suspect that somehow our custom command was allowing a pipe to fill causing this issue.  Especially suspicious was the custom command's stdin which we knew to be connected to the Splunk daemon that was reporting the error.  Reviewing the implementation strategy of the command, including the python SDK base classes, presented a few potential optimizations.
First, the python SDK will currently simply read the entirety of the data input into RAM before processing (due to the implementation of _read_chunk() in SearchCommand).  This seemed problematic for multiple reasons (memory usage of the custom command, lack of true streaming implementation for large data sets).  We first attempted to repair this by building a "chunk-aware" input and processing the events as we read them from stdin.  This timing change (reading the input records more slowly and producing output records while doing so) seemed to much more quickly trigger the buffer full failure, so, while we think this is actually the best implementation, we abandoned it.
class ChunkedInput(object):
    def __init__(self, infile, limit):
        self._file = infile
        self._limit = limit
    def __iter__(self):
        while True:
            if self._limit <= 0:
                return
            line = self._file.readline()
            yield line
            self._limit -= len(line)
Instead, we repaired the "read everything into RAM" problem by implementing a new class StreamingSearchCommand, derived from SearchCommand.  In this implementation, we reworked _read_chunk() and _records_protocol_v2() to download the incoming events into a gzip'd file in the dispatch directory and then reopen and stream them back from there.  This greatly reduced memory footprint required by our simple test custom SPL command, but the buffer full errors continued.
Next, we imagined that perhaps, since our command was not continuing to monitor and read from stdin once it had collected all of the incoming events that the pipe attached to our stdin might be filling to the point where the Splunk daemon was going to block attempting to write into it.  We imagined that this condition could be underlying the error reported here.  We repaired this oversight by improving our custom SmartStreamingCommand class to also occasionally poll ( select actually) the stdin file descriptor and read out a chunk if data was present.  Testing of this implementation confirmed that the Splunk daemon did, in fact, continue to occasionally write things to us through this pipe, even after we had collected all input records.  Still, this improvement did not completely prevent the dreaded buffer full error.
Finally, reviewing the python SDK implementation further we were concerned that it might not be flushing records in a streaming fashion, but instead waiting until the generator chain ( self._record_writer.write_records(process(self._records(ifile))) ) drained.  So, we added an occasional flush() call to our SmartStreamingCommand implementation.  Unfortunately, we continued to hit buffer full errors.
At this point, we decided to bring out the big guns (strace) and we started by monitoring our process.  We could easily see that we were regularly monitoring stdin and reading it quickly if data was present.  Everything on the python SDK/custom command process side seemed okay, so we switched to straceing the Splunk daemon itself.  We found that attaching strace had the (Heisenberg) effect of eliminating the problem altogether.  Excellent.
After numerous tries, changing the number and list of syscalls that we were intercepting, we finally managed to catch the failure in action one time. We expected review of this precious, captured output to show a system call returning EWOULDBLOCK or similar, allowing us to work backwards to understand the condition that caused the Splunk daemon to become upset and produce this error. Unfortunately, after quite a bit of time tracing file descriptors, futexes and signals across threads in the Splunk daemon, all of the system calls looked fine and no clear culprit was illuminated.
Additional inspection of the many search.log examples that were generated during the testing and evaluation of this issue did seem to show a pattern.  Specifically, the Splunk daemon would fairly consistently issue this error approximately 80ms after the custom command had flushed a batch of input to it.  We believe this suggests that the code associated with reading event records back from the custom command is thus likely implicated in this issue.  We attempted, to some, but not complete success, to leverage this observation by adding a slight sleep() before flushing each batch of records.
After numerous attempts to work around this issue, including building lower footprint, more efficient python SDK replacements, we remain stuck with this issue, unable to build custom commands that process more than a million or so events without causing issues. This is a significant weakness in our current "big data" infrastructure and is blocking us on a few fronts. We would welcome advice or collaboration intended to work towards a solution to this issue. I will file a Splunk support ticket referencing this item on Splunk Answers...
Please see: https://answers.splunk.com/answers/785161/chunkedtrue-smartstreamingcommand-to-support-large.html?mi...
I had to remove the partial flush when hitting maxresultrows to consistently use smartstreamingcommand
 
					
				
		
Howdy. 😃
I actually had some additional synchronization change deltas on that original hacking that I never uploaded to github. I have pushed them now. Perhaps they handle the situation you were hitting? The changes were related to cases that hit maxresultrows and the behavior of the base class...
 
					
				
		
Oh, latest upload here: https://github.com/splunk/splunk-sdk-python/compare/master...TiVo:large-scale-custom-cmds
Well done on this triage! I have not looked into this issue in the last year or so, but I do see there is now a pull request on the issue in github, which I have not yet had a look at. https://github.com/splunk/splunk-sdk-python/issues/150 https://github.com/splunk/splunk-sdk-python/pull/251
Please see my post below.  SmartStreamingCommand appears to never receive 40-50% of the records in some situations.  Other times it runs fine.
"When using smartstreamingcommand from the package in your updated answer we avoid the error at the subsearch limit but with a new problem:"
 
					
				
		
Thanks for these links. I put a pointer to my github changes in that issue.
 
		
		
		
		
		
	
			
		
		
			
					
		This is likely due to an internal implementation detail of how events are fed to a command. Normally when you search for a large dataset it arrives in chunks from the lower layers, and we can pass those chunks through to the search command piecemeal. Using inputcsv is going to do that as one huge chunk which the protocol chokes on. The issue is likely that there is some buffer somewhere that is getting overfull while waiting for a complete object.
Hi,
Have you tried invoking self.flush()? Something like:
def stream(self, records):
    for record in records:
        yield record
        self.flush()
Cheers,
Tim.
 
					
				
		
This is interesting -- that code actually triggers the error immediately, regardless of the size of the event set. Any thoughts on what this means the problem may be? Could it have something to do with the SearchCommand class's flush() method, which invokes self._record_writer.flush()?
 
					
				
		
See my answer above, but I believe adding a flush() each time through the loop triggers the problem faster because each time the custom command process (child) writes a batch (or even a single) event back to the Splunk daemon (parent), the parent responds by sending an "empty" chunk back.  Since the default python SDK never reads stdin after collecting the initial batch of events, this stdin pipe can rapidly fill up and the parent will eventually either block or get an EWOULDBLOCK errno on write calls to the other end of the pipe.
Sadly, repairing this issue by teaching the child to monitor and continually drain stdin was not sufficient to prevent this error from occurring, though it does reduce the frequency somewhat.
 
					
				
		
Hi There,
Have you had a look at the options here : 
http://docs.splunk.com/Documentation/Splunk/latest/Admin/Commandsconf
The following options can help :
chunked = [true|false]
* If true, this command supports the new "chunked" custom
  search command protocol.
* If true, the only other commands.conf attributes supported are
  is_risky, maxwait, maxchunksize, filename, and command.arg.<N>.
* If false, this command uses the legacy custom search command
  protocol supported by Intersplunk.py.
* Default is false
maxwait = <integer>
* Only available if chunked = true.
* Not supported in Windows.
* The value of maxwait is the maximum number of seconds the custom
  search command can pause before producing output.
* If set to 0, the command can pause forever.
* Default is 0
maxchunksize = <integer>
* Only available if chunked = true.
* The value of maxchunksize is maximum size chunk (size of metadata
  plus size of body) the external command may produce. If the command
  tries to produce a larger chunk, the command is terminated.
* If set to 0, the command may send any size chunk.
* Default is 0
Cheers,
David
 
					
				
		
Sorry it took me a while to get back to you. I have played around with those options, but to no avail. And as the documentation states, they default to the most permissive settings anyway.
As my response to timbits' question mentions, the problem may have something to do with the search_command.py flush() method, which invokes self._record_writer.flush(). Still no clue what the solution is though.
