I'm writing a search using the example from the SDK below. My search matches around 220,000 results and the search finishes in about 15 seconds, but it takes almost 5 minutes to loop over the results (tried with count as 10, 100, 1000 - doesn't seem to make a difference).
Could there be anything causing the result processing to take so much time? I presume once the search finishes the server has all the data, so all it is doing is streaming the results - which I can't imagine is taxing for it?
import splunklib.results as results
# A blocking search
job = jobs.create("search my_search_string_here", **{"exec_mode": "blocking"})
print "...done!\n"
# Page through results by looping through sets of 10 at a time
print "Search results:\n"
resultCount = job["resultCount"] # Number of results this job returned
offset = 0; # Start at result 0
count = 10; # Get sets of 10 results at a time
while (offset < int(resultCount)):
kwargs_paginate = {"count": count,
"offset": offset}
# Get the search results and display them
blocksearch_results = job.results(**kwargs_paginate)
for result in results.ResultsReader(blocksearch_results):
print result
# Increase the offset to get the next set of results
offset += count
Not buffering is definitely the problem here.
I created the following class:
class ResponseReaderWrapper(io.RawIOBase):
def __init__(self, responseReader):
self.responseReader = responseReader
def readable(self):
return True
def close(self):
self.responseReader.close()
def read(self, n):
return self.responseReader.read(n)
def readinto(self, b):
sz = len(b)
data = self.responseReader.read(sz)
for idx, ch in enumerate(data):
b[idx] = ch
return len(data)
And then this allows me to utilize the io.BufferedReader as follows:
rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))
This means my query and pulling the results now runs in ~3 seconds rather than 90+ seconds as before.
It would be nice if ResponseReader implemented the readable
and readinto
methods so it were more streamlike, then this ResponseReaderWrapper class wouldn't be necessary - happy to provide a pull-request for this if you agree.
Not buffering is definitely the problem here.
I created the following class:
class ResponseReaderWrapper(io.RawIOBase):
def __init__(self, responseReader):
self.responseReader = responseReader
def readable(self):
return True
def close(self):
self.responseReader.close()
def read(self, n):
return self.responseReader.read(n)
def readinto(self, b):
sz = len(b)
data = self.responseReader.read(sz)
for idx, ch in enumerate(data):
b[idx] = ch
return len(data)
And then this allows me to utilize the io.BufferedReader as follows:
rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))
This means my query and pulling the results now runs in ~3 seconds rather than 90+ seconds as before.
It would be nice if ResponseReader implemented the readable
and readinto
methods so it were more streamlike, then this ResponseReaderWrapper class wouldn't be necessary - happy to provide a pull-request for this if you agree.
Phenominal. This was a great help! This improved export time 5x for me. Thank you.
I believe the issue is "caused" by the REST API rather than the SDK. The specific
reason is roughly this: when search results are stored on disk as .csv.gz files
(essentially, compressed CSVs), they are not seekable.
So when you ask for offset 100K, for example, we will unpack the file until we find
that offset, and then return 10/100/1000 results (however many you specified in count).
When you then try and get offset 100010, we will expand it again, seek to that
offset, and so forth. So as get into larger offsets, it will take longer and
longer to do.
To put it concisely: this specific API is not a good fit for exporting the entire
result set. To do that, the best way is to use the /export API endpoint, for which
there is an equivalent export
function in the Python SDK. This will stream the
results to you as they become availabe, rather than you having to iterate over
them through disk.
We're working on an example for dev.splunk.com to show how to use export
, though it should be pretty similar to what you have above, just with a single ResultsReader
.
@ineeman, we're getting XML parse errors from jobs.export, where jobs.oneshot completes the same query. Is it possible to export either the csv.gz, json or python OrderedDict representation?
@ineeman, This is 6 years ago, is there an update link for export API?
your solution sounds good..
do you have the example on dev.splunk.com already?
I second @mathu's request. Are there any examples on using export
? Using the "buffered" solution in the accepted answer above, only gives me extremely slow read speeds (reading rate of rows/sec becomes slower the longer the query is -as expected based on the above explanation)
I am experiencing the same thing. I ran my app with the -m cProfile flag, and after some munging in excel:
ncalls tottime percall cumtime percall filename:lineno(function)
-----------------------------------------------------------------
410 0.01 0 94.422 0.23 results.py:204(next)
410 0.757 0.002 94.412 0.23 results.py:207(_parse_results)
29481 0.185 0 93.039 0.003 <string>:80(next)
33 0.001 0 92.819 2.813 results.py:93(read)
32 9.158 0.286 92.818 2.901 results.py:124(read)
518047 13.097 0 83.542 0 binding.py:1142(read)
518053 11.294 0 68.321 0 httplib.py:532(read)
518199 24.065 0 54.89 0 socket.py:336(read)
518764 9.899 0 19.695 0 ssl.py:235(recv)
518764 5.646 0 9.796 0 ssl.py:154(read)
518764 4.15 0 4.15 0 {built-in method read}
518520 2.431 0 2.431 0 {max}
518846 2.415 0 2.415 0 {method 'seek' of 'cStringIO.StringO' objects}
518466 2.356 0 2.356 0 {cStringIO.StringIO}
I'm reading this as results.py is making 1/2million calls out to binding.py's read method, ONE character at a time. I'm guessing that it is not using any form of buffered I/O though ?
def read(self, n=None):
"""Read at most *n* characters from this stream.
If *n* is ``None``, return all available characters.
"""
response = ""
while n is None or n > 0:
c = self.stream.read(1)
if c == "":
break
elif c == "<":
c += self.stream.read(1)
if c == "<?":
while True:
q = self.stream.read(1)
if q == ">":
break
else:
response += c
if n is not None:
n -= len(c)
else:
response += c
if n is not None:
n -= 1
return response
Richard,
Thanks for investigating this issue offering to provide a pull-request. If you submit it, I will review the change.
Best, David Noble
My previous comment here wasn't quite accurate, so I removed it (since I can't edit it).
I should say, we're using the 1.1.0 python lib here