Hi, everyone, need you help.
I have the json data, and the format is like this:
"alert_data": {"domain": "abc.com", "csv": {"id": 12345, "name": "credentials.csv", "mimetype": "text/csv", "is_safe": true, "content": [{"username": "test@abc.com", "password":"1qaz@WSX#EDC"}
Because password is sensitive information, I do 6-digits mask before indexing.
In addition, I need to check if the password meets the complexity, for example, the password should be at least 8 characters long and must include at least three of the following: numbers, uppercase letters, lowercase letters, and special characters.
So the indexed data should be:
"alert_data": {"domain": "abc.com", "csv": {"id": 12345, "name": "credentials.csv", "mimetype": "text/csv", "is_safe": true, "content": [{"username": "test@abc.com", "password":"******SX#EDC","is_password_meet_complexity":"Yes"}
I already mask the password with SEDCMD like this:
[json_sourcetype]
SEDCMD-password = s/\"password\"\:\s+\"\S{6}([^ ]*)/"password":"******\1/g
But I have no idea how to extract the complexity metadata of password field before indexing ( add "is_password_meet_complexity" field to log), should I use ingest time eval?
Your support in this is highly appreciated.
Hi @lynn140428,
We should note the example isn't strictly JSON:
"alert_data": {"domain": "abc.com", "csv": {"id": 12345, "name": "credentials.csv", "mimetype": "text/csv", "is_safe": true, "content": [{"username": "test@abc.com", "password":"1qaz@WSX#EDC"}
The string should start with a left brace ({), and the objects and array should be properly closed:
{"alert_data": {"domain": "abc.com", "csv": {"id": 12345, "name": "credentials.csv", "mimetype": "text/csv", "is_safe": true, "content": [{"username": "test@abc.com", "password":"1qaz@WSX#EDC"}]}}}
We can validate with the json eval function. Note that quotation marks are escaped with a backslash within the string:
| makeresults
| eval _raw=json("\"alert_data\": {\"domain\": \"abc.com\", \"csv\": {\"id\": 12345, \"name\": \"credentials.csv\", \"mimetype\": \"text/csv\", \"is_safe\": true, \"content\": [{\"username\": \"test@abc.com\", \"password\":\"1qaz@WSX#EDC\"}")
No results are returned.
Let's correct the JSON and try again:
| makeresults
| eval _raw=json("{\"alert_data\": {\"domain\": \"abc.com\", \"csv\": {\"id\": 12345, \"name\": \"credentials.csv\", \"mimetype\": \"text/csv\", \"is_safe\": true, \"content\": [{\"username\": \"test@abc.com\", \"password\":\"1qaz@WSX#EDC\"}]}}}")
We now have a valid JSON object in the _raw field, and we can use this object to test eval expressions that we'll apply later in a transform. You should correct the source data before proceeding further. Hint: You can correct the data at ingest using a simple eval expression and a transform similar to what I'm describing here.
Testing the length of the password is straightforward:
| eval length=len(json_extract(_raw, "alert_data.csv.content{}.password"))
| eval is_password_meet_complexity=if(length >= 8, "Yes", "No")
You haven't provided a list of special characters, but we'll assume they're drawn from the list of printable ASCII characters. Using PCRE character classes, we have:
Numbers or digits: [[:digit:]]
Uppercase letters: [[:upper:]]
Lowercase letters: [[:lower:]]
Punctuation characters: [[:punct:]]
We can test the password against these to determine whether it contains a character matching the class:
| makeresults
| eval _raw=json("{\"alert_data\": {\"domain\": \"abc.com\", \"csv\": {\"id\": 12345, \"name\": \"credentials.csv\", \"mimetype\": \"text/csv\", \"is_safe\": true, \"content\": [{\"username\": \"test@abc.com\", \"password\":\"1qaz@WSX#EDC\"}]}}}")
| eval length=len(json_extract(_raw, "alert_data.csv.content{}.password"))
| eval digit=if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:digit:]]"), 1, 0)
| eval upper=if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:upper:]]"), 1, 0)
| eval lower=if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:lower:]]"), 1, 0)
| eval punct=if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:punct:]]"), 1, 0)
| eval is_password_meet_complexity=if(length >= 8 AND (digit + upper + lower + punct) >= 3, "Yes", "No")
We can now combine the tests into a single expression:
| makeresults
| eval _raw=json("{\"alert_data\": {\"domain\": \"abc.com\", \"csv\": {\"id\": 12345, \"name\": \"credentials.csv\", \"mimetype\": \"text/csv\", \"is_safe\": true, \"content\": [{\"username\": \"test@abc.com\", \"password\":\"1qaz@WSX#EDC\"}]}}}")
| eval is_password_meet_complexity=if(len(json_extract(_raw, "alert_data.csv.content{}.password")) >= 8 AND (if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:digit:]]"), 1, 0) + if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:upper:]]"), 1, 0) + if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:lower:]]"), 1, 0) + if(match(json_extract(_raw, "alert_data.csv.content{}.password"), "[[:punct:]]"), 1, 0)) >= 3, "Yes", "No")
and use that expression to add the is_password_meet_complexity key to the object:
| makeresults
| eval _raw="{\"alert_data\": {\"domain\": \"abc.com\", \"csv\": {\"id\": 12345, \"name\": \"credentials.csv\", \"mimetype\": \"text/csv\", \"is_safe\": true, \"content\": [{\"username\": \"test@abc.com\", \"password\":\"1qaz@WSX#EDC\"}]}}}"
| eval _raw=json_set(_raw, "alert_data.csv.content{0}.is_password_meet_complexity", if(len(json_extract(json(_raw), "alert_data.csv.content{}.password")) >= 8 AND (if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:digit:]]"), 1, 0) + if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:upper:]]"), 1, 0) + if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:lower:]]"), 1, 0) + if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:punct:]]"), 1, 0)) >= 3, "Yes", "No"))
{"alert_data":{"domain":"abc.com","csv":{"id":12345,"name":"credentials.csv","mimetype":"text/csv","is_safe":true,"content":[{"username":"test@abc.com","password":"1qaz@WSX#EDC","is_password_meet_complexity":"Yes"}]}}}
With "password":"NotComplex":
{"alert_data":{"domain":"abc.com","csv":{"id":12345,"name":"credentials.csv","mimetype":"text/csv","is_safe":true,"content":[{"username":"test@abc.com","password":"NotComplex","is_password_meet_complexity":"No"}]}}}
Finally, we can use the eval expression in the INGEST_EVAL setting of a transform:
# props.conf
[json_sourcetype]
TRANSFORMS-password_complexity = password_complexity
SEDCMD-password = s/\"password\"\:\s+\"\S{6}([^ ]*)/"password":"******\1/g
# transforms.conf
[password_complexity]
INGEST_EVAL = _raw=json_set(_raw, "alert_data.csv.content{0}.is_password_meet_complexity", if(len(json_extract(json(_raw), "alert_data.csv.content{}.password")) >= 8 AND (if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:digit:]]"), 1, 0) + if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:upper:]]"), 1, 0) + if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:lower:]]"), 1, 0) + if(match(json_extract(json(_raw), "alert_data.csv.content{}.password"), "[[:punct:]]"), 1, 0)) >= 3, "Yes", "No"))
Note that your sample includes a single object in the content array. If you have multiple objects in your array, we'll need to refactor the solution to accommodate them.
Also note that we've used json(_raw) and json_extract(json(_raw), "alert_data.csv.content{}.password") repeatedly. There may be a way to optimize the expression and reduce the number of times the json and json_extract functions are called per event.
Finally note that we don't have to use json functions to analyze the password. If "password":"value" only appears once and is well-formed, we can match against _raw directly; however, escaped quotes as in "password":"val\"ue" pose a challenge.
I'll leave all of the above to you as an exercise. 🙂
Thanks!! That do work for the first object!
But I do have multiple objects in the array and the number of objects is not fixed, how can I refactor the solution to accommodate them? Sorry I have done the research but totally have no idea how to do that.
My first instinct is to use the mvmap eval function to iterate over the alert_data.csv.content array and join the results into a new JSON array:
| makeresults
| eval _raw="{\"alert_data\": {\"domain\": \"abc.com\", \"csv\": {\"id\": 12345, \"name\": \"credentials.csv\", \"mimetype\": \"text/csv\", \"is_safe\": true, \"content\": [{\"username\": \"test1@abc.com\", \"password\":\"1qaz@WSX#EDC\"}, {\"username\": \"test2@abc.com\", \"password\":\"NotComplex\"}]}}}"
| eval _raw=json_set(json(_raw), "alert_data.csv.content", json("[".mvjoin(mvmap(json_array_to_mv(json_extract(json(_raw), "alert_data.csv.content{}")), json_set(_raw, "is_password_meet_complexity", if(len(json_extract(_raw, "password")) >= 8 AND (if(match(json_extract(_raw, "password"), "[[:digit:]]"), 1, 0) + if(match(json_extract(_raw, "password"), "[[:upper:]]"), 1, 0) + if(match(json_extract(_raw, "password"), "[[:lower:]]"), 1, 0) + if(match(json_extract(_raw, "password"), "[[:punct:]]"), 1, 0)) >= 3, "Yes", "No"))), ",")."]"))
However, mvmap is not supported by INGEST_EVAL: "The following search-time eval functions are not currently supported at index-time with INGEST_EVAL: mvfilter, mvmap, searchmatch, now, and commands." See https://docs.splunk.com/Documentation/Splunk/latest/Data/IngestEval.
To work around the missing functionality, we must analyze the input stream using an external process. We have several options available, but my preference lately for file (monitor) inputs is the props.conf unarchive_cmd setting. unarchive_cmd streams data to an external command over stdin and sends the command's stdout stream to the Splunk ingest pipeline.
If we assume your file input is newline delimited JSON, unarchive_cmd allows us to read each object from stdin, process each content array item individually, and write the resulting object to stdout.
Given alert_data.ndjson:
{"alert_data": {"domain": "abc.com", "csv": {"id": 12345, "name": "credentials1.csv", "mimetype": "text/csv", "is_safe": true, "content": [{"username": "test1@abc.com", "password":"1qaz@WSX#EDC"}, {"username": "test2@abc.com", "password":"NotComplex"}]}}}
{"alert_data": {"domain": "abc.com", "csv": {"id": 67890, "name": "credentials2.csv", "mimetype": "text/csv", "is_safe": true, "content": [{"username": "test3@abc.com", "password":"passw0rd"}, {"username": "test4@abc.com", "password":"j#4kS.0e"}]}}}
let's introduce an alert_data source type and construct inputs.conf and props.conf:
# inputs.conf
[monitor:///tmp/alert_data.ndjson]
sourcetype = alert_data
# props.conf
[source::...alert_data.ndjson]
unarchive_cmd = python $SPLUNK_HOME/bin/scripts/preprocess_alert_data.py
unarchive_cmd_start_mode = direct
sourcetype = preprocess_alert_data
NO_BINARY_CHECK = true
[preprocess_alert_data]
invalid_cause = archive
is_valid = False
LEARN_MODEL = false
[alert_data]
DATETIME_CONFIG = CURRENT
SHOULD_LINEMERGE = false
LINE_BREAKER = ([\r\n]+)
EVENT_BREAKER_ENABLE = true
EVENT_BREAKER = ([\r\n]+)
Now let's write $SPLUNK_HOME/bin/scripts/preprocess_alert_data.py to read, process, and write JSON objects:
import json
import re
import sys
for line in sys.stdin:
line = line.strip()
if not line:
continue
else:
try:
json_object = json.loads(line.rstrip())
for item in json_object["alert_data"]["csv"]["content"]:
meets_length_requirement = len(item["password"]) >= 8
digit_score = 1 if re.search(r"\d", item["password"]) else 0
upper_score = 1 if re.search(r"[A-Z]", item["password"]) else 0
lower_score = 1 if re.search(r"[a-z]", item["password"]) else 0
punct_score = 1 if re.search(r"[^a-zA-Z0-9\s]", item["password"]) else 0
meets_complexity_requirement = True if (digit_score + upper_score + lower_score + punct_score) >= 3 else False
if meets_length_requirement and meets_complexity_requirement:
item["is_password_meet_complexity"] = "Yes"
else:
item["is_password_meet_complexity"] = "No"
print(json.dumps(json_object))
except Exception as err:
print(err, file=sys.stderr)
print(line)
On a full instance of Splunk Enterprise, i.e. a heavy forwarder, Splunk will use its local copy of Python. On a universal forwarder, we'll need to install Python 3.x and make sure the executable is in the path.
At scale, this solution is better implemented as a modular input, but that's a separate topic for a larger discussion.
Thanks for the quick reply, very appreciate.
But my json log input is defined in an app network input, not a file, it defines the sourcetype, for example, sourcetype=app_alert_data.
In this case, can I use unarchive_cmd to preprocess app_alert_data?
Thanks again.
Hi,
Yes. Use sourcetype = app_alert_data in the input stanza combined with a props configuration similar to what I shared. The props stanza uses a different sourcetype setting to specifically set the following values:
invalid_cause = archive is_valid = Fals
The combination of the two sourcetype stanzas gives you both preprocessing by the archive processor and parsing of the processed data by the app_alert_data stanza.
In some of my earlier posts, if you see them, I incorrectly stated unarchive_cmd would gives us checkpoint tracking etc. It does not. When the file is modified, it is re-read from the beginning. This is still useful for dropping complete files into a monitored directory.
Also: Don't forget to either keep your SEDCMD setting intact or add code to the Python script to mask the password. We'll assume all of this is being done for research and offline analysis.
For anyone reading along, this is an interesting exercise, but please do not store or log passwords in plain text. It's probably not even a good idea to log whether a particular user's password meets complexity requirements; that should be handled while the user is creating a password.