Splunk Dev

Getting errors while executing Splunk SDK script: "NoneType' object has no attribute 'startswith".

shahid285
Path Finder

I am trying to run a list of saved searches, via multi threading in python, wherein, I am getting the below error while executing the script.

'NoneType' object has no attribute 'startswith'

Basically,
I am running a function using multiprocessing code, where a single process will spawn a pool of threads (say '5' threads).
This thread will run a Splunk saved search query, based on a Splunk service connection been passed from the main class.

I am getting this error due to this,

 'NoneType' object has no attribute 'startswith'

When I create a new service connection, per thread- it is working, but number of connections are increasing due to this.
May I know an alternative approach to overcome this?

Thanks,
Shahid

Labels (2)
0 Karma

DavidHourani
Super Champion

Hi @shahid285, could you please paste your code here so we can test and troubleshoot ?

0 Karma

shahid285
Path Finder

Hi @DavidHourani , PFB the sample code below,
from functools import partial
from pathos import multiprocessing as multiprocessing
from splunklib.binding import AuthenticationError
from splunklib.binding import HTTPError as HttpError
from DataIngester import *
import csv
import logging
import os
import configparser
import splunklib.client as client
import splunklib.results as results
import threading
import datetime
class APICall():

    def __init__(self):
        self.code_dir_path = os.path.abspath(os.path.dirname(__file__))
        self.config = configparser.RawConfigParser()
        self.code_dir_path = self.code_dir_path.replace("src", "")
        self.config.read(self.code_dir_path + 'resources\\config.properties')
        logging.basicConfig(filename=self.code_dir_path + 'log\\Api-Call.log', filemode='w', level=logging.DEBUG,
                            format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
        self.host = self.config.get('splunk_search_section', 'host_name')
        self.port = self.config.get('splunk_search_section', 'host_port')
        self.username = self.config.get('splunk_search_section', 'user_name')
        self.password = self.config.get('splunk_search_section', 'password')
        print("Host Name : " + self.config.get('splunk_search_section', 'host_name'))
        print("Port      : " + self.config.get('splunk_search_section', 'host_port'))
        print("User Name : " + self.config.get('splunk_search_section', 'user_name'))
        print("port      : " + self.config.get('splunk_search_section', 'password'))
        self.logging = logging
        # print(DataIngester.callSample(self))
        # Create a Service instance and log in

        # self.tech_list = ["All", "Route/Switch", "Data Center Switch", "Data Center Storage"]
        self.kwargs_oneshot = {"count": 0}



    #This method is performs the actual running of the elastic search call post completion of saved search run.
    def run_saved_search_per_company(self, saved_search_list, each_cpy, index_value):
        self.logging.info(f"Starting the saved search processing for company {each_cpy}")
        each_ss = saved_search_list[index_value]
        cpy_ss = each_ss.replace("$cpyKey$", each_cpy)
        ss_name = each_ss.split(" ")[0]
        self.logging.info(
            f"Starting  the saved search processing for company {each_cpy} and saved search name {ss_name}")
        self.run_ss(cpy_ss, each_cpy, ss_name)

    def process(self):

        company_list = self.fetch_onboarded_companies_from_customer_csv()
        saved_search_list_file = os.path.join(self.code_dir_path, "resources\\saved_search_template.txt")
        try:
            service = client.connect(
                host=self.host,
                port=self.port,
                username=self.username,
                password=self.password)

            print("splunk connected  successfully: ")
        except TimeoutError as te:

            self.logging.error(
                f"Error connecting to Splunk search head, as the host is unreachable. Reason being, {te}")
            raise TimeoutError("Error connecting to Splunk search head, as the host is unreachable.")
        except HttpError as he:
            self.logging.error(f"Login failed while connecting to Splunk search head. Reason being, {he}")
            raise IOError("Login failed while connecting to Splunk search head, due to HTTP Error.")
        except AuthenticationError as ae:
            self.logging.error(
                f"Authentication error occurred while connecting to Splunk search head. Reason being, {ae}")
            raise AuthenticationError(
                "Authentication error occurred while connecting to Splunk search head. Login Failed.")
        except Exception as ex:
            import traceback
            print(traceback.format_exc())
        try:
            with open(saved_search_list_file, "r") as ss_file_pointer:
                saved_search_list = ss_file_pointer.readlines()
        except IOError as ie:
            self.logging.error(f"Error occurred while accessing the Saved Search file reason being , {ie}")
            raise IOError("IO Error occurred while accessing the Saved Search file.")
        finally:
            ss_file_pointer.close()
        if service is not None:
            self.splunk_service = service
        # Creating a process pool for each company key, to increase the throughput.
        # Assuming 4 processors to be max optimistically.
        p = multiprocessing.Pool(processes=4)
        # for each_cpy in company_list:
        list_length_company = len(company_list)
        array_of_numbers = [x for x in range(0, list_length_company)]
        ssl = saved_search_list
        cl = company_list
        from functools import partial
        func = partial(self.processing_saved_search_per_company, ssl, cl)
        p.map(func, array_of_numbers)
        p.close()
        p.join()

        self.logging.info(f"No. of. Companies processed are :->{list_length_company}")

    #This method is used to span threads for each saved_search that way we run the saved searches in parallel to achieve max throughput.
    def processing_saved_search_per_company(self, saved_search_list, company_list, each_cpy_index):
        company_key = company_list[each_cpy_index]
        print("Company Key : " + company_key)
        self.logging.info(f"processing the saved search for company {company_key}")
        each_cpy = company_list[each_cpy_index]
        array_of_numbers = [x for x in range(0, len(saved_search_list))]
        # Creating a Thread pool of 5 threads to optimistically increase the throughput of saved search processing.
        """from ThreadPool import ThreadPool
        thread_pool = ThreadPool(5)
        from functools import partial
        function1 = partial(self.run_saved_search_per_company, saved_search_list, each_cpy)
        thread_pool.map(function1, array_of_numbers)
        thread_pool.wait_completion() """
        self.run_saved_search_per_company(saved_search_list, each_cpy,0)

    def run_ss(self, ss_query, each_cpy, ss_name):
        # print each_cpy
        # print ss_query

        self.logging.info(f"The company key for which the query is being run is {each_cpy}")
        import datetime
        present_time = datetime.datetime.now()
        filename_ts = "%s%s%s%s%s%s" % (
            present_time.year, present_time.strftime("%m"), present_time.strftime("%d"), present_time.strftime("%H"),
            present_time.strftime("%M"), present_time.strftime("%S"))

        ss_final_query = '| savedsearch ' + ss_query
        print("Saved Search Query : "+ss_final_query)
        print("Service is "+str(self.splunk_service.jobs.oneshot))
        print("logging  is "+str(self.logging))
        print(str(self.kwargs_oneshot))
        #self.logging.debug(f"The Query being run for company key {each_cpy} is {ss_final_query}")
        print("Before")
        job_creator = self.splunk_service.jobs

        oneshotsearch_results = job_creator.oneshot(ss_final_query, **self.kwargs_oneshot)
        print("After")
        reader = results.ResultsReader(oneshotsearch_results)
        print(str(reader))
        out_data = []
        self.logging.info(f"Triggering a  thread for company {each_cpy} and saved search name {ss_name}")
        data_processing_thread = threading.Thread(target=self.prepare_push_data,
                                                  args=(reader, each_cpy, out_data, ss_name))
        data_processing_thread.start()
        data_processing_thread.join()
        # APICall.prepare_push_data(self,reader,each_cpy,out_data,file_name,ss_name)

    def prepare_push_data(self, reader, cpy_key, out_data, ss_name):
        import os;
        print("Company Key : " + cpy_key)
        print("Reader : " + str(reader))
        print("out_data : " + str(out_data))
        print("SS Name : "+ss_name)
        index_name = cpy_key + "-" + ss_name
        print(index_name)
        self.logging.debug(f"setting presets to the result of saved search with index as {index_name}")
        for item in reader:
            # data to be appended to each dict item
            preset_data = {"_index": index_name,
                           "_type": "_doc"}
            source_content = dict(_source=dict(item))
            preset_data.update(source_content)
            out_data.append(preset_data)

        self.logging.debug(f"setting presets completed to the result of saved search with index as {index_name}")
        # print("out_data : "+ str(out_data))
        final_data = dict(data=out_data)
        print(final_data['data'][0])
        file_path = os.path.join(self.code_dir_path, cpy_key)
        self.logging.debug(f"Pushing the result of saved search with index as {index_name} to Elastic Search")
        from DataIngester import DataIngester
        # making an Elastic Search Push Call
        DataIngester().pushDetails(final_data, cpy_key, ss_name)

    def fetch_onboarded_companies_from_customer_csv(self):
        company_list = []
        try:
            self.logging.debug("Fetching the Company list for processing")
            customer_csv_filepath = os.path.join(self.code_dir_path, "resources\\kpi_customer.csv")

            if os.path.exists(customer_csv_filepath):
                self.logging.debug("csv File Path Exists")
                company_entries = csv.DictReader(open(customer_csv_filepath))
                for cpy_entry in company_entries:
                    if cpy_entry['roles'].lower() == "admin":
                        cpy_key = str(cpy_entry['cpyKey'])
                        if cpy_key not in company_list:
                            # print cpy_key
                            company_list.append(cpy_key)
                self.logging.debug(f"The Companies are {str(company_list)}")
        except IOError as ie:
            self.logging.error(f"Error Occurred while accessing the KPI Customer file reason being, {ie} ")

        return company_list



#Main method to run the entire script.
if __name__ == '__main__':
    APICall().process()
0 Karma

DavidHourani
Super Champion

which line is the error on in your message ?

0 Karma

shahid285
Path Finder

@DavidHourani, I have further segregated the code in multiple classes, wherein , i am now able to get stacktrace from the splunklib.
PFB the stack trace,

multiprocess.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\mohamnaw\AppData\Roaming\Python\Python37\site-packages\multiprocess\pool.py", line 121, in worker
result = (True, func(args, **kwds))
File "C:\Users\mohamnaw\AppData\Roaming\Python\Python37\site-packages\multiprocess\pool.py", line 44, in mapstar
return list(map(*args))
File "C:/Users/mohamnaw/Documents/BCI-ACI/ElasticSearch/src/APICall.py", line 123, in processing_saved_search_per_company
massage_data.run_saved_search_per_company(saved_search_list, each_cpy,0)
File "C:\Users\mohamnaw\Documents\BCI-ACI\ElasticSearch\src\MassageData.py", line 28, in run_saved_search_per_company
self.run_ss(cpy_ss, each_cpy, ss_name)
File "C:\Users\mohamnaw\Documents\BCI-ACI\ElasticSearch\src\MassageData.py", line 49, in run_ss
oneshotsearch_results = job_creator(ss_final_query, path="",
*self.kwargs_oneshot)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\client.py", line 3047, in oneshot
**params).body
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\client.py", line 814, in post
return self.service.post(path, owner=owner, app=app, sharing=sharing, **query)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 289, in wrapper
return request_fun(self, *args, **kwargs)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 71, in new_f
val = f(*args, **kwargs)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 752, in post
response = self.http.post(path, all_headers, **query)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 1224, in post
return self.request(url, message)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 1241, in request
response = self.handler(url, message, **kwargs)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 1366, in request
scheme, host, port, path = _spliturl(url)
File "C:\Program Files\Splunk\splunk-sdk-python-1.6.6\splunklib\binding.py", line 1076, in _spliturl
if host.startswith('[') and host.endswith(']'): host = host[1:-1]
AttributeError: 'NoneType' object has no attribute 'startswith'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "C:/Users/mohamnaw/Documents/BCI-ACI/ElasticSearch/src/APICall.py", line 216, in
APICall().process()
File "C:/Users/mohamnaw/Documents/BCI-ACI/ElasticSearch/src/APICall.py", line 100, in process
p.map(func, array_of_numbers)
File "C:\Users\mohamnaw\AppData\Roaming\Python\Python37\site-packages\multiprocess\pool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:\Users\mohamnaw\AppData\Roaming\Python\Python37\site-packages\multiprocess\pool.py", line 657, in get
raise self._value
AttributeError: 'NoneType' object has no attribute 'startswith'

Thanks
Shahid

0 Karma

DavidHourani
Super Champion
_spliturl
if host.startswith('[') and host.endswith(']'): host = host[1:-1] 

AttributeError: 'NoneType' object has no attribute 'startswith' means your host field is empty for some reason...

0 Karma

shahid285
Path Finder

@DavidHourani : When i run as a single process it seems to work fine. Only when i am passing down service as a parameter to the ThreadPool i am seeing this issue.

May i know a solution to this issue, where i can set the host once more before calling the oneshot function?

Also, let me know the format of the host, which is usually gets passed.

Thanks
Shahid

0 Karma

DavidHourani
Super Champion

try checking what's in self.host = self.config.get('splunk_search_section', 'host_name') for both scripts

0 Karma

shahid285
Path Finder

@DavidHourani : When i am trying to print the value out of the service object, post creation of connection, I see the host and port values are available intact, even down to the method in in the thread, where the service being passed as a parameter to the Threadpool.

From this i can clearly assume that host and port values are intact and available in service,

Thanks
Shahid

0 Karma

shahid285
Path Finder

@DavidHourani : waiting for further inputs from your side.

Please do run the shared script to understand the issue better, as the service objet has the host and port details, as explained in my previous comment.

Thanks
Shahid

0 Karma

shahid285
Path Finder

Hi @DavidHourani , any luck with the issue?
I see that when you run as a single thread we don't have an issue. but when i am creating a thread pool the Service object is not getting pickled/serialized properly , when passed as a parameter to a function.

This was my latest observation.

Is there a way to make this service object static, so we can reuse?
A sample code, would be helpful here.

Thanks
Shahid

0 Karma

DavidHourani
Super Champion

it should be possible, it's described here, but not sure how to do it 😞
https://docs.splunk.com/DocumentationStatic/PythonSDK/1.1/client.html

0 Karma
Get Updates on the Splunk Community!

What's new in Splunk Cloud Platform 9.1.2312?

Hi Splunky people! We are excited to share the newest updates in Splunk Cloud Platform 9.1.2312! Analysts can ...

What’s New in Splunk Security Essentials 3.8.0?

Splunk Security Essentials (SSE) is an app that can amplify the power of your existing Splunk Cloud Platform, ...

Let’s Get You Certified – Vegas-Style at .conf24

Are you ready to level up your Splunk game? Then, let’s get you certified live at .conf24 – our annual user ...