I receive some logs in json format, but one of the nodes is mutable, sometimes it's an array, sometimes it is not. Take for example the two possible logs below:
Single record:
{
"root": {
"metadata": {
"name": "Jay Doe",
"email": "jay.doe@example.com"
},
"record": {
"row": {
"source_ip": "8.8.8.8",
"count": "1"
},
"identifiers": {
"to": "companyfoo.com",
"from": "example.com",
"header_from": "example.com"
}
}
}
}
Multiple records:
{
"root": {
"metadata": {
"name": "Bob Doe",
"email": "bob.doe@example.com"
},
"record": [
{
"row": {
"source_ip": "8.8.8.8",
"count": "1"
},
"identifiers": {
"to": "companyfoo.com",
"from": "example.com",
"header_from": "example.com"
}
},
{
"row": {
"source_ip": "8.8.4.4",
"count": "5"
},
"identifiers": {
"to": "companybar.com",
"from": "example.com",
"header_from": "example.com"
}
}
]
}
}
The only part that is mutable is root.record. I want to be able to parse both formats and have a table like so:
name | source_ip | count | to | from | header_from | |
Jay Doe | jay.doe@exampel.com | 8.8.8.8 | 1 | companyfoo.com | example.com | example.com |
Bob Doe | bob.doe@example.com | 8.8.8.8 | 1 | companyfoo.com | example.com | example.com |
Bob Doe | bob.doe@example.com | 8.8.4.4 | 5 | companybar.com | example.com | example.com |
Is it possible without using heavy and/or complex queries?
If record is the last field in root, you can edit it to convert the single instance to an array. If not, the sed will need to be adjusted accordingly
| makeresults
| eval event=split("{
\"root\": {
\"metadata\": {
\"name\": \"Jay Doe\",
\"email\": \"jay.doe@example.com\"
},
\"record\": {
\"row\": {
\"source_ip\": \"8.8.8.8\",
\"count\": \"1\"
},
\"identifiers\": {
\"to\": \"companyfoo.com\",
\"from\": \"example.com\",
\"header_from\": \"example.com\"
}
}
}
}|{
\"root\": {
\"metadata\": {
\"name\": \"Bob Doe\",
\"email\": \"bob.doe@example.com\"
},
\"record\": [
{
\"row\": {
\"source_ip\": \"8.8.8.8\",
\"count\": \"1\"
},
\"identifiers\": {
\"to\": \"companyfoo.com\",
\"from\": \"example.com\",
\"header_from\": \"example.com\"
}
},
{
\"row\": {
\"source_ip\": \"8.8.4.4\",
\"count\": \"5\"
},
\"identifiers\": {
\"to\": \"companybar.com\",
\"from\": \"example.com\",
\"header_from\": \"example.com\"
}
}
]
}
}","|")
| mvexpand event
| rex field=event mode=sed "s/(?s)(\"record\":\s)\{(.*)(\})(\s+\}$)/\1[{\2]\3\4/g"
| spath input=event path=root.record{} output=record
| spath input=event path=root.metadata output=metadata
| spath input=metadata
| mvexpand record
| spath input=record path=row output=row
| spath input=row
| spath input=record path=identifiers output=identifiers
| spath input=identifiers
| table name email source_ip count to from header_from
The part before the blank lines just sets up some test data with your two sample events
If record is the last field in root, you can edit it to convert the single instance to an array. If not, the sed will need to be adjusted accordingly
| makeresults
| eval event=split("{
\"root\": {
\"metadata\": {
\"name\": \"Jay Doe\",
\"email\": \"jay.doe@example.com\"
},
\"record\": {
\"row\": {
\"source_ip\": \"8.8.8.8\",
\"count\": \"1\"
},
\"identifiers\": {
\"to\": \"companyfoo.com\",
\"from\": \"example.com\",
\"header_from\": \"example.com\"
}
}
}
}|{
\"root\": {
\"metadata\": {
\"name\": \"Bob Doe\",
\"email\": \"bob.doe@example.com\"
},
\"record\": [
{
\"row\": {
\"source_ip\": \"8.8.8.8\",
\"count\": \"1\"
},
\"identifiers\": {
\"to\": \"companyfoo.com\",
\"from\": \"example.com\",
\"header_from\": \"example.com\"
}
},
{
\"row\": {
\"source_ip\": \"8.8.4.4\",
\"count\": \"5\"
},
\"identifiers\": {
\"to\": \"companybar.com\",
\"from\": \"example.com\",
\"header_from\": \"example.com\"
}
}
]
}
}","|")
| mvexpand event
| rex field=event mode=sed "s/(?s)(\"record\":\s)\{(.*)(\})(\s+\}$)/\1[{\2]\3\4/g"
| spath input=event path=root.record{} output=record
| spath input=event path=root.metadata output=metadata
| spath input=metadata
| mvexpand record
| spath input=record path=row output=row
| spath input=row
| spath input=record path=identifiers output=identifiers
| spath input=identifiers
| table name email source_ip count to from header_from
The part before the blank lines just sets up some test data with your two sample events
Thanks! This worked, but I end up changing the source so that ALL root.record are sent as list, even if there is only one record. This makes it easier and more consistent.
Changing the source is the better approach. 😀
Hi @JChris_
Can you try this,
<your_search_goes_here>
| spath input=_raw path=root.record{} output=array_items
| spath input=_raw path=root.metadata.name output=name
| spath input=_raw path=root.metadata.email output=email
| mvexpand array_items
| spath input=array_items path=row output=row
| spath input=array_items path=identifiers output=id
| spath input=row
| spath input=id
| table name email count source_ip to from header_from
Output for the second sample provided,
---
An upvote would be appreciated and Accept solution if this reply helps!
This only works for root.record when it is an array. The query should work for both.