Getting Data In

Filtering data block in Heavy Forwarder

jrballesteros05
Communicator

Hello, I have a problem that I don't know how to solve. We are receiving logs in xml via universal forwarders. The logs are OK but we want to filter some of them. We couldn't filter in the original source so I am looking if I can filter by Heavy forwarder or not.

For example, I receive an XML log (I attached)

AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203630_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="111111" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET NAMES latin1"
AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203631_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="111111" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET character_set_results = NULL"
AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203632_2019-07-12T02:54:30" NAME="Connect" CONNECTION_ID="111111" STATUS="0" STATUS_CODE="0" USER="myuser" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="connect" PRIV_USER="myuser" PROXY_USER="" DB="MY_INSTANCE"

AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203630_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET NAMES latin1"
AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203631_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET character_set_results = NULL"
AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203632_2019-07-12T02:54:30" NAME="Connect" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="connect" PRIV_USER="myuser" PROXY_USER="" DB="**INSTANCE_TO_FILTER**"

I want to filter all events from "INSTANCE_TO_FILTER" which in this case tooks the CONNECTION_ID=222222 but I want to keep the rest. Is there a way to do that in Heavy forwarder?

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi jrballesteros05,
You can filter logs on the Indexers, before indexing or on Heavy Forwarders.
The second option isn't so good as can appear because HFs are a bottlenck for you data; if you haven't bandwidth problems the best choice is to filter them on Indexers.
To do this see at https://docs.splunk.com/Documentation/Splunk/7.3.1/Forwarding/Routeandfilterdatad
Anyway, the steps are:

  • identify the way to filter, two choices: discard something and take all the other logs, take something and discard the other;
  • identify the regex to filter your logs;
  • modify props.conf and transforms.conf on Indexers as below;
  • restart Indexers;
  • if you have Heavy Forwarders the files to modify are on HFs.

Discard specific events and keep the rest:

props.conf:

[your_sourcetype]
TRANSFORMS-null= setnull

transforms.conf:

[setnull]
REGEX = <your_regex>
DEST_KEY = queue
FORMAT = nullQueue

Keep specific events and discard the rest:
props.conf:

[your_sourcetype]
TRANSFORMS-set= setnull,setparsing

transforms.conf:

[setnull]
REGEX = .
DEST_KEY = queue
FORMAT = nullQueue
[setparsing]
REGEX = >your_regex>
DEST_KEY = queue
FORMAT = indexQueue

Bye.
Giuseppe

0 Karma

jrballesteros05
Communicator

Hello @gcusello.

It is like the comments. I filtered logs in the past, in the HF and the indexer. But I mean in the specific case I post in the question. I cannot.

0 Karma

gcusello
SplunkTrust
SplunkTrust

Sorry I misunderstood!

Anyway, the way to proceed is the same on indexers and on Heavy Forwarders, there isn't any difference: you have to craete a props.conf and a transforms.conf.

About the filter, let me understand: do you want to filter the event where there's the "INSTANCE_TO_FILTER" string, or the events from this string to what?
In first case it's easy because you can send to nullqueue events with this regex.
In the second case is more difficoult because there isn't any infortmation to filter the following events.

bye.
Giuseppe

0 Karma

jrballesteros05
Communicator

Hi. I have been reading about that and I think I cannot filter in that way.

I have three different events:

  AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203630_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET NAMES latin1"
  AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203631_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET character_set_results = NULL"
  AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203632_2019-07-12T02:54:30" NAME="Connect" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="connect" PRIV_USER="myuser" PROXY_USER="" DB="**INSTANCE_TO_FILTER**"

I want to filter all the events with DB="INSTANCE_TO_FILTER", which in this case has an CONNECTION_ID="222222", but it's difficult because are different events.

0 Karma

gcusello
SplunkTrust
SplunkTrust

Let me understand: you want to filter only the last event (the one with the string DB="INSTANCE_TO_FILTER") but not the others, is it correct?
If this is your request try to put on your Heavy Forwarders:
props.conf:

[your_sourcetype]
TRANSFORMS-null= setnull

transforms.conf:

[setnull]
REGEX = DB\=\"INSTANCE_TO_FILTER\"
DEST_KEY = queue
FORMAT = nullQueue

Bye.
Giuseppe

0 Karma

jrballesteros05
Communicator

No, I want to filter DB="INSTANCE_TO_FILTER" which in this case had a CONNECTION_ID="222222", and all the events which has CONNECTION_ID="222222".

Then if there is another event DB="INSTANCE_TO_FILTER" with CONNECTION_ID="333333" I want to filter all the events with CONNECTION_ID="333333"

That is why your filter does not work for me because it is going to filter only the event with DB="INSTANCE_TO_FILTER" but it won't filter the other ones.

0 Karma

gcusello
SplunkTrust
SplunkTrust

Ok I understood!
I don't think that is possible in Splunk (I hope to be wrong but I don't think!).
The only way is pre-parse events with an external script before ingestion in UFs.

Bye.
Giuseppe

0 Karma

harsmarvania57
Ultra Champion

Hi,

Yes you can discard events which has word INSTANCE_TO_FILTER on Heavy forwarders with below configurations.

props.conf

[yoursourcetype]
TRANSFORMS-xml_null = setnull

transforms.conf

[setnull]
REGEX = INSTANCE_TO_FILTER
DEST_KEY = queue
FORMAT = nullQueue
0 Karma

jrballesteros05
Communicator

Hello, thanks for your answer.

I think I did not explain properly.

This configuration will filter the event:

 AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203632_2019-07-12T02:54:30" NAME="Connect" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="connect" PRIV_USER="myuser" PROXY_USER="" DB="**INSTANCE_TO_FILTER**"

But it won't filter these one:

AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203630_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET NAMES latin1"
 AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203631_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET character_set_results = NULL"

I want to filter the block, in this case "INSTANCE_TO_FILTER" has the CONNECTION_ID=222222 and I want to filter all the events that have CONNECTION_ID=222222.

0 Karma

harsmarvania57
Ultra Champion

Can you please confirm whether below block is single event or those are 3 different events ?

 AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203630_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET NAMES latin1"
 AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203631_2019-07-12T02:54:30" NAME="Query" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser[mydb] @  [1.1.1.1]" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="set_option" SQLTEXT="SET character_set_results = NULL"
 AUDIT_RECORD TIMESTAMP="2019-07-12T02:54:30 UTC" RECORD_ID="91203632_2019-07-12T02:54:30" NAME="Connect" CONNECTION_ID="**222222**" STATUS="0" STATUS_CODE="0" USER="myuser" OS_LOGIN="" HOST="" IP="1.1.1.1" COMMAND_CLASS="connect" PRIV_USER="myuser" PROXY_USER="" DB="**INSTANCE_TO_FILTER**"
0 Karma

jrballesteros05
Communicator

Yes, there are different events. But in this case they have in common the CONNECTION_ID field.

0 Karma

harsmarvania57
Ultra Champion

If those are different events then in splunk there are no such feature to discard previous event or new event based on REGEX matching for current event. I am converting my answer to comment so that if anyone else has any idea, they can share.

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...