Splunk Enterprise Security

time filter in rest apis

palemmahesh
Engager

I want to fetch the results from triggered alerts  from time T1 to T2.

Tried passing the earliest_time or earliest query params but it didn't work. Can someone please let me how to pass the time filter params to the following rest apis

https://splunk1:8089/servicesNS/nobody/-/alerts/fired_alerts/-?output_mode=json

 

https://splunk1:8089//servicesNS/nobody/-/search/jobs/scheduler__admin__SplunkEnterpriseSecuritySuit...

Labels (1)
0 Karma

kamlesh_vaghela
SplunkTrust
SplunkTrust

@palemmahesh 

These APIs doesn't allow to pass time range  filter to get results of triggered alert. But I have a sample script which is doing same. Check below python script and do appropriate changes as per your requirement.

from __future__ import print_function
import urllib.error
import urllib.parse
import urllib.request
from xml.dom import minidom
import httplib2
from future import standard_library
import re
import time
import json
standard_library.install_aliases()

baseurl = 'https://localhost:8089'
userName = 'admin'
password = 'admin123'
alert_actions = "test_alert"
earliest = "@h"
latest = "now()"
searchQuery = f'index="_internal" earliest="{earliest}" latest={latest}  sourcetype="scheduler" alert_actions="{alert_actions}"  | head 3| table sid'

# Authenticate with server.
# Disable SSL cert validation. Splunk certs are self-signed.
serverContent = httplib2.Http(disable_ssl_certificate_validation=True).request(
    baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': userName, 'password': password}))[1]

sessionKey = minidom.parseString(serverContent).getElementsByTagName(
    'sessionKey')[0].childNodes[0].nodeValue

# Remove leading and trailing whitespace from the search
searchQuery = searchQuery.strip()

# If the query doesn't already start with the 'search' operator or another
# generating command (e.g. "| inputcsv"), then prepend "search " to it.
if not (searchQuery.startswith('search') or searchQuery.startswith("|")):
    searchQuery = 'search ' + searchQuery

print(searchQuery)

# Run the search.
# Again, disable SSL cert validation.
res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + '/services/search/jobs', 'POST',
                                                                     headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'search': searchQuery}))[1]
res = res.decode("utf-8")

m = re.search("<sid>(.*)<\/sid>", res)
sid = None
if m:
    sid = m.group(1)
    print(sid)
isDone = 0
if sid:
    while isDone == 0:
        print("Search is not finished, waiting 3 seconds")
        time.sleep(3)
        print("Seeing if search is done")

        #parsing response#
        sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(
            baseurl + f'/services/search/jobs/{sid}', method="GET", headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]

        # isDone = 1
        sid_res = sid_res.decode("utf-8")
        isDoneYet = json.loads(sid_res)["entry"][0]["content"]["isDone"]
        print(isDoneYet)
        print("IS DONE RESULT: ", isDoneYet)
        if int(isDoneYet) == True:
            isDone += 1

sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid}/results', 'GET',
                                                                         headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]
sid_res = json.loads(sid_res.decode("utf-8"))

print(sid_res)

for i in sid_res["results"]:
    sid_t = i["sid"]
    print(f"Result from {sid_t}")
    sid_res_t = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid_t}/results', 'GET',
                                                                               headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'csv'}))

    # info_sid_res_t = sid_res_t[0]
    # print(info_sid_res_t)
    csv_sid_res_t = sid_res_t[1].decode("utf-8")
    print(csv_sid_res_t)

 

Thanks
KV
▄︻̷̿┻̿═━一

If any of my reply helps you to solve the problem Or gain knowledge, an upvote would be appreciated.

0 Karma
Get Updates on the Splunk Community!

The All New Performance Insights for Splunk

Splunk gives you amazing tools to analyze system data and make business-critical decisions, react to issues, ...

Good Sourcetype Naming

When it comes to getting data in, one of the earliest decisions made is what to use as a sourcetype. Often, ...

See your relevant APM services, dashboards, and alerts in one place with the updated ...

As a Splunk Observability user, you have a lot of data you have to manage, prioritize, and troubleshoot on a ...