I’m working on a custom Splunk app (VendorThreatIntel) that ingests alert data from an external API using the Splunk Python SDK.
Before inserting events into a custom index, I perform a duplicate check based on a unique id field. For each incoming record, I run a one-shot search against the index to check whether the record already exists. If found, I skip insertion; otherwise, I insert the event.
Below is the logic I’m using:
if 'data' in response and isinstance(response['data'], Sequence):
inserted_count = 0
skipped_count = 0
logger.info("[Vendor Events] Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")
for row in response['data']:
row['service_name'] = service['displayName']
row_id = row.get("id")
search_query = f'search index="vendor_alerts" id="{row_id}"'
logger.info(f"[Vendor Events] Checking for existing record: {search_query}")
results_found = False
try:
rr = results.JSONResultsReader(
server.jobs.oneshot(search_query, output_mode="json")
)
for result in rr:
if isinstance(result, dict):
logger.info(
"[Vendor Events] Duplicate record found with id=%s, skipping insertion.",
row_id
)
results_found = True
skipped_count += 1
if not results_found:
index = server.indexes["vendor_alerts"]
index.submit(json.dumps(row))
inserted_count += 1
except Exception as e:
logger.error("[Vendor Events] Error inserting data: %s", str(e))
logger.info(
"[Vendor Events] Summary | Total: %s | Inserted: %s | Skipped: %s",
len(response['data']), inserted_count, skipped_count
)
Even when records with the same id already exist in the index, the duplicate detection logic is not being applied, and duplicate events are still getting indexed.
May I know why this duplication logic is not being applied?
Is this related to one-shot search behavior, indexing latency, or event availability timing during ingestion?
Additionally, I would like guidance considering Splunk’s distributed architecture (search head and indexers).
Is there any other recommended or reliable way to handle duplicate detection in Splunk, especially in a distributed environment?
Thanks in advance.
It doesnt look like you are specifying an earliest_time/latest_time for the search, therefore you might not be receiving all the events you are expecting to receive from the search which would mean you're not matching the duplicates.
How many events are you ingesting using this method? As the index grows and time goes by you will need to search further and further back to check for records already ingested, it might be more efficient to use a KV store which could use a _key field based on the unique id of your event, you could then either upsert or check for a specific ID in the KVStore.
🌟 Did this answer help you? If so, please consider:
Your feedback encourages the volunteers in this community to continue contributing
Thanks for the suggestions.
At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.
Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.
Any guidance on what else can be added or tuned for this scenario would be helpful.
Thanks.
Thanks for the suggestions.
At this point, I would like to store the data only in the index and avoid using KV Store or any external persistence.
Given this constraint, is there anything else I can add or improve in the index-based approach to make the duplicate check more reliable?
For example, explicitly setting earliest_time and latest_time, using indexed fields, or any other best practices that help with duplicate detection while relying only on indexed data.
Any guidance on what else can be added or tuned for this scenario would be helpful.
Thanks.
Please try to reformat your code - insert it as a code block or at least in a preformatted paragraph style.
In current form it's unreadable.
@PickleRick
Please find the updated code above
Wait. You're trying to search the index from within a modular input? Kill it with fire. Never do stuff like this.
I've only seen one add-on which historically used to have this logic (and even then when I reviewed it it had been already disabled and migrated to a checkpoint-based persistence).
Input is input, search is search. Don't mix those. Remember that - as a general rule - your input might be running on an instance which has no access to search.
This is a very very bad idea.
logger.info("Received %s records for processing", len(response['data']))
server = client.connect(token=session_key, app="VendorThreatIntel")
for row in response['data']:
row['service_name'] = service['displayName']
row_id = row.get("id")
search_query = 'search index="vendor_alerts" id="' + str(row_id) + '"'
logger.info("Checking for existing record: %s", search_query)
results_found = False
try:
rr = results.JSONResultsReader(server.jobs.oneshot(search_query, output_mode="json"))
for result in rr:
if isinstance(result, dict):
logger.info("Duplicate record found with id=%s, skipping insertion",row_id)
results_found = True
skipped_count += 1
if not results_found:
index = server.indexes["vendor_alerts"]
index.submit(json.dumps(row))
inserted_count += 1
except Exception as e:
logger.error("Error inserting data: %s", str(e))
logger.info("Summary | Total: %s | Inserted: %s | Skipped: %s",len(response['data']), inserted_count,skipped_count)