Deployment Architecture

Duplicate logs via modular input

VijaySrrie
Builder

Hi Team,

We are using modular input to ingest the logs into splunk, we have checkpoint file, but we see duplicate logs are ingested into splunk. How to eliminate duplicates?
application from which the logs are ingested - Tyk analytics

Labels (1)
Tags (1)
0 Karma

VijaySrrie
Builder

#!/usr/bin/env python
# coding=utf-8

from __future__ import print_function
import sys, os
import xml.dom.minidom, xml.sax.saxutils
from pymongo import MongoClient
from datetime import datetime
import base64
import json
import re

import logging
from io import open
import six
logging.root
logging.root.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.root.addHandler(handler)

SCHEME = """<scheme>
<title>API Gateway Analytics</title>
<description>Ingest data from API Gateway mongodb tyk_analytics database</description>
<streaming_mode>xml</streaming_mode>
<endpoint>
<args>
<arg name="mongodb_uri">
<title>MongoDB URI</title>
<description>mongodb://USER:PASS@SERVER1:27017,SERVER2:27017/tyk_analytics?replicaSet=mongo-replica</description>
</arg>
</args>
</endpoint>
</scheme>
"""

def do_scheme():
print(SCHEME)

# Empty validation routine. This routine is optional.
def validate_arguments():
pass

def get_config():
config = {}

try:
# read everything from stdin
config_str = sys.stdin.read()

# parse the config XML
doc = xml.dom.minidom.parseString(config_str)
root = doc.documentElement
conf_node = root.getElementsByTagName("configuration")[0]
if conf_node:
stanza = conf_node.getElementsByTagName("stanza")[0]
if stanza:
stanza_name = stanza.getAttribute("name")
if stanza_name:
logging.debug("XML: found stanza " + stanza_name)
config["name"] = stanza_name
params = stanza.getElementsByTagName("param")
for param in params:
param_name = param.getAttribute("name")
logging.debug("XML: found param '%s'" % param_name)
if param_name and param.firstChild and param.firstChild.nodeType == param.firstChild.TEXT_NODE:
data = param.firstChild.data
config[param_name] = data
logging.debug("XML: '%s' -> '%s'" % (param_name, data))

checkpnt_node = root.getElementsByTagName("checkpoint_dir")[0]
if checkpnt_node and checkpnt_node.firstChild and checkpnt_node.firstChild.nodeType == checkpnt_node.firstChild.TEXT_NODE:
config["checkpoint_dir"] = checkpnt_node.firstChild.data

if not config:
raise Exception("Invalid configuration received from Splunk.")

except Exception as e:
raise Exception("Error getting Splunk configuration via STDIN: %s" % str(e))

return config

def save_checkpoint(config, checkpoint):
checkpoint_file = "checkpoint-" + config["name"]
checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file)
checkpoint_file_new = checkpoint_file + ".new"
chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file)
chk_file_new = os.path.join(config["checkpoint_dir"], checkpoint_file_new)

checkpoint_rfc3339=checkpoint.strftime('%Y-%m-%d %H:%M:%S.%f')
logging.debug("Saving checkpoint=%s (checkpoint_rfc3339=%s) to file=%s", checkpoint, checkpoint_rfc3339, chk_file)

f = open(chk_file_new, "w")
f.write("%s" % checkpoint_rfc3339)
f.close()
os.rename(chk_file_new, chk_file)

def load_checkpoint(config):
checkpoint_file = "checkpoint-" + config["name"]
checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file)
chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file)
#chk_file = os.path.join(config["checkpoint_dir"], "checkpoint")
# try to open this file
try:
f = open(chk_file, "r")
checkpoint_rfc3339=f.readline().split("\n")[0]
logging.info("Read checkpoint_rfc3339=%s from file=%s", checkpoint_rfc3339, chk_file)
checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f')
f.close()
except:
# assume that this means the checkpoint is not there (Use 2000/1/1)
checkpoint_rfc3339='2000-01-01 00:00:00.000000'
checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f')
logging.error("Failed to read checkpoint from file=%s using checkpoint_rfc3339=%s", chk_file, checkpoint_rfc3339)

logging.debug("Checkpoint value is: checkpoint=%s", checkpoint)

return checkpoint

# Routine to index data
def run():
config = get_config()
mongodb_uri = config["mongodb_uri"]

checkpoint = load_checkpoint(config)

client = MongoClient(mongodb_uri)
database = client["tyk_analytics"]
collection = database["tyk_analytics"]

cursor = collection.find({'timestamp': {'$gt': checkpoint} })

sys.stdout.write("<stream>")
#logging.info("H: Before Document")
for document in cursor:
#logging.info("After1 Before Document")
new_checkpoint=document['timestamp']
#logging.info("After2 Before Document")
#document['timestamp'] is in GMT, so we can do a straight epoc conversion, and not be concerned with timezones
epoc_timestamp = (new_checkpoint - datetime(1970,1,1,0,0,0,0)).total_seconds()
#logging.debug("Calculated epoc_timestamp=%s, from ['timestamp']=%s", str(epoc_timestamp), checkpoint)

outputdata={}
outputdata['timestamp'] = six.text_type(document['timestamp'])
outputdata['apiname'] = document['apiname']
outputdata['ipaddress'] = document['ipaddress']
outputdata['id'] = six.text_type(document['_id'])
outputdata['requesttime'] = str(document['requesttime'])
outputdata['responsecode'] = str(document['responsecode'])
outputdata['method'] = document['method']
outputdata['path'] = document['path']

request=base64.b64decode(document['rawrequest'])
try:
request.decode('utf-8')
#print "string is UTF-8, length %d bytes" % len(request)
except UnicodeError:
request = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode"

m = re.search('(?s)^(.+)\r\n\r\n(.*)', request.decode('utf-8'))
if m:
#Strip any Authorization header
outputdata['requestheader']=re.sub('\nAuthorization: [^\n]+','',m.group(1))
outputdata['requestbody']=m.group(2)
else:
outputdata['request'] = request

response=base64.b64decode(document['rawresponse'])
try:
response.decode('utf-8')
#print "string is UTF-8, length %d bytes" % len(response)
except UnicodeError:
response = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode"

if response != "":
m = re.search('(?s)^(.+)\r\n\r\n(.*)', response.decode('utf-8'))
if m:
outputdata['responseheader']=m.group(1)
outputdata['responsebody']=m.group(2)
else:
outputdata['response'] = response.decode('utf-8')

sys.stdout.write("<event>")
sys.stdout.write("<time>")
sys.stdout.write(str(epoc_timestamp))
sys.stdout.write("</time>")
sys.stdout.write("<data>")
logging.info("Before Json dumps")
sys.stdout.write(xml.sax.saxutils.escape(json.dumps(outputdata)))
logging.info("After1 Json dumps")
sys.stdout.write("</data><done/></event>\n")

if new_checkpoint > checkpoint:
checkpoint_delta = (new_checkpoint - checkpoint).total_seconds()
checkpoint=new_checkpoint
if checkpoint_delta > 60:
save_checkpoint(config, checkpoint)
#End for block

save_checkpoint(config, checkpoint)
sys.stdout.write("</stream>")
sys.stdout.flush()

# Script must implement these args: scheme, validate-arguments
if __name__ == '__main__':
if len(sys.argv) > 1:
if sys.argv[1] == "--scheme":
do_scheme()
elif sys.argv[1] == "--validate-arguments":
validate_arguments()
else:
pass
else:
run()

sys.exit(0)

0 Karma

VijaySrrie
Builder

@inventsekar  - This is the python code we use to pull the loigs

0 Karma

inventsekar
SplunkTrust
SplunkTrust

Hi @VijaySrrie ... you have given only very least details. 

pls provide us the modular input script.. the config files.. 

is the modular input working fine previously and just recently it started the duplicates?

thanks and best regards,
Sekar

PS - If this or any post helped you in any way, pls consider upvoting, thanks for reading !
0 Karma
Get Updates on the Splunk Community!

Observability Unlocked: Kubernetes Monitoring with Splunk Observability Cloud

 Ready to master Kubernetes and cloud monitoring like the pros? Join Splunk’s Growth Engineering team for an ...

Update Your SOAR Apps for Python 3.13: What Community Developers Need to Know

To Community SOAR App Developers - we're reaching out with an important update regarding Python 3.9's ...

October Community Champions: A Shoutout to Our Contributors!

As October comes to a close, we want to take a moment to celebrate the people who make the Splunk Community ...