If you're using Ingest Processor and SPL2, you can split the flowTuples into individual events. Here's the pipeline config to do so. I have re-used the field names referenced in the other answers to ...
See more...
If you're using Ingest Processor and SPL2, you can split the flowTuples into individual events. Here's the pipeline config to do so. I have re-used the field names referenced in the other answers to make the migration easier. Steps: Onboard data like before with the MSCS add-on Create the following pipeline with partitioning set to sourcetype == mscs:nsg:flow2 to avoid conflicting with the INDEXED_EXTRACTIONS in the TA you may have installed already. When creating a pipeline matching a sourcetype Ingest Processor will pull the event out before it's indexed, transform it and send it back into Splunk or your destination of choice: /*
A valid SPL2 statement for a pipeline must start with "$pipeline", and include "from $source"
and "into $destination".
*/
$pipeline = | from $source
| flatten _raw
| expand records
| flatten records
| fields - records
| flatten properties
| rename flows AS f1
| expand f1
| flatten f1
| rename flows AS f2
| expand f2
| flatten f2
| expand flowTuples
| eval flow_time=mvindex(split(flowTuples,","),0)
| eval src_ip=mvindex(split(flowTuples,","),1)
| eval dest_ip=mvindex(split(flowTuples,","),2)
| eval src_port=mvindex(split(flowTuples,","),3)
| eval dest_port=mvindex(split(flowTuples,","),4)
| eval transport=mvindex(split(flowTuples,","),5)
| eval traffic_flow=mvindex(split(flowTuples,","),6)
| eval traffic_result=mvindex(split(flowTuples,","),7)
| eval flow_state=mvindex(split(flowTuples,","),8)
| eval packets_in=mvindex(split(flowTuples,","),9)
| eval bytes_in=mvindex(split(flowTuples,","),10)
| eval packets_out=mvindex(split(flowTuples,","),11)
| eval bytes_out=mvindex(split(flowTuples,","),12)
// Normalization, which could also be done at search-time
| eval action=case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
| eval protocol=if(match(src_ip, /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/), "ip", "unknown")
| eval direction=case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
| eval transport=case(transport == "T", "tcp", transport == "U", "udp")
| eval bytes=(coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
| eval packets=(coalesce(packets_in,0)) + (coalesce(packets_out,0))
| fields - flowTuples
| eval _raw=json_object("resourceId", resourceId, "category", category, "macAddress", macAddress, "Version", Version, "systemId", systemId, "operationName", operationName, "mac", mac, "rule", rule, "flow_time", flow_time, "src_ip", src_ip, "dest_ip", dest_ip, "src_port", src_port, "dest_port", dest_port, "traffic_flow", traffic_flow, "traffic_result", traffic_result, "bytes_in", bytes_in, "bytes_out", bytes_out, "bytes", bytes, "packets_in", packets_in, "packets_out", packets_out, "packets", packets, "transport", transport, "protocol", protocol, "direction", direction, "action", action)
| eval _time=flow_time
| fields - flow_state, f1, time, f2, properties, resourceId, category, macAddress, Version, systemId, operationName, mac, rule, flow_time, src_ip, dest_ip, src_port, dest_port, traffic_flow, traffic_result, bytes_in, bytes_out, bytes, packets_in, packets_out, packets, transport, protocol, direction, action
| into $destination; On a side note, Microsoft will be deprecating NSG Flow Logs and replacing them with Virtual Network Flow Logs which has a similar format. Here's the config for Virtual Network Flow Logs with sourcetype mscs:vnet:flow : /*
A valid SPL2 statement for a pipeline must start with "$pipeline", and include "from $source"
and "into $destination".
*/
$pipeline = | from $source
| flatten _raw
| expand records
| flatten records
| fields - records
| rename flowRecords AS f1
| expand f1
| flatten f1
| rename flows AS f2
| expand f2
| flatten f2
| expand flowGroups
| flatten flowGroups
| expand flowTuples
| eval flow_time=mvindex(split(flowTuples,","),0)
| eval src_ip=mvindex(split(flowTuples,","),1)
| eval dest_ip=mvindex(split(flowTuples,","),2)
| eval src_port=mvindex(split(flowTuples,","),3)
| eval dest_port=mvindex(split(flowTuples,","),4)
| eval transport=mvindex(split(flowTuples,","),5)
| eval traffic_flow=mvindex(split(flowTuples,","),6)
| eval flow_state=mvindex(split(flowTuples,","),7)
| eval flow_encryption=mvindex(split(flowTuples,","),8)
| eval packets_in=toint(mvindex(split(flowTuples,","),9))
| eval bytes_in=toint(mvindex(split(flowTuples,","),10))
| eval packets_out=toint(mvindex(split(flowTuples,","),11))
| eval bytes_out=toint(mvindex(split(flowTuples,","),12))
// Normalization, which could also be done at search-time
| eval action=case(flow_state == "B", "allowed", flow_state == "D", "blocked", flow_state == "E", "teardown", flow_state == "C", "flow")
| eval protocol=if(match(src_ip, /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/), "ip", "unknown")
| eval direction=case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
| eval bytes=(toint(coalesce(bytes_in,0))) + (toint(coalesce(bytes_out,0)))
| eval packets=(toint(coalesce(packets_in,0))) + (toint(coalesce(packets_out,0)))
| fields - flowGroups
| eval _raw=json_object("record_time", time, "flowLogGUID", flowLogGUID, "flowLogResourceID", flowLogResourceID, "targetResourceId", targetResourceID, "category", category, "macAddress", macAddress, "flowLogVersion", flowLogVersion, "operationName", operationName, "aclID", aclID, "flow_encryption", flow_encryption, "src_ip", src_ip, "dest_ip", dest_ip, "src_port", src_port, "dest_port", dest_port, "traffic_flow", traffic_flow, "bytes_in", bytes_in, "bytes_out", bytes_out, "bytes", bytes, "packets_in", packets_in, "packets_out", packets_out, "packets", packets, "transport", transport, "protocol", protocol, "direction", direction, "action", action)
| eval _time = flow_time / 1000
| fields - packets_out, bytes_in, rule, f1, f2, packets, src_ip, targetResourceID, protocol, action, dest_port, aclID, flow_encryption, packets_in, operationName, transport, src_port, flow_state, macAddress, bytes_out, bytes, dest_ip, flowLogVersion, flowLogGUID, category, flowLogResourceID, flowTuples, traffic_flow, direction, time, flow_time
| into $destination;