Splunk Dev

Duplicate detection logic not getting applied while ingesting data using Splunk Python SDK

gurunagasimha
Loves-to-Learn Lots

 

I’m working on a custom Splunk app (VendorThreatIntel) that ingests alert data from an external API using the Splunk Python SDK.

Before inserting events into a custom index, I perform a duplicate check based on a unique id field. For each incoming record, I run a one-shot search against the index to check whether the record already exists. If found, I skip insertion; otherwise, I insert the event.

Below is the logic I’m using:

if 'data' in response and isinstance(response['data'], Sequence):
inserted_count = 0
skipped_count = 0

logger.info("[Vendor Events] Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")

for row in response['data']:
row['service_name'] = service['displayName']
row_id = row.get("id")

search_query = f'search index="vendor_alerts" id="{row_id}"'
logger.info(f"[Vendor Events] Checking for existing record: {search_query}")

results_found = False
try:
rr = results.JSONResultsReader(
server.jobs.oneshot(search_query, output_mode="json")
)

for result in rr:
if isinstance(result, dict):
logger.info(
"[Vendor Events] Duplicate record found with id=%s, skipping insertion.",
row_id
)
results_found = True
skipped_count += 1

if not results_found:
index = server.indexes["vendor_alerts"]
index.submit(json.dumps(row))
inserted_count += 1

except Exception as e:
logger.error("[Vendor Events] Error inserting data: %s", str(e))

logger.info(
"[Vendor Events] Summary | Total: %s | Inserted: %s | Skipped: %s",
len(response['data']), inserted_count, skipped_count
)


Even when records with the same id already exist in the index, the duplicate detection logic is not being applied, and duplicate events are still getting indexed.

May I know why this duplication logic is not being applied?
Is this related to one-shot search behavior, indexing latency, or event availability timing during ingestion?

Additionally, I would like guidance considering Splunk’s distributed architecture (search head and indexers).
Is there any other recommended or reliable way to handle duplicate detection in Splunk, especially in a distributed environment?

Thanks in advance.

0 Karma

livehybrid
SplunkTrust
SplunkTrust

Hi @gurunagasimha 

It doesnt look like you are specifying an earliest_time/latest_time for the search, therefore you might not be receiving all the events you are expecting to receive from the search which would mean you're not matching the duplicates.

How many events are you ingesting using this method? As the index grows and time goes by you will need to search further and further back to check for records already ingested, it might be more efficient to use a KV store which could use a _key field based on the unique id of your event, you could then either upsert or check for a specific ID in the KVStore. 

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

0 Karma

gurunagasimha
Loves-to-Learn Lots

@livehybrid 

Thanks for the suggestions.

At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.

Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.

Any guidance on what else can be added or tuned for this scenario would be helpful.

Thanks.

0 Karma

gurunagasimha
Loves-to-Learn Lots

Thanks for the suggestions.

At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.

Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.

Any guidance on what else can be added or tuned for this scenario would be helpful.

Thanks.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Please try to reformat your code - insert it as a code block or at least in a preformatted paragraph style.

In current form it's unreadable.

0 Karma

gurunagasimha
Loves-to-Learn Lots

@PickleRick 
Please find the updated code above

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Wait. You're trying to search the index from within a modular input? Kill it with fire. Never do stuff like this.

I've only seen one add-on which historically used to have this logic (and even then when I reviewed it it had been already disabled and migrated to a checkpoint-based persistence).

Input is input, search is search. Don't mix those. Remember that - as a general rule - your input might be running on an instance which has no access to search.

This is a very very bad idea.

0 Karma

gurunagasimha
Loves-to-Learn Lots

logger.info("Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")

for row in response['data']:
        row['service_name'] = service['displayName']
        row_id = row.get("id")

       search_query = 'search index="vendor_alerts" id="' + str(row_id) + '"'
       logger.info("Checking for existing record: %s", search_query)

       results_found = False
       try:
           rr = results.JSONResultsReader(server.jobs.oneshot(search_query, output_mode="json"))

           for result in rr:
                  if isinstance(result, dict):
                        logger.info("Duplicate record found with id=%s, skipping insertion",row_id)
                        results_found = True
                        skipped_count += 1

                  if not results_found:
                        index = server.indexes["vendor_alerts"]
                        index.submit(json.dumps(row))
                        inserted_count += 1

       except Exception as e:
            logger.error("Error inserting data: %s", str(e))

logger.info("Summary | Total: %s | Inserted: %s | Skipped: %s",len(response['data']), inserted_count,skipped_count)

0 Karma
Get Updates on the Splunk Community!

New Year, New Changes for Splunk Certifications

As we embrace a new year, we’re making a small but important update to the Splunk Certification ...

[Puzzles] Solve, Learn, Repeat: Unmerging HTML Tables

[Puzzles] Solve, Learn, Repeat: Unmerging HTML TablesFor a previous puzzle, I needed some sample data, and ...

Enterprise Security (ES) Essentials 8.3 is Now GA — Smarter Detections, Faster ...

As of today, Enterprise Security (ES) Essentials 8.3 is now generally available, helping SOC teams simplify ...