Hello @livehybrid - It's hard to tell as initially there were proxy issues b/w my org's N/W and splunk cloud but I guess we fixed that and hence was able to access /services/apps/local. For other endpoints like /services/search/jobs and /services/server/info - I see traces on splunk cloud in it's internal access logs as if the requests are reaching to Splunk server but not sure if Splunk server is not returning the response on time OR on Splunk side , the response stream is stuck b/w its web server and any other layer before it otherwise why would access log has 200 response for API call and I get connection timeout ? About the python code to invoke Splunk , here it is - import time
import os
import logging
import splunklib.client as client
import splunklib.results as results
from splunklib.binding import HTTPError
from dotenv import load_dotenv
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
splunklib_logger = logging.getLogger("splunklib")
splunklib_logger.setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
class SplunkSearchClient:
def __init__(self, host, port, username, password, retries=3, retry_delay=2):
"""
Initializes the Splunk client.
:param host: Splunk Cloud host
:param port: Splunk management port (default 8089)
:param username: Splunk username
:param password: Splunk password
:param retries: Number of retries for API failures
:param retry_delay: Delay between retries
"""
self.host = host
self.port = port
self.username = username
self.password = password
self.retries = retries
self.retry_delay = retry_delay
self.service = self._connect_to_splunk()
@staticmethod
def _convert_to_iso8601(time_str):
"""
Converts a time string from 'yyyy-MM-dd HH:MM:SS' format to ISO8601 ('yyyy-MM-ddTHH:MM:SS').
:param time_str: Time string in 'yyyy-MM-dd HH:MM:SS' format.
:return: Time string in ISO8601 format.
"""
dt = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
return dt.isoformat()
def _connect_to_splunk(self):
"""
Establishes a connection to Splunk without retry logic.
"""
try:
service = client.connect(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
scheme="https",
basic=True
)
return service
except HTTPError as e:
logger.error(f"❌ Connection failed: {e}")
raise
def trigger_search(self, query, start_time, end_time):
"""
Submits a search job to Splunk.
:param query: SPL search query.
:param start_time: Start time in 'yyyy-MM-dd HH:MM:SS' format.
:param end_time: End time in 'yyyy-MM-dd HH:MM:SS' format.
:return: Splunk job object.
"""
# Convert to ISO8601 format for safety
iso_start = self._convert_to_iso8601(start_time)
iso_end = self._convert_to_iso8601(end_time)
try:
job = self.service.jobs.create(query, earliest_time=iso_start, latest_time=iso_end,timeout=60)
print(f"🔍 Search job triggered successfully (Job ID: {job.sid})")
return job
except HTTPError as e:
print(f"❌ Failed to create search job: {e}")
raise
def wait_for_completion(self, job):
"""
Waits for a Splunk search job to complete.
:param job: Splunk search job object
"""
logger.info("⏳ Waiting for job completion...")
while not job.is_done():
time.sleep(2)
job.refresh()
logger.info("✅ Search job completed!")
def fetch_results(self, job):
"""
Fetches results from a completed Splunk search job.
:param job: Splunk search job object
:return: List of result dictionaries
"""
try:
reader = results.ResultsReader(job.results())
output = [dict(result) for result in reader if isinstance(result, dict)]
logger.info(f"📊 Retrieved {len(output)} results")
return output
except HTTPError as e:
logger.error(f"❌ Error fetching results: {e}")
raise
def run_search(self, query, earliest_time="-15m", latest_time="now"):
"""
Runs a full search workflow: triggers job, waits for completion, fetches results.
:param query: SPL search query
:param earliest_time: Time range start
:param latest_time: Time range end
:return: List of results
"""
job = self.trigger_search(query, earliest_time, latest_time)
self.wait_for_completion(job)
results = self.fetch_results(job)
job.cancel() # Clean up the job
return results
# Example Usage
if __name__ == "__main__":
load_dotenv()
splunk_client = SplunkSearchClient(
host=os.getenv('SPLUNK_CLOUD_HOST'),
port=int(os.getenv('SPLUNK_CLOUD_PORT', '8089')),
username=os.getenv('SPLUNK_USERNAME'),
password=os.getenv('SPLUNK_PASSWORD')
)
query = "search index=_internal | stats count by sourcetype"
start_time = "2025-04-02 09:30:00"
end_time = "2025-04-04 12:30:00"
results = splunk_client.run_search(query,earliest_time=start_time, latest_time=end_time)
for row in results:
logger.info(row) It fails in trigger_search method call while calling create method of Jobs object.
... View more