Splunk Search
Highlighted

Import Azure NSG LOGs

Path Finder

Hello there,

I try to import Azure NSG flow Events. To get the data into Splunk I use the Splunk Add-on for Microsoft Cloud (https://splunkbase.splunk.com/app/3110/). Heres a anonymized example of a delivered JSON Object.

{"time":"2018-06-06T09:00:09.2874215Z","systemId":"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx","category":"NetworkSecurityGroupFlowEvent","resourceId":"/SUBSCRIPTIONS/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/RESOURCEGROUPS/NETZWERK-NETZWERK-DUT-RG/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/HUBTRUSTED-NETZWERK-DUT-NSG","operationName":"NetworkSecurityGroupFlowEvents","properties":{"Version":1,"flows":[{"rule":"UserRule_AllowAllInBound","flows":[{"mac":"000D3A2DEF83","flowTuples":["1528275561,123.123.123.123,234.234.234.234,40531,2252,T,I,A","1528275571,12.12.12.12,23.23.23.23,39052,2095,T,I,A"]}]}]}},

For more information about the Azure NSG Logs: https://docs.microsoft.com/en-GB/azure/network-watcher/network-watcher-nsg-flow-logging-overview

Heres the props.conf I wrote:

[mscs:nsg:flow]
DATETIME_CONFIG =
KV_MODE = json
LINE_BREAKER = }}(,)
NO_BINARY_CHECK = true
SEDCMD-remove_header = s/{\"records\":\[//g
TIME_PREFIX = time\":\"
TRUNCATE = 0
category = Network & Security
disabled = false
pulldown_type = true
SEDCMD-add_closing_bracket = s/\s$/ }//g
SEDCMD-correctly-close = s/]}(?!\S)//g
SHOULD_LINEMERGE = true
REPORT-tuples = extract_tuple
SEDCMD-correctly-begin = s/^"time"/{"time"/g

As you can see in the following image this works very well for me. (This is another log then the raw log)

alt text

To extract the fields I wrote the following transforms.conf:

[extract_tuple]
REGEX = (?<timestamp>[0-9]{10}),(?<src_ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}),(?<dest_ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}),(?<src_port>[0-9]{1,5}),(?<dest_port>[0-9]{1,5}),(?<protocol>(T|U)),(?<traffic_flow>(I|O)),(?<traffic>(A|D))
MV_ADD = true

Heres a example for two extracted fields with this configurration:

alt text

And heres the problem I need some help. As far as I can see the fields get extracted correctly as multi value fields. But if I try to filter for a value like "dest_ip=234.234.234.234" I will get the four destination IP's "23.23.23.23", too. I guess Splunk shows every event with the minimum of one match. (and all other values of the event, too)

Another issue I have got is that searches over 3 or 4 multivalue fields need a lot of time or cant even executed.

Is there a better way to extract the fields?

Best regards,
mdorobek

Highlighted

Re: Import Azure NSG LOGs

Motivator

IP addresses are actually multiple segments. From the documentation about segmentation:

...the IP address 192.0.2.223 is a major segment. But this major segment can be broken down into minor segments, such as "192", as well as groups of minor segments like "192.0.2".

To search specifically for the IP address "234.234.234.234", use TERM().

dest_ip=TERM(234.234.234.234)
0 Karma
Highlighted

Re: Import Azure NSG LOGs

Path Finder

Even with using TERM() I still have the exact same issue as before. I don't think this is the right answer.
Do you need any further information?

0 Karma
Highlighted

Re: Import Azure NSG LOGs

Communicator

So I have a similar problem, and I outline how I solved it in my post: https://answers.splunk.com/answers/714696/process-json-azure-nsg-flow-log-tuples.html

Basically, I set the LINE_BREAKER right before the epoch time of each tuple:

LINE_BREAKER = (\")\d{10}

This will start every event with the tuple. Then, I use SEDCMD to remove strings that begin with quotes through the end of the line, as long as the character after the quote wasn't a digit (to exclude tuples, because they begin with a digit, because it's epoch time):

SEDCMD-removenotepoch = s/\"\D.*$//g

Now all my logs are CSV tuples! I lose some information from the JSON object like rule name, but I can grab the NSG name from "source" because it's in the source directory. This is the best solution I've come across for our environment - please let me know if anyone sees any caveats.

View solution in original post

Highlighted

Re: Import Azure NSG LOGs

Path Finder

I also thought about this approach. The issues I have with this is that I loose the ressourceID and SystemID, don't I?
Is it possible to share youre complete props.conf? Then I can try your solution.

As far as I'm concered im fine with useing the mvexpand on the multi value field I use in my search, but I have nothing agianst better solutions.

0 Karma
Highlighted

Re: Import Azure NSG LOGs

Communicator

The values of resourceID can be obtained from the source field. You could create a field alias for source AS resourceID.

systemID is a unique identifier for the resourceID as well, so it doesn't provide much value if you are referencing source for the resourceID (unless you need the systemID string, but you could always reference in a lookup table absolutely necessary).

The problem with multi value fields is how they map to CIM data models. I needed to break everything apart because I run alerts and reports against the Network Traffic CIM data model, and it doesn't work if there are multi valued fields. I hope this helps!

### props.conf (indexer)
[mscs:nsg:flow]

### Break apart tuples as single events, and discard the rest of the JSON blob
LINE_BREAKER = (\")\d{10}
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$//g

## props.conf (search head)
[mscs:nsg:flow]
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc
EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
REPORT-tuples = extract_tuple

### transforms.conf (search head)
[extract_tuple]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result
0 Karma
Highlighted

Re: Import Azure NSG LOGs

Communicator

Here are my mods to @ejwade code above. Still a shame this isnt handled in Splunk's official addons after 2+ years 😞

## props
[mscs:nsg:flow]
# https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
MAX_TIMESTAMP_LOOKAHEAD = 12
LINE_BREAKER = (\")\d{10}|(\"\,\")\d{10}
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$//g
TRUNCATE = 35000
NO_BINARY_CHECK = false
pulldown_type = true

REPORT-tuples = extract_tuple_v1, extract_tuple_v2
REPORT-nsg = sub_res_nsg
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc

EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-direction = case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
EVAL-bytes = (coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
EVAL-packets = (coalesce(packets_in,0)) + (coalesce(packets_out,0))

## transforms
#https://www.splunk.com/blog/2017/02/20/splunking-microsoft-azure-network-watcher-data.html
#https://answers.splunk.com/answers/666758/import-azure-nsg-logs.html
[extract_tuple_v1]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

[extract_tuple_v2]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result,flow_state,packets_in,bytes_in,packets_out,bytes_out

[sub_res_nsg]
SOURCE_KEY = source
REGEX = SUBSCRIPTIONS\/(\S+)\/RESOURCEGROUPS\/(\S+)\/PROVIDERS\/MICROSOFT.NETWORK\/NETWORKSECURITYGROUPS\/(\S+)\/y
FORMAT = subscriptionID::$1 resourceGroupName::$2 nsgName::$3

## eventtypes
[mscs_nsg_flow]
search = sourcetype=mscs:nsg:flow src_ip=*
#tags = network communicate

[mscs_nsg_flow_start]
search = sourcetype=mscs:nsg:flow flow_state=B
#tags = network session start

[mscs_nsg_flow_end]
search = sourcetype=mscs:nsg:flow flow_state=E
#tags = network session end

## tags
[eventtype=mscs_nsg_flow]
network = enabled
communicate = enabled

[eventtype=mscs_nsg_flow_start]
network = enabled
session = enabled
start = enabled

[eventtype=mscs_nsg_flow_end]
network = enabled
session = enabled
end = enabled
0 Karma
Highlighted

Re: Import Azure NSG LOGs

New Member

I have slightly modified my props to read the epoch timestamp and weed out the empty events. Hope this helps someone!

SHOULD_LINEMERGE = false
TRUNCATE=100000
TIME_FORMAT = %s
TIME_PREFIX=^
LINE_BREAKER = (\")\d{10}\,
SEDCMD-remove_not_epoch = s/\{?\"\D.*$//g
MAX_TIMESTAMP_LOOKAHEAD = 12
0 Karma
Speak Up for Splunk Careers!

We want to better understand the impact Splunk experience and expertise has has on individuals' careers, and help highlight the growing demand for Splunk skills.