Getting Data In

How to manipulate JSON before index?

oliverja
Path Finder

(Single/standalone instance of splunk)

I have been in a fight with these events for over a week now. I was hoping eventually my failures would add up to a glorious success, but it turns out that I am finding EVEN MORE FAILURES. So many more.

I am getting data from a source that provides single line json events. I have a few problems here: My JSON data has a consistent field located at ["event"]["original"], BUT the contents of .original often contain more nested data, which is breaking my regexes. I keep making new ones for each new "shape" I find, but it just seems tedious when the json contains it all nice and neat for me.
Props:

 

 

[source::http:kafka_iap-suricata-log]
LINE_BREAKER = (`~!\^<)
SHOULD_LINEMERGE = false
TRANSFORMS-also = extractSuriStats, extract_suri_protocol_msg, extractMessage​

 

 

Transforms:

 

 

[extractMessage]
REGEX = "original":([\s\S]*?})},"
LOOKAHEAD=100000
DEST_KEY= _raw
FORMAT = $1
WRITE_META = true

[extractSuriStats]
REGEX = "event_type":"stats"[\s\S]+({"event_type":"stats".+})}}
LOOKAHEAD=100000
DEST_KEY= _raw
FORMAT = $1
WRITE_META = true

[extract_suri_protocol_msg]
REGEX = "original":([\s\S]*})},"
LOOKAHEAD=100000
DEST_KEY= _raw
FORMAT = $1
WRITE_META = true

[sourcetyper]
LOOKAHEAD = 100000
​

 

 

This is fragile, and keeps breaking when a new "nested" shape comes through. 

Now, lets assume the above works, but then BAM an event comes through with a payload of 47000 characters of "\\0" contained in the json.

My above extractions continue to work, but the events themselves no longer parse at (searchtime?). I have pretty json, buy no key/value pairs that I can act off of.

Ok, I think! What if i just replace the payload with --deleted--!

Well, sedcmd seems to not apply too terribly often, and I wonder if it has the same character limitation but I dont see a limit to configure for it.

My seds:

 

 

[source::http:kafka_iap-suricata-log]
LINE_BREAKER = (`~!\^<)
SHOULD_LINEMERGE = false
SEDCMD-payload = s/payload_printable":([\s\S]*)",/ ---payload string has been truncated by splunk admins at index time--- /g
SEDCMD-response = s/http_response_body_printable":([\s\S]*)"}/ ---payload string has been truncated by splunk admins at index time--- /g
SEDCMD-fluff = s/(?:\\\\0){20,}/ ---html string has been truncated by splunk admins at index time--- /g
TRANSFORMS-also = extractSuriStats, extract_suri_protocol_msg, extractMessage

 

 

 

What I would much prefer to do is again, just work with the json directly. But I dont think that is possible.

My frustration continues, so I think what if i intercept the JSON and throw python things at it!

I see a few references to using the unarchive_cmd , and get an idea...

 

 

#!/usr/bin/python
import json
import sys

def ReadEvent(jsonSingleLine):
    data = json.loads(jsonSingleLine)
    return data

def FindOriginalEvent(data):
    if 'event' in data:
        if 'original' in data['event']:
            originalEvent = data["event"]["original"]
    return originalEvent

while True:
    fromSplunk = sys.stdin.readline()
    if not len(fromSplunk):
        break
    eventString = json.dumps(FindOriginalEvent(ReadEvent(fromSplunk)))
    sys.stdout.write(eventString)

sys.stdout.flush()
sys.exit()

 

 

Props:

 

 

[source::http:kafka_iap-suricata-log]
LINE_BREAKER = (`~!\^<)
SHOULD_LINEMERGE = false
unarchive_cmd = /opt/splunk/etc/apps/stamus_for_splunk/bin/parse_suricata.py

[(?::){0}suricata:*]
invalid_cause = archive
unarchive_cmd = /opt/splunk/etc/apps/stamus_for_splunk/bin/parse_suricata.py

[suricata]
invalid_cause = archive
unarchive_cmd = /opt/splunk/etc/apps/stamus_for_splunk/bin/parse_suricata.py

 

 

 (i put it everywhere, to make sure it would work.) 

The code is ugly and useless. **bleep**. Art imitates life today....

So I am left with either:

A bunch of regexes and sedcmds that break when the event is too long

A custom script that I am apparently wrong on. 

Which direction do I focus my attention? Any suggestions would be a huge help.

Sample event:

 

 

{"destination": {"ip": "xxx","port": 443,"address": "xxx"},"ecs": {"version": "1.12.0"},"host": {"name": "ptm-nsm"},"fileset": {"name": "eve"},"input": {"type": "log"},"suricata": {"eve": {"http": {"http_method": "CONNECT","hostname": "xxx","status": 200,"length": 0,"http_port": 443,"url": "xxx","protocol": "HTTP/1.0","http_user_agent": "Mozilla/4.0 (compatible;)"},"payload_printable": "xxxxx","alert": {"metadata": {"updated_at": ["2021_11_24"],"created_at": ["2011_12_08"]},"category": "A Network Trojan was detected","gid": 1,"signature": "ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan","action": "allowed","signature_id": 2014002,"rev": 10,"severity": 1,"rule": "alert http $HOME_NET any -> $EXTERNAL_NET any (msg:\"ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan\"; flow:established,to_server; content:\"Mozilla/4.0|20 28|compatible|3b 29|\"; http_user_agent; fast_pattern; isdataat:!1,relative; content:!\".bluecoat.com\"; http_host; http_header_names; content:!\"BlueCoat\"; nocase; threshold:type limit, track by_src, count 1, seconds 60; classtype:trojan-activity; sid:2014002; rev:10; metadata:created_at 2011_12_08, updated_at 2021_11_24;)"},"packet": "RQA==","stream": 1,"flow_id": "769386515195888","app_proto": "http","flow": {"start": "2022-05-10T10:43:58.911344+0000","pkts_toclient": 3,"pkts_toserver": 4,"bytes_toserver": 1102,"bytes_toclient": 245},"event_type": "alert","tx_id": 0,"packet_info": {"linktype": 12}}},"service": {"type": "suricata"},"source": {"ip": "xxx","port": 64391,"address": "xxx"},"log": {"offset": 1062706606,"file": {"path": "/opt/suricata/eve.json"}},"network.direction": "external","@timestamp": "2022-05-10T10:43:59.106Z","agent": {"hostname": "xxx","ephemeral_id": "xxx","type": "filebeat","version": "7.16.2","id": "xxx","name": "ptm-nsm"},"tags": ["iap","suricata"],"@version": "1","event": {"created": "2022-05-10T10:43:59.340Z","module": "suricata","dataset": "suricata.eve","original": {"http": {"http_method": "CONNECT","hostname": "xxx","status": 200,"url": "xxx:443","http_port": 443,"length": 0,"protocol": "HTTP/1.0","http_user_agent": "Mozilla/4.0 (compatible;)"},"dest_port": 443,"payload_printable": "CONNECT xxx:443 HTTP/1.0\r\nUser-Agent: Mozilla/4.0 (compatible;)\r\nHost: xxx\r\n\r\n","alert": {"metadata": {"updated_at": ["2021_11_24"],"created_at": ["2011_12_08"]},"category": "A Network Trojan was detected","gid": 1,"action": "allowed","signature": "ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan","signature_id": 2014002,"rev": 10,"severity": 1,"rule": "alert http $HOME_NET any -> $EXTERNAL_NET any (msg:\"ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan\"; flow:established,to_server; content:\"Mozilla/4.0|20 28|compatible|3b 29|\"; http_user_agent; fast_pattern; isdataat:!1,relative; content:!\".bluecoat.com\"; http_host; http_header_names; content:!\"BlueCoat\"; nocase; threshold:type limit, track by_src, count 1, seconds 60; classtype:trojan-activity; sid:2014002; rev:10; metadata:created_at 2011_12_08, updated_at 2021_11_24;)"},"packet": "RQAAKAA9ZMAAA==","stream": 1,"flow_id": 769386515195888,"proto": "TCP","app_proto": "http","src_port": 64391,"dest_ip": "xxx","event_type": "alert","flow": {"start": "2022-05-10T10:43:58.911344+0000","pkts_toserver": 4,"pkts_toclient": 3,"bytes_toserver": 1102,"bytes_toclient": 245},"timestamp": "2022-05-10T10:43:59.106396+0000","tx_id": 0,"src_ip": "xxx","packet_info": {"linktype": 12}}},"network": {"transport": "TCP","community_id": "Ns="}}

 

 

Labels (1)
Tags (2)
0 Karma

PickleRick
SplunkTrust
SplunkTrust

Well, you're trying to force Splunk to do something it's not meant for 😉

But seriously - at ingest phase Splunk has no awareness of any json structure or anything like that (except for indexed extractions; you could try that). So the option I'd consider would be to use scripted/modular input instead of doing some dirty tricks with normal monitor input if you want to manipulate your data with external script.

0 Karma

oliverja
Path Finder

My input is:

Kafka -> Kafka Connect For Splunk  -> (HEC) Indexer

Scripted inputs would take the place of of the HEC and that is more than I want to tackle.

I hoped to process the data after splunk had "collected" it, but that may just not be possible.

Do we have a config for extending how far SED will go into a string?

Tags (1)
0 Karma

PickleRick
SplunkTrust
SplunkTrust

Bah, it indeed makes it a bit complicated. SED will just happily execute the s/// command according to the PCRE specified which means that you can control it to some extent using the greediness modifiers. But it's still a simple text-based operation, it has nothing to do with any JSON structure contained within your event so it ain't that easy to match - for example matching parentheses (although you could try; I'm not sure if it would work; theroretically, as it's a PCRE, it should)

See https://regex101.com/r/eBtSTM/1 (the example isn't mine. It's from https://stackoverflow.com/questions/546433/regular-expression-to-match-balanced-parentheses )

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...