Splunk Enterprise Security

time filter in rest apis

palemmahesh
Engager

I want to fetch the results from triggered alerts  from time T1 to T2.

Tried passing the earliest_time or earliest query params but it didn't work. Can someone please let me how to pass the time filter params to the following rest apis

https://splunk1:8089/servicesNS/nobody/-/alerts/fired_alerts/-?output_mode=json

 

https://splunk1:8089//servicesNS/nobody/-/search/jobs/scheduler__admin__SplunkEnterpriseSecuritySuit...

Labels (2)
0 Karma

kamlesh_vaghela
SplunkTrust
SplunkTrust

@palemmahesh 

These APIs doesn't allow to pass time range  filter to get results of triggered alert. But I have a sample script which is doing same. Check below python script and do appropriate changes as per your requirement.

from __future__ import print_function
import urllib.error
import urllib.parse
import urllib.request
from xml.dom import minidom
import httplib2
from future import standard_library
import re
import time
import json
standard_library.install_aliases()

baseurl = 'https://localhost:8089'
userName = 'admin'
password = 'admin123'
alert_actions = "test_alert"
earliest = "@h"
latest = "now()"
searchQuery = f'index="_internal" earliest="{earliest}" latest={latest}  sourcetype="scheduler" alert_actions="{alert_actions}"  | head 3| table sid'

# Authenticate with server.
# Disable SSL cert validation. Splunk certs are self-signed.
serverContent = httplib2.Http(disable_ssl_certificate_validation=True).request(
    baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': userName, 'password': password}))[1]

sessionKey = minidom.parseString(serverContent).getElementsByTagName(
    'sessionKey')[0].childNodes[0].nodeValue

# Remove leading and trailing whitespace from the search
searchQuery = searchQuery.strip()

# If the query doesn't already start with the 'search' operator or another
# generating command (e.g. "| inputcsv"), then prepend "search " to it.
if not (searchQuery.startswith('search') or searchQuery.startswith("|")):
    searchQuery = 'search ' + searchQuery

print(searchQuery)

# Run the search.
# Again, disable SSL cert validation.
res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + '/services/search/jobs', 'POST',
                                                                     headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'search': searchQuery}))[1]
res = res.decode("utf-8")

m = re.search("<sid>(.*)<\/sid>", res)
sid = None
if m:
    sid = m.group(1)
    print(sid)
isDone = 0
if sid:
    while isDone == 0:
        print("Search is not finished, waiting 3 seconds")
        time.sleep(3)
        print("Seeing if search is done")

        #parsing response#
        sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(
            baseurl + f'/services/search/jobs/{sid}', method="GET", headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]

        # isDone = 1
        sid_res = sid_res.decode("utf-8")
        isDoneYet = json.loads(sid_res)["entry"][0]["content"]["isDone"]
        print(isDoneYet)
        print("IS DONE RESULT: ", isDoneYet)
        if int(isDoneYet) == True:
            isDone += 1

sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid}/results', 'GET',
                                                                         headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]
sid_res = json.loads(sid_res.decode("utf-8"))

print(sid_res)

for i in sid_res["results"]:
    sid_t = i["sid"]
    print(f"Result from {sid_t}")
    sid_res_t = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid_t}/results', 'GET',
                                                                               headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'csv'}))

    # info_sid_res_t = sid_res_t[0]
    # print(info_sid_res_t)
    csv_sid_res_t = sid_res_t[1].decode("utf-8")
    print(csv_sid_res_t)

 

Thanks
KV
▄︻̷̿┻̿═━一

If any of my reply helps you to solve the problem Or gain knowledge, an upvote would be appreciated.

0 Karma
Get Updates on the Splunk Community!

.conf24 | Day 0

Hello Splunk Community! My name is Chris, and I'm based in Canberra, Australia's capital, and I travelled for ...

Enhance Security Visibility with Splunk Enterprise Security 7.1 through Threat ...

 (view in My Videos)Struggling with alert fatigue, lack of context, and prioritization around security ...

Troubleshooting the OpenTelemetry Collector

  In this tech talk, you’ll learn how to troubleshoot the OpenTelemetry collector - from checking the ...