Dashboards & Visualizations

Error with Splunk Modular Input App creation

Poojitha
Communicator
Hi All,

 

I am trying to create a modular input in splunk cloud that gets splunk observability metadata. Input has fields : realm, token and object_type. I can see the input form under "Settings -> Data Inputs" with all these fields and is fine.

 

I have created another script that masks token. But when I upgrade the app with this script and restmap.conf, the modular input is getting disappered from "Settings" --> "Data Inputs" .

 

splunk_observability.py ---> this is to create modular input schema.
observability_object_helper.py --> This is helper script that makes ARI call.
observability_admin_TA_rh_account.py --> This creates restmodel and encrypts token input field.

 

Directory Structure : 

App Folder Name --> Splunk_Observability_Metadata

metadata --> default.meta

bin --> import_declare_test.py, splunk_observability.py,observability_object_helper.py,observability_admin_TA_rh_account.py

README --> inputs.conf.spec

local --> inputs.conf, app.conf

lib --> required splunklib

splunk_observability.py

import import_declare_test
import sys,os
import json

from splunklib import modularinput as smi
sys.path.insert(0, os.path.dirname(__file__))
from observability_object_helper import stream_events, validate_input


class SPLUNK_OBSERVABILITY(smi.Script):
    def __init__(self):
        super(SPLUNK_OBSERVABILITY, self).__init__()

    def get_scheme(self):
        scheme = smi.Scheme("Splunk Observability Metadata Input")
        scheme.use_external_validation = False
        scheme.use_single_instance = False
        scheme.description = "Modular Input"
        scheme.streaming_mode_xml = True

        scheme.add_argument(
            smi.Argument(
                'realm',
                required_on_create=True
            )
        )

        scheme.add_argument(
            smi.Argument(
                'name',
                required_on_create=True,
                description="Name should not contain whitespaces",          
            )
        )

        scheme.add_argument(
            smi.Argument(
                'token',
                required_on_create=True,
                description="Add API Key required to connect to Splunk Observability Cloud",
            )
        )

        scheme.add_argument(
            smi.Argument(
                'object_type',
                required_on_create=True
            )
        )
        return scheme

    def validate_input(self, definition: smi.ValidationDefinition):
        return validate_input(definition)

    def stream_events(self, inputs: smi.InputDefinition, ew: smi.EventWriter):
        return stream_events(inputs, ew)


if __name__ == "__main__":
    sys.exit(SPLUNK_OBSERVABILITY().run(sys.argv))


observability_object_helper.py


import json
import logging
import time
import requests
# import import_declare_test
from solnlib import conf_manager, log, credentials
from splunklib import modularinput as smi


ADDON_NAME = "splunk_observability"

def get_key_name(input_name: str) -> str:
    # `input_name` is a string like "example://<input_name>".
    return input_name.split("/")[-1]


def logger_for_input(input_name: str) -> logging.Logger:
    return log.Logs().get_logger(f"{ADDON_NAME.lower()}_{input_name}")

def splunk_observability_get_endpoint(type, realm):
    BASE_URL = f"https://api.{realm}.signalfx.com"
    ENDPOINT = ""
    types = {
        "chart": f"{BASE_URL}/v2/chart",
        "dashboard": f"{BASE_URL}/v2/dashboard",
        "detector": f"{BASE_URL}/v2/detector",
        "heartbeat": f"{BASE_URL}/v2/detector",
        "synthetic": f"{BASE_URL}/v2/synthetics/tests",
    }
    for type_key in types:
        if type.lower() == type_key.lower():
            ENDPOINT = types.get(type_key)
    return ENDPOINT

def splunk_observability_get_sourcetype(type):
    sourcetypes = {
        "chart": "observability:chart_api:json",
        "dashboard": "observability:dashboard_api:json",
        "detector": "observability:detector_api:json",
        "synthetic": "observability:synthetic_api:json",
        "token": "observability:token_api:json",
    }
    for type_key in sourcetypes:
        if type.lower() == type_key.lower():
            return sourcetypes.get(type_key)

def splunk_observability_get_objects(type, realm, token, logger):
    TOKEN = token
    ENDPOINT_URL = splunk_observability_get_endpoint(type, realm)
    limit = 200
    offset = 0
    pagenation = True
    headers = {"Content-Type": "application/json", "X-SF-TOKEN": TOKEN}
    processStart = time.time()
    objects = []

    while pagenation:
        params = {"limit": limit, "offset": offset}
        try:
            response = requests.get(ENDPOINT_URL, headers=headers, params=params)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            log.log_exception(logger, e, "RequestError", msg_before="Error fetching data:")
            return []

        data = response.json()

        if isinstance(data, list):
            results = data
        elif isinstance(data, dict):
            results = data.get("results", [])
        else:
            logger.error("Unexpected response format")

        objects.extend(results)

        logger.info(f"pagenating {type} result 'length': {len(results)} , offset: {offset}, limit {limit}")
        if len(results) < limit:
            pagenation = False
            
        # too many objects to query, splunk will max out at 10,000
        elif (offset >= 10000-limit):
            pagenation = False
            logger.warn("Cannot ingest more than 10,000 objects")
        else:
            offset += limit

    count = offset+len(results)
    timeTakenProcess = str(round((time.time() - processStart) * 1000))
    log.log_event(logger, {"message": f"{type} ingest finished", "time_taken": f"{timeTakenProcess}ms", "ingested": count})
    return objects

def splunk_observability_get_objects_synthetics(type, realm, token, logger):
    processStart = time.time()
    BASE_URL = f"https://api.{realm}.signalfx.com"
    ENDPOINT_URL = f"{BASE_URL}/v2/synthetics/tests"
    page = 1
    pagenating = True
    headers = {"Content-Type": "application/json", "X-SF-TOKEN": token}
    synthetics_objects = []

    while pagenating:
        params = {"perPage": 100, "page": page}
        try:
            response = requests.get(ENDPOINT_URL, headers=headers, params=params)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            log.log_exception(logger, e, "RequestError", msg_before="Error fetching synthetic data:")
            return []

        data = response.json()
        tests = data["tests"]
        for test in tests:
            synthetic = {"id": test["id"], "type": test["type"]}
            SYNTHETIC_TYPE = synthetic["type"]
            SYNTHETIC_ID = synthetic["id"]
            detail_url = f"{BASE_URL}/v2/synthetics/tests/{SYNTHETIC_TYPE}/{SYNTHETIC_ID}"
            if type=="synthetic_detailed":
                try:
                    detail_response = requests.get(detail_url, headers=headers)
                    detail_response.raise_for_status()
                    synthetics_objects.append(detail_response.json())
                except requests.exceptions.RequestException as e:
                    log.log_exception(logger, e, "RequestError", msg_before=f"Error fetching synthetic details for ID: {SYNTHETIC_ID}")
            else:
                synthetics_objects.append(test)
        pagenating = data.get("nextPageLink") is not None
        page += 1

    timeTakenProcess = str(round((time.time() - processStart) * 1000))
    log.log_event(logger, {"message": "synthetic ingest finished", "time_taken": f"{timeTakenProcess}ms", "ingested": len(synthetics_objects)})

    return synthetics_objects

def validate_input(definition: smi.ValidationDefinition):
    return False

def stream_events(inputs: smi.InputDefinition, event_writer: smi.EventWriter):
    for input_name, input_item in inputs.inputs.items():
        normalized_input_name = input_name.split("/")[-1]
        logger = logger_for_input(normalized_input_name)
        try:
            observability_type = input_item.get("object_type")
            observability_token = input_item.get("token")
            observability_realm = input_item.get("realm")
            
            log.modular_input_start(logger, normalized_input_name)

            if observability_type.lower() == "synthetic":
                objects = splunk_observability_get_objects_synthetics(observability_type, observability_realm, observability_token, logger)
            else:
                objects = splunk_observability_get_objects(observability_type, observability_realm, observability_token, logger)
            
            # source_type = splunk_observability_get_sourcetype(observability_type)
            for obj in objects:
                logger.debug(f"DEBUG EVENT {observability_type} :{json.dumps(obj)}")
                event_writer.write_event(
                    smi.Event(
                        data=json.dumps(obj, ensure_ascii=False, default=str),
                        index=input_item.get("index"),
                        sourcetype=input_item.get("sourcetype"),
                    )
                )
            log.events_ingested(logger, input_name, sourcetype, len(objects), input_item.get("index"))
            log.modular_input_end(logger, normalized_input_name)
        except Exception as e:
            log.log_exception(logger, e, "IngestionError", msg_before="Error processing input:")
​

observability_admin_TA_rh_account.py

from splunktaucclib.rest_handler.endpoint import (
    field,
    validator,
    RestModel,
    DataInputModel,
)
from splunktaucclib.rest_handler import admin_external, util
import logging

util.remove_http_proxy_env_vars()

fields = [
    field.RestField('name', required=True, encrypted=False),
    field.RestField('realm', required=True, encrypted=False),
    field.RestField('token', required=True, encrypted=True),
    field.RestField('interval', required=True, encrypted=False, default="300"),
]

model = RestModel(fields, name='splunk_observability')

endpoint = DataInputModel(model, input_type='splunk_observability')

if __name__ == '__main__':
    logging.getLogger().addHandler(logging.NullHandler())
    admin_external.handle(endpoint)


restmap.conf

[endpoint:admin/input/splunk_observability]
match = splunk_observability
python.version = python3
handlerfile = observability_admin_TA_rh_account.py

Please help me to resolve this issue.

Thanks,
PNV




Labels (1)
Tags (1)
0 Karma

Poojitha
Communicator

@livehybrid  : Sorry, I mentioned it wrong here. It is a splunk standalone server. Yes , I am testing locally.

When I remove  observability_admin_TA_rh_account.py and restmap.conf file the app is working fine.I can see it under datainputs. So I am guessing is it something wrong with these two files. 

Regards,
PNV

0 Karma

livehybrid
Super Champion

Hi @Poojitha 

Do you see anything in your $SPLUNK_HOME/var/log/splunk/splunkd.log relating to this? Any python errors, stack traces etc?

 

0 Karma

livehybrid
Super Champion

Hi @Poojitha 

I assume you are using UCC Framework for this app? 

Are you able to see the inputs in https://yourSplunkEnvironment.com/en-US/app/yourApp/inputs ?

Have you been able to test the app locally? I would highly recommend doing some local verification before packaging the app for uploading to Splunk Cloud.

🌟 Did this answer help you? If so, please consider:

  • Adding karma to show it was useful
  • Marking it as the solution if it resolved your issue
  • Commenting if you need any clarification

Your feedback encourages the volunteers in this community to continue contributing

0 Karma
Get Updates on the Splunk Community!

AppDynamics Summer Webinars

This summer, our mighty AppDynamics team is cooking up some delicious content on YouTube Live to satiate your ...

SOCin’ it to you at Splunk University

Splunk University is expanding its instructor-led learning portfolio with dedicated Security tracks at .conf25 ...

Credit Card Data Protection & PCI Compliance with Splunk Edge Processor

Organizations handling credit card transactions know that PCI DSS compliance is both critical and complex. The ...