Splunk Search

Why am I getting inconsistent search results using export with the Splunk Python SDK?

Engager

I've written a search that creates a stats table with a medium sized result with around 5 cols and 100k+ rows. When I run the search in Splunk Web, it gives me consistent results. If I create a search job, wait for it to finish, then fetch all results (running multiple times with the correct offsets), then I get the same consistent results as in Splunk Web.

However, if I try to run the same search using export and stream the results, I get inconsistent results. Interestingly enough, I seem to always end up with more results than with a search job. When I dig deeper, I find that there are repeated entries, but I still seem to end up with more results total after I remove the duplicates. Even weirder, I get different results each time I run it.

Here's my working search:

def wait_for_search_job(job):
    while True:
        while not job.is_ready():
            pass
        stats = {"isDone": job["isDone"],
                 "doneProgress": float(job["doneProgress"])*100,
                 "scanCount": int(job["scanCount"]),
                 "eventCount": int(job["eventCount"]),
                 "resultCount": int(job["resultCount"])}

        status = ("%(doneProgress)03.1f%%   %(scanCount)d scanned   "
                  "%(eventCount)d matched   %(resultCount)d results") % stats

        clear_output()
        display(status)
        if stats["isDone"] == "1":
            display("Done!")
            break
        sleep(5)
    return

def fetch_all_results(job):
    result_count = int(job["resultCount"])
    num_results = 50000
    iterations = int(ceil(1. * result_count / num_results))

    rows = []

    for i in range(iterations):
        offset = i * num_results

        for result in results.ResultsReader(job.results(count=num_results, offset=offset)):
            if isinstance(result, dict):
                rows.append(result)
            elif isinstance(result, results.Message):
                # Diagnostic messages may be returned in the results
                print "Message: %s" % result

    return rows

def get_splunk_hits_search():
    # Create a Service instance and log in 
    service = splunk.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD
    )

    kwargs_export = {
        "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
        "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
        "search_mode": "normal"
    }

    api_hits = """
     search <search_string>
     | rex field=req.url ".*branch_id=(?<branch_id>[a-zA-Z0-9._-]+)&?.*" 
     | rex field=req.originalUrl ".*api/(v1/)?(?<endpoint>\w+)/?.*" 
     | fillnull value=NULL endpoint branch_id 
     | bin _time span=7d as week 
     | eval week=strftime(week, "%Y-%m-%d") 
     | eval day=strftime(_time, "%Y-%m-%d") 
     | stats count dc(day) as days_visited by req.user.account week endpoint req.headers.user-agent branch_id 
     | rename req.headers.user-agent as user_agent, req.user.account as accountnumber, count as hits
     """

    job = service.jobs.create(api_hits, **kwargs_export)
    wait_for_search_job(job)
    res = fetch_all_results(job)  
    df = pd.DataFrame.from_dict(res)
    return job, df

Here's my kinda-working but unreliable export:

def get_splunk_hits_export():
    # Create a Service instance and log in 
    service = splunk.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD)

    kwargs_export = {
        "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
        "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
        "search_mode": "normal"
    }

    api_hits = """
     search <search_string>
     | rex field=req.url ".*branch_id=(?<branch_id>[a-zA-Z0-9._-]+)&?.*" 
     | rex field=req.originalUrl ".*api/(v1/)?(?<endpoint>\w+)/?.*" 
     | fillnull value=NULL endpoint branch_id 
     | bin _time span=7d as week 
     | eval week=strftime(week, "%Y-%m-%d") 
     | eval day=strftime(_time, "%Y-%m-%d") 
     | stats count dc(day) as days_visited by req.user.account week endpoint req.headers.user-agent branch_id 
     | rename req.headers.user-agent as user_agent, req.user.account as accountnumber, count as hits
     """

    exportsearch_results = service.jobs.export(api_hits, **kwargs_export)

    rows = []

    for result in results.ResultsReader(exportsearch_results):
        if isinstance(result, dict):
            rows.append(result)
        elif isinstance(result, results.Message):
            # Diagnostic messages may be returned in the results
            print "Message: %s" % result

    df = pd.DataFrame.from_dict(res)
    return df
1 Solution

Explorer

you are likely getting dupes in the second case because of previews being enabled; try disabling them via assert rr.is_preview == False

ref: http://docs.splunk.com/Documentation/Splunk/6.2.5/Search/Exportsearchresults#Python_SDK

View solution in original post

Explorer

you are likely getting dupes in the second case because of previews being enabled; try disabling them via assert rr.is_preview == False

ref: http://docs.splunk.com/Documentation/Splunk/6.2.5/Search/Exportsearchresults#Python_SDK

View solution in original post

Engager

That did it! I set the following and now I get consistent results that match my 2 step search!

     kwargs_export = {
         "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
         "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
         "search_mode": "normal",
         "preview": False
     }
0 Karma

Explorer

Using "preview": False in kwargs_export solved this issue for me. Thanks @aiguofer.

0 Karma