Getting Data In

How to manipulate JSON before index?

oliverja
Path Finder

(Single/standalone instance of splunk)

I have been in a fight with these events for over a week now. I was hoping eventually my failures would add up to a glorious success, but it turns out that I am finding EVEN MORE FAILURES. So many more.

I am getting data from a source that provides single line json events. I have a few problems here: My JSON data has a consistent field located at ["event"]["original"], BUT the contents of .original often contain more nested data, which is breaking my regexes. I keep making new ones for each new "shape" I find, but it just seems tedious when the json contains it all nice and neat for me.
Props:

 

 

[source::http:kafka_iap-suricata-log]
LINE_BREAKER = (`~!\^<)
SHOULD_LINEMERGE = false
TRANSFORMS-also = extractSuriStats, extract_suri_protocol_msg, extractMessage​

 

 

Transforms:

 

 

[extractMessage]
REGEX = "original":([\s\S]*?})},"
LOOKAHEAD=100000
DEST_KEY= _raw
FORMAT = $1
WRITE_META = true

[extractSuriStats]
REGEX = "event_type":"stats"[\s\S]+({"event_type":"stats".+})}}
LOOKAHEAD=100000
DEST_KEY= _raw
FORMAT = $1
WRITE_META = true

[extract_suri_protocol_msg]
REGEX = "original":([\s\S]*})},"
LOOKAHEAD=100000
DEST_KEY= _raw
FORMAT = $1
WRITE_META = true

[sourcetyper]
LOOKAHEAD = 100000
​

 

 

This is fragile, and keeps breaking when a new "nested" shape comes through. 

Now, lets assume the above works, but then BAM an event comes through with a payload of 47000 characters of "\\0" contained in the json.

My above extractions continue to work, but the events themselves no longer parse at (searchtime?). I have pretty json, buy no key/value pairs that I can act off of.

Ok, I think! What if i just replace the payload with --deleted--!

Well, sedcmd seems to not apply too terribly often, and I wonder if it has the same character limitation but I dont see a limit to configure for it.

My seds:

 

 

[source::http:kafka_iap-suricata-log]
LINE_BREAKER = (`~!\^<)
SHOULD_LINEMERGE = false
SEDCMD-payload = s/payload_printable":([\s\S]*)",/ ---payload string has been truncated by splunk admins at index time--- /g
SEDCMD-response = s/http_response_body_printable":([\s\S]*)"}/ ---payload string has been truncated by splunk admins at index time--- /g
SEDCMD-fluff = s/(?:\\\\0){20,}/ ---html string has been truncated by splunk admins at index time--- /g
TRANSFORMS-also = extractSuriStats, extract_suri_protocol_msg, extractMessage

 

 

 

What I would much prefer to do is again, just work with the json directly. But I dont think that is possible.

My frustration continues, so I think what if i intercept the JSON and throw python things at it!

I see a few references to using the unarchive_cmd , and get an idea...

 

 

#!/usr/bin/python
import json
import sys

def ReadEvent(jsonSingleLine):
    data = json.loads(jsonSingleLine)
    return data

def FindOriginalEvent(data):
    if 'event' in data:
        if 'original' in data['event']:
            originalEvent = data["event"]["original"]
    return originalEvent

while True:
    fromSplunk = sys.stdin.readline()
    if not len(fromSplunk):
        break
    eventString = json.dumps(FindOriginalEvent(ReadEvent(fromSplunk)))
    sys.stdout.write(eventString)

sys.stdout.flush()
sys.exit()

 

 

Props:

 

 

[source::http:kafka_iap-suricata-log]
LINE_BREAKER = (`~!\^<)
SHOULD_LINEMERGE = false
unarchive_cmd = /opt/splunk/etc/apps/stamus_for_splunk/bin/parse_suricata.py

[(?::){0}suricata:*]
invalid_cause = archive
unarchive_cmd = /opt/splunk/etc/apps/stamus_for_splunk/bin/parse_suricata.py

[suricata]
invalid_cause = archive
unarchive_cmd = /opt/splunk/etc/apps/stamus_for_splunk/bin/parse_suricata.py

 

 

 (i put it everywhere, to make sure it would work.) 

The code is ugly and useless. **bleep**. Art imitates life today....

So I am left with either:

A bunch of regexes and sedcmds that break when the event is too long

A custom script that I am apparently wrong on. 

Which direction do I focus my attention? Any suggestions would be a huge help.

Sample event:

 

 

{"destination": {"ip": "xxx","port": 443,"address": "xxx"},"ecs": {"version": "1.12.0"},"host": {"name": "ptm-nsm"},"fileset": {"name": "eve"},"input": {"type": "log"},"suricata": {"eve": {"http": {"http_method": "CONNECT","hostname": "xxx","status": 200,"length": 0,"http_port": 443,"url": "xxx","protocol": "HTTP/1.0","http_user_agent": "Mozilla/4.0 (compatible;)"},"payload_printable": "xxxxx","alert": {"metadata": {"updated_at": ["2021_11_24"],"created_at": ["2011_12_08"]},"category": "A Network Trojan was detected","gid": 1,"signature": "ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan","action": "allowed","signature_id": 2014002,"rev": 10,"severity": 1,"rule": "alert http $HOME_NET any -> $EXTERNAL_NET any (msg:\"ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan\"; flow:established,to_server; content:\"Mozilla/4.0|20 28|compatible|3b 29|\"; http_user_agent; fast_pattern; isdataat:!1,relative; content:!\".bluecoat.com\"; http_host; http_header_names; content:!\"BlueCoat\"; nocase; threshold:type limit, track by_src, count 1, seconds 60; classtype:trojan-activity; sid:2014002; rev:10; metadata:created_at 2011_12_08, updated_at 2021_11_24;)"},"packet": "RQA==","stream": 1,"flow_id": "769386515195888","app_proto": "http","flow": {"start": "2022-05-10T10:43:58.911344+0000","pkts_toclient": 3,"pkts_toserver": 4,"bytes_toserver": 1102,"bytes_toclient": 245},"event_type": "alert","tx_id": 0,"packet_info": {"linktype": 12}}},"service": {"type": "suricata"},"source": {"ip": "xxx","port": 64391,"address": "xxx"},"log": {"offset": 1062706606,"file": {"path": "/opt/suricata/eve.json"}},"network.direction": "external","@timestamp": "2022-05-10T10:43:59.106Z","agent": {"hostname": "xxx","ephemeral_id": "xxx","type": "filebeat","version": "7.16.2","id": "xxx","name": "ptm-nsm"},"tags": ["iap","suricata"],"@version": "1","event": {"created": "2022-05-10T10:43:59.340Z","module": "suricata","dataset": "suricata.eve","original": {"http": {"http_method": "CONNECT","hostname": "xxx","status": 200,"url": "xxx:443","http_port": 443,"length": 0,"protocol": "HTTP/1.0","http_user_agent": "Mozilla/4.0 (compatible;)"},"dest_port": 443,"payload_printable": "CONNECT xxx:443 HTTP/1.0\r\nUser-Agent: Mozilla/4.0 (compatible;)\r\nHost: xxx\r\n\r\n","alert": {"metadata": {"updated_at": ["2021_11_24"],"created_at": ["2011_12_08"]},"category": "A Network Trojan was detected","gid": 1,"action": "allowed","signature": "ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan","signature_id": 2014002,"rev": 10,"severity": 1,"rule": "alert http $HOME_NET any -> $EXTERNAL_NET any (msg:\"ET TROJAN Fake Variation of Mozilla 4.0 - Likely Trojan\"; flow:established,to_server; content:\"Mozilla/4.0|20 28|compatible|3b 29|\"; http_user_agent; fast_pattern; isdataat:!1,relative; content:!\".bluecoat.com\"; http_host; http_header_names; content:!\"BlueCoat\"; nocase; threshold:type limit, track by_src, count 1, seconds 60; classtype:trojan-activity; sid:2014002; rev:10; metadata:created_at 2011_12_08, updated_at 2021_11_24;)"},"packet": "RQAAKAA9ZMAAA==","stream": 1,"flow_id": 769386515195888,"proto": "TCP","app_proto": "http","src_port": 64391,"dest_ip": "xxx","event_type": "alert","flow": {"start": "2022-05-10T10:43:58.911344+0000","pkts_toserver": 4,"pkts_toclient": 3,"bytes_toserver": 1102,"bytes_toclient": 245},"timestamp": "2022-05-10T10:43:59.106396+0000","tx_id": 0,"src_ip": "xxx","packet_info": {"linktype": 12}}},"network": {"transport": "TCP","community_id": "Ns="}}

 

 

Labels (1)
Tags (2)
0 Karma

PickleRick
SplunkTrust
SplunkTrust

Well, you're trying to force Splunk to do something it's not meant for 😉

But seriously - at ingest phase Splunk has no awareness of any json structure or anything like that (except for indexed extractions; you could try that). So the option I'd consider would be to use scripted/modular input instead of doing some dirty tricks with normal monitor input if you want to manipulate your data with external script.

0 Karma

oliverja
Path Finder

My input is:

Kafka -> Kafka Connect For Splunk  -> (HEC) Indexer

Scripted inputs would take the place of of the HEC and that is more than I want to tackle.

I hoped to process the data after splunk had "collected" it, but that may just not be possible.

Do we have a config for extending how far SED will go into a string?

Tags (1)
0 Karma

PickleRick
SplunkTrust
SplunkTrust

Bah, it indeed makes it a bit complicated. SED will just happily execute the s/// command according to the PCRE specified which means that you can control it to some extent using the greediness modifiers. But it's still a simple text-based operation, it has nothing to do with any JSON structure contained within your event so it ain't that easy to match - for example matching parentheses (although you could try; I'm not sure if it would work; theroretically, as it's a PCRE, it should)

See https://regex101.com/r/eBtSTM/1 (the example isn't mine. It's from https://stackoverflow.com/questions/546433/regular-expression-to-match-balanced-parentheses )

0 Karma
Get Updates on the Splunk Community!

Webinar Recap | Revolutionizing IT Operations: The Transformative Power of AI and ML ...

The Transformative Power of AI and ML in Enhancing Observability   In the realm of IT operations, the ...

.conf24 | Registration Open!

Hello, hello! I come bearing good news: Registration for .conf24 is now open!   conf is Splunk’s rad annual ...

ICYMI - Check out the latest releases of Splunk Edge Processor

Splunk is pleased to announce the latest enhancements to Splunk Edge Processor.  HEC Receiver authorization ...