Splunk Search

Why am I getting inconsistent search results using export with the Splunk Python SDK?

aiguofer
Engager

I've written a search that creates a stats table with a medium sized result with around 5 cols and 100k+ rows. When I run the search in Splunk Web, it gives me consistent results. If I create a search job, wait for it to finish, then fetch all results (running multiple times with the correct offsets), then I get the same consistent results as in Splunk Web.

However, if I try to run the same search using export and stream the results, I get inconsistent results. Interestingly enough, I seem to always end up with more results than with a search job. When I dig deeper, I find that there are repeated entries, but I still seem to end up with more results total after I remove the duplicates. Even weirder, I get different results each time I run it.

Here's my working search:

def wait_for_search_job(job):
    while True:
        while not job.is_ready():
            pass
        stats = {"isDone": job["isDone"],
                 "doneProgress": float(job["doneProgress"])*100,
                 "scanCount": int(job["scanCount"]),
                 "eventCount": int(job["eventCount"]),
                 "resultCount": int(job["resultCount"])}

        status = ("%(doneProgress)03.1f%%   %(scanCount)d scanned   "
                  "%(eventCount)d matched   %(resultCount)d results") % stats

        clear_output()
        display(status)
        if stats["isDone"] == "1":
            display("Done!")
            break
        sleep(5)
    return

def fetch_all_results(job):
    result_count = int(job["resultCount"])
    num_results = 50000
    iterations = int(ceil(1. * result_count / num_results))

    rows = []

    for i in range(iterations):
        offset = i * num_results

        for result in results.ResultsReader(job.results(count=num_results, offset=offset)):
            if isinstance(result, dict):
                rows.append(result)
            elif isinstance(result, results.Message):
                # Diagnostic messages may be returned in the results
                print "Message: %s" % result

    return rows

def get_splunk_hits_search():
    # Create a Service instance and log in 
    service = splunk.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD
    )

    kwargs_export = {
        "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
        "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
        "search_mode": "normal"
    }

    api_hits = """
     search <search_string>
     | rex field=req.url ".*branch_id=(?<branch_id>[a-zA-Z0-9._-]+)&?.*" 
     | rex field=req.originalUrl ".*api/(v1/)?(?<endpoint>\w+)/?.*" 
     | fillnull value=NULL endpoint branch_id 
     | bin _time span=7d as week 
     | eval week=strftime(week, "%Y-%m-%d") 
     | eval day=strftime(_time, "%Y-%m-%d") 
     | stats count dc(day) as days_visited by req.user.account week endpoint req.headers.user-agent branch_id 
     | rename req.headers.user-agent as user_agent, req.user.account as accountnumber, count as hits
     """

    job = service.jobs.create(api_hits, **kwargs_export)
    wait_for_search_job(job)
    res = fetch_all_results(job)  
    df = pd.DataFrame.from_dict(res)
    return job, df

Here's my kinda-working but unreliable export:

def get_splunk_hits_export():
    # Create a Service instance and log in 
    service = splunk.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD)

    kwargs_export = {
        "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
        "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
        "search_mode": "normal"
    }

    api_hits = """
     search <search_string>
     | rex field=req.url ".*branch_id=(?<branch_id>[a-zA-Z0-9._-]+)&?.*" 
     | rex field=req.originalUrl ".*api/(v1/)?(?<endpoint>\w+)/?.*" 
     | fillnull value=NULL endpoint branch_id 
     | bin _time span=7d as week 
     | eval week=strftime(week, "%Y-%m-%d") 
     | eval day=strftime(_time, "%Y-%m-%d") 
     | stats count dc(day) as days_visited by req.user.account week endpoint req.headers.user-agent branch_id 
     | rename req.headers.user-agent as user_agent, req.user.account as accountnumber, count as hits
     """

    exportsearch_results = service.jobs.export(api_hits, **kwargs_export)

    rows = []

    for result in results.ResultsReader(exportsearch_results):
        if isinstance(result, dict):
            rows.append(result)
        elif isinstance(result, results.Message):
            # Diagnostic messages may be returned in the results
            print "Message: %s" % result

    df = pd.DataFrame.from_dict(res)
    return df
1 Solution

tthrockm
Explorer

you are likely getting dupes in the second case because of previews being enabled; try disabling them via assert rr.is_preview == False

ref: http://docs.splunk.com/Documentation/Splunk/6.2.5/Search/Exportsearchresults#Python_SDK

View solution in original post

tthrockm
Explorer

you are likely getting dupes in the second case because of previews being enabled; try disabling them via assert rr.is_preview == False

ref: http://docs.splunk.com/Documentation/Splunk/6.2.5/Search/Exportsearchresults#Python_SDK

aiguofer
Engager

That did it! I set the following and now I get consistent results that match my 2 step search!

     kwargs_export = {
         "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
         "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
         "search_mode": "normal",
         "preview": False
     }
0 Karma

rithvikmundra
Explorer

Using "preview": False in kwargs_export solved this issue for me. Thanks @aiguofer.

0 Karma

samprog1816
Explorer

@aiguofer can you share the complete script and all the required libraries to successfully execute this script. Any help is greatly appreciated. 

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...