Splunk Dev

Splunk Python SDK API job.results limited to 50k results. Trying to set an offset to pull multiple chunks of 50k but don't know how to get it to work

cdhippen
Path Finder

I have a job who's job['resultCount'] is 367k, but no matter what I do, I can't seem to pull more than the first 50,000 chunk.

I read this chunk of code off of an answer here for someone who had a similar end goal and setup: https://answers.splunk.com/answers/114045/python-sdk-results-resultsreader-extremely-slow.html

rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))

I wrote the below code around that and I've fiddled around a bit with it, but I can't get offset=self._offset to do anything and I have no idea what it's supposed to be doing.

class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
    self.username = username
    self.password = password
    self.customerGuid = customerGuid
    flag = True
    while flag:
        try:
            self.service = client.connect(host=********, port=8089, username=self.username, password=self.password, scheme='https')
            flag = False
        except binding.HTTPError as e:
            json_log.debug(str(e))

def search(self, query_dict):
    query = query_dict['search']
    label = query_dict['label']
    search_headers = query_dict['headers']
    customer = query_dict['customer']
    customerGuid = query_dict['customerGuid']
    try:
        earliest_time = query_dict['earliest_time']
        latest_time = query_dict['latest_time']
    except KeyError:
        earliest_time = '-1d@d'
        latest_time = '@d'
    json_log.debug('Starting %s customerGuid=%s' % (label, self.customerGuid))
    kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
    job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
    while True:
        try:
            while not job.is_ready():
                pass
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100,
                     "scanCount": int(job["scanCount"]),
                     "eventCount": int(job["eventCount"]),
                     "resultCount": int(job["resultCount"])}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug("\n\nDone!\n\n")
                break
            sleep(2)
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug('Search %s finished for customerGuid=%s'
                               % (label, customerGuid))
                break
            sleep(2)

        except binding.HTTPError as e:
            json_log.debug(str(e))
            pass
        except AttributeError:
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug('Search %s finished for customerGuid=%s'
                               % (label, customerGuid))
                break
            sleep(2)

    # Get the results and display them
    result_count = job['resultCount']
    rs = job.results(count=0)
    rr = results.ResultsReader(io.BufferedReader(rs))
    results_list = []
    for result in rr:
        if isinstance(result, results.Message):
            # Diagnostic messages may be returned in the results
            json_log.debug('%s: %s label=%s customerGuid=%s'
                           % (result.type, result.message, label, customerGuid))
        elif isinstance(result, dict):
            # Normal events are returned as dicts
            keys, values = [], []

            for header in search_headers:
                if header not in result.keys():
                    print(header)
                    result[header] = ''

            for key, value in result.items():
                if key in search_headers:
                    keys.append(str(key))
                    values.append(str(value))
            if not results_list == []:
                results_list.append(values)
            else:
                results_list.append(keys)
                results_list.append(values)

    output = io.BytesIO()
    writer = csv.writer(output, delimiter=',')
    writer.writerows(results_list)
    output_string = output.getvalue()
    assert rr.is_preview is False

    job.cancel()
    return [label, output_string.replace('\r\n', '\n').replace('---', '')]

    def searches(self, query_list):
        print(query_list)
        if type(query_list) == dict:
            query_list = [value for value in query_list.values()]
        with closing(ThreadPool(processes=len(query_list))) as pool:
            results = pool.map(self.search, query_list)
            pool.terminate()

        print(results)
        search_results = {item[0]: item[1] for item in results}
        print(search_results)
        return search_results
Labels (2)
0 Karma
1 Solution

cdhippen
Path Finder

I was able to get this working successfully. My code below should demonstrate how this is accomplished.

import io
import csv
from time import sleep
import splunklib.results as results
import splunklib.client as client
import splunklib.binding as binding
from multiprocessing.pool import ThreadPool
from contextlib import closing



class SplunkConnector(object):
    def __init__(self, username, password, customerGuid):
        self.username = username
        self.password = password
        self.customerGuid = customerGuid
        flag = True
        while flag:
            try:
                self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
                flag = False
            except binding.HTTPError as e:
                json_log.debug(str(e))

    def search(self, query_dict):
        query = query_dict['search']
        label = query_dict['label']
        search_headers = query_dict['headers']
        customer = query_dict['customer']
        customerGuid = query_dict['customerGuid']
        try:
            earliest_time = query_dict['earliest_time']
            latest_time = query_dict['latest_time']
        except KeyError:
            earliest_time = '-1d@d'
            latest_time = '@d'
        kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
        flag = True
        while flag:
            try:
                job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
                flag = False
            except binding.HTTPError as e:
                pass
            pass
        while True:
            try:
                while not job.is_ready():
                    pass
                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100,
                         "scanCount": int(job["scanCount"]),
                         "eventCount": int(job["eventCount"]),
                         "resultCount": int(job["resultCount"])}

                if stats["isDone"] == "1":
                    break
                sleep(2)
                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100}

                if stats["isDone"] == "1":
                    break
                sleep(2)

            except binding.HTTPError as e:
                pass
            except AttributeError:

                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100}

                if stats["isDone"] == "1":
                    break
                sleep(2)

        result_count = job['resultCount']
        offset = 0
        count = 50000
        results_list = self.results_getter(job, label, customerGuid, search_headers, True, count, offset, result_count)


        while len(results_list) < int(result_count) + 1:
            offset += count
            placeholder = self.results_getter(job, label, customerGuid, search_headers, False, count, offset, result_count)
            results_list.extend(placeholder)

        output = io.BytesIO()
        writer = csv.writer(output, delimiter=',')
        writer.writerows(results_list)
        output_string = output.getvalue()
        job.cancel()
        return [label, output_string.replace('\r\n', '\n').replace('---', '')]

    def results_getter(self, job, label, customerGuid, search_headers, first, count, offset, result_count):
        # Get the results and display them
        kwargs_paginate = {"count": count,
                           "offset": offset}
        blocksearch_results = job.results(**kwargs_paginate)
        results_list = []

        reader = results.ResultsReader(blocksearch_results)

        for result in reader:
            if isinstance(result, results.Message):
                # Diagnostic messages may be returned in the results
            elif isinstance(result, dict):
                # Normal events are returned as dicts
                keys, values = [], []

                for header in search_headers:
                    if header not in result.keys():
                        result[header] = ''

                for key, value in result.items():
                    if key in search_headers:
                        keys.append(str(key))
                        values.append(str(value))
                if not results_list == []:
                    results_list.append(values)
                elif first:
                    results_list.append(keys)
                    results_list.append(values)
                else:
                    results_list.append(values)

        assert not reader.is_preview
        return results_list

    def searches(self, query_list):
        if type(query_list) == dict:
            query_list = [value for value in query_list.values()]
        with closing(ThreadPool(processes=len(query_list))) as pool:
            results = pool.map(self.search, query_list)
            pool.terminate()

        search_results = {item[0]: item[1] for item in results}
        return search_results

View solution in original post

0 Karma

cdhippen
Path Finder

I was able to get this working successfully. My code below should demonstrate how this is accomplished.

import io
import csv
from time import sleep
import splunklib.results as results
import splunklib.client as client
import splunklib.binding as binding
from multiprocessing.pool import ThreadPool
from contextlib import closing



class SplunkConnector(object):
    def __init__(self, username, password, customerGuid):
        self.username = username
        self.password = password
        self.customerGuid = customerGuid
        flag = True
        while flag:
            try:
                self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
                flag = False
            except binding.HTTPError as e:
                json_log.debug(str(e))

    def search(self, query_dict):
        query = query_dict['search']
        label = query_dict['label']
        search_headers = query_dict['headers']
        customer = query_dict['customer']
        customerGuid = query_dict['customerGuid']
        try:
            earliest_time = query_dict['earliest_time']
            latest_time = query_dict['latest_time']
        except KeyError:
            earliest_time = '-1d@d'
            latest_time = '@d'
        kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
        flag = True
        while flag:
            try:
                job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
                flag = False
            except binding.HTTPError as e:
                pass
            pass
        while True:
            try:
                while not job.is_ready():
                    pass
                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100,
                         "scanCount": int(job["scanCount"]),
                         "eventCount": int(job["eventCount"]),
                         "resultCount": int(job["resultCount"])}

                if stats["isDone"] == "1":
                    break
                sleep(2)
                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100}

                if stats["isDone"] == "1":
                    break
                sleep(2)

            except binding.HTTPError as e:
                pass
            except AttributeError:

                stats = {"isDone": job["isDone"],
                         "label": label,
                         "customer": customer,
                         "customerGuid": customerGuid,
                         "doneProgress": float(job["doneProgress"]) * 100}

                if stats["isDone"] == "1":
                    break
                sleep(2)

        result_count = job['resultCount']
        offset = 0
        count = 50000
        results_list = self.results_getter(job, label, customerGuid, search_headers, True, count, offset, result_count)


        while len(results_list) < int(result_count) + 1:
            offset += count
            placeholder = self.results_getter(job, label, customerGuid, search_headers, False, count, offset, result_count)
            results_list.extend(placeholder)

        output = io.BytesIO()
        writer = csv.writer(output, delimiter=',')
        writer.writerows(results_list)
        output_string = output.getvalue()
        job.cancel()
        return [label, output_string.replace('\r\n', '\n').replace('---', '')]

    def results_getter(self, job, label, customerGuid, search_headers, first, count, offset, result_count):
        # Get the results and display them
        kwargs_paginate = {"count": count,
                           "offset": offset}
        blocksearch_results = job.results(**kwargs_paginate)
        results_list = []

        reader = results.ResultsReader(blocksearch_results)

        for result in reader:
            if isinstance(result, results.Message):
                # Diagnostic messages may be returned in the results
            elif isinstance(result, dict):
                # Normal events are returned as dicts
                keys, values = [], []

                for header in search_headers:
                    if header not in result.keys():
                        result[header] = ''

                for key, value in result.items():
                    if key in search_headers:
                        keys.append(str(key))
                        values.append(str(value))
                if not results_list == []:
                    results_list.append(values)
                elif first:
                    results_list.append(keys)
                    results_list.append(values)
                else:
                    results_list.append(values)

        assert not reader.is_preview
        return results_list

    def searches(self, query_list):
        if type(query_list) == dict:
            query_list = [value for value in query_list.values()]
        with closing(ThreadPool(processes=len(query_list))) as pool:
            results = pool.map(self.search, query_list)
            pool.terminate()

        search_results = {item[0]: item[1] for item in results}
        return search_results
0 Karma

cdhippen
Path Finder

Also with this current code, if I change rs = job.results(count=0) to rs = job.results(count=0, offset=anything) I get no search results returned

0 Karma
Get Updates on the Splunk Community!

Stay Connected: Your Guide to November Tech Talks, Office Hours, and Webinars!

&#x1f342; Fall into November with a fresh lineup of Community Office Hours, Tech Talks, and Webinars we’ve ...

Transform your security operations with Splunk Enterprise Security

Hi Splunk Community, Splunk Platform has set a great foundation for your security operations. With the ...

Splunk Admins and App Developers | Earn a $35 gift card!

Splunk, in collaboration with ESG (Enterprise Strategy Group) by TechTarget, is excited to announce a ...