Splunk Dev

Duplicate detection logic not getting applied while ingesting data using Splunk Python SDK

gurunagasimha
Loves-to-Learn Everything

 

I’m working on a custom Splunk app (VendorThreatIntel) that ingests alert data from an external API using the Splunk Python SDK.

Before inserting events into a custom index, I perform a duplicate check based on a unique id field. For each incoming record, I run a one-shot search against the index to check whether the record already exists. If found, I skip insertion; otherwise, I insert the event.

Below is the logic I’m using:

if 'data' in response and isinstance(response['data'], Sequence):
inserted_count = 0
skipped_count = 0

logger.info("[Vendor Events] Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")

for row in response['data']:
row['service_name'] = service['displayName']
row_id = row.get("id")

search_query = f'search index="vendor_alerts" id="{row_id}"'
logger.info(f"[Vendor Events] Checking for existing record: {search_query}")

results_found = False
try:
rr = results.JSONResultsReader(
server.jobs.oneshot(search_query, output_mode="json")
)

for result in rr:
if isinstance(result, dict):
logger.info(
"[Vendor Events] Duplicate record found with id=%s, skipping insertion.",
row_id
)
results_found = True
skipped_count += 1

if not results_found:
index = server.indexes["vendor_alerts"]
index.submit(json.dumps(row))
inserted_count += 1

except Exception as e:
logger.error("[Vendor Events] Error inserting data: %s", str(e))

logger.info(
"[Vendor Events] Summary | Total: %s | Inserted: %s | Skipped: %s",
len(response['data']), inserted_count, skipped_count
)


Even when records with the same id already exist in the index, the duplicate detection logic is not being applied, and duplicate events are still getting indexed.

May I know why this duplication logic is not being applied?
Is this related to one-shot search behavior, indexing latency, or event availability timing during ingestion?

Additionally, I would like guidance considering Splunk’s distributed architecture (search head and indexers).
Is there any other recommended or reliable way to handle duplicate detection in Splunk, especially in a distributed environment?

Thanks in advance.

0 Karma

livehybrid
SplunkTrust
SplunkTrust

Hi @gurunagasimha 

It doesnt look like you are specifying an earliest_time/latest_time for the search, therefore you might not be receiving all the events you are expecting to receive from the search which would mean you're not matching the duplicates.

How many events are you ingesting using this method? As the index grows and time goes by you will need to search further and further back to check for records already ingested, it might be more efficient to use a KV store which could use a _key field based on the unique id of your event, you could then either upsert or check for a specific ID in the KVStore. 

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

0 Karma

gurunagasimha
Loves-to-Learn Everything

@livehybrid 

Thanks for the suggestions.

At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.

Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.

Any guidance on what else can be added or tuned for this scenario would be helpful.

Thanks.

0 Karma

gurunagasimha
Loves-to-Learn Everything

Thanks for the suggestions.

At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.

Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.

Any guidance on what else can be added or tuned for this scenario would be helpful.

Thanks.

0 Karma

livehybrid
SplunkTrust
SplunkTrust

Hi @gurunagasimha 

As myself and @PickleRick  mentioned, the best way would be not use a search. Is there any particular reason you are wanting to avoid using a KVStore to hold details of the events processed? It would be good to understand a little bit more about the setup, number of events etc.

Regardless of it being scripted input or modular input, theyre isnt a whole lot of difference when it comes to this as its still a python script being run by Splunk to ingest data. 

How many events are we talking?

Regarding the issue you're seeing with the search not returning events - its probably because no earliest/latest is defined. At this point the difference between indexed_earliest/earliest etc wouldnt make much difference and you probably shouldnt have the uniqueID as an indexed field with it being high cardinality.

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Here's where I disagree a bit 😉 KVstore is best also avoided. I prefer file-based checkpoints (they are easier to backup/replicate/set up a "kinda-HA" environment). And it doesn't require firing up kvstore on the forwarder.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Please try to reformat your code - insert it as a code block or at least in a preformatted paragraph style.

In current form it's unreadable.

0 Karma

gurunagasimha
Loves-to-Learn Everything

@PickleRick 
Please find the updated code above

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Wait. You're trying to search the index from within a modular input? Kill it with fire. Never do stuff like this.

I've only seen one add-on which historically used to have this logic (and even then when I reviewed it it had been already disabled and migrated to a checkpoint-based persistence).

Input is input, search is search. Don't mix those. Remember that - as a general rule - your input might be running on an instance which has no access to search.

This is a very very bad idea.

0 Karma

gurunagasimha
Loves-to-Learn Everything

Thank you for the insight, @PickleRick — I understand and agree with the general caution around mixing input and search. That said, to clarify the context: this implementation is a custom scripted input within the VendorThreatIntel Splunk app, not a modular input. It uses the Splunk Python SDK to pull alert data from an external API and ingest it into a custom index.
The duplicate events appear to originate on the API side. Unfortunately, a traditional checkpoint-based approach isn’t reliable here, as the API does not provide consistent timestamps or strictly sequential markers. The data can arrive out of order, and relying solely on checkpoints risks either missing events or re-ingesting them.
To mitigate this, we attempted a one-shot search–based duplicate check before indexing. However, this does not seem to be fully effective — events with the same IDs are still being indexed.
I’d appreciate your thoughts on more robust approaches for duplicate handling in a scripted input scenario like this, especially in distributed setups.
Thanks in advance for your guidance.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Unfortunately, that's the typical issue with a bad input. Splunk is meant for ingesting "sequential" data. It's supposed to be time-based and consisting of unique events. It's the source side responsibility to provide the reasonable data.

BTW, if you're warking with threat intel, it's often better to "ingest" it as "state" stored in a lookup (at typical TI sizes, it's usually a KVstore-based lookup) instead of "incremental" events. That's how many TI solutions work with Splunk (and AFAIR that's the way TI works in ES).

0 Karma

gurunagasimha
Loves-to-Learn Everything

logger.info("Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")

for row in response['data']:
        row['service_name'] = service['displayName']
        row_id = row.get("id")

       search_query = 'search index="vendor_alerts" id="' + str(row_id) + '"'
       logger.info("Checking for existing record: %s", search_query)

       results_found = False
       try:
           rr = results.JSONResultsReader(server.jobs.oneshot(search_query, output_mode="json"))

           for result in rr:
                  if isinstance(result, dict):
                        logger.info("Duplicate record found with id=%s, skipping insertion",row_id)
                        results_found = True
                        skipped_count += 1

                  if not results_found:
                        index = server.indexes["vendor_alerts"]
                        index.submit(json.dumps(row))
                        inserted_count += 1

       except Exception as e:
            logger.error("Error inserting data: %s", str(e))

logger.info("Summary | Total: %s | Inserted: %s | Skipped: %s",len(response['data']), inserted_count,skipped_count)

0 Karma
Get Updates on the Splunk Community!

Application management with Targeted Application Install for Victoria Experience

  Experience a new era of flexibility in managing your Splunk Cloud Platform apps! With Targeted Application ...

Index This | What goes up and never comes down?

January 2026 Edition  Hayyy Splunk Education Enthusiasts and the Eternally Curious!   We’re back with this ...

Splunkers, Pack Your Bags: Why Cisco Live EMEA is Your Next Big Destination

The Power of Two: Splunk + Cisco at "Ludicrous Scale"   You know Splunk. You know Cisco. But have you seen ...