Splunk Search

How do I import Azure NSG LOGs?

mdorobek
Path Finder

Hello there,

I try to import Azure NSG flow Events. To get the data into Splunk I use the Splunk Add-on for Microsoft Cloud (https://splunkbase.splunk.com/app/3110/). Heres a anonymized example of a delivered JSON Object.

{"time":"2018-06-06T09:00:09.2874215Z","systemId":"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx","category":"NetworkSecurityGroupFlowEvent","resourceId":"/SUBSCRIPTIONS/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/RESOURCEGROUPS/NETZWERK-NETZWERK-DUT-RG/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/HUBTRUSTED-NETZWERK-DUT-NSG","operationName":"NetworkSecurityGroupFlowEvents","properties":{"Version":1,"flows":[{"rule":"UserRule_AllowAllInBound","flows":[{"mac":"000D3A2DEF83","flowTuples":["1528275561,123.123.123.123,234.234.234.234,40531,2252,T,I,A","1528275571,12.12.12.12,23.23.23.23,39052,2095,T,I,A"]}]}]}},

For more information about the Azure NSG Logs: https://docs.microsoft.com/en-GB/azure/network-watcher/network-watcher-nsg-flow-logging-overview

Heres the props.conf I wrote:

[mscs:nsg:flow]
DATETIME_CONFIG =
KV_MODE = json
LINE_BREAKER = }}(,)
NO_BINARY_CHECK = true
SEDCMD-remove_header = s/{\"records\":\[//g
TIME_PREFIX = time\":\"
TRUNCATE = 0
category = Network & Security
disabled = false
pulldown_type = true
SEDCMD-add_closing_bracket = s/\s$/ }//g
SEDCMD-correctly-close = s/]}(?!\S)//g
SHOULD_LINEMERGE = true
REPORT-tuples = extract_tuple
SEDCMD-correctly-begin = s/^"time"/{"time"/g

As you can see in the following image this works very well for me. (This is another log then the raw log)

alt text

To extract the fields I wrote the following transforms.conf:

[extract_tuple]
REGEX = (?<timestamp>[0-9]{10}),(?<src_ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}),(?<dest_ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}),(?<src_port>[0-9]{1,5}),(?<dest_port>[0-9]{1,5}),(?<protocol>(T|U)),(?<traffic_flow>(I|O)),(?<traffic>(A|D))
MV_ADD = true

Heres a example for two extracted fields with this configurration:

alt text

And heres the problem I need some help. As far as I can see the fields get extracted correctly as multi value fields. But if I try to filter for a value like "dest_ip=234.234.234.234" I will get the four destination IP's "23.23.23.23", too. I guess Splunk shows every event with the minimum of one match. (and all other values of the event, too)

Another issue I have got is that searches over 3 or 4 multivalue fields need a lot of time or cant even executed.

Is there a better way to extract the fields?

Best regards,
mdorobek

1 Solution

ejwade
Contributor

So I have a similar problem, and I outline how I solved it in my post: https://answers.splunk.com/answers/714696/process-json-azure-nsg-flow-log-tuples.html

Basically, I set the LINE_BREAKER right before the epoch time of each tuple:

LINE_BREAKER = (\")\d{10}

This will start every event with the tuple. Then, I use SEDCMD to remove strings that begin with quotes through the end of the line, as long as the character after the quote wasn't a digit (to exclude tuples, because they begin with a digit, because it's epoch time):

SEDCMD-remove_not_epoch = s/\"\D.*$//g

Now all my logs are CSV tuples! I lose some information from the JSON object like rule name, but I can grab the NSG name from "source" because it's in the source directory. This is the best solution I've come across for our environment - please let me know if anyone sees any caveats.

View solution in original post

mbjerkeland_spl
Splunk Employee
Splunk Employee

If you're using Ingest Processor and SPL2, you can split the flowTuples into individual events. Here's the pipeline config to do so. I have re-used the field names referenced in the other answers to make the migration easier.

Steps:

  1. Onboard data like before with the MSCS add-on
  2. Create the following pipeline with partitioning set to sourcetype == mscs:nsg:flow2 to avoid conflicting with the INDEXED_EXTRACTIONS in the TA you may have installed already. When creating a pipeline matching a sourcetype Ingest Processor will pull the event out before it's indexed, transform it and send it back into Splunk or your destination of choice:

 

 

 

 

 

 

/*
A valid SPL2 statement for a pipeline must start with "$pipeline", and include "from $source"
and "into $destination".
*/
$pipeline = | from $source 
| flatten _raw
| expand records
| flatten records
| fields - records
| flatten properties
| rename flows AS f1
| expand f1
| flatten f1
| rename flows AS f2
| expand f2
| flatten f2
| expand flowTuples

| eval flow_time=mvindex(split(flowTuples,","),0)
| eval src_ip=mvindex(split(flowTuples,","),1)
| eval dest_ip=mvindex(split(flowTuples,","),2)
| eval src_port=mvindex(split(flowTuples,","),3)
| eval dest_port=mvindex(split(flowTuples,","),4)
| eval transport=mvindex(split(flowTuples,","),5)
| eval traffic_flow=mvindex(split(flowTuples,","),6)
| eval traffic_result=mvindex(split(flowTuples,","),7)
| eval flow_state=mvindex(split(flowTuples,","),8)
| eval packets_in=mvindex(split(flowTuples,","),9)
| eval bytes_in=mvindex(split(flowTuples,","),10)
| eval packets_out=mvindex(split(flowTuples,","),11)
| eval bytes_out=mvindex(split(flowTuples,","),12)

// Normalization, which could also be done at search-time
| eval action=case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
| eval protocol=if(match(src_ip, /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/), "ip", "unknown")
| eval direction=case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
| eval transport=case(transport == "T", "tcp", transport == "U", "udp")
| eval bytes=(coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
| eval packets=(coalesce(packets_in,0)) + (coalesce(packets_out,0))
| fields - flowTuples
| eval _raw=json_object("resourceId", resourceId, "category", category, "macAddress", macAddress, "Version", Version, "systemId", systemId, "operationName", operationName, "mac", mac, "rule", rule, "flow_time", flow_time, "src_ip", src_ip, "dest_ip", dest_ip, "src_port", src_port, "dest_port", dest_port, "traffic_flow", traffic_flow, "traffic_result", traffic_result, "bytes_in", bytes_in, "bytes_out", bytes_out, "bytes", bytes, "packets_in", packets_in, "packets_out", packets_out, "packets", packets, "transport", transport, "protocol", protocol, "direction", direction, "action", action)
| eval _time=flow_time
| fields - flow_state, f1, time, f2, properties, resourceId, category, macAddress, Version, systemId, operationName, mac, rule, flow_time, src_ip, dest_ip, src_port, dest_port, traffic_flow, traffic_result, bytes_in, bytes_out, bytes, packets_in, packets_out, packets, transport, protocol, direction, action
| into $destination;

 

 

 

 

 

 

 

On a side note, Microsoft will be deprecating NSG Flow Logs and replacing them with Virtual Network Flow Logs which has a similar format. Here's the config for Virtual Network Flow Logs with sourcetype mscs:vnet:flow :

 

 

 

 

 

 

/*
A valid SPL2 statement for a pipeline must start with "$pipeline", and include "from $source"
and "into $destination".
*/
$pipeline = | from $source 
| flatten _raw
| expand records
| flatten records
| fields - records
| rename flowRecords AS f1
| expand f1
| flatten f1
| rename flows AS f2
| expand f2
| flatten f2
| expand flowGroups
| flatten flowGroups
| expand flowTuples

| eval flow_time=mvindex(split(flowTuples,","),0)
| eval src_ip=mvindex(split(flowTuples,","),1)
| eval dest_ip=mvindex(split(flowTuples,","),2)
| eval src_port=mvindex(split(flowTuples,","),3)
| eval dest_port=mvindex(split(flowTuples,","),4)
| eval transport=mvindex(split(flowTuples,","),5)
| eval traffic_flow=mvindex(split(flowTuples,","),6)
| eval flow_state=mvindex(split(flowTuples,","),7)
| eval flow_encryption=mvindex(split(flowTuples,","),8)
| eval packets_in=toint(mvindex(split(flowTuples,","),9))
| eval bytes_in=toint(mvindex(split(flowTuples,","),10))
| eval packets_out=toint(mvindex(split(flowTuples,","),11))
| eval bytes_out=toint(mvindex(split(flowTuples,","),12))

// Normalization, which could also be done at search-time
| eval action=case(flow_state == "B", "allowed", flow_state == "D", "blocked", flow_state == "E", "teardown", flow_state == "C", "flow")
| eval protocol=if(match(src_ip, /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/), "ip", "unknown")
| eval direction=case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
| eval bytes=(toint(coalesce(bytes_in,0))) + (toint(coalesce(bytes_out,0)))
| eval packets=(toint(coalesce(packets_in,0))) + (toint(coalesce(packets_out,0)))
| fields - flowGroups
| eval _raw=json_object("record_time", time, "flowLogGUID", flowLogGUID, "flowLogResourceID", flowLogResourceID, "targetResourceId", targetResourceID, "category", category, "macAddress", macAddress, "flowLogVersion", flowLogVersion, "operationName", operationName, "aclID", aclID, "flow_encryption", flow_encryption, "src_ip", src_ip, "dest_ip", dest_ip, "src_port", src_port, "dest_port", dest_port, "traffic_flow", traffic_flow, "bytes_in", bytes_in, "bytes_out", bytes_out, "bytes", bytes, "packets_in", packets_in, "packets_out", packets_out, "packets", packets, "transport", transport, "protocol", protocol, "direction", direction, "action", action)

| eval _time = flow_time / 1000
| fields - packets_out, bytes_in, rule, f1, f2, packets, src_ip, targetResourceID, protocol, action, dest_port, aclID, flow_encryption, packets_in, operationName, transport, src_port, flow_state, macAddress, bytes_out, bytes, dest_ip, flowLogVersion, flowLogGUID, category, flowLogResourceID, flowTuples, traffic_flow, direction, time, flow_time
| into $destination;

 

 

 

 

 

 

 

ejwade
Contributor

So I have a similar problem, and I outline how I solved it in my post: https://answers.splunk.com/answers/714696/process-json-azure-nsg-flow-log-tuples.html

Basically, I set the LINE_BREAKER right before the epoch time of each tuple:

LINE_BREAKER = (\")\d{10}

This will start every event with the tuple. Then, I use SEDCMD to remove strings that begin with quotes through the end of the line, as long as the character after the quote wasn't a digit (to exclude tuples, because they begin with a digit, because it's epoch time):

SEDCMD-remove_not_epoch = s/\"\D.*$//g

Now all my logs are CSV tuples! I lose some information from the JSON object like rule name, but I can grab the NSG name from "source" because it's in the source directory. This is the best solution I've come across for our environment - please let me know if anyone sees any caveats.

bhsakarchourasi
Path Finder

Hi,

Thanks for the solution it works for me, although we have another ask from client to extract the rule name, have you ever tried or achieved to extract the rule name from the event.

We have tried putting transforms before SEDCMD (looks silly.. 😄 but really that it will work) but it's not working.

Looking forward for your reply.

Thanks,

Bhaskar  

0 Karma

mdorobek
Path Finder

I also thought about this approach. The issues I have with this is that I loose the ressourceID and SystemID, don't I?
Is it possible to share youre complete props.conf? Then I can try your solution.

As far as I'm concered im fine with useing the mvexpand on the multi value field I use in my search, but I have nothing agianst better solutions.

0 Karma

ejwade
Contributor

The values of resourceID can be obtained from the source field. You could create a field alias for source AS resourceID.

systemID is a unique identifier for the resourceID as well, so it doesn't provide much value if you are referencing source for the resourceID (unless you need the systemID string, but you could always reference in a lookup table absolutely necessary).

The problem with multi value fields is how they map to CIM data models. I needed to break everything apart because I run alerts and reports against the Network Traffic CIM data model, and it doesn't work if there are multi valued fields. I hope this helps!

### props.conf (indexer)
[mscs:nsg:flow]

### Break apart tuples as single events, and discard the rest of the JSON blob
LINE_BREAKER = (\")\d{10}
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$//g

## props.conf (search head)
[mscs:nsg:flow]
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc
EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
REPORT-tuples = extract_tuple

### transforms.conf (search head)
[extract_tuple]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

edoardo_vicendo
Builder

Thank you all, here are updated configurations adapted from the previous ones. NSG logs are written into an Azure Storage Account. Then a Splunk HF reads the logs from the Azure Storage account with "Splunk Add-on for Microsoft Cloud Services" and send back to the Indexers.

 

Configuration applied on the Splunk Heavy Forwarder (can be applied in an Indexer if you don't have an HF)

hf_in_azure_nsg_app/default

inputs.conf
#Inputs is defined directly in the Splunk HF via WEB-UI with "Splunk Add-on for Microsoft Cloud Services" and can be found here /opt/splunk/etc/apps/Splunk_TA_microsoft-cloudservices/local/inputs.conf


props.conf

#NOTE: Following set-up allow to extract only the flowTuples from the payload and set _time based on flowTuples epoch
#First LINE_BREAKER apply, then SEDCMD-remove_not_epoch that keeps only flowTuples, then TRANSFORMS with INGEST_EVAL that overwrite _time
#flowTuples data parsing is done at search time in the Search Head with separate app
#The "source" field already contains the resourceId informations (subscriptionID, resourceGroupName, nsgName, macAddress) that can be extracted on the Search Head at search time
#NOTE 2: LINE_BREAKER has been enhanced to avoid extracting events with macAddress containing first 10 numeric digits
#TO BE DONE: Understand if SEDCMD- has some limit on very huge payload
#TO BE DONE 2: In the INGEST_EVAL with a case statement if length is lower than 10 digits valorize now() as _time

#References:
#https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
#https://community.splunk.com/t5/Splunk-Search/How-do-I-import-Azure-NSG-LOGs/td-p/396018
#https://community.splunk.com/t5/Getting-Data-In/How-to-extract-an-event-timestamp-where-seconds-and-milliseconds/m-p/428837

[mscs:nsg:flow]
MAX_TIMESTAMP_LOOKAHEAD = 10
LINE_BREAKER = (\")\d{10}\,|(\"\,\")\d{10}\,
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$|\{|\}|\[|\]//g
TRUNCATE = 50000000
TRANSFORMS-evalingest = nsg_eval_substr_time


transforms.conf

[nsg_eval_substr_time]
INGEST_EVAL = _time=substr(_raw,0,10)

 

Configuration applied on the Splunk Search Head

sh_azure_nsg_app/default

props.conf

#References:
#https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
#https://community.splunk.com/t5/Splunk-Search/How-do-I-import-Azure-NSG-LOGs/td-p/396018
#https://community.splunk.com/t5/Getting-Data-In/How-to-extract-an-event-timestamp-where-seconds-and-milliseconds/m-p/428837

[mscs:nsg:flow]
REPORT-tuples = extract_tuple_v1, extract_tuple_v2
REPORT-nsg = sub_res_nsg
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc

EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-direction = case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
EVAL-bytes = (coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
EVAL-packets = (coalesce(packets_in,0)) + (coalesce(packets_out,0))
EVAL-flow_state_desc = case(flow_state == "B", "begin", flow_state == "C", "continuing ", flow_state == "E", "end")


transforms.conf

[extract_tuple_v1]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

[extract_tuple_v2]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result,flow_state,packets_in,bytes_in,packets_out,bytes_out

[sub_res_nsg]
SOURCE_KEY = source
REGEX = SUBSCRIPTIONS\/(\S+)\/RESOURCEGROUPS\/(\S+)\/PROVIDERS\/MICROSOFT.NETWORK\/NETWORKSECURITYGROUPS\/(\S+)\/y=\d+\/m=\d+\/d=\d+\/h=\d+\/m=\d+\/macAddress=(\S+)\/
FORMAT = subscriptionID::$1 resourceGroupName::$2 nsgName::$3 macAddress::$4


eventtypes.conf

[mscs_nsg_flow]
search = sourcetype=mscs:nsg:flow src_ip=*

[mscs_nsg_flow_start]
search = sourcetype=mscs:nsg:flow flow_state=B

[mscs_nsg_flow_end]
search = sourcetype=mscs:nsg:flow flow_state=E


tags.conf

[eventtype=mscs_nsg_flow]
network = enabled
communicate = enabled

[eventtype=mscs_nsg_flow_start]
network = enabled
session = enabled
start = enabled

[eventtype=mscs_nsg_flow_end]
network = enabled
session = enabled
end = enabled

 

Best Regards,

Edoardo

 

0 Karma

phanim565
Engager

Hi, Is there any way to fetch rulename from json format before it breaks into csv tuples

0 Karma

ChrisBell04
Communicator

Here are my mods to @ejwade code above. Still a shame this isnt handled in Splunk's official addons after 2+ years 😞

## props
[mscs:nsg:flow]
# https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
MAX_TIMESTAMP_LOOKAHEAD = 12
LINE_BREAKER = (\")\d{10}|(\"\,\")\d{10}
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$//g
TRUNCATE = 35000
NO_BINARY_CHECK = false
pulldown_type = true

REPORT-tuples = extract_tuple_v1, extract_tuple_v2
REPORT-nsg = sub_res_nsg
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc

EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-direction = case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
EVAL-bytes = (coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
EVAL-packets = (coalesce(packets_in,0)) + (coalesce(packets_out,0))

## transforms
#https://www.splunk.com/blog/2017/02/20/splunking-microsoft-azure-network-watcher-data.html
#https://answers.splunk.com/answers/666758/import-azure-nsg-logs.html
[extract_tuple_v1]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

[extract_tuple_v2]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result,flow_state,packets_in,bytes_in,packets_out,bytes_out

[sub_res_nsg]
SOURCE_KEY = source
REGEX = SUBSCRIPTIONS\/(\S+)\/RESOURCEGROUPS\/(\S+)\/PROVIDERS\/MICROSOFT.NETWORK\/NETWORKSECURITYGROUPS\/(\S+)\/y
FORMAT = subscriptionID::$1 resourceGroupName::$2 nsgName::$3

## eventtypes
[mscs_nsg_flow]
search = sourcetype=mscs:nsg:flow src_ip=*
#tags = network communicate

[mscs_nsg_flow_start]
search = sourcetype=mscs:nsg:flow flow_state=B
#tags = network session start

[mscs_nsg_flow_end]
search = sourcetype=mscs:nsg:flow flow_state=E
#tags = network session end

## tags
[eventtype=mscs_nsg_flow]
network = enabled
communicate = enabled

[eventtype=mscs_nsg_flow_start]
network = enabled
session = enabled
start = enabled

[eventtype=mscs_nsg_flow_end]
network = enabled
session = enabled
end = enabled

rkantamaneni_sp
Splunk Employee
Splunk Employee

Hi All,

Splunk Idea APPSID-I-511 (https://ideas.splunk.com/ideas/APPSID-I-511) has been created for this to get native support in the Add-On. Go vote on it and add comments. 🙂

0 Karma

rijom
Explorer

I have slightly modified my props to read the epoch timestamp and weed out the empty events. Hope this helps someone!

SHOULD_LINEMERGE = false
TRUNCATE=100000
TIME_FORMAT = %s
TIME_PREFIX=^
LINE_BREAKER = (\")\d{10}\,
SEDCMD-remove_not_epoch = s/\{?\"\D.*$//g
MAX_TIMESTAMP_LOOKAHEAD = 12

jconger
Splunk Employee
Splunk Employee

IP addresses are actually multiple segments. From the documentation about segmentation:

...the IP address 192.0.2.223 is a major segment. But this major segment can be broken down into minor segments, such as "192", as well as groups of minor segments like "192.0.2".

To search specifically for the IP address "234.234.234.234", use TERM().

dest_ip=TERM(234.234.234.234)
0 Karma

mdorobek
Path Finder

Even with using TERM() I still have the exact same issue as before. I don't think this is the right answer.
Do you need any further information?

0 Karma
Get Updates on the Splunk Community!

Announcing the Expansion of the Splunk Academic Alliance Program

The Splunk Community is more than just an online forum — it’s a network of passionate users, administrators, ...

Learn Splunk Insider Insights, Do More With Gen AI, & Find 20+ New Use Cases You Can ...

Splunk Lantern is a Splunk customer success center that provides advice from Splunk experts on valuable data ...

Buttercup Games: Further Dashboarding Techniques (Part 7)

This series of blogs assumes you have already completed the Splunk Enterprise Search Tutorial as it uses the ...