HI All
I have IP flow based information being ingested into Splunk, which consists of source_ip, source_port, destination_ip, destination_port.
Occasionally, due to the environmental factors, we get a duplicate log of the flow in the reverse direction.
E.g.
source_ip source_port destination_ip destination_port
1.1.1.1 42000 2.2.2.2 80 <- Keep this
2.2.2.2. 80 1.1.1.1 42000 <- I would like to discard this
1.1.1.5 42300 2.2.2.2 80
3.3.3.3 134 5.5.5.5. 80
My goal is to identify and ultimately filter out the duplicated entries.
What I am having trouble with is coming up with a query to flag events where there is a duplicate entry (in reverse direction).
I can then filter out the “flagged” duplicate entries where say source_port < destination_port.
I am trying to avoid using computational heavy commands such as nested searches as the data set is quite large.
Would greatly appreciate some ideas or assistance on how this can be tackled.
If this isn't sufficient
| where source_port>destination_port
try this
| makeresults
| eval _raw="source_ip source_port destination_ip destination_port
1.1.1.1 42000 2.2.2.2 80
2.2.2.2 80 1.1.1.1 42000
1.1.1.5 42300 2.2.2.2 80
3.3.3.3 134 5.5.5.5 80"
| multikv forceheader=1
| table source_ip source_port destination_ip destination_port
| eval highport=if(source_port>destination_port,source_ip.":".source_port.",".destination_ip.":".destination_port,destination_ip.":".destination_port.",".source_ip.":".source_port)
| eventstats count by highport
| where count=1 OR highport=source_ip.":".source_port.",".destination_ip.":".destination_port
If this isn't sufficient
| where source_port>destination_port
try this
| makeresults
| eval _raw="source_ip source_port destination_ip destination_port
1.1.1.1 42000 2.2.2.2 80
2.2.2.2 80 1.1.1.1 42000
1.1.1.5 42300 2.2.2.2 80
3.3.3.3 134 5.5.5.5 80"
| multikv forceheader=1
| table source_ip source_port destination_ip destination_port
| eval highport=if(source_port>destination_port,source_ip.":".source_port.",".destination_ip.":".destination_port,destination_ip.":".destination_port.",".source_ip.":".source_port)
| eventstats count by highport
| where count=1 OR highport=source_ip.":".source_port.",".destination_ip.":".destination_port
Thats fantastic, thank you.
I really like the way you have solved it.