Hi Team,
We are using modular input to ingest the logs into splunk, we have checkpoint file, but we see duplicate logs are ingested into splunk. How to eliminate duplicates?
application from which the logs are ingested - Tyk analytics
#!/usr/bin/env python
# coding=utf-8
from __future__ import print_function
import sys, os
import xml.dom.minidom, xml.sax.saxutils
from pymongo import MongoClient
from datetime import datetime
import base64
import json
import re
import logging
from io import open
import six
logging.root
logging.root.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.root.addHandler(handler)
SCHEME = """<scheme>
<title>API Gateway Analytics</title>
<description>Ingest data from API Gateway mongodb tyk_analytics database</description>
<streaming_mode>xml</streaming_mode>
<endpoint>
<args>
<arg name="mongodb_uri">
<title>MongoDB URI</title>
<description>mongodb://USER:PASS@SERVER1:27017,SERVER2:27017/tyk_analytics?replicaSet=mongo-replica</description>
</arg>
</args>
</endpoint>
</scheme>
"""
def do_scheme():
print(SCHEME)
# Empty validation routine. This routine is optional.
def validate_arguments():
pass
def get_config():
config = {}
try:
# read everything from stdin
config_str = sys.stdin.read()
# parse the config XML
doc = xml.dom.minidom.parseString(config_str)
root = doc.documentElement
conf_node = root.getElementsByTagName("configuration")[0]
if conf_node:
stanza = conf_node.getElementsByTagName("stanza")[0]
if stanza:
stanza_name = stanza.getAttribute("name")
if stanza_name:
logging.debug("XML: found stanza " + stanza_name)
config["name"] = stanza_name
params = stanza.getElementsByTagName("param")
for param in params:
param_name = param.getAttribute("name")
logging.debug("XML: found param '%s'" % param_name)
if param_name and param.firstChild and param.firstChild.nodeType == param.firstChild.TEXT_NODE:
data = param.firstChild.data
config[param_name] = data
logging.debug("XML: '%s' -> '%s'" % (param_name, data))
checkpnt_node = root.getElementsByTagName("checkpoint_dir")[0]
if checkpnt_node and checkpnt_node.firstChild and checkpnt_node.firstChild.nodeType == checkpnt_node.firstChild.TEXT_NODE:
config["checkpoint_dir"] = checkpnt_node.firstChild.data
if not config:
raise Exception("Invalid configuration received from Splunk.")
except Exception as e:
raise Exception("Error getting Splunk configuration via STDIN: %s" % str(e))
return config
def save_checkpoint(config, checkpoint):
checkpoint_file = "checkpoint-" + config["name"]
checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file)
checkpoint_file_new = checkpoint_file + ".new"
chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file)
chk_file_new = os.path.join(config["checkpoint_dir"], checkpoint_file_new)
checkpoint_rfc3339=checkpoint.strftime('%Y-%m-%d %H:%M:%S.%f')
logging.debug("Saving checkpoint=%s (checkpoint_rfc3339=%s) to file=%s", checkpoint, checkpoint_rfc3339, chk_file)
f = open(chk_file_new, "w")
f.write("%s" % checkpoint_rfc3339)
f.close()
os.rename(chk_file_new, chk_file)
def load_checkpoint(config):
checkpoint_file = "checkpoint-" + config["name"]
checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file)
chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file)
#chk_file = os.path.join(config["checkpoint_dir"], "checkpoint")
# try to open this file
try:
f = open(chk_file, "r")
checkpoint_rfc3339=f.readline().split("\n")[0]
logging.info("Read checkpoint_rfc3339=%s from file=%s", checkpoint_rfc3339, chk_file)
checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f')
f.close()
except:
# assume that this means the checkpoint is not there (Use 2000/1/1)
checkpoint_rfc3339='2000-01-01 00:00:00.000000'
checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f')
logging.error("Failed to read checkpoint from file=%s using checkpoint_rfc3339=%s", chk_file, checkpoint_rfc3339)
logging.debug("Checkpoint value is: checkpoint=%s", checkpoint)
return checkpoint
# Routine to index data
def run():
config = get_config()
mongodb_uri = config["mongodb_uri"]
checkpoint = load_checkpoint(config)
client = MongoClient(mongodb_uri)
database = client["tyk_analytics"]
collection = database["tyk_analytics"]
cursor = collection.find({'timestamp': {'$gt': checkpoint} })
sys.stdout.write("<stream>")
#logging.info("H: Before Document")
for document in cursor:
#logging.info("After1 Before Document")
new_checkpoint=document['timestamp']
#logging.info("After2 Before Document")
#document['timestamp'] is in GMT, so we can do a straight epoc conversion, and not be concerned with timezones
epoc_timestamp = (new_checkpoint - datetime(1970,1,1,0,0,0,0)).total_seconds()
#logging.debug("Calculated epoc_timestamp=%s, from ['timestamp']=%s", str(epoc_timestamp), checkpoint)
outputdata={}
outputdata['timestamp'] = six.text_type(document['timestamp'])
outputdata['apiname'] = document['apiname']
outputdata['ipaddress'] = document['ipaddress']
outputdata['id'] = six.text_type(document['_id'])
outputdata['requesttime'] = str(document['requesttime'])
outputdata['responsecode'] = str(document['responsecode'])
outputdata['method'] = document['method']
outputdata['path'] = document['path']
request=base64.b64decode(document['rawrequest'])
try:
request.decode('utf-8')
#print "string is UTF-8, length %d bytes" % len(request)
except UnicodeError:
request = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode"
m = re.search('(?s)^(.+)\r\n\r\n(.*)', request.decode('utf-8'))
if m:
#Strip any Authorization header
outputdata['requestheader']=re.sub('\nAuthorization: [^\n]+','',m.group(1))
outputdata['requestbody']=m.group(2)
else:
outputdata['request'] = request
response=base64.b64decode(document['rawresponse'])
try:
response.decode('utf-8')
#print "string is UTF-8, length %d bytes" % len(response)
except UnicodeError:
response = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode"
if response != "":
m = re.search('(?s)^(.+)\r\n\r\n(.*)', response.decode('utf-8'))
if m:
outputdata['responseheader']=m.group(1)
outputdata['responsebody']=m.group(2)
else:
outputdata['response'] = response.decode('utf-8')
sys.stdout.write("<event>")
sys.stdout.write("<time>")
sys.stdout.write(str(epoc_timestamp))
sys.stdout.write("</time>")
sys.stdout.write("<data>")
logging.info("Before Json dumps")
sys.stdout.write(xml.sax.saxutils.escape(json.dumps(outputdata)))
logging.info("After1 Json dumps")
sys.stdout.write("</data><done/></event>\n")
if new_checkpoint > checkpoint:
checkpoint_delta = (new_checkpoint - checkpoint).total_seconds()
checkpoint=new_checkpoint
if checkpoint_delta > 60:
save_checkpoint(config, checkpoint)
#End for block
save_checkpoint(config, checkpoint)
sys.stdout.write("</stream>")
sys.stdout.flush()
# Script must implement these args: scheme, validate-arguments
if __name__ == '__main__':
if len(sys.argv) > 1:
if sys.argv[1] == "--scheme":
do_scheme()
elif sys.argv[1] == "--validate-arguments":
validate_arguments()
else:
pass
else:
run()
sys.exit(0)
@inventsekar - This is the python code we use to pull the loigs
 
		
		
		
		
		
	
			
		
		
			
					
		Hi @VijaySrrie ... you have given only very least details.
pls provide us the modular input script.. the config files..
is the modular input working fine previously and just recently it started the duplicates?
