Dashboards & Visualizations

Error with Splunk Modular Input App creation

Poojitha
Communicator
Hi All,

 

I am trying to create a modular input in splunk cloud that gets splunk observability metadata. Input has fields : realm, token and object_type. I can see the input form under "Settings -> Data Inputs" with all these fields and is fine.

 

I have created another script that masks token. But when I upgrade the app with this script and restmap.conf, the modular input is getting disappered from "Settings" --> "Data Inputs" .

 

splunk_observability.py ---> this is to create modular input schema.
observability_object_helper.py --> This is helper script that makes ARI call.
observability_admin_TA_rh_account.py --> This creates restmodel and encrypts token input field.

 

Directory Structure : 

App Folder Name --> Splunk_Observability_Metadata

metadata --> default.meta

bin --> import_declare_test.py, splunk_observability.py,observability_object_helper.py,observability_admin_TA_rh_account.py

README --> inputs.conf.spec

local --> inputs.conf, app.conf

lib --> required splunklib

splunk_observability.py

import import_declare_test
import sys,os
import json

from splunklib import modularinput as smi
sys.path.insert(0, os.path.dirname(__file__))
from observability_object_helper import stream_events, validate_input


class SPLUNK_OBSERVABILITY(smi.Script):
    def __init__(self):
        super(SPLUNK_OBSERVABILITY, self).__init__()

    def get_scheme(self):
        scheme = smi.Scheme("Splunk Observability Metadata Input")
        scheme.use_external_validation = False
        scheme.use_single_instance = False
        scheme.description = "Modular Input"
        scheme.streaming_mode_xml = True

        scheme.add_argument(
            smi.Argument(
                'realm',
                required_on_create=True
            )
        )

        scheme.add_argument(
            smi.Argument(
                'name',
                required_on_create=True,
                description="Name should not contain whitespaces",          
            )
        )

        scheme.add_argument(
            smi.Argument(
                'token',
                required_on_create=True,
                description="Add API Key required to connect to Splunk Observability Cloud",
            )
        )

        scheme.add_argument(
            smi.Argument(
                'object_type',
                required_on_create=True
            )
        )
        return scheme

    def validate_input(self, definition: smi.ValidationDefinition):
        return validate_input(definition)

    def stream_events(self, inputs: smi.InputDefinition, ew: smi.EventWriter):
        return stream_events(inputs, ew)


if __name__ == "__main__":
    sys.exit(SPLUNK_OBSERVABILITY().run(sys.argv))


observability_object_helper.py


import json
import logging
import time
import requests
# import import_declare_test
from solnlib import conf_manager, log, credentials
from splunklib import modularinput as smi


ADDON_NAME = "splunk_observability"

def get_key_name(input_name: str) -> str:
    # `input_name` is a string like "example://<input_name>".
    return input_name.split("/")[-1]


def logger_for_input(input_name: str) -> logging.Logger:
    return log.Logs().get_logger(f"{ADDON_NAME.lower()}_{input_name}")

def splunk_observability_get_endpoint(type, realm):
    BASE_URL = f"https://api.{realm}.signalfx.com"
    ENDPOINT = ""
    types = {
        "chart": f"{BASE_URL}/v2/chart",
        "dashboard": f"{BASE_URL}/v2/dashboard",
        "detector": f"{BASE_URL}/v2/detector",
        "heartbeat": f"{BASE_URL}/v2/detector",
        "synthetic": f"{BASE_URL}/v2/synthetics/tests",
    }
    for type_key in types:
        if type.lower() == type_key.lower():
            ENDPOINT = types.get(type_key)
    return ENDPOINT

def splunk_observability_get_sourcetype(type):
    sourcetypes = {
        "chart": "observability:chart_api:json",
        "dashboard": "observability:dashboard_api:json",
        "detector": "observability:detector_api:json",
        "synthetic": "observability:synthetic_api:json",
        "token": "observability:token_api:json",
    }
    for type_key in sourcetypes:
        if type.lower() == type_key.lower():
            return sourcetypes.get(type_key)

def splunk_observability_get_objects(type, realm, token, logger):
    TOKEN = token
    ENDPOINT_URL = splunk_observability_get_endpoint(type, realm)
    limit = 200
    offset = 0
    pagenation = True
    headers = {"Content-Type": "application/json", "X-SF-TOKEN": TOKEN}
    processStart = time.time()
    objects = []

    while pagenation:
        params = {"limit": limit, "offset": offset}
        try:
            response = requests.get(ENDPOINT_URL, headers=headers, params=params)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            log.log_exception(logger, e, "RequestError", msg_before="Error fetching data:")
            return []

        data = response.json()

        if isinstance(data, list):
            results = data
        elif isinstance(data, dict):
            results = data.get("results", [])
        else:
            logger.error("Unexpected response format")

        objects.extend(results)

        logger.info(f"pagenating {type} result 'length': {len(results)} , offset: {offset}, limit {limit}")
        if len(results) < limit:
            pagenation = False
            
        # too many objects to query, splunk will max out at 10,000
        elif (offset >= 10000-limit):
            pagenation = False
            logger.warn("Cannot ingest more than 10,000 objects")
        else:
            offset += limit

    count = offset+len(results)
    timeTakenProcess = str(round((time.time() - processStart) * 1000))
    log.log_event(logger, {"message": f"{type} ingest finished", "time_taken": f"{timeTakenProcess}ms", "ingested": count})
    return objects

def splunk_observability_get_objects_synthetics(type, realm, token, logger):
    processStart = time.time()
    BASE_URL = f"https://api.{realm}.signalfx.com"
    ENDPOINT_URL = f"{BASE_URL}/v2/synthetics/tests"
    page = 1
    pagenating = True
    headers = {"Content-Type": "application/json", "X-SF-TOKEN": token}
    synthetics_objects = []

    while pagenating:
        params = {"perPage": 100, "page": page}
        try:
            response = requests.get(ENDPOINT_URL, headers=headers, params=params)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            log.log_exception(logger, e, "RequestError", msg_before="Error fetching synthetic data:")
            return []

        data = response.json()
        tests = data["tests"]
        for test in tests:
            synthetic = {"id": test["id"], "type": test["type"]}
            SYNTHETIC_TYPE = synthetic["type"]
            SYNTHETIC_ID = synthetic["id"]
            detail_url = f"{BASE_URL}/v2/synthetics/tests/{SYNTHETIC_TYPE}/{SYNTHETIC_ID}"
            if type=="synthetic_detailed":
                try:
                    detail_response = requests.get(detail_url, headers=headers)
                    detail_response.raise_for_status()
                    synthetics_objects.append(detail_response.json())
                except requests.exceptions.RequestException as e:
                    log.log_exception(logger, e, "RequestError", msg_before=f"Error fetching synthetic details for ID: {SYNTHETIC_ID}")
            else:
                synthetics_objects.append(test)
        pagenating = data.get("nextPageLink") is not None
        page += 1

    timeTakenProcess = str(round((time.time() - processStart) * 1000))
    log.log_event(logger, {"message": "synthetic ingest finished", "time_taken": f"{timeTakenProcess}ms", "ingested": len(synthetics_objects)})

    return synthetics_objects

def validate_input(definition: smi.ValidationDefinition):
    return False

def stream_events(inputs: smi.InputDefinition, event_writer: smi.EventWriter):
    for input_name, input_item in inputs.inputs.items():
        normalized_input_name = input_name.split("/")[-1]
        logger = logger_for_input(normalized_input_name)
        try:
            observability_type = input_item.get("object_type")
            observability_token = input_item.get("token")
            observability_realm = input_item.get("realm")
            
            log.modular_input_start(logger, normalized_input_name)

            if observability_type.lower() == "synthetic":
                objects = splunk_observability_get_objects_synthetics(observability_type, observability_realm, observability_token, logger)
            else:
                objects = splunk_observability_get_objects(observability_type, observability_realm, observability_token, logger)
            
            # source_type = splunk_observability_get_sourcetype(observability_type)
            for obj in objects:
                logger.debug(f"DEBUG EVENT {observability_type} :{json.dumps(obj)}")
                event_writer.write_event(
                    smi.Event(
                        data=json.dumps(obj, ensure_ascii=False, default=str),
                        index=input_item.get("index"),
                        sourcetype=input_item.get("sourcetype"),
                    )
                )
            log.events_ingested(logger, input_name, sourcetype, len(objects), input_item.get("index"))
            log.modular_input_end(logger, normalized_input_name)
        except Exception as e:
            log.log_exception(logger, e, "IngestionError", msg_before="Error processing input:")
​

observability_admin_TA_rh_account.py

from splunktaucclib.rest_handler.endpoint import (
    field,
    validator,
    RestModel,
    DataInputModel,
)
from splunktaucclib.rest_handler import admin_external, util
import logging

util.remove_http_proxy_env_vars()

fields = [
    field.RestField('name', required=True, encrypted=False),
    field.RestField('realm', required=True, encrypted=False),
    field.RestField('token', required=True, encrypted=True),
    field.RestField('interval', required=True, encrypted=False, default="300"),
]

model = RestModel(fields, name='splunk_observability')

endpoint = DataInputModel(model, input_type='splunk_observability')

if __name__ == '__main__':
    logging.getLogger().addHandler(logging.NullHandler())
    admin_external.handle(endpoint)


restmap.conf

[endpoint:admin/input/splunk_observability]
match = splunk_observability
python.version = python3
handlerfile = observability_admin_TA_rh_account.py

Please help me to resolve this issue.

Thanks,
PNV




Labels (1)
Tags (1)
0 Karma

Poojitha
Communicator

@livehybrid  : Sorry, I mentioned it wrong here. It is a splunk standalone server. Yes , I am testing locally.

When I remove  observability_admin_TA_rh_account.py and restmap.conf file the app is working fine.I can see it under datainputs. So I am guessing is it something wrong with these two files. 

Regards,
PNV

0 Karma

livehybrid
SplunkTrust
SplunkTrust

Hi @Poojitha 

Do you see anything in your $SPLUNK_HOME/var/log/splunk/splunkd.log relating to this? Any python errors, stack traces etc?

 

0 Karma

livehybrid
SplunkTrust
SplunkTrust

Hi @Poojitha 

I assume you are using UCC Framework for this app? 

Are you able to see the inputs in https://yourSplunkEnvironment.com/en-US/app/yourApp/inputs ?

Have you been able to test the app locally? I would highly recommend doing some local verification before packaging the app for uploading to Splunk Cloud.

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

0 Karma
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.
Get Updates on the Splunk Community!

Tech Talk Recap | Mastering Threat Hunting

Mastering Threat HuntingDive into the world of threat hunting, exploring the key differences between ...

Observability for AI Applications: Troubleshooting Latency

If you’re working with proprietary company data, you’re probably going to have a locally hosted LLM or many ...

Splunk AI Assistant for SPL vs. ChatGPT: Which One is Better?

In the age of AI, every tool promises to make our lives easier. From summarizing content to writing code, ...