Splunk Enterprise Security

time filter in rest apis

palemmahesh
Engager

I want to fetch the results from triggered alerts  from time T1 to T2.

Tried passing the earliest_time or earliest query params but it didn't work. Can someone please let me how to pass the time filter params to the following rest apis

https://splunk1:8089/servicesNS/nobody/-/alerts/fired_alerts/-?output_mode=json

 

https://splunk1:8089//servicesNS/nobody/-/search/jobs/scheduler__admin__SplunkEnterpriseSecuritySuit...

Labels (2)
0 Karma

kamlesh_vaghela
SplunkTrust
SplunkTrust

@palemmahesh 

These APIs doesn't allow to pass time range  filter to get results of triggered alert. But I have a sample script which is doing same. Check below python script and do appropriate changes as per your requirement.

from __future__ import print_function
import urllib.error
import urllib.parse
import urllib.request
from xml.dom import minidom
import httplib2
from future import standard_library
import re
import time
import json
standard_library.install_aliases()

baseurl = 'https://localhost:8089'
userName = 'admin'
password = 'admin123'
alert_actions = "test_alert"
earliest = "@h"
latest = "now()"
searchQuery = f'index="_internal" earliest="{earliest}" latest={latest}  sourcetype="scheduler" alert_actions="{alert_actions}"  | head 3| table sid'

# Authenticate with server.
# Disable SSL cert validation. Splunk certs are self-signed.
serverContent = httplib2.Http(disable_ssl_certificate_validation=True).request(
    baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': userName, 'password': password}))[1]

sessionKey = minidom.parseString(serverContent).getElementsByTagName(
    'sessionKey')[0].childNodes[0].nodeValue

# Remove leading and trailing whitespace from the search
searchQuery = searchQuery.strip()

# If the query doesn't already start with the 'search' operator or another
# generating command (e.g. "| inputcsv"), then prepend "search " to it.
if not (searchQuery.startswith('search') or searchQuery.startswith("|")):
    searchQuery = 'search ' + searchQuery

print(searchQuery)

# Run the search.
# Again, disable SSL cert validation.
res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + '/services/search/jobs', 'POST',
                                                                     headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'search': searchQuery}))[1]
res = res.decode("utf-8")

m = re.search("<sid>(.*)<\/sid>", res)
sid = None
if m:
    sid = m.group(1)
    print(sid)
isDone = 0
if sid:
    while isDone == 0:
        print("Search is not finished, waiting 3 seconds")
        time.sleep(3)
        print("Seeing if search is done")

        #parsing response#
        sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(
            baseurl + f'/services/search/jobs/{sid}', method="GET", headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]

        # isDone = 1
        sid_res = sid_res.decode("utf-8")
        isDoneYet = json.loads(sid_res)["entry"][0]["content"]["isDone"]
        print(isDoneYet)
        print("IS DONE RESULT: ", isDoneYet)
        if int(isDoneYet) == True:
            isDone += 1

sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid}/results', 'GET',
                                                                         headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]
sid_res = json.loads(sid_res.decode("utf-8"))

print(sid_res)

for i in sid_res["results"]:
    sid_t = i["sid"]
    print(f"Result from {sid_t}")
    sid_res_t = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid_t}/results', 'GET',
                                                                               headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'csv'}))

    # info_sid_res_t = sid_res_t[0]
    # print(info_sid_res_t)
    csv_sid_res_t = sid_res_t[1].decode("utf-8")
    print(csv_sid_res_t)

 

Thanks
KV
▄︻̷̿┻̿═━一

If any of my reply helps you to solve the problem Or gain knowledge, an upvote would be appreciated.

0 Karma
Get Updates on the Splunk Community!

Stay Connected: Your Guide to July and August Tech Talks, Office Hours, and Webinars!

Dive into our sizzling summer lineup for July and August Community Office Hours and Tech Talks. Scroll down to ...

Edge Processor Scaling, Energy & Manufacturing Use Cases, and More New Articles on ...

Splunk Lantern is a Splunk customer success center that provides advice from Splunk experts on valuable data ...

Get More Out of Your Security Practice With a SIEM

Get More Out of Your Security Practice With a SIEMWednesday, July 31, 2024  |  11AM PT / 2PM ETREGISTER ...