Hi All, I am trying to create a modular input in splunk cloud that gets splunk observability metadata. Input has fields : realm, token and object_type. I can see the input form under "Settings -> Data Inputs" with all these fields and is fine. I have created another script that masks token. But when I upgrade the app with this script and restmap.conf, the modular input is getting disappered from "Settings" --> "Data Inputs" . splunk_observability.py ---> this is to create modular input schema. observability_object_helper.py --> This is helper script that makes ARI call. observability_admin_TA_rh_account.py --> This creates restmodel and encrypts token input field. Directory Structure : App Folder Name --> Splunk_Observability_Metadata metadata --> default.meta bin --> import_declare_test.py, splunk_observability.py,observability_object_helper.py,observability_admin_TA_rh_account.py README --> inputs.conf.spec local --> inputs.conf, app.conf lib --> required splunklib splunk_observability.py import import_declare_test
import sys,os
import json
from splunklib import modularinput as smi
sys.path.insert(0, os.path.dirname(__file__))
from observability_object_helper import stream_events, validate_input
class SPLUNK_OBSERVABILITY(smi.Script):
def __init__(self):
super(SPLUNK_OBSERVABILITY, self).__init__()
def get_scheme(self):
scheme = smi.Scheme("Splunk Observability Metadata Input")
scheme.use_external_validation = False
scheme.use_single_instance = False
scheme.description = "Modular Input"
scheme.streaming_mode_xml = True
scheme.add_argument(
smi.Argument(
'realm',
required_on_create=True
)
)
scheme.add_argument(
smi.Argument(
'name',
required_on_create=True,
description="Name should not contain whitespaces",
)
)
scheme.add_argument(
smi.Argument(
'token',
required_on_create=True,
description="Add API Key required to connect to Splunk Observability Cloud",
)
)
scheme.add_argument(
smi.Argument(
'object_type',
required_on_create=True
)
)
return scheme
def validate_input(self, definition: smi.ValidationDefinition):
return validate_input(definition)
def stream_events(self, inputs: smi.InputDefinition, ew: smi.EventWriter):
return stream_events(inputs, ew)
if __name__ == "__main__":
sys.exit(SPLUNK_OBSERVABILITY().run(sys.argv)) observability_object_helper.py import json
import logging
import time
import requests
# import import_declare_test
from solnlib import conf_manager, log, credentials
from splunklib import modularinput as smi
ADDON_NAME = "splunk_observability"
def get_key_name(input_name: str) -> str:
# `input_name` is a string like "example://<input_name>".
return input_name.split("/")[-1]
def logger_for_input(input_name: str) -> logging.Logger:
return log.Logs().get_logger(f"{ADDON_NAME.lower()}_{input_name}")
def splunk_observability_get_endpoint(type, realm):
BASE_URL = f"https://api.{realm}.signalfx.com"
ENDPOINT = ""
types = {
"chart": f"{BASE_URL}/v2/chart",
"dashboard": f"{BASE_URL}/v2/dashboard",
"detector": f"{BASE_URL}/v2/detector",
"heartbeat": f"{BASE_URL}/v2/detector",
"synthetic": f"{BASE_URL}/v2/synthetics/tests",
}
for type_key in types:
if type.lower() == type_key.lower():
ENDPOINT = types.get(type_key)
return ENDPOINT
def splunk_observability_get_sourcetype(type):
sourcetypes = {
"chart": "observability:chart_api:json",
"dashboard": "observability:dashboard_api:json",
"detector": "observability:detector_api:json",
"synthetic": "observability:synthetic_api:json",
"token": "observability:token_api:json",
}
for type_key in sourcetypes:
if type.lower() == type_key.lower():
return sourcetypes.get(type_key)
def splunk_observability_get_objects(type, realm, token, logger):
TOKEN = token
ENDPOINT_URL = splunk_observability_get_endpoint(type, realm)
limit = 200
offset = 0
pagenation = True
headers = {"Content-Type": "application/json", "X-SF-TOKEN": TOKEN}
processStart = time.time()
objects = []
while pagenation:
params = {"limit": limit, "offset": offset}
try:
response = requests.get(ENDPOINT_URL, headers=headers, params=params)
response.raise_for_status()
except requests.exceptions.RequestException as e:
log.log_exception(logger, e, "RequestError", msg_before="Error fetching data:")
return []
data = response.json()
if isinstance(data, list):
results = data
elif isinstance(data, dict):
results = data.get("results", [])
else:
logger.error("Unexpected response format")
objects.extend(results)
logger.info(f"pagenating {type} result 'length': {len(results)} , offset: {offset}, limit {limit}")
if len(results) < limit:
pagenation = False
# too many objects to query, splunk will max out at 10,000
elif (offset >= 10000-limit):
pagenation = False
logger.warn("Cannot ingest more than 10,000 objects")
else:
offset += limit
count = offset+len(results)
timeTakenProcess = str(round((time.time() - processStart) * 1000))
log.log_event(logger, {"message": f"{type} ingest finished", "time_taken": f"{timeTakenProcess}ms", "ingested": count})
return objects
def splunk_observability_get_objects_synthetics(type, realm, token, logger):
processStart = time.time()
BASE_URL = f"https://api.{realm}.signalfx.com"
ENDPOINT_URL = f"{BASE_URL}/v2/synthetics/tests"
page = 1
pagenating = True
headers = {"Content-Type": "application/json", "X-SF-TOKEN": token}
synthetics_objects = []
while pagenating:
params = {"perPage": 100, "page": page}
try:
response = requests.get(ENDPOINT_URL, headers=headers, params=params)
response.raise_for_status()
except requests.exceptions.RequestException as e:
log.log_exception(logger, e, "RequestError", msg_before="Error fetching synthetic data:")
return []
data = response.json()
tests = data["tests"]
for test in tests:
synthetic = {"id": test["id"], "type": test["type"]}
SYNTHETIC_TYPE = synthetic["type"]
SYNTHETIC_ID = synthetic["id"]
detail_url = f"{BASE_URL}/v2/synthetics/tests/{SYNTHETIC_TYPE}/{SYNTHETIC_ID}"
if type=="synthetic_detailed":
try:
detail_response = requests.get(detail_url, headers=headers)
detail_response.raise_for_status()
synthetics_objects.append(detail_response.json())
except requests.exceptions.RequestException as e:
log.log_exception(logger, e, "RequestError", msg_before=f"Error fetching synthetic details for ID: {SYNTHETIC_ID}")
else:
synthetics_objects.append(test)
pagenating = data.get("nextPageLink") is not None
page += 1
timeTakenProcess = str(round((time.time() - processStart) * 1000))
log.log_event(logger, {"message": "synthetic ingest finished", "time_taken": f"{timeTakenProcess}ms", "ingested": len(synthetics_objects)})
return synthetics_objects
def validate_input(definition: smi.ValidationDefinition):
return False
def stream_events(inputs: smi.InputDefinition, event_writer: smi.EventWriter):
for input_name, input_item in inputs.inputs.items():
normalized_input_name = input_name.split("/")[-1]
logger = logger_for_input(normalized_input_name)
try:
observability_type = input_item.get("object_type")
observability_token = input_item.get("token")
observability_realm = input_item.get("realm")
log.modular_input_start(logger, normalized_input_name)
if observability_type.lower() == "synthetic":
objects = splunk_observability_get_objects_synthetics(observability_type, observability_realm, observability_token, logger)
else:
objects = splunk_observability_get_objects(observability_type, observability_realm, observability_token, logger)
# source_type = splunk_observability_get_sourcetype(observability_type)
for obj in objects:
logger.debug(f"DEBUG EVENT {observability_type} :{json.dumps(obj)}")
event_writer.write_event(
smi.Event(
data=json.dumps(obj, ensure_ascii=False, default=str),
index=input_item.get("index"),
sourcetype=input_item.get("sourcetype"),
)
)
log.events_ingested(logger, input_name, sourcetype, len(objects), input_item.get("index"))
log.modular_input_end(logger, normalized_input_name)
except Exception as e:
log.log_exception(logger, e, "IngestionError", msg_before="Error processing input:")
observability_admin_TA_rh_account.py from splunktaucclib.rest_handler.endpoint import (
field,
validator,
RestModel,
DataInputModel,
)
from splunktaucclib.rest_handler import admin_external, util
import logging
util.remove_http_proxy_env_vars()
fields = [
field.RestField('name', required=True, encrypted=False),
field.RestField('realm', required=True, encrypted=False),
field.RestField('token', required=True, encrypted=True),
field.RestField('interval', required=True, encrypted=False, default="300"),
]
model = RestModel(fields, name='splunk_observability')
endpoint = DataInputModel(model, input_type='splunk_observability')
if __name__ == '__main__':
logging.getLogger().addHandler(logging.NullHandler())
admin_external.handle(endpoint) restmap.conf [endpoint:admin/input/splunk_observability]
match = splunk_observability
python.version = python3
handlerfile = observability_admin_TA_rh_account.py Please help me to resolve this issue. Thanks, PNV
... View more