Splunk Enterprise Security

time filter in rest apis

palemmahesh
Engager

I want to fetch the results from triggered alerts  from time T1 to T2.

Tried passing the earliest_time or earliest query params but it didn't work. Can someone please let me how to pass the time filter params to the following rest apis

https://splunk1:8089/servicesNS/nobody/-/alerts/fired_alerts/-?output_mode=json

 

https://splunk1:8089//servicesNS/nobody/-/search/jobs/scheduler__admin__SplunkEnterpriseSecuritySuit...

Labels (2)
0 Karma

kamlesh_vaghela
SplunkTrust
SplunkTrust

@palemmahesh 

These APIs doesn't allow to pass time range  filter to get results of triggered alert. But I have a sample script which is doing same. Check below python script and do appropriate changes as per your requirement.

from __future__ import print_function
import urllib.error
import urllib.parse
import urllib.request
from xml.dom import minidom
import httplib2
from future import standard_library
import re
import time
import json
standard_library.install_aliases()

baseurl = 'https://localhost:8089'
userName = 'admin'
password = 'admin123'
alert_actions = "test_alert"
earliest = "@h"
latest = "now()"
searchQuery = f'index="_internal" earliest="{earliest}" latest={latest}  sourcetype="scheduler" alert_actions="{alert_actions}"  | head 3| table sid'

# Authenticate with server.
# Disable SSL cert validation. Splunk certs are self-signed.
serverContent = httplib2.Http(disable_ssl_certificate_validation=True).request(
    baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': userName, 'password': password}))[1]

sessionKey = minidom.parseString(serverContent).getElementsByTagName(
    'sessionKey')[0].childNodes[0].nodeValue

# Remove leading and trailing whitespace from the search
searchQuery = searchQuery.strip()

# If the query doesn't already start with the 'search' operator or another
# generating command (e.g. "| inputcsv"), then prepend "search " to it.
if not (searchQuery.startswith('search') or searchQuery.startswith("|")):
    searchQuery = 'search ' + searchQuery

print(searchQuery)

# Run the search.
# Again, disable SSL cert validation.
res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + '/services/search/jobs', 'POST',
                                                                     headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'search': searchQuery}))[1]
res = res.decode("utf-8")

m = re.search("<sid>(.*)<\/sid>", res)
sid = None
if m:
    sid = m.group(1)
    print(sid)
isDone = 0
if sid:
    while isDone == 0:
        print("Search is not finished, waiting 3 seconds")
        time.sleep(3)
        print("Seeing if search is done")

        #parsing response#
        sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(
            baseurl + f'/services/search/jobs/{sid}', method="GET", headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]

        # isDone = 1
        sid_res = sid_res.decode("utf-8")
        isDoneYet = json.loads(sid_res)["entry"][0]["content"]["isDone"]
        print(isDoneYet)
        print("IS DONE RESULT: ", isDoneYet)
        if int(isDoneYet) == True:
            isDone += 1

sid_res = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid}/results', 'GET',
                                                                         headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'json'}))[1]
sid_res = json.loads(sid_res.decode("utf-8"))

print(sid_res)

for i in sid_res["results"]:
    sid_t = i["sid"]
    print(f"Result from {sid_t}")
    sid_res_t = httplib2.Http(disable_ssl_certificate_validation=True).request(baseurl + f'/services/search/jobs/{sid_t}/results', 'GET',
                                                                               headers={'Authorization': 'Splunk %s' % sessionKey}, body=urllib.parse.urlencode({'output_mode': 'csv'}))

    # info_sid_res_t = sid_res_t[0]
    # print(info_sid_res_t)
    csv_sid_res_t = sid_res_t[1].decode("utf-8")
    print(csv_sid_res_t)

 

Thanks
KV
▄︻̷̿┻̿═━一

If any of my reply helps you to solve the problem Or gain knowledge, an upvote would be appreciated.

0 Karma
Get Updates on the Splunk Community!

Announcing Scheduled Export GA for Dashboard Studio

We're excited to announce the general availability of Scheduled Export for Dashboard Studio. Starting in ...

Extending Observability Content to Splunk Cloud

Watch Now!   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to leverage ...

More Control Over Your Monitoring Costs with Archived Metrics GA in US-AWS!

What if there was a way you could keep all the metrics data you need while saving on storage costs?This is now ...