Hello,
I have 4 python scripts in HF. My plan is to run those python scripts automatically through my HF. How I would do that? Thank you and any help will be highly appreciated.
Hello,
Yes, I agree .....if we can create a TA...that would be the easiest way ... But, what is meant by "contains those as inputs"?
Hello.
Thank you so much, it's an excellent resource. But, just wanted make sure we are on the same page, the main objective of those scripts are to parse the source files which are manually placed on the Linux server where HF has been installed, not more than that at this stage. Here is the Cron jobs (please see below) I consider to use for running my 4 python scripts. Now question is, how I would place these Cron tasks in a file (or in a place) that it would run those python scripts automatically to parse those files. Thank you again
10 23 * * * /opt/splunk/etc/apps/python-scripts/script1.py
12 23 * * * /opt/splunk/etc/apps/python-scripts/script2.py
14 23 * * * /opt/splunk/etc/apps/python-scripts/script3.py
16 23 * * * /opt/splunk/etc/apps/python-scripts/script4.py
Hi
You can run those by splunk even you are not getting data into splunk via those. Just see these docs:
Then it's totally another story is Splunk the correct or best tool to do this or should you use something else to manage your batch jobs. But it's out of scope of this discussion.
r. Ismo
Hello,
Thank you so much for your support. But, it might be a little complicated to implement that approach in my context. If possible please let me know how I would proceed to run those Python scripts as Cron jobs. Thank you again.
You either ignore the splunk part completely and just run them as you'd run any other software from cron or make a custom splunk commands from them and schedule searches running those commands.
Hello,
Thank you so much for your response, really appreciate it. Question is: How I would make a custom splunk commands from them and schedule searches running those commands?
https://dev.splunk.com/enterprise/docs/devtools/customsearchcommands/
For scheduling you just create a report which runs - for example - once a day but don't care about the results 🙂
Hello,
Thank you so much for your reply, appreciated. Create, running, and schedule report shouldn't be an issue. But, how I include my python scripts in that report or pointing that python script from that report to run/execute. My complete python scripts is given below and using to parse files and it is located in HF. Thank you again.
----------------Starts------
import os
import re
from datetime import date
from striprtf.striprtf import rtf_to_text
def getSubfolders(directory):
subfolders = []
for (dirpath, dirnames, filenames) in os.walk(directory):
subfolders.extend(dirnames)
break
return subfolders
def getFiles(directory):
files = []
for (dirpath, dirnames, filenames) in os.walk(directory):
for file in filenames:
files.append(os.path.join(directory, file))
break
return files
def make_clean_input_file(file, src):
lines = []
with open(file, 'r') as f:
rtf = f.read()
text = rtf_to_text(rtf)
lines.append(text)
out = open(os.path.join(src, file.split(".")[0] + "--CLEAN.TXT"), "w")
for line in lines:
out.write(line)
out.close()
return os.path.join(src, file.split(".")[0] + "--CLEAN.TXT")
def get_lines(input_file):
lines = []
skip_next = False
with open(input_file) as file:
file.readline()
file.readline()
while line := file.readline():
if "Report Generation Date" in line:
skip_next = True
pass
else:
if not skip_next:
lines.append(line.strip())
else:
skip_next = False
out = open(input_file.split('--')[0] + "--SANITIZED.txt", 'w')
for line in lines:
out.write(line + '\n')
out.close()
return lines
def param_text(p_name, value, quotes=True):
text = ""
if not quotes:
text = p_name + "=" + value
else:
text = p_name + "=\"" + value + "\""
return text
def format_param(param):
text = ''
words = param.split(" ")
for w in words:
if text == '':
text = w.upper()
else:
text = text + "_" + w.upper()
return text
def get_preamble(lines):
preamble = []
for i in range(len(lines)):
if "Policy Checker" in lines[i]:
preamble.append(param_text("SOFTWARE_VERSION", lines[i].split("-")[0].strip()))
preamble.append(param_text("REPORT_NAME", lines[i].split("-")[1].strip()))
elif "Compliance Status Legend" in lines[i]: # 13 is the end of the preamble that is used
break
elif "Unique Failed Checks" in lines[i] or "Total Failed Checks" in lines[i] or "Software Version" in lines[i]:
pass
else:
params = lines[i].split("\t")
if len(params) == 1:
preamble.append(param_text(format_param(params[0]), ""))
else:
preamble.append(param_text(format_param(params[0]), params[1]))
if len(params) > 2:
print("PARAM MORE THAN 2!!!")
return preamble
def get_compliance_status_descriptions(lines):
cs = []
start = -1
end = -1
for i in range(len(lines)):
if "BLUE\t" in lines[i]:
start = i
elif "RED\t" in lines[i]:
end = i+1
for i in range(start, end):
param = "COMP_STATUS_" + lines[i].split('\t')[0].upper() + "_DESC"
cs.append(param_text(param, lines[i].split("\t")[1].strip(' ')))
# POLICY LINES
for i in range(len(lines)):
if "Check Failure Details" in lines[i]:
brick2 = lines[i+2].split('\t')
cs.append(param_text('POLICY', brick2[0]))
cs.append(param_text('POLICY_NAME', brick2[1]))
cs.append(param_text('POLICY_DATE', brick2[2]))
return cs
def get_test_results(lines):
cs = []
brick = lines[24].split("\t")
cs.append(param_text("COMPLIANCE_STATUS", brick[1]))
cs.append(param_text("UNIQUE_FAILS", brick[2]))
cs.append(param_text("TOTAL_FAILS", brick[3]))
cs.append(param_text("UNIQUE_ERRORS", brick[4]))
cs.append(param_text("TOTAL_ERRORS", brick[5]))
cs.append(param_text("SCORE", brick[6]))
prefix = "CHK_STATUS_"
for i in range(27, 33):
p_name = lines[i].split('\t')[0].replace('/', '')
val = lines[i].split('\t')[1]
cs.append(param_text(format_param(prefix + p_name), val))
return cs
def get_bounds(lines, delimiter="Check Failure Details"):
starts = []
ends = []
for i in range(33, len(lines)):
if delimiter in lines[i]:
starts.append(i)
elif '____' in lines[i]:
ends.append(i)
if i < len(lines) - 1:
starts.append(i)
elif "RBD Tracking ID\t" in lines[i]:
ends.append(i)
# starts.append(i)
break
if len(starts) > len(ends):
ends.append(len(lines))
return list(zip(starts, ends))
def get_description(lines, start, delimiter):
true_end = -1
desc = ''
for i in range(start, len(lines)):
if delimiter in lines[i]:
true_end = i
break
if true_end == -1:
true_end = len(lines)
for i in range(start + 1, true_end):
desc += lines[i] + " "
return desc
def get_policy(lines, bounds, prefix_num):
Summary = (param_text(str(prefix_num) + "_summary", ""))
nistPolicy = (param_text(str(prefix_num) + "_nistPolicy", ""))
Description = (param_text(str(prefix_num) + "_desc", ""))
reqSetting = (param_text(str(prefix_num) + "_reqSetting", ""))
extRef = (param_text(str(prefix_num) + "_extRef", ""))
policyID = (param_text(str(prefix_num) + "_policyID", ""))
appliesTo = (param_text(str(prefix_num) + "_appliesTo", ""))
risk = (param_text(str(prefix_num) + "_risk", ""))
category = (param_text(str(prefix_num) + "_category", ""))
checkID = (param_text(str(prefix_num) + "_checkID", ""))
irsPolicy = (param_text(str(prefix_num) + "_irsPolicy", ""))
policy_lines = []
for i in range(bounds[0], bounds[1]):
if "Summary\t" in lines[i]:
Summary = (param_text(str(prefix_num) + "_summary", lines[i].split('\t', 1)[1]))
elif "800-53A\t" in lines[i]:
nistPolicy = (param_text(str(prefix_num) + "_nistPolicy", lines[i].split('\t', 1)[1]))
elif "Description" in lines[i]:
d = get_description(lines, i, "Required Setting")
Description = (param_text(str(prefix_num) + "_desc", d))
elif "Required Setting" in lines[i]:
r = get_description(lines, i, "Actual Setting")
reqSetting = (param_text(str(prefix_num) + "_reqSetting", r))
elif "External References" in lines[i]:
extRef = param_text(str(prefix_num) + "_extRef", lines[i + 1])
elif "Policy ID\t" in lines[i]:
policyID = (param_text(str(prefix_num) + "_policyID", lines[i].split('\t', 1)[1]))
elif "Applies" in lines[i]:
appliesTo = (param_text(str(prefix_num) + "_appliesTo", lines[i].split('\t', 1)[1]))
elif "Risk\t" in lines[i]:
risk = (param_text(str(prefix_num) + "_risk", lines[i].split('\t', 1)[1]))
elif "Category\t" in lines[i]:
category = (param_text(str(prefix_num) + "_category", lines[i].split('\t', 1)[1]))
elif "Check ID\t" in lines[i]:
checkID = (param_text(str(prefix_num) + "_checkID", lines[i].split('\t', 1)[1]))
elif "IRS Policy\t" in lines[i]:
irsPolicy = (param_text(str(prefix_num) + "_irsPolicy", lines[i].split('\t', 1)[1]))
policy_lines.append(Summary)
policy_lines.append(nistPolicy)
policy_lines.append(reqSetting)
policy_lines.append(policyID)
policy_lines.append(extRef)
policy_lines.append(appliesTo)
policy_lines.append(risk)
policy_lines.append(irsPolicy)
policy_lines.append(category)
policy_lines.append(checkID)
policy_lines.append(Description)
return policy_lines
def get_settings(lines, bounds, prefix_num):
counter = 1
start = -1
breaks = []
actualSetting = ''
moreInfo = ''
flag = False
settings = []
output = []
for i in range(bounds[0], bounds[1]):
if "Actual Setting" in lines[i]:
start = i + 1
break
if start == -1:
return []
for i in range(start, bounds[1]):
breaks.append(lines[i].partition('\t'))
for b in breaks:
if b[1] == '\t' and not flag:
actualSetting += b[0]
moreInfo += b[2]
elif b[1] == '':
moreInfo += b[0]
flag = True
elif b[1] == '\t' and flag:
settings.append([actualSetting, moreInfo])
actualSetting = b[0]
moreInfo = b[2]
flag = False
settings.append([actualSetting, moreInfo])
for s in settings:
output.append(param_text(str(prefix_num) + "_" + str(counter) + "_actualSetting", s[0].strip()))
output.append(param_text(str(prefix_num) + "_" + str(counter) + "_moreInfo", s[1].strip()))
counter += 1
return output
def parse_policy(lines, bounds, prefix_num):
policy_lines = get_policy(lines, bounds, prefix_num) # GET the policy info lines
actual_Settings = get_settings(lines, bounds, prefix_num) # GET Actual Settings for the policy
output = []
for p in policy_lines:
output.append(p)
for a in actual_Settings:
output.append(a)
return output
def get_policies(lines):
bounds = get_bounds(lines)
counter = 1
output = []
for i in range(len(bounds)):
policy = parse_policy(lines, bounds[i], counter)
for p in policy:
output.append(p)
counter += 1
return output
def get_rbd(lines):
start = -1
end = len(lines) - 1
comment = ''
rbd = []
for i in range(len(lines)):
if "RBD Tracking ID" in lines[i]:
start = i + 1
break
if start == -1:
return []
for i in range(start, len(lines)):
if i == start:
rbd.append(param_text("RBD_TrackingID", lines[i].split('\t')[0]))
rbd.append(param_text("RBD_Approver", lines[i].split('\t')[1]))
rbd.append(param_text("RBD_POAM_Number", ''))
comment = lines[i].split('\t')[2]
rbd.append(param_text("RBD_ApprovalDate", lines[i].split('\t')[3]))
rbd.append(param_text("RBD_ExpirationDate", "none"))
else:
if "________________________________________________________________________________________________" not in \
lines[i]:
comment += lines[i].strip()
rbd.append(param_text("RBD_Comment", comment))
return rbd
def nu_TESTER(lists):
for i in range(len(lists)):
for j in range(len(lists[i])):
print(lists[i][j])
def parse_input(input_file):
lines = get_lines(input_file)
preamble = get_preamble(lines) # lines start to 13 used;
comp_status = get_compliance_status_descriptions(lines) # lines 16 thru 19
compliance_summary = get_test_results(lines) # lines 24 to 32
policies = get_policies(lines)
rbd = get_rbd(lines)
parts = [preamble, compliance_summary, comp_status, policies, rbd]
output = []
for p in parts:
for line in p:
output.append(line)
return output
def main():
today = date.today()
date_prefix = today.strftime("%Y-%m-%d_")
all_files = []
subfolders = []
# source_folder = input("Please enter source folder for processing (root):")
# Linux
source_folder = r"/opt/splunk/etc/apps/SourceFiles/"
subfolders.extend(getSubfolders(source_folder))
for folder in subfolders:
temp = getFiles(os.path.join(source_folder, folder))
# print(temp)
for t in temp:
if t[-4:] == ".rtf" and "Computer_Summary" in t and t[0:2] != "~$":
all_files.append(t)
for file in all_files:
print("current file=", file)
input_file = make_clean_input_file(file, source_folder)
output = parse_input(input_file)
outfile = input_file.split('--')[0] + '.parsed'
print('Saving output to:', outfile)
# add date prefix
# outfile = outfile.rsplit('\\', 1)[0] + "\\" + date_prefix + outfile.rsplit('\\', 1)[1].split('--')[0]
# Linux pathing:
outfile = outfile.rsplit('/', 1)[0] + "/" + date_prefix + outfile.rsplit('/', 1)[1].split('--')[0]
# Save output
out = open(outfile, 'w')
for o in output:
out.write(o + '\n')
out.close()
# remove clean input file
os.remove(input_file)
os.remove(input_file.split('--')[0] + '--SANITIZED.txt')
return 0
if __name__ == "__main__":
main()
-------------End-------
Look at the docs I pointed you to. I haven't done it myself but it involves creating some config files so that splunk understands how to interface with your script.