Hi @gcusello, If you're receiving Elastic Common Schema (ECS) events in JSON format, i.e. Logstash tcp output plugin to Splunk raw tcp input , then a combination of search-time automatic field extra...
See more...
Hi @gcusello, If you're receiving Elastic Common Schema (ECS) events in JSON format, i.e. Logstash tcp output plugin to Splunk raw tcp input , then a combination of search-time automatic field extractions, field aliases, etc. may be preferred if your site distinguishes between the Splunk administrator and Splunk knowledge manager functions; otherwise, I would transform the events using TRANSFORMS or RULESET. For example, given a source ECS event in _raw: {
"@timestamp": "2023-11-26T14:40:57.209Z",
"@metadata": {
"beat": "winlogbeat",
"type": "_doc",
"version": "8.11.1"
},
"event": {
"action": "Sensitive Privilege Use",
"created": "2023-11-26T16:52:21.698Z",
"code": "4673",
"kind": "event",
"provider": "Microsoft-Windows-Security-Auditing",
"outcome": "failure"
},
"log": {
"level": "information"
},
"message": "A privileged service was called.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-21-**********-**********-**********-1234\n\tAccount Name:\t\tuser\n\tAccount Domain:\t\tCONTOSO\n\tLogon ID:\t\t0x1960F1B\n\nService:\n\tServer:\tSecurity\n\tService Name:\t-\n\nProcess:\n\tProcess ID:\t0x37d0\n\tProcess Name:\tC:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe\n\nService Request Information:\n\tPrivileges:\t\tSeProfileSingleProcessPrivilege",
"host": {
"os": {
"kernel": "10.0.22621.2715 (WinBuild.160101.0800)",
"build": "22621.2715",
"type": "windows",
"platform": "windows",
"version": "10.0",
"family": "windows",
"name": "Windows 11 Pro"
},
"id": "6d403ce9-3f79-4551-b651-4d3eb6c53bc9",
"ip": [
"192.0.2.1"
],
"mac": [
"ff-ff-ff-ff-ff-ff",
],
"name": "my-pc",
"hostname": "my-pc",
"architecture": "x86_64"
},
"ecs": {
"version": "8.0.0"
},
"agent": {
"version": "8.11.1",
"ephemeral_id": "080792d1-f86c-4a5d-9c46-265212f944f7",
"id": "287205c6-b0d0-46f5-875a-1bcc6e013cf2",
"name": "my-pc",
"type": "winlogbeat"
},
"winlog": {
"provider_guid": "{54849625-5478-4994-a5ba-3e3b0328c30d}",
"channel": "Security",
"opcode": "Info",
"process": {
"pid": 4,
"thread": {
"id": 940
}
},
"provider_name": "Microsoft-Windows-Security-Auditing",
"event_data": {
"Service": "-",
"ProcessId": "0x37d0",
"SubjectUserSid": "S-1-5-21-**********-**********-**********-1234",
"SubjectDomainName": "MY-PC",
"ObjectServer": "Security",
"PrivilegeList": "SeProfileSingleProcessPrivilege",
"ProcessName": "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe",
"SubjectUserName": "user",
"SubjectLogonId": "0x1960f1b"
},
"keywords": [
"Audit Failure"
],
"computer_name": "my-pc",
"record_id": 383388,
"api": "wineventlog",
"event_id": "4673",
"task": "Sensitive Privilege Use"
}
} we can use INGEST_EVAL to reformat _raw as WinEventLog: # transforms.conf [ecs-to-wineventlog] INGEST_EVAL = _raw:=strftime(strptime(json_extract(_raw, "event.created"), "%Y-%m-%dT%H:%M:%S.%N%Z"), "%m/%d/%Y %I:%M:%S %p").urldecode("%0a")."LogName=".json_extract(_raw, "winlog.channel").urldecode("%0a")."EventCode=".json_extract(_raw, "winlog.event_id").urldecode("%0a")."ComputerName=".json_extract(_raw, "winlog.computer_name").urldecode("%0a")."RecordNumber=".json_extract(_raw, "winlog.record_id").urldecode("%0a")."Keywords=".json_extract(_raw, "winlog.keywords{}").urldecode("%0a")."TaskCategory=".json_extract(_raw, "winlog.task").urldecode("%0a")."OpCode=".json_extract(_raw, "winlog.opcode").urldecode("%0a")."Message=".json_extract(_raw, "message") We can use the same INGEST_EVAL or a separate one (before _raw is transformed) to extract _time from event.created, set sourcetype, etc. Note that my sample event does not have direct translations of EventType, SourceName, and Type. The output would be similar to a WinEventLog input with the related suppress_* settings set to true. ECS does have event.outcome and log.level fields; however, they've been normalized for Elastic. The ECS event.provider field is the internal Windows event log provider/source name, e.g. Microsoft-Windows-Security-Auditing, not the rendered name, e.g. Microsoft Windows security auditing. Translating to XmlWinEvnetLog is similar, but without access to the mvmap() function in INGEST_EVAL, construction of the <EventData><Data Name="Foo">Bar</Data></EventData> array wouldn't be dynamic. A search-time translation of EventData might look like this: | eval _raw="<EventData>".mvjoin(mvmap(split(replace(json_extract(_raw, "winlog.event_data"), "\",\"", "\"}".urldecode("%08")."{\""), urldecode("%08")), "<Data Name=\"".mvjoin(json_array_to_mv(json_keys(_raw)), "")."\">".replace(replace(replace(replace(replace(json_extract(_raw, mvjoin(json_array_to_mv(json_keys(_raw)), "")), "&", "&"), "\"", """), "'", "'"), "<", "<"), ">", ">")."</Data>"), "")."</EventData>" --but it isn't very maintainable. It also doesn't help with your problem; it's just fun. (Edit: The community editor may have stripped some of the characters from my examples. Apologies if they don't work as shown! This edit will also remove syntax highlighting--old community bug!) Some XML elements are not present in every event, so we would need to wrap segments in coalesce() to make them optional during extraction, e.g. coalesce(json_extract(_raw, "winlog.some_missing_field"), ""), or just inject an empty element, e.g. <Security />. The Windows Event schema is documented at <https://learn.microsoft.com/en-us/windows/win32/wes/eventschema-schema>.