Installation

How To Run my Python Script Automatically Through HF

SplunkDash
Motivator

Hello,

I have 4 python scripts in HF. My plan is to run those python scripts automatically through my HF. How I would do that?  Thank you and any help will be highly appreciated.

Labels (1)
Tags (1)
0 Karma

isoutamo
SplunkTrust
SplunkTrust
Probably the easiest way is to create TA which contains those as inputs. Then schedule those with required frequency.

SplunkDash
Motivator

Hello,

Yes, I agree .....if we can create a TA...that would be the easiest way ... But, what is  meant by "contains those as inputs"?

0 Karma

isoutamo
SplunkTrust
SplunkTrust
You could create a scripted input there. https://docs.splunk.com/Documentation/SplunkCloud/latest/AdvancedDev/ScriptSetup
It could return the status of execution as output. No need to send anything special.
R. Ismo

SplunkDash
Motivator

Hello.

Thank you so much, it's an excellent resource. But, just wanted make sure we are on the same page, the main objective of those scripts are to parse the source files which are manually placed on the Linux server where HF has been installed, not more than that at this stage. Here is the Cron jobs (please see below) I consider to use for running my 4 python scripts. Now question is, how I would place these Cron tasks in a file (or in a place) that it would run those python scripts automatically to parse those files. Thank you again

 

10 23 * * * /opt/splunk/etc/apps/python-scripts/script1.py

12 23 * * * /opt/splunk/etc/apps/python-scripts/script2.py

14 23 * * * /opt/splunk/etc/apps/python-scripts/script3.py

16 23 * * * /opt/splunk/etc/apps/python-scripts/script4.py

 

0 Karma

isoutamo
SplunkTrust
SplunkTrust

Hi

You can run those by splunk even you are not getting data into splunk via those. Just see these docs:

Then it's totally another story is Splunk the correct or best tool to do this or should you use something else to manage your batch jobs. But it's out of scope of this discussion.

r. Ismo

0 Karma

SplunkDash
Motivator

Hello,

Thank you so much for your support. But, it might be a little complicated to implement that approach in my context. If possible please let me know how  I would proceed to run those Python scripts as Cron jobs. Thank you again.

0 Karma

isoutamo
SplunkTrust
SplunkTrust
https://docs.splunk.com/Documentation/Splunk/8.2.3/Admin/Inputsconf You should look script and there interval. That will helps you to do scheduling part without adding it HFs cron. It just use splunkd to do it.
0 Karma

PickleRick
SplunkTrust
SplunkTrust

You either ignore the splunk part completely and just run them as you'd run any other software from cron or make a custom splunk commands from them and schedule searches running those commands.

SplunkDash
Motivator

Hello,

Thank you so much for your response, really appreciate it. Question is: How I would make a custom splunk commands from them and schedule searches running those commands?

 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

https://dev.splunk.com/enterprise/docs/devtools/customsearchcommands/

For scheduling you just create a report which runs - for example - once a day but don't care about the results 🙂

SplunkDash
Motivator

Hello,

Thank you so much for your reply, appreciated. Create, running, and schedule report shouldn't be an issue. But, how I include my python scripts in that report or pointing that python script from that report to run/execute. My complete python scripts is given below and using to parse files and it is located in HF. Thank you again.

----------------Starts------

import os
import re
from datetime import date
from striprtf.striprtf import rtf_to_text


def getSubfolders(directory):
subfolders = []
for (dirpath, dirnames, filenames) in os.walk(directory):
subfolders.extend(dirnames)
break
return subfolders


def getFiles(directory):
files = []

for (dirpath, dirnames, filenames) in os.walk(directory):
for file in filenames:
files.append(os.path.join(directory, file))
break

return files


def make_clean_input_file(file, src):
lines = []
with open(file, 'r') as f:
rtf = f.read()
text = rtf_to_text(rtf)
lines.append(text)

out = open(os.path.join(src, file.split(".")[0] + "--CLEAN.TXT"), "w")
for line in lines:
out.write(line)
out.close()
return os.path.join(src, file.split(".")[0] + "--CLEAN.TXT")


def get_lines(input_file):
lines = []
skip_next = False
with open(input_file) as file:
file.readline()
file.readline()
while line := file.readline():
if "Report Generation Date" in line:
skip_next = True
pass
else:
if not skip_next:
lines.append(line.strip())
else:
skip_next = False

out = open(input_file.split('--')[0] + "--SANITIZED.txt", 'w')
for line in lines:
out.write(line + '\n')
out.close()
return lines


def param_text(p_name, value, quotes=True):
text = ""
if not quotes:
text = p_name + "=" + value
else:
text = p_name + "=\"" + value + "\""
return text


def format_param(param):
text = ''
words = param.split(" ")
for w in words:
if text == '':
text = w.upper()
else:
text = text + "_" + w.upper()
return text


def get_preamble(lines):
preamble = []

for i in range(len(lines)):
if "Policy Checker" in lines[i]:
preamble.append(param_text("SOFTWARE_VERSION", lines[i].split("-")[0].strip()))
preamble.append(param_text("REPORT_NAME", lines[i].split("-")[1].strip()))
elif "Compliance Status Legend" in lines[i]: # 13 is the end of the preamble that is used
break
elif "Unique Failed Checks" in lines[i] or "Total Failed Checks" in lines[i] or "Software Version" in lines[i]:
pass
else:
params = lines[i].split("\t")
if len(params) == 1:
preamble.append(param_text(format_param(params[0]), ""))
else:
preamble.append(param_text(format_param(params[0]), params[1]))
if len(params) > 2:
print("PARAM MORE THAN 2!!!")
return preamble


def get_compliance_status_descriptions(lines):
cs = []
start = -1
end = -1
for i in range(len(lines)):
if "BLUE\t" in lines[i]:
start = i
elif "RED\t" in lines[i]:
end = i+1

for i in range(start, end):
param = "COMP_STATUS_" + lines[i].split('\t')[0].upper() + "_DESC"
cs.append(param_text(param, lines[i].split("\t")[1].strip(' ')))

# POLICY LINES
for i in range(len(lines)):
if "Check Failure Details" in lines[i]:
brick2 = lines[i+2].split('\t')
cs.append(param_text('POLICY', brick2[0]))
cs.append(param_text('POLICY_NAME', brick2[1]))
cs.append(param_text('POLICY_DATE', brick2[2]))
return cs


def get_test_results(lines):
cs = []
brick = lines[24].split("\t")

cs.append(param_text("COMPLIANCE_STATUS", brick[1]))
cs.append(param_text("UNIQUE_FAILS", brick[2]))
cs.append(param_text("TOTAL_FAILS", brick[3]))
cs.append(param_text("UNIQUE_ERRORS", brick[4]))
cs.append(param_text("TOTAL_ERRORS", brick[5]))
cs.append(param_text("SCORE", brick[6]))

prefix = "CHK_STATUS_"
for i in range(27, 33):
p_name = lines[i].split('\t')[0].replace('/', '')
val = lines[i].split('\t')[1]
cs.append(param_text(format_param(prefix + p_name), val))
return cs


def get_bounds(lines, delimiter="Check Failure Details"):
starts = []
ends = []

for i in range(33, len(lines)):
if delimiter in lines[i]:
starts.append(i)
elif '____' in lines[i]:
ends.append(i)
if i < len(lines) - 1:
starts.append(i)
elif "RBD Tracking ID\t" in lines[i]:
ends.append(i)
# starts.append(i)
break
if len(starts) > len(ends):
ends.append(len(lines))

return list(zip(starts, ends))


def get_description(lines, start, delimiter):
true_end = -1
desc = ''
for i in range(start, len(lines)):
if delimiter in lines[i]:
true_end = i
break
if true_end == -1:
true_end = len(lines)
for i in range(start + 1, true_end):
desc += lines[i] + " "
return desc


def get_policy(lines, bounds, prefix_num):
Summary = (param_text(str(prefix_num) + "_summary", ""))
nistPolicy = (param_text(str(prefix_num) + "_nistPolicy", ""))
Description = (param_text(str(prefix_num) + "_desc", ""))
reqSetting = (param_text(str(prefix_num) + "_reqSetting", ""))
extRef = (param_text(str(prefix_num) + "_extRef", ""))
policyID = (param_text(str(prefix_num) + "_policyID", ""))
appliesTo = (param_text(str(prefix_num) + "_appliesTo", ""))
risk = (param_text(str(prefix_num) + "_risk", ""))
category = (param_text(str(prefix_num) + "_category", ""))
checkID = (param_text(str(prefix_num) + "_checkID", ""))
irsPolicy = (param_text(str(prefix_num) + "_irsPolicy", ""))
policy_lines = []

for i in range(bounds[0], bounds[1]):
if "Summary\t" in lines[i]:
Summary = (param_text(str(prefix_num) + "_summary", lines[i].split('\t', 1)[1]))
elif "800-53A\t" in lines[i]:
nistPolicy = (param_text(str(prefix_num) + "_nistPolicy", lines[i].split('\t', 1)[1]))
elif "Description" in lines[i]:
d = get_description(lines, i, "Required Setting")
Description = (param_text(str(prefix_num) + "_desc", d))
elif "Required Setting" in lines[i]:
r = get_description(lines, i, "Actual Setting")
reqSetting = (param_text(str(prefix_num) + "_reqSetting", r))
elif "External References" in lines[i]:
extRef = param_text(str(prefix_num) + "_extRef", lines[i + 1])
elif "Policy ID\t" in lines[i]:
policyID = (param_text(str(prefix_num) + "_policyID", lines[i].split('\t', 1)[1]))
elif "Applies" in lines[i]:
appliesTo = (param_text(str(prefix_num) + "_appliesTo", lines[i].split('\t', 1)[1]))
elif "Risk\t" in lines[i]:
risk = (param_text(str(prefix_num) + "_risk", lines[i].split('\t', 1)[1]))
elif "Category\t" in lines[i]:
category = (param_text(str(prefix_num) + "_category", lines[i].split('\t', 1)[1]))
elif "Check ID\t" in lines[i]:
checkID = (param_text(str(prefix_num) + "_checkID", lines[i].split('\t', 1)[1]))
elif "IRS Policy\t" in lines[i]:
irsPolicy = (param_text(str(prefix_num) + "_irsPolicy", lines[i].split('\t', 1)[1]))

policy_lines.append(Summary)
policy_lines.append(nistPolicy)
policy_lines.append(reqSetting)
policy_lines.append(policyID)
policy_lines.append(extRef)
policy_lines.append(appliesTo)
policy_lines.append(risk)
policy_lines.append(irsPolicy)
policy_lines.append(category)
policy_lines.append(checkID)
policy_lines.append(Description)

return policy_lines


def get_settings(lines, bounds, prefix_num):
counter = 1
start = -1
breaks = []
actualSetting = ''
moreInfo = ''
flag = False
settings = []
output = []

for i in range(bounds[0], bounds[1]):
if "Actual Setting" in lines[i]:
start = i + 1
break
if start == -1:
return []
for i in range(start, bounds[1]):
breaks.append(lines[i].partition('\t'))

for b in breaks:
if b[1] == '\t' and not flag:
actualSetting += b[0]
moreInfo += b[2]
elif b[1] == '':
moreInfo += b[0]
flag = True
elif b[1] == '\t' and flag:
settings.append([actualSetting, moreInfo])
actualSetting = b[0]
moreInfo = b[2]
flag = False
settings.append([actualSetting, moreInfo])

for s in settings:
output.append(param_text(str(prefix_num) + "_" + str(counter) + "_actualSetting", s[0].strip()))
output.append(param_text(str(prefix_num) + "_" + str(counter) + "_moreInfo", s[1].strip()))
counter += 1

return output


def parse_policy(lines, bounds, prefix_num):
policy_lines = get_policy(lines, bounds, prefix_num) # GET the policy info lines
actual_Settings = get_settings(lines, bounds, prefix_num) # GET Actual Settings for the policy
output = []

for p in policy_lines:
output.append(p)
for a in actual_Settings:
output.append(a)

return output


def get_policies(lines):
bounds = get_bounds(lines)
counter = 1
output = []

for i in range(len(bounds)):
policy = parse_policy(lines, bounds[i], counter)
for p in policy:
output.append(p)
counter += 1

return output


def get_rbd(lines):
start = -1
end = len(lines) - 1
comment = ''
rbd = []

for i in range(len(lines)):
if "RBD Tracking ID" in lines[i]:
start = i + 1
break
if start == -1:
return []
for i in range(start, len(lines)):
if i == start:

rbd.append(param_text("RBD_TrackingID", lines[i].split('\t')[0]))
rbd.append(param_text("RBD_Approver", lines[i].split('\t')[1]))
rbd.append(param_text("RBD_POAM_Number", ''))
comment = lines[i].split('\t')[2]
rbd.append(param_text("RBD_ApprovalDate", lines[i].split('\t')[3]))
rbd.append(param_text("RBD_ExpirationDate", "none"))
else:
if "________________________________________________________________________________________________" not in \
lines[i]:
comment += lines[i].strip()
rbd.append(param_text("RBD_Comment", comment))

return rbd

def nu_TESTER(lists):
for i in range(len(lists)):
for j in range(len(lists[i])):
print(lists[i][j])


def parse_input(input_file):
lines = get_lines(input_file)
preamble = get_preamble(lines) # lines start to 13 used;
comp_status = get_compliance_status_descriptions(lines) # lines 16 thru 19
compliance_summary = get_test_results(lines) # lines 24 to 32
policies = get_policies(lines)
rbd = get_rbd(lines)
parts = [preamble, compliance_summary, comp_status, policies, rbd]
output = []
for p in parts:
for line in p:
output.append(line)

return output


def main():
today = date.today()
date_prefix = today.strftime("%Y-%m-%d_")
all_files = []
subfolders = []
# source_folder = input("Please enter source folder for processing (root):")
# Linux
source_folder = r"/opt/splunk/etc/apps/SourceFiles/"
subfolders.extend(getSubfolders(source_folder))
for folder in subfolders:
temp = getFiles(os.path.join(source_folder, folder))
# print(temp)
for t in temp:
if t[-4:] == ".rtf" and "Computer_Summary" in t and t[0:2] != "~$":
all_files.append(t)


for file in all_files:
print("current file=", file)
input_file = make_clean_input_file(file, source_folder)
output = parse_input(input_file)
outfile = input_file.split('--')[0] + '.parsed'
print('Saving output to:', outfile)

# add date prefix
# outfile = outfile.rsplit('\\', 1)[0] + "\\" + date_prefix + outfile.rsplit('\\', 1)[1].split('--')[0]
# Linux pathing:
outfile = outfile.rsplit('/', 1)[0] + "/" + date_prefix + outfile.rsplit('/', 1)[1].split('--')[0]

# Save output
out = open(outfile, 'w')
for o in output:
out.write(o + '\n')
out.close()

# remove clean input file
os.remove(input_file)
os.remove(input_file.split('--')[0] + '--SANITIZED.txt')
return 0


if __name__ == "__main__":
main()

 

-------------End-------

 

 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Look at the docs I pointed you to. I haven't done it myself but it involves creating some config files so that splunk understands how to interface with your script.

Get Updates on the Splunk Community!

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...

What's new in Splunk Cloud Platform 9.1.2312?

Hi Splunky people! We are excited to share the newest updates in Splunk Cloud Platform 9.1.2312! Analysts can ...

What’s New in Splunk Security Essentials 3.8.0?

Splunk Security Essentials (SSE) is an app that can amplify the power of your existing Splunk Cloud Platform, ...