Splunk Search

How to reassemble bidirectional flows with transaction

emf1123
New Member

I need to assemble transactions where, depending on the direction of the traffic, the "source" might actually be the "destination", or vice-versa.

Here's a particular example, with only the important fields shown:

"_time",action,src,"s_port",dst,service,xlatesrc,xlatesport,proto
"2014-05-27T08:47:32.000-0400",accept,"10.9.0.32",52643,"72.21.81.253",80,"192.0.2.1",51051,tcp
"2014-05-27T08:48:01.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:49:25.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:51:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:53:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp

you'll notice that "xlatesrc" in the first line becomes the "dst" on the subsequent drop events. The only real clue I have here is that the (xlatesrc,xlatesport) [or (src,s_port)] tuple equals the (dst,service) tuple, and vice-versa.

how do you reassemble these streams?

Tags (1)
0 Karma

emf1123
New Member

If it makes it more clear, the problem is that "transaction a,b" is an "AND" match, and I need an "OR" match.

0 Karma

jhupka
Path Finder

If your xlatesrc is always empty in your subsequent events, can you do something like:

... | eval tuple = if(isnull(xlatesrc), src.s_port.dst.service, dst.service.xlatesrc.xlatesport) | stats <whatever values you want> by tuple 

Essentially conditionally build your tuple of the proper IP+port+IP+port based on the xlatesrc being there or not for the fields that have your unique identifier.

0 Karma

emf1123
New Member

jhupka: I can't rely on sessions always being started with xlatesrc, and I absolutely need to match subsequent events that can happen in either direction.

I tried this, but it's still giving two "match"es:

| eval tuplesrc = if(isnull(xlatesrc), src.s_port, xlatesrc.xlatesport)
| eval tupledst = dst.service
| eval matchsrc = case(tuplesrc == xlatesrc.xlatesport, tuplesrc, tuplesrc == src.s_port, tuplesrc, tuplesrc == dst.service, tuplesrc)
| eval matchdst = case(tupledst == dst.service, tupledst, tupledst == src.s_port, tupledst)
| eval match = matchsrc.matchdst

0 Karma
Career Survey
First 500 qualified respondents will receive a $20 gift card! Tell us about your professional Splunk journey.

Can’t make it to .conf25? Join us online!

Get Updates on the Splunk Community!

Take Action Automatically on Splunk Alerts with Red Hat Ansible Automation Platform

 Are you ready to revolutionize your IT operations? As digital transformation accelerates, the demand for ...

Calling All Security Pros: Ready to Race Through Boston?

Hey Splunkers, .conf25 is heading to Boston and we’re kicking things off with something bold, competitive, and ...

Beyond Detection: How Splunk and Cisco Integrated Security Platforms Transform ...

Financial services organizations face an impossible equation: maintain 99.9% uptime for mission-critical ...