I’m working on a custom Splunk app (VendorThreatIntel) that ingests alert data from an external API using the Splunk Python SDK.
Before inserting events into a custom index, I perform a duplicate check based on a unique id field. For each incoming record, I run a one-shot search against the index to check whether the record already exists. If found, I skip insertion; otherwise, I insert the event.
Below is the logic I’m using:
if 'data' in response and isinstance(response['data'], Sequence):
inserted_count = 0
skipped_count = 0
logger.info("[Vendor Events] Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")
for row in response['data']:
row['service_name'] = service['displayName']
row_id = row.get("id")
search_query = f'search index="vendor_alerts" id="{row_id}"'
logger.info(f"[Vendor Events] Checking for existing record: {search_query}")
results_found = False
try:
rr = results.JSONResultsReader(
server.jobs.oneshot(search_query, output_mode="json")
)
for result in rr:
if isinstance(result, dict):
logger.info(
"[Vendor Events] Duplicate record found with id=%s, skipping insertion.",
row_id
)
results_found = True
skipped_count += 1
if not results_found:
index = server.indexes["vendor_alerts"]
index.submit(json.dumps(row))
inserted_count += 1
except Exception as e:
logger.error("[Vendor Events] Error inserting data: %s", str(e))
logger.info(
"[Vendor Events] Summary | Total: %s | Inserted: %s | Skipped: %s",
len(response['data']), inserted_count, skipped_count
)
Even when records with the same id already exist in the index, the duplicate detection logic is not being applied, and duplicate events are still getting indexed.
May I know why this duplication logic is not being applied?
Is this related to one-shot search behavior, indexing latency, or event availability timing during ingestion?
Additionally, I would like guidance considering Splunk’s distributed architecture (search head and indexers).
Is there any other recommended or reliable way to handle duplicate detection in Splunk, especially in a distributed environment?
Thanks in advance.
It doesnt look like you are specifying an earliest_time/latest_time for the search, therefore you might not be receiving all the events you are expecting to receive from the search which would mean you're not matching the duplicates.
How many events are you ingesting using this method? As the index grows and time goes by you will need to search further and further back to check for records already ingested, it might be more efficient to use a KV store which could use a _key field based on the unique id of your event, you could then either upsert or check for a specific ID in the KVStore.
🌟 Did this answer help you? If so, please consider:
Your feedback encourages the volunteers in this community to continue contributing
Thanks for the suggestions.
At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.
Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.
Any guidance on what else can be added or tuned for this scenario would be helpful.
Thanks.
Thanks for the suggestions.
At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.
Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.
Any guidance on what else can be added or tuned for this scenario would be helpful.
Thanks.
As myself and @PickleRick mentioned, the best way would be not use a search. Is there any particular reason you are wanting to avoid using a KVStore to hold details of the events processed? It would be good to understand a little bit more about the setup, number of events etc.
Regardless of it being scripted input or modular input, theyre isnt a whole lot of difference when it comes to this as its still a python script being run by Splunk to ingest data.
How many events are we talking?
Regarding the issue you're seeing with the search not returning events - its probably because no earliest/latest is defined. At this point the difference between indexed_earliest/earliest etc wouldnt make much difference and you probably shouldnt have the uniqueID as an indexed field with it being high cardinality.
🌟 Did this answer help you? If so, please consider:
Your feedback encourages the volunteers in this community to continue contributing
Here's where I disagree a bit 😉 KVstore is best also avoided. I prefer file-based checkpoints (they are easier to backup/replicate/set up a "kinda-HA" environment). And it doesn't require firing up kvstore on the forwarder.
Please try to reformat your code - insert it as a code block or at least in a preformatted paragraph style.
In current form it's unreadable.
@PickleRick
Please find the updated code above
Wait. You're trying to search the index from within a modular input? Kill it with fire. Never do stuff like this.
I've only seen one add-on which historically used to have this logic (and even then when I reviewed it it had been already disabled and migrated to a checkpoint-based persistence).
Input is input, search is search. Don't mix those. Remember that - as a general rule - your input might be running on an instance which has no access to search.
This is a very very bad idea.
Thank you for the insight, @PickleRick — I understand and agree with the general caution around mixing input and search. That said, to clarify the context: this implementation is a custom scripted input within the VendorThreatIntel Splunk app, not a modular input. It uses the Splunk Python SDK to pull alert data from an external API and ingest it into a custom index.
The duplicate events appear to originate on the API side. Unfortunately, a traditional checkpoint-based approach isn’t reliable here, as the API does not provide consistent timestamps or strictly sequential markers. The data can arrive out of order, and relying solely on checkpoints risks either missing events or re-ingesting them.
To mitigate this, we attempted a one-shot search–based duplicate check before indexing. However, this does not seem to be fully effective — events with the same IDs are still being indexed.
I’d appreciate your thoughts on more robust approaches for duplicate handling in a scripted input scenario like this, especially in distributed setups.
Thanks in advance for your guidance.
Unfortunately, that's the typical issue with a bad input. Splunk is meant for ingesting "sequential" data. It's supposed to be time-based and consisting of unique events. It's the source side responsibility to provide the reasonable data.
BTW, if you're warking with threat intel, it's often better to "ingest" it as "state" stored in a lookup (at typical TI sizes, it's usually a KVstore-based lookup) instead of "incremental" events. That's how many TI solutions work with Splunk (and AFAIR that's the way TI works in ES).
logger.info("Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")
for row in response['data']:
row['service_name'] = service['displayName']
row_id = row.get("id")
search_query = 'search index="vendor_alerts" id="' + str(row_id) + '"'
logger.info("Checking for existing record: %s", search_query)
results_found = False
try:
rr = results.JSONResultsReader(server.jobs.oneshot(search_query, output_mode="json"))
for result in rr:
if isinstance(result, dict):
logger.info("Duplicate record found with id=%s, skipping insertion",row_id)
results_found = True
skipped_count += 1
if not results_found:
index = server.indexes["vendor_alerts"]
index.submit(json.dumps(row))
inserted_count += 1
except Exception as e:
logger.error("Error inserting data: %s", str(e))
logger.info("Summary | Total: %s | Inserted: %s | Skipped: %s",len(response['data']), inserted_count,skipped_count)