I'm trying to optimize execution of a custom command by caching information it processes, but just for the duration of the currently execution SPL.
My custom command does something like this for each input record
Parse a field from an application event in Splunk into 1 or more tokens for further processing
For each token, do some expensive proccessing on the token and cache the results. This cache is really only valid for the scope of the SPL query since the results of the processing can differ each time. E.g., a restful API query for information that can be updated over time
The SDK doc for StreamingCommand of the PythonSDK (http://docs.splunk.com/Documentation/PythonSDK) says (bold highlighting mine),
Streaming commands typically filter, augment, or update, search result records. Splunk will send them in batches of up to 50,000 records. Hence, _a search command must be prepared to be invoked many times during the course of pipeline processing. _ Each invocation should produce a set of results independently usable by downstream processors.
My question here is:
How can I maintain my cache of expensive processing results for the full scope of the SPL query and only for the query duration? That is, maintain the cached information over multiple command invocations for a given SPL query.
I do see multiple invocations causing certain tokens to be needlessly processed multiply. My current cache is simply a Python dict() but I'm not picky. I do, however, need to know SPL start/end somehow so I can init and delete the chache. Such as a pre/post query callback. Or, for that matter, some way of hooking up my stateful command data to the query so that I can fetch it again via, say, the self.service.token or some such.
Here's an edited down version of my code. (I'm also a Python newbie, so apologies for any ugliness there.)
import sys
from mytokeninfo import Info
sys.path.append("splunk_sdk-1.6.5-py2.7.egg")
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration
# A global cache of already processed "token" results to avoid
# doing more than absolutely necessary
knownTokens = None
@Configuration(local=True) # Per doc on "stateful" streaming commands
class ExStatefulCommand(StreamingCommand):
def stream(self, records):
for record in records:
token = self.parseRecordForToken(record)
if token not in knownTokens:
self.processAndCache(token) # Call one or more restful APIs for this token and save results
info = knownTokens[token]
record['newField'] = info.field # Application specifics simplified here for clarity (hopefully)
yield record
Also, in the working code, I've put in some logging and definitely see the knownTokens size go back to zero in the search log. So, to restate question, how to I keep my cache populated, but just until the end of the calling SPL?
... View more