Deployment Architecture

Duplicate logs via modular input

VijaySrrie
Builder

Hi Team,

We are using modular input to ingest the logs into splunk, we have checkpoint file, but we see duplicate logs are ingested into splunk. How to eliminate duplicates?
application from which the logs are ingested - Tyk analytics

Labels (1)
Tags (1)
0 Karma

VijaySrrie
Builder

#!/usr/bin/env python
# coding=utf-8

from __future__ import print_function
import sys, os
import xml.dom.minidom, xml.sax.saxutils
from pymongo import MongoClient
from datetime import datetime
import base64
import json
import re

import logging
from io import open
import six
logging.root
logging.root.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.root.addHandler(handler)

SCHEME = """<scheme>
<title>API Gateway Analytics</title>
<description>Ingest data from API Gateway mongodb tyk_analytics database</description>
<streaming_mode>xml</streaming_mode>
<endpoint>
<args>
<arg name="mongodb_uri">
<title>MongoDB URI</title>
<description>mongodb://USER:PASS@SERVER1:27017,SERVER2:27017/tyk_analytics?replicaSet=mongo-replica</description>
</arg>
</args>
</endpoint>
</scheme>
"""

def do_scheme():
print(SCHEME)

# Empty validation routine. This routine is optional.
def validate_arguments():
pass

def get_config():
config = {}

try:
# read everything from stdin
config_str = sys.stdin.read()

# parse the config XML
doc = xml.dom.minidom.parseString(config_str)
root = doc.documentElement
conf_node = root.getElementsByTagName("configuration")[0]
if conf_node:
stanza = conf_node.getElementsByTagName("stanza")[0]
if stanza:
stanza_name = stanza.getAttribute("name")
if stanza_name:
logging.debug("XML: found stanza " + stanza_name)
config["name"] = stanza_name
params = stanza.getElementsByTagName("param")
for param in params:
param_name = param.getAttribute("name")
logging.debug("XML: found param '%s'" % param_name)
if param_name and param.firstChild and param.firstChild.nodeType == param.firstChild.TEXT_NODE:
data = param.firstChild.data
config[param_name] = data
logging.debug("XML: '%s' -> '%s'" % (param_name, data))

checkpnt_node = root.getElementsByTagName("checkpoint_dir")[0]
if checkpnt_node and checkpnt_node.firstChild and checkpnt_node.firstChild.nodeType == checkpnt_node.firstChild.TEXT_NODE:
config["checkpoint_dir"] = checkpnt_node.firstChild.data

if not config:
raise Exception("Invalid configuration received from Splunk.")

except Exception as e:
raise Exception("Error getting Splunk configuration via STDIN: %s" % str(e))

return config

def save_checkpoint(config, checkpoint):
checkpoint_file = "checkpoint-" + config["name"]
checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file)
checkpoint_file_new = checkpoint_file + ".new"
chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file)
chk_file_new = os.path.join(config["checkpoint_dir"], checkpoint_file_new)

checkpoint_rfc3339=checkpoint.strftime('%Y-%m-%d %H:%M:%S.%f')
logging.debug("Saving checkpoint=%s (checkpoint_rfc3339=%s) to file=%s", checkpoint, checkpoint_rfc3339, chk_file)

f = open(chk_file_new, "w")
f.write("%s" % checkpoint_rfc3339)
f.close()
os.rename(chk_file_new, chk_file)

def load_checkpoint(config):
checkpoint_file = "checkpoint-" + config["name"]
checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file)
chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file)
#chk_file = os.path.join(config["checkpoint_dir"], "checkpoint")
# try to open this file
try:
f = open(chk_file, "r")
checkpoint_rfc3339=f.readline().split("\n")[0]
logging.info("Read checkpoint_rfc3339=%s from file=%s", checkpoint_rfc3339, chk_file)
checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f')
f.close()
except:
# assume that this means the checkpoint is not there (Use 2000/1/1)
checkpoint_rfc3339='2000-01-01 00:00:00.000000'
checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f')
logging.error("Failed to read checkpoint from file=%s using checkpoint_rfc3339=%s", chk_file, checkpoint_rfc3339)

logging.debug("Checkpoint value is: checkpoint=%s", checkpoint)

return checkpoint

# Routine to index data
def run():
config = get_config()
mongodb_uri = config["mongodb_uri"]

checkpoint = load_checkpoint(config)

client = MongoClient(mongodb_uri)
database = client["tyk_analytics"]
collection = database["tyk_analytics"]

cursor = collection.find({'timestamp': {'$gt': checkpoint} })

sys.stdout.write("<stream>")
#logging.info("H: Before Document")
for document in cursor:
#logging.info("After1 Before Document")
new_checkpoint=document['timestamp']
#logging.info("After2 Before Document")
#document['timestamp'] is in GMT, so we can do a straight epoc conversion, and not be concerned with timezones
epoc_timestamp = (new_checkpoint - datetime(1970,1,1,0,0,0,0)).total_seconds()
#logging.debug("Calculated epoc_timestamp=%s, from ['timestamp']=%s", str(epoc_timestamp), checkpoint)

outputdata={}
outputdata['timestamp'] = six.text_type(document['timestamp'])
outputdata['apiname'] = document['apiname']
outputdata['ipaddress'] = document['ipaddress']
outputdata['id'] = six.text_type(document['_id'])
outputdata['requesttime'] = str(document['requesttime'])
outputdata['responsecode'] = str(document['responsecode'])
outputdata['method'] = document['method']
outputdata['path'] = document['path']

request=base64.b64decode(document['rawrequest'])
try:
request.decode('utf-8')
#print "string is UTF-8, length %d bytes" % len(request)
except UnicodeError:
request = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode"

m = re.search('(?s)^(.+)\r\n\r\n(.*)', request.decode('utf-8'))
if m:
#Strip any Authorization header
outputdata['requestheader']=re.sub('\nAuthorization: [^\n]+','',m.group(1))
outputdata['requestbody']=m.group(2)
else:
outputdata['request'] = request

response=base64.b64decode(document['rawresponse'])
try:
response.decode('utf-8')
#print "string is UTF-8, length %d bytes" % len(response)
except UnicodeError:
response = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode"

if response != "":
m = re.search('(?s)^(.+)\r\n\r\n(.*)', response.decode('utf-8'))
if m:
outputdata['responseheader']=m.group(1)
outputdata['responsebody']=m.group(2)
else:
outputdata['response'] = response.decode('utf-8')

sys.stdout.write("<event>")
sys.stdout.write("<time>")
sys.stdout.write(str(epoc_timestamp))
sys.stdout.write("</time>")
sys.stdout.write("<data>")
logging.info("Before Json dumps")
sys.stdout.write(xml.sax.saxutils.escape(json.dumps(outputdata)))
logging.info("After1 Json dumps")
sys.stdout.write("</data><done/></event>\n")

if new_checkpoint > checkpoint:
checkpoint_delta = (new_checkpoint - checkpoint).total_seconds()
checkpoint=new_checkpoint
if checkpoint_delta > 60:
save_checkpoint(config, checkpoint)
#End for block

save_checkpoint(config, checkpoint)
sys.stdout.write("</stream>")
sys.stdout.flush()

# Script must implement these args: scheme, validate-arguments
if __name__ == '__main__':
if len(sys.argv) > 1:
if sys.argv[1] == "--scheme":
do_scheme()
elif sys.argv[1] == "--validate-arguments":
validate_arguments()
else:
pass
else:
run()

sys.exit(0)

0 Karma

VijaySrrie
Builder

@inventsekar  - This is the python code we use to pull the loigs

0 Karma

inventsekar
SplunkTrust
SplunkTrust

Hi @VijaySrrie ... you have given only very least details. 

pls provide us the modular input script.. the config files.. 

is the modular input working fine previously and just recently it started the duplicates?

0 Karma
Got questions? Get answers!

Join the Splunk Community Slack to learn, troubleshoot, and make connections with fellow Splunk practitioners in real time!

Meet up IRL or virtually!

Join Splunk User Groups to connect and learn in-person by region or remotely by topic or industry.

Get Updates on the Splunk Community!

Announcing Modern Navigation: A New Era of Splunk User Experience

We are excited to introduce the Modern Navigation feature in the Splunk Platform, available to both cloud and ...

Modernize your Splunk Apps – Introducing Python 3.13 in Splunk

We are excited to announce that the upcoming releases of Splunk Enterprise 10.2.x and Splunk Cloud Platform ...

Step into “Hunt the Insider: An Splunk ES Premier Mystery” to catch a cybercriminal ...

After a whole week of being on call, you fell asleep on your keyboard, and you hit a sequence of buttons that ...