Hi @DavidHourani , PFB the sample code below,
from functools import partial
from pathos import multiprocessing as multiprocessing
from splunklib.binding import AuthenticationError
from splunklib.binding import HTTPError as HttpError
from DataIngester import *
import csv
import logging
import os
import configparser
import splunklib.client as client
import splunklib.results as results
import threading
import datetime
class APICall():
def __init__(self):
self.code_dir_path = os.path.abspath(os.path.dirname(__file__))
self.config = configparser.RawConfigParser()
self.code_dir_path = self.code_dir_path.replace("src", "")
self.config.read(self.code_dir_path + 'resources\\config.properties')
logging.basicConfig(filename=self.code_dir_path + 'log\\Api-Call.log', filemode='w', level=logging.DEBUG,
format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
self.host = self.config.get('splunk_search_section', 'host_name')
self.port = self.config.get('splunk_search_section', 'host_port')
self.username = self.config.get('splunk_search_section', 'user_name')
self.password = self.config.get('splunk_search_section', 'password')
print("Host Name : " + self.config.get('splunk_search_section', 'host_name'))
print("Port : " + self.config.get('splunk_search_section', 'host_port'))
print("User Name : " + self.config.get('splunk_search_section', 'user_name'))
print("port : " + self.config.get('splunk_search_section', 'password'))
self.logging = logging
# print(DataIngester.callSample(self))
# Create a Service instance and log in
# self.tech_list = ["All", "Route/Switch", "Data Center Switch", "Data Center Storage"]
self.kwargs_oneshot = {"count": 0}
#This method is performs the actual running of the elastic search call post completion of saved search run.
def run_saved_search_per_company(self, saved_search_list, each_cpy, index_value):
self.logging.info(f"Starting the saved search processing for company {each_cpy}")
each_ss = saved_search_list[index_value]
cpy_ss = each_ss.replace("$cpyKey$", each_cpy)
ss_name = each_ss.split(" ")[0]
self.logging.info(
f"Starting the saved search processing for company {each_cpy} and saved search name {ss_name}")
self.run_ss(cpy_ss, each_cpy, ss_name)
def process(self):
company_list = self.fetch_onboarded_companies_from_customer_csv()
saved_search_list_file = os.path.join(self.code_dir_path, "resources\\saved_search_template.txt")
try:
service = client.connect(
host=self.host,
port=self.port,
username=self.username,
password=self.password)
print("splunk connected successfully: ")
except TimeoutError as te:
self.logging.error(
f"Error connecting to Splunk search head, as the host is unreachable. Reason being, {te}")
raise TimeoutError("Error connecting to Splunk search head, as the host is unreachable.")
except HttpError as he:
self.logging.error(f"Login failed while connecting to Splunk search head. Reason being, {he}")
raise IOError("Login failed while connecting to Splunk search head, due to HTTP Error.")
except AuthenticationError as ae:
self.logging.error(
f"Authentication error occurred while connecting to Splunk search head. Reason being, {ae}")
raise AuthenticationError(
"Authentication error occurred while connecting to Splunk search head. Login Failed.")
except Exception as ex:
import traceback
print(traceback.format_exc())
try:
with open(saved_search_list_file, "r") as ss_file_pointer:
saved_search_list = ss_file_pointer.readlines()
except IOError as ie:
self.logging.error(f"Error occurred while accessing the Saved Search file reason being , {ie}")
raise IOError("IO Error occurred while accessing the Saved Search file.")
finally:
ss_file_pointer.close()
if service is not None:
self.splunk_service = service
# Creating a process pool for each company key, to increase the throughput.
# Assuming 4 processors to be max optimistically.
p = multiprocessing.Pool(processes=4)
# for each_cpy in company_list:
list_length_company = len(company_list)
array_of_numbers = [x for x in range(0, list_length_company)]
ssl = saved_search_list
cl = company_list
from functools import partial
func = partial(self.processing_saved_search_per_company, ssl, cl)
p.map(func, array_of_numbers)
p.close()
p.join()
self.logging.info(f"No. of. Companies processed are :->{list_length_company}")
#This method is used to span threads for each saved_search that way we run the saved searches in parallel to achieve max throughput.
def processing_saved_search_per_company(self, saved_search_list, company_list, each_cpy_index):
company_key = company_list[each_cpy_index]
print("Company Key : " + company_key)
self.logging.info(f"processing the saved search for company {company_key}")
each_cpy = company_list[each_cpy_index]
array_of_numbers = [x for x in range(0, len(saved_search_list))]
# Creating a Thread pool of 5 threads to optimistically increase the throughput of saved search processing.
"""from ThreadPool import ThreadPool
thread_pool = ThreadPool(5)
from functools import partial
function1 = partial(self.run_saved_search_per_company, saved_search_list, each_cpy)
thread_pool.map(function1, array_of_numbers)
thread_pool.wait_completion() """
self.run_saved_search_per_company(saved_search_list, each_cpy,0)
def run_ss(self, ss_query, each_cpy, ss_name):
# print each_cpy
# print ss_query
self.logging.info(f"The company key for which the query is being run is {each_cpy}")
import datetime
present_time = datetime.datetime.now()
filename_ts = "%s%s%s%s%s%s" % (
present_time.year, present_time.strftime("%m"), present_time.strftime("%d"), present_time.strftime("%H"),
present_time.strftime("%M"), present_time.strftime("%S"))
ss_final_query = '| savedsearch ' + ss_query
print("Saved Search Query : "+ss_final_query)
print("Service is "+str(self.splunk_service.jobs.oneshot))
print("logging is "+str(self.logging))
print(str(self.kwargs_oneshot))
#self.logging.debug(f"The Query being run for company key {each_cpy} is {ss_final_query}")
print("Before")
job_creator = self.splunk_service.jobs
oneshotsearch_results = job_creator.oneshot(ss_final_query, **self.kwargs_oneshot)
print("After")
reader = results.ResultsReader(oneshotsearch_results)
print(str(reader))
out_data = []
self.logging.info(f"Triggering a thread for company {each_cpy} and saved search name {ss_name}")
data_processing_thread = threading.Thread(target=self.prepare_push_data,
args=(reader, each_cpy, out_data, ss_name))
data_processing_thread.start()
data_processing_thread.join()
# APICall.prepare_push_data(self,reader,each_cpy,out_data,file_name,ss_name)
def prepare_push_data(self, reader, cpy_key, out_data, ss_name):
import os;
print("Company Key : " + cpy_key)
print("Reader : " + str(reader))
print("out_data : " + str(out_data))
print("SS Name : "+ss_name)
index_name = cpy_key + "-" + ss_name
print(index_name)
self.logging.debug(f"setting presets to the result of saved search with index as {index_name}")
for item in reader:
# data to be appended to each dict item
preset_data = {"_index": index_name,
"_type": "_doc"}
source_content = dict(_source=dict(item))
preset_data.update(source_content)
out_data.append(preset_data)
self.logging.debug(f"setting presets completed to the result of saved search with index as {index_name}")
# print("out_data : "+ str(out_data))
final_data = dict(data=out_data)
print(final_data['data'][0])
file_path = os.path.join(self.code_dir_path, cpy_key)
self.logging.debug(f"Pushing the result of saved search with index as {index_name} to Elastic Search")
from DataIngester import DataIngester
# making an Elastic Search Push Call
DataIngester().pushDetails(final_data, cpy_key, ss_name)
def fetch_onboarded_companies_from_customer_csv(self):
company_list = []
try:
self.logging.debug("Fetching the Company list for processing")
customer_csv_filepath = os.path.join(self.code_dir_path, "resources\\kpi_customer.csv")
if os.path.exists(customer_csv_filepath):
self.logging.debug("csv File Path Exists")
company_entries = csv.DictReader(open(customer_csv_filepath))
for cpy_entry in company_entries:
if cpy_entry['roles'].lower() == "admin":
cpy_key = str(cpy_entry['cpyKey'])
if cpy_key not in company_list:
# print cpy_key
company_list.append(cpy_key)
self.logging.debug(f"The Companies are {str(company_list)}")
except IOError as ie:
self.logging.error(f"Error Occurred while accessing the KPI Customer file reason being, {ie} ")
return company_list
#Main method to run the entire script.
if __name__ == '__main__':
APICall().process()
... View more