#!/usr/bin/env python # coding=utf-8 from __future__ import print_function import sys, os import xml.dom.minidom, xml.sax.saxutils from pymongo import MongoClient from datetime import datetime import base64 import json import re import logging from io import open import six logging.root logging.root.setLevel(logging.DEBUG) formatter = logging.Formatter('%(levelname)s %(message)s') handler = logging.StreamHandler() handler.setFormatter(formatter) logging.root.addHandler(handler) SCHEME = """<scheme> <title>API Gateway Analytics</title> <description>Ingest data from API Gateway mongodb tyk_analytics database</description> <streaming_mode>xml</streaming_mode> <endpoint> <args> <arg name="mongodb_uri"> <title>MongoDB URI</title> <description>mongodb://USER:PASS@SERVER1:27017,SERVER2:27017/tyk_analytics?replicaSet=mongo-replica</description> </arg> </args> </endpoint> </scheme> """ def do_scheme(): print(SCHEME) # Empty validation routine. This routine is optional. def validate_arguments(): pass def get_config(): config = {} try: # read everything from stdin config_str = sys.stdin.read() # parse the config XML doc = xml.dom.minidom.parseString(config_str) root = doc.documentElement conf_node = root.getElementsByTagName("configuration")[0] if conf_node: stanza = conf_node.getElementsByTagName("stanza")[0] if stanza: stanza_name = stanza.getAttribute("name") if stanza_name: logging.debug("XML: found stanza " + stanza_name) config["name"] = stanza_name params = stanza.getElementsByTagName("param") for param in params: param_name = param.getAttribute("name") logging.debug("XML: found param '%s'" % param_name) if param_name and param.firstChild and param.firstChild.nodeType == param.firstChild.TEXT_NODE: data = param.firstChild.data config[param_name] = data logging.debug("XML: '%s' -> '%s'" % (param_name, data)) checkpnt_node = root.getElementsByTagName("checkpoint_dir")[0] if checkpnt_node and checkpnt_node.firstChild and checkpnt_node.firstChild.nodeType == checkpnt_node.firstChild.TEXT_NODE: config["checkpoint_dir"] = checkpnt_node.firstChild.data if not config: raise Exception("Invalid configuration received from Splunk.") except Exception as e: raise Exception("Error getting Splunk configuration via STDIN: %s" % str(e)) return config def save_checkpoint(config, checkpoint): checkpoint_file = "checkpoint-" + config["name"] checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file) checkpoint_file_new = checkpoint_file + ".new" chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file) chk_file_new = os.path.join(config["checkpoint_dir"], checkpoint_file_new) checkpoint_rfc3339=checkpoint.strftime('%Y-%m-%d %H:%M:%S.%f') logging.debug("Saving checkpoint=%s (checkpoint_rfc3339=%s) to file=%s", checkpoint, checkpoint_rfc3339, chk_file) f = open(chk_file_new, "w") f.write("%s" % checkpoint_rfc3339) f.close() os.rename(chk_file_new, chk_file) def load_checkpoint(config): checkpoint_file = "checkpoint-" + config["name"] checkpoint_file = re.sub('[\\/\s]', '', checkpoint_file) chk_file = os.path.join(config["checkpoint_dir"], checkpoint_file) #chk_file = os.path.join(config["checkpoint_dir"], "checkpoint") # try to open this file try: f = open(chk_file, "r") checkpoint_rfc3339=f.readline().split("\n")[0] logging.info("Read checkpoint_rfc3339=%s from file=%s", checkpoint_rfc3339, chk_file) checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f') f.close() except: # assume that this means the checkpoint is not there (Use 2000/1/1) checkpoint_rfc3339='2000-01-01 00:00:00.000000' checkpoint = datetime.strptime(checkpoint_rfc3339, '%Y-%m-%d %H:%M:%S.%f') logging.error("Failed to read checkpoint from file=%s using checkpoint_rfc3339=%s", chk_file, checkpoint_rfc3339) logging.debug("Checkpoint value is: checkpoint=%s", checkpoint) return checkpoint # Routine to index data def run(): config = get_config() mongodb_uri = config["mongodb_uri"] checkpoint = load_checkpoint(config) client = MongoClient(mongodb_uri) database = client["tyk_analytics"] collection = database["tyk_analytics"] cursor = collection.find({'timestamp': {'$gt': checkpoint} }) sys.stdout.write("<stream>") #logging.info("H: Before Document") for document in cursor: #logging.info("After1 Before Document") new_checkpoint=document['timestamp'] #logging.info("After2 Before Document") #document['timestamp'] is in GMT, so we can do a straight epoc conversion, and not be concerned with timezones epoc_timestamp = (new_checkpoint - datetime(1970,1,1,0,0,0,0)).total_seconds() #logging.debug("Calculated epoc_timestamp=%s, from ['timestamp']=%s", str(epoc_timestamp), checkpoint) outputdata={} outputdata['timestamp'] = six.text_type(document['timestamp']) outputdata['apiname'] = document['apiname'] outputdata['ipaddress'] = document['ipaddress'] outputdata['id'] = six.text_type(document['_id']) outputdata['requesttime'] = str(document['requesttime']) outputdata['responsecode'] = str(document['responsecode']) outputdata['method'] = document['method'] outputdata['path'] = document['path'] request=base64.b64decode(document['rawrequest']) try: request.decode('utf-8') #print "string is UTF-8, length %d bytes" % len(request) except UnicodeError: request = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode" m = re.search('(?s)^(.+)\r\n\r\n(.*)', request.decode('utf-8')) if m: #Strip any Authorization header outputdata['requestheader']=re.sub('\nAuthorization: [^\n]+','',m.group(1)) outputdata['requestbody']=m.group(2) else: outputdata['request'] = request response=base64.b64decode(document['rawresponse']) try: response.decode('utf-8') #print "string is UTF-8, length %d bytes" % len(response) except UnicodeError: response = "(SPLUNK SCRIPTED INPUT ERROR) input is not UTF-8 unicode" if response != "": m = re.search('(?s)^(.+)\r\n\r\n(.*)', response.decode('utf-8')) if m: outputdata['responseheader']=m.group(1) outputdata['responsebody']=m.group(2) else: outputdata['response'] = response.decode('utf-8') sys.stdout.write("<event>") sys.stdout.write("<time>") sys.stdout.write(str(epoc_timestamp)) sys.stdout.write("</time>") sys.stdout.write("<data>") logging.info("Before Json dumps") sys.stdout.write(xml.sax.saxutils.escape(json.dumps(outputdata))) logging.info("After1 Json dumps") sys.stdout.write("</data><done/></event>\n") if new_checkpoint > checkpoint: checkpoint_delta = (new_checkpoint - checkpoint).total_seconds() checkpoint=new_checkpoint if checkpoint_delta > 60: save_checkpoint(config, checkpoint) #End for block save_checkpoint(config, checkpoint) sys.stdout.write("</stream>") sys.stdout.flush() # Script must implement these args: scheme, validate-arguments if __name__ == '__main__': if len(sys.argv) > 1: if sys.argv[1] == "--scheme": do_scheme() elif sys.argv[1] == "--validate-arguments": validate_arguments() else: pass else: run() sys.exit(0)
... View more