Splunk Search

How do I import Azure NSG LOGs?

mdorobek
Path Finder

Hello there,

I try to import Azure NSG flow Events. To get the data into Splunk I use the Splunk Add-on for Microsoft Cloud (https://splunkbase.splunk.com/app/3110/). Heres a anonymized example of a delivered JSON Object.

{"time":"2018-06-06T09:00:09.2874215Z","systemId":"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx","category":"NetworkSecurityGroupFlowEvent","resourceId":"/SUBSCRIPTIONS/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/RESOURCEGROUPS/NETZWERK-NETZWERK-DUT-RG/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/HUBTRUSTED-NETZWERK-DUT-NSG","operationName":"NetworkSecurityGroupFlowEvents","properties":{"Version":1,"flows":[{"rule":"UserRule_AllowAllInBound","flows":[{"mac":"000D3A2DEF83","flowTuples":["1528275561,123.123.123.123,234.234.234.234,40531,2252,T,I,A","1528275571,12.12.12.12,23.23.23.23,39052,2095,T,I,A"]}]}]}},

For more information about the Azure NSG Logs: https://docs.microsoft.com/en-GB/azure/network-watcher/network-watcher-nsg-flow-logging-overview

Heres the props.conf I wrote:

[mscs:nsg:flow]
DATETIME_CONFIG =
KV_MODE = json
LINE_BREAKER = }}(,)
NO_BINARY_CHECK = true
SEDCMD-remove_header = s/{\"records\":\[//g
TIME_PREFIX = time\":\"
TRUNCATE = 0
category = Network & Security
disabled = false
pulldown_type = true
SEDCMD-add_closing_bracket = s/\s$/ }//g
SEDCMD-correctly-close = s/]}(?!\S)//g
SHOULD_LINEMERGE = true
REPORT-tuples = extract_tuple
SEDCMD-correctly-begin = s/^"time"/{"time"/g

As you can see in the following image this works very well for me. (This is another log then the raw log)

alt text

To extract the fields I wrote the following transforms.conf:

[extract_tuple]
REGEX = (?<timestamp>[0-9]{10}),(?<src_ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}),(?<dest_ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}),(?<src_port>[0-9]{1,5}),(?<dest_port>[0-9]{1,5}),(?<protocol>(T|U)),(?<traffic_flow>(I|O)),(?<traffic>(A|D))
MV_ADD = true

Heres a example for two extracted fields with this configurration:

alt text

And heres the problem I need some help. As far as I can see the fields get extracted correctly as multi value fields. But if I try to filter for a value like "dest_ip=234.234.234.234" I will get the four destination IP's "23.23.23.23", too. I guess Splunk shows every event with the minimum of one match. (and all other values of the event, too)

Another issue I have got is that searches over 3 or 4 multivalue fields need a lot of time or cant even executed.

Is there a better way to extract the fields?

Best regards,
mdorobek

Labels (1)
1 Solution

ejwade
Contributor

So I have a similar problem, and I outline how I solved it in my post: https://answers.splunk.com/answers/714696/process-json-azure-nsg-flow-log-tuples.html

Basically, I set the LINE_BREAKER right before the epoch time of each tuple:

LINE_BREAKER = (\")\d{10}

This will start every event with the tuple. Then, I use SEDCMD to remove strings that begin with quotes through the end of the line, as long as the character after the quote wasn't a digit (to exclude tuples, because they begin with a digit, because it's epoch time):

SEDCMD-remove_not_epoch = s/\"\D.*$//g

Now all my logs are CSV tuples! I lose some information from the JSON object like rule name, but I can grab the NSG name from "source" because it's in the source directory. This is the best solution I've come across for our environment - please let me know if anyone sees any caveats.

View solution in original post

ejwade
Contributor

So I have a similar problem, and I outline how I solved it in my post: https://answers.splunk.com/answers/714696/process-json-azure-nsg-flow-log-tuples.html

Basically, I set the LINE_BREAKER right before the epoch time of each tuple:

LINE_BREAKER = (\")\d{10}

This will start every event with the tuple. Then, I use SEDCMD to remove strings that begin with quotes through the end of the line, as long as the character after the quote wasn't a digit (to exclude tuples, because they begin with a digit, because it's epoch time):

SEDCMD-remove_not_epoch = s/\"\D.*$//g

Now all my logs are CSV tuples! I lose some information from the JSON object like rule name, but I can grab the NSG name from "source" because it's in the source directory. This is the best solution I've come across for our environment - please let me know if anyone sees any caveats.

bhsakarchourasi
Path Finder

Hi,

Thanks for the solution it works for me, although we have another ask from client to extract the rule name, have you ever tried or achieved to extract the rule name from the event.

We have tried putting transforms before SEDCMD (looks silly.. 😄 but really that it will work) but it's not working.

Looking forward for your reply.

Thanks,

Bhaskar  

0 Karma

mdorobek
Path Finder

I also thought about this approach. The issues I have with this is that I loose the ressourceID and SystemID, don't I?
Is it possible to share youre complete props.conf? Then I can try your solution.

As far as I'm concered im fine with useing the mvexpand on the multi value field I use in my search, but I have nothing agianst better solutions.

0 Karma

ejwade
Contributor

The values of resourceID can be obtained from the source field. You could create a field alias for source AS resourceID.

systemID is a unique identifier for the resourceID as well, so it doesn't provide much value if you are referencing source for the resourceID (unless you need the systemID string, but you could always reference in a lookup table absolutely necessary).

The problem with multi value fields is how they map to CIM data models. I needed to break everything apart because I run alerts and reports against the Network Traffic CIM data model, and it doesn't work if there are multi valued fields. I hope this helps!

### props.conf (indexer)
[mscs:nsg:flow]

### Break apart tuples as single events, and discard the rest of the JSON blob
LINE_BREAKER = (\")\d{10}
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$//g

## props.conf (search head)
[mscs:nsg:flow]
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc
EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
REPORT-tuples = extract_tuple

### transforms.conf (search head)
[extract_tuple]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

edoardo_vicendo
Contributor

Thank you all, here are updated configurations adapted from the previous ones. NSG logs are written into an Azure Storage Account. Then a Splunk HF reads the logs from the Azure Storage account with "Splunk Add-on for Microsoft Cloud Services" and send back to the Indexers.

 

Configuration applied on the Splunk Heavy Forwarder (can be applied in an Indexer if you don't have an HF)

hf_in_azure_nsg_app/default

inputs.conf
#Inputs is defined directly in the Splunk HF via WEB-UI with "Splunk Add-on for Microsoft Cloud Services" and can be found here /opt/splunk/etc/apps/Splunk_TA_microsoft-cloudservices/local/inputs.conf


props.conf

#NOTE: Following set-up allow to extract only the flowTuples from the payload and set _time based on flowTuples epoch
#First LINE_BREAKER apply, then SEDCMD-remove_not_epoch that keeps only flowTuples, then TRANSFORMS with INGEST_EVAL that overwrite _time
#flowTuples data parsing is done at search time in the Search Head with separate app
#The "source" field already contains the resourceId informations (subscriptionID, resourceGroupName, nsgName, macAddress) that can be extracted on the Search Head at search time
#NOTE 2: LINE_BREAKER has been enhanced to avoid extracting events with macAddress containing first 10 numeric digits
#TO BE DONE: Understand if SEDCMD- has some limit on very huge payload
#TO BE DONE 2: In the INGEST_EVAL with a case statement if length is lower than 10 digits valorize now() as _time

#References:
#https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
#https://community.splunk.com/t5/Splunk-Search/How-do-I-import-Azure-NSG-LOGs/td-p/396018
#https://community.splunk.com/t5/Getting-Data-In/How-to-extract-an-event-timestamp-where-seconds-and-milliseconds/m-p/428837

[mscs:nsg:flow]
MAX_TIMESTAMP_LOOKAHEAD = 10
LINE_BREAKER = (\")\d{10}\,|(\"\,\")\d{10}\,
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$|\{|\}|\[|\]//g
TRUNCATE = 50000000
TRANSFORMS-evalingest = nsg_eval_substr_time


transforms.conf

[nsg_eval_substr_time]
INGEST_EVAL = _time=substr(_raw,0,10)

 

Configuration applied on the Splunk Search Head

sh_azure_nsg_app/default

props.conf

#References:
#https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
#https://community.splunk.com/t5/Splunk-Search/How-do-I-import-Azure-NSG-LOGs/td-p/396018
#https://community.splunk.com/t5/Getting-Data-In/How-to-extract-an-event-timestamp-where-seconds-and-milliseconds/m-p/428837

[mscs:nsg:flow]
REPORT-tuples = extract_tuple_v1, extract_tuple_v2
REPORT-nsg = sub_res_nsg
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc

EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-direction = case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
EVAL-bytes = (coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
EVAL-packets = (coalesce(packets_in,0)) + (coalesce(packets_out,0))
EVAL-flow_state_desc = case(flow_state == "B", "begin", flow_state == "C", "continuing ", flow_state == "E", "end")


transforms.conf

[extract_tuple_v1]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

[extract_tuple_v2]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result,flow_state,packets_in,bytes_in,packets_out,bytes_out

[sub_res_nsg]
SOURCE_KEY = source
REGEX = SUBSCRIPTIONS\/(\S+)\/RESOURCEGROUPS\/(\S+)\/PROVIDERS\/MICROSOFT.NETWORK\/NETWORKSECURITYGROUPS\/(\S+)\/y=\d+\/m=\d+\/d=\d+\/h=\d+\/m=\d+\/macAddress=(\S+)\/
FORMAT = subscriptionID::$1 resourceGroupName::$2 nsgName::$3 macAddress::$4


eventtypes.conf

[mscs_nsg_flow]
search = sourcetype=mscs:nsg:flow src_ip=*

[mscs_nsg_flow_start]
search = sourcetype=mscs:nsg:flow flow_state=B

[mscs_nsg_flow_end]
search = sourcetype=mscs:nsg:flow flow_state=E


tags.conf

[eventtype=mscs_nsg_flow]
network = enabled
communicate = enabled

[eventtype=mscs_nsg_flow_start]
network = enabled
session = enabled
start = enabled

[eventtype=mscs_nsg_flow_end]
network = enabled
session = enabled
end = enabled

 

Best Regards,

Edoardo

 

0 Karma

phanim565
Engager

Hi, Is there any way to fetch rulename from json format before it breaks into csv tuples

0 Karma

ChrisBell04
Communicator

Here are my mods to @ejwade code above. Still a shame this isnt handled in Splunk's official addons after 2+ years 😞

## props
[mscs:nsg:flow]
# https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-nsg-flow-logging-overview
MAX_TIMESTAMP_LOOKAHEAD = 12
LINE_BREAKER = (\")\d{10}|(\"\,\")\d{10}
SHOULD_LINEMERGE = false
SEDCMD-remove_not_epoch = s/\"\D.*$//g
TRUNCATE = 35000
NO_BINARY_CHECK = false
pulldown_type = true

REPORT-tuples = extract_tuple_v1, extract_tuple_v2
REPORT-nsg = sub_res_nsg
FIELDALIAS-mscs_nsg_flow = dest_ip AS dest src_ip AS src host AS dvc

EVAL-action = case(traffic_result == "A", "allowed", traffic_result == "D", "blocked")
EVAL-protocol = if(match(src_ip, "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"), "ip", "unknown")
EVAL-direction = case(traffic_flow == "I", "inbound", traffic_flow == "O", "outbound")
EVAL-transport = case(transport == "T", "tcp", transport == "U", "udp")
EVAL-bytes = (coalesce(bytes_in,0)) + (coalesce(bytes_out,0))
EVAL-packets = (coalesce(packets_in,0)) + (coalesce(packets_out,0))

## transforms
#https://www.splunk.com/blog/2017/02/20/splunking-microsoft-azure-network-watcher-data.html
#https://answers.splunk.com/answers/666758/import-azure-nsg-logs.html
[extract_tuple_v1]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result

[extract_tuple_v2]
DELIMS = ","
FIELDS = time,src_ip,dest_ip,src_port,dest_port,transport,traffic_flow,traffic_result,flow_state,packets_in,bytes_in,packets_out,bytes_out

[sub_res_nsg]
SOURCE_KEY = source
REGEX = SUBSCRIPTIONS\/(\S+)\/RESOURCEGROUPS\/(\S+)\/PROVIDERS\/MICROSOFT.NETWORK\/NETWORKSECURITYGROUPS\/(\S+)\/y
FORMAT = subscriptionID::$1 resourceGroupName::$2 nsgName::$3

## eventtypes
[mscs_nsg_flow]
search = sourcetype=mscs:nsg:flow src_ip=*
#tags = network communicate

[mscs_nsg_flow_start]
search = sourcetype=mscs:nsg:flow flow_state=B
#tags = network session start

[mscs_nsg_flow_end]
search = sourcetype=mscs:nsg:flow flow_state=E
#tags = network session end

## tags
[eventtype=mscs_nsg_flow]
network = enabled
communicate = enabled

[eventtype=mscs_nsg_flow_start]
network = enabled
session = enabled
start = enabled

[eventtype=mscs_nsg_flow_end]
network = enabled
session = enabled
end = enabled

rkantamaneni_sp
Splunk Employee
Splunk Employee

Hi All,

Splunk Idea APPSID-I-511 (https://ideas.splunk.com/ideas/APPSID-I-511) has been created for this to get native support in the Add-On. Go vote on it and add comments. 🙂

0 Karma

rijom
Explorer

I have slightly modified my props to read the epoch timestamp and weed out the empty events. Hope this helps someone!

SHOULD_LINEMERGE = false
TRUNCATE=100000
TIME_FORMAT = %s
TIME_PREFIX=^
LINE_BREAKER = (\")\d{10}\,
SEDCMD-remove_not_epoch = s/\{?\"\D.*$//g
MAX_TIMESTAMP_LOOKAHEAD = 12

jconger
Splunk Employee
Splunk Employee

IP addresses are actually multiple segments. From the documentation about segmentation:

...the IP address 192.0.2.223 is a major segment. But this major segment can be broken down into minor segments, such as "192", as well as groups of minor segments like "192.0.2".

To search specifically for the IP address "234.234.234.234", use TERM().

dest_ip=TERM(234.234.234.234)
0 Karma

mdorobek
Path Finder

Even with using TERM() I still have the exact same issue as before. I don't think this is the right answer.
Do you need any further information?

0 Karma
Get Updates on the Splunk Community!

Enterprise Security Content Update (ESCU) | New Releases

In December, the Splunk Threat Research Team had 1 release of new security content via the Enterprise Security ...

Why am I not seeing the finding in Splunk Enterprise Security Analyst Queue?

(This is the first of a series of 2 blogs). Splunk Enterprise Security is a fantastic tool that offers robust ...

Index This | What are the 12 Days of Splunk-mas?

December 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...