If you're using Ingest Processor and SPL2, you can split the flowTuples into individual events. Here's the pipeline config to do so. I have re-used the field names referenced in the other answers to make the migration easier. Steps: Onboard data like before with the MSCS add-on Create the following pipeline with partitioning set to sourcetype == mscs:nsg:flow2 to avoid conflicting with the INDEXED_EXTRACTIONS in the TA you may have installed already. When creating a pipeline matching a sourcetype Ingest Processor will pull the event out before it's indexed, transform it and send it back into Splunk or your destination of choice: /*
A valid SPL2 statement for a pipeline must start with "$pipeline", and include "from $source"
and "into $destination".
*/
$pipeline = | from $source
| flatten _raw
| expand records
| flatten records
| fields - records
| flatten properties
| rename flows AS f1
| expand f1
| flatten f1
| rename flows AS f2
| expand f2
| flatten f2
| expand flowTuples
| eval flow_time=mvindex(split(flowTuples,","),0)
| eval src_ip=mvindex(split(flowTuples,","),1)
| eval dest_ip=mvindex(split(flowTuples,","),2)
| eval src_port=mvindex(split(flowTuples,","),3)
| eval dest_port=mvindex(split(flowTuples,","),4)
| eval transport=mvindex(split(flowTuples,","),5)
| eval traffic_flow=mvindex(split(flowTuples,","),6)
| eval traffic_result=mvindex(split(flowTuples,","),7)
| eval flow_state=mvindex(split(flowTuples,","),8)
| eval packets_in=mvindex(split(flowTuples,","),9)
| eval bytes_in=mvindex(split(flowTuples,","),10)
| eval packets_out=mvindex(split(flowTuples,","),11)
| eval bytes_out=mvindex(split(flowTuples,","),12)
// Normalization, which could also be done at search-time
| eval action=case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
| eval protocol=if(match(src_ip, /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/), "ip", "unknown")
| eval direction=case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
| eval transport=case(transport == "T", "tcp", transport == "U", "udp")
| eval bytes=(coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
| eval packets=(coalesce(packets_in,0)) + (coalesce(packets_out,0))
| fields - flowTuples
| eval _raw=json_object("resourceId", resourceId, "category", category, "macAddress", macAddress, "Version", Version, "systemId", systemId, "operationName", operationName, "mac", mac, "rule", rule, "flow_time", flow_time, "src_ip", src_ip, "dest_ip", dest_ip, "src_port", src_port, "dest_port", dest_port, "traffic_flow", traffic_flow, "traffic_result", traffic_result, "bytes_in", bytes_in, "bytes_out", bytes_out, "bytes", bytes, "packets_in", packets_in, "packets_out", packets_out, "packets", packets, "transport", transport, "protocol", protocol, "direction", direction, "action", action)
| eval _time=flow_time
| fields - flow_state, f1, time, f2, properties, resourceId, category, macAddress, Version, systemId, operationName, mac, rule, flow_time, src_ip, dest_ip, src_port, dest_port, traffic_flow, traffic_result, bytes_in, bytes_out, bytes, packets_in, packets_out, packets, transport, protocol, direction, action
| into $destination; On a side note, Microsoft will be deprecating NSG Flow Logs and replacing them with Virtual Network Flow Logs which has a similar format. Here's the config for Virtual Network Flow Logs with sourcetype mscs:vnet:flow : /*
A valid SPL2 statement for a pipeline must start with "$pipeline", and include "from $source"
and "into $destination".
*/
$pipeline = | from $source
| flatten _raw
| expand records
| flatten records
| fields - records
| rename flowRecords AS f1
| expand f1
| flatten f1
| rename flows AS f2
| expand f2
| flatten f2
| expand flowGroups
| flatten flowGroups
| expand flowTuples
| eval flow_time=mvindex(split(flowTuples,","),0)
| eval src_ip=mvindex(split(flowTuples,","),1)
| eval dest_ip=mvindex(split(flowTuples,","),2)
| eval src_port=mvindex(split(flowTuples,","),3)
| eval dest_port=mvindex(split(flowTuples,","),4)
| eval transport=mvindex(split(flowTuples,","),5)
| eval traffic_flow=mvindex(split(flowTuples,","),6)
| eval flow_state=mvindex(split(flowTuples,","),7)
| eval flow_encryption=mvindex(split(flowTuples,","),8)
| eval packets_in=toint(mvindex(split(flowTuples,","),9))
| eval bytes_in=toint(mvindex(split(flowTuples,","),10))
| eval packets_out=toint(mvindex(split(flowTuples,","),11))
| eval bytes_out=toint(mvindex(split(flowTuples,","),12))
// Normalization, which could also be done at search-time
| eval action=case(flow_state == "B", "allowed", flow_state == "D", "blocked", flow_state == "E", "teardown", flow_state == "C", "flow")
| eval protocol=if(match(src_ip, /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/), "ip", "unknown")
| eval direction=case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
| eval bytes=(toint(coalesce(bytes_in,0))) + (toint(coalesce(bytes_out,0)))
| eval packets=(toint(coalesce(packets_in,0))) + (toint(coalesce(packets_out,0)))
| fields - flowGroups
| eval _raw=json_object("record_time", time, "flowLogGUID", flowLogGUID, "flowLogResourceID", flowLogResourceID, "targetResourceId", targetResourceID, "category", category, "macAddress", macAddress, "flowLogVersion", flowLogVersion, "operationName", operationName, "aclID", aclID, "flow_encryption", flow_encryption, "src_ip", src_ip, "dest_ip", dest_ip, "src_port", src_port, "dest_port", dest_port, "traffic_flow", traffic_flow, "bytes_in", bytes_in, "bytes_out", bytes_out, "bytes", bytes, "packets_in", packets_in, "packets_out", packets_out, "packets", packets, "transport", transport, "protocol", protocol, "direction", direction, "action", action)
| eval _time = flow_time / 1000
| fields - packets_out, bytes_in, rule, f1, f2, packets, src_ip, targetResourceID, protocol, action, dest_port, aclID, flow_encryption, packets_in, operationName, transport, src_port, flow_state, macAddress, bytes_out, bytes, dest_ip, flowLogVersion, flowLogGUID, category, flowLogResourceID, flowTuples, traffic_flow, direction, time, flow_time
| into $destination;
... View more