Getting Data In

Priority (precedence) in props.conf

gcusello
SplunkTrust
SplunkTrust

Hi at all,

I have a data flow in json format from one host that I ingest with HEC, so I have one host, one source and one sourcetype for all events.

I would override the host, source and sourcetype values based on regexes and I'm able to do this.

The issue is that the data flow is an elaboration of an external systel (logstash) that takes raw logs (e.g. from linux systems) and saves them in a fields of the json format ("message") adding many other fields.

So, after host, source and sourcetype overriding (that is fine working) I would remove all the extra contents in the events and maintain only the content of the message field (the raw logs).

I'm able to do this, but the issue is that I'm not able to do both the transformations: in other words I'm able to override values but the extra contents removing doesn't work or I can remove extra contents but the overriding doesn't work.

I have in my props. conf the following configurations:

[logstash]
# set host
TRANSFORMS-sethost = set_hostname_logstash
# set sourcetype Linux
TRANSFORMS-setsourcetype_linux_audit = set_sourcetype_logstash_linux_audit
# set source
TRANSFORMS-setsource = set_source_logstash_linux

# restoring original raw log
[linux_audit]
SEDCMD-raw_data_linux_audit = s/.*\"message\":\"([^\"]+).*/\1/g

as you can see in the first stanza I override sourcetype from logstash to linux_audit and in the second I try to remove the extra contents using the linux audit sourcetype.

If I use the logstash sourcetype also in the second stanza, the extra contents are removed, but the fields overriding (that runs using the extra contents) doesn't work.

I also tried to setup a priority using the props.conf "priority" option with no luck.

I also tried to use source for the first stanza because source usually has an higher priority than sourcetype, but with the same result.

Can anyone give me an hint how to solve this issue?

Thank you in advance.

Ciao.

Giuseppe

Labels (1)
0 Karma
1 Solution

PickleRick
SplunkTrust
SplunkTrust

OK. I did some more tries and the final "solution" would be like this (notice that as I need to rewrite the queue in the destination sourcetype based on the event's contents - need to match only some of the events - I have to make sure the queue-manipulating transforms are invoked before the event-cutting ones).

props.conf:

[test_sourcetype_to_recast]

#Order of transform classes i crucial! You can do the same casting multiple transforms from
#one class
TRANSFORMS-0_extract_host = picklerick_extract_host
TRANSFORMS-1_extract_source = picklerick_extract_source
TRANSFORMS-2_recast_sourcetype = picklerick_recast_sourcetype
TRANSFORMS-3_drop_dead = picklerick_drop_dead

[destination_sourcetype]
TRANSFORMS-0_drop_all_except_proper_ones = drop_dead_all,keep_matching_events
TRANSFORMS-conditional_host_overwrite = conditional_host_overwrite
TRANSFORMS-cut_most_of_the_event = cut_most_of_the_event

transforms.conf:

[picklerick_drop_dead]
REGEX = sourcetype:destination_sourcetype
DEST_KEY = queue
FORMAT = nullQueue

[picklerick_recast_sourcetype]
REGEX = (.)
CLONE_SOURCETYPE = destination_sourcetype

[picklerick_extract_source]
REGEX = source:(\w*)
FORMAT = source::$1
DEST_KEY = MetaData:Source
WRITE_META = true

[picklerick_extract_host]
REGEX = host:(\w*)
FORMAT = host::$1
DEST_KEY = MetaData:Host
WRITE_META = true

[conditional_host_overwrite]
REGEX = desthost=(\w*)
FORMAT = host::$1
DEST_KEY = MetaData:Host
WRITE_META = true

[cut_most_of_the_event]
REGEX = .*:([^:]*)$
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

[drop_dead_all]
REGEX = .
DEST_KEY = queue
FORMAT = nullQueue

[keep_matching_events]
REGEX = sourcetype:destination_sourcetype
DEST_KEY = queue
FORMAT = indexQueue

 This way if I include a string "sourcetype:destination_sourcetype" in my event contents, the event will be recast to the destination_sourcetype and processed accordingly.

The downsides to this are two:

1) You can't specify the sourcetype you'll be cloning your event to dynamically so if you have many of them in one event stream... that's going to get complicated.

2) You have to account for all non-matching events in the destination sourcetype which can be tricky in case of an already existing sourcetype (like your linux_audit). I'm wondering if you could do something with it by firstly cloning that to an intermediate sourcetype, filtering your data there (the queue field should be retained over cloning) and cloning them again to the destination sourcetype. But that's getting ridiculously complicated.

View solution in original post

PickleRick
SplunkTrust
SplunkTrust

If I understand you correctly - you want to correct your sourcetype first and then fire transforms for the new sourcetype, right? It won't work that way. The set of transforms to execute is set at the beginning of the pipeline based on the event's sourcetype/source/host and is not changed later even of you overwrite those metadata fields.

The only way I see to do it would be to CLONE_SOURCETYPE to make a copy of your event with a new event's sourcetype set properly (this one will be processed from the beginning using the new sourcetype's props and transforms) and drop the original event bu sending it to  nullQueue. Yes, it does create some processing overhead but I don't see another way if you can't make your source send reasonably formatted data.

gcusello
SplunkTrust
SplunkTrust

Hi @PickleRick ,

thank you for your answer, sorry but I don't understand:
if I clone sourcetype, can I pass also host and source values (that I extracted from the json fields) to the new one?

if I clone sourcetype, do you think that I can apply transformations to the new sourcetype?

the send to NullQueue, can run after the host and source overriding and the cloning?

Let me understand, you hint a props.conf like the following:

[logstash]
# set host
TRANSFORMS-sethost = set_hostname_logstash
# set sourcetype Linux
TRANSFORMS-setsourcetype_linux_audit = set_sourcetype_logstash_linux_audit
# set source
TRANSFORMS-setsource = set_source_logstash_linux
# send to NullQueue
TRANSFORMS-send_to_NullQueue = send_to_NullQueue

# restoring original raw log
[linux_audit]
SEDCMD-raw_data_linux_audit = s/.*\"message\":\"([^\"]+).*/\1/g

Is it correct?

Ciao.

Giuseppe

0 Karma

PickleRick
SplunkTrust
SplunkTrust

CLONE_SOURCETYPE makes a clone of the event you have, sets a sourcetype that you provide for it and pushes it back into the front of the processing pipeline.

I'm not 100% sure (you'd have to test it) but I'd assume if you overwrote source and host before arriving at the transform cloning the event, you'd have your new host and source applied.

* The duplicated events receive index-time transformations & sed
  commands for all transforms that match its new host, source, or source type.
* This means that props.conf matching on host or source will incorrectly be
applied a second time.

So yep, something like your props.conf but.

1. The set-sourcetype transform would have to use CLONE_SOURCETYPE to recast the sourcetype to your linux_audit

2. You'd have to make sure that your transforms are aplied in proper order (firstly adjust the metadata, then clone sourcetype, finally drop to nullqueue)

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi @PickleRick ,

the problem is that if I clone the event assigning the new sourcetype, I'm again in the previous ampasse: if I remove the extra contents I cannot assign the correct host and source, I'll try!

Thank you.

ciao.

Giuseppe

0 Karma

PickleRick
SplunkTrust
SplunkTrust

OK. Look at this:

props.conf:

[test_sourcetype_to_recast]

#Order of transform classes i crucial! You can do the same casting multiple transforms from
#one class
TRANSFORMS-0_extract_host = picklerick_extract_host
TRANSFORMS-1_extract_source = picklerick_extract_source
TRANSFORMS-2_recast_sourcetype = picklerick_recast_sourcetype
TRANSFORMS-3_drop_dead = picklerick_drop_dead

[destination_sourcetype]
TRANSFORMS-conditional_host_overwrite = conditional_host_overwrite
TRANSFORMS-cut_most_of_the_event = cut_most_of_the_event

transforms.conf:

[picklerick_drop_dead]
REGEX = sourcetype:destination_sourcetype
DEST_KEY = queue
FORMAT = nullQueue

[picklerick_recast_sourcetype]
REGEX = sourcetype:destination_sourcetype
CLONE_SOURCETYPE = destination_sourcetype

[picklerick_extract_source]
REGEX = source:(\w*)
FORMAT = source::$1
DEST_KEY = MetaData:Source
WRITE_META = true

[picklerick_extract_host]
REGEX = host:(\w*)
FORMAT = host::$1
DEST_KEY = MetaData:Host
WRITE_META = true

[conditional_host_overwrite]
REGEX = desthost=(\w*)
FORMAT = host::$1
DEST_KEY = MetaData:Host
WRITE_META = true

[cut_most_of_the_event]
REGEX = .*:([^:]*)$
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

 

Now if I do this:

curl -H "Authorization: Splunk my_token" http://my_host:8088/services/collector/event -d '{"index":"test1","host":"original_host","source":"original_host","sourcetype":"test_sourcetype_to_recast","event":"sourcetype:destination_sourcetype,source:destination_source,host:host1,source:source1:event:desthost=destination_host:This should be left at the end"}'

 I will get this in my index

PickleRick_0-1700243379869.png

As you can see, the original values of source and host fields posted with the event to HEC were completely discarded and were rewritten from the contents of the event. Then the event was cloned as the "destination_sourcetype" sourcetype when the host field was again rewritten, this time with the value provided in the "desthost=" part of the event.

And finally the original event was discarded and the cloned event was cut short to leave only the part from the last semicolon till the end.

Kinda ugly, kinda complicated. I'm not sure if you couldn't do similar thing with ingest actions by the way. But I'm no expert with those.

One thing though - there is something not entirely right with the cloning transform because it seems to be cloning all events, not just those matching regex. I suppose that transform is lacking something.

EDIT: Argh. I can point myself to my own post: https://community.splunk.com/t5/Getting-Data-In/Use-CLONE-SOURCETYPE-only-for-matching-events/m-p/66...

CLONE_SOURCETYPE is always applied to all events.

So you'd have to further filter cloned/non-cloned events to only leave matching ones in the proper pipeline. Getting more and more complicated.

Maybe it's worth fighting to get the data in a reasonable format? 😉

gcusello
SplunkTrust
SplunkTrust

Hi @PickleRick ,

thank you very much for you answer, I will try it on Monday  and I'm confident that it will run.

Only two explanations, to better understand you solution: 

  1. you hint to extract host and source before cloning sourcetype, but how these two fields are passed to the cloned sourcetype?
  2. what's the purpose of the "TRANSFORMS-conditional_host_overwrite" command in the cloned sourcetype? must I add also a similar command for source field?

Ciao and thank you very much.

Giuseppe

0 Karma

PickleRick
SplunkTrust
SplunkTrust

1. If I remembered correctly you said that you were extracting host from the whole "packed" event - before casting it to the destination sourcetype. It was just to demonstrate that it works and cloned event inherits the indexed fields which had already been extracted before the event was cloned. That's all. If you're gonna extract the host from the "embedded" event after casting it to the destination sourcetype, no problem, you can remove that transform from the source sourcetype. 🙂

The transforms extracting source and host were just inserted to demonstrate the inheritance and "overwriting" fields in case of event cloning. They are not essential for the cloning as such.

2. The conditional_host_overwrite transform was to show you that even though the host field gets overwritten in the source sourcetype the transform will get executed in the destination sourcetype so your cloning does indeed do what you wanted - fire transforms from the destination sourcetype to which the events get cloned.

The main takeouts from this exercise are that:

1. Indexed fields get cloned with the CLONE_SOURCETYPE functionality

2. _ALL_ events to which the given props get applied (so all matching given sourcetype, source or host - depending on how you assign your transforms to events) get cloned so you have to selectively filter them in the source sourcetype and destination sourcetype.

3. Props/transforms from the destination sourcetype get applied after the event gets cloned

4. Order of transforms is important.

gcusello
SplunkTrust
SplunkTrust

Hi @PickleRick ,

there's still something wrong:

I used this props.conf:

[logstash]
TRANSFORMS-00_sethost = set_hostname_logstash
TRANSFORMS-01_setsource = set_source_logstash_linux
TRANSFORMS-05_setsourcetype_linux_audit = set_sourcetype_logstash_linux_audit
TRANSFORMS-06_setsourcetype_linux_secure = set_sourcetype_logstash_linux_secure
TRANSFORMS-50_drop_dead_linux_audit = drop_dead_linux_audit
TRANSFORMS-51_drop_dead_linux_secure = drop_dead_linux_secure

[linux_audit]
TRANSFORMS-0_drop_all_except_proper_ones_linux_audit = drop_dead_all,keep_matching_events_linux_audit
#TRANSFORMS-conditional_host_overwrite = conditional_host_overwrite
TRANSFORMS-cut_most_of_the_event_linux_audit = cut_most_of_the_event_linux_audit

[linux_secure]
TRANSFORMS-0_drop_all_except_proper_ones_linux_secure = drop_dead_all,keep_matching_events_linux_secure
#TRANSFORMS-conditional_host_overwrite = conditional_host_overwrite
TRANSFORMS-cut_most_of_the_event_linux_secure = cut_most_of_the_event_linux_secure

and this transforms.conf

# set host=host.name
[set_hostname_logstash]
REGEX = \"host\":\{\"name\":\"([^\"]+)
FORMAT = host::$1
DEST_KEY = MetaData:Host

# set source=log.file.path per linux_audit e linux_secure
[set_source_logstash_linux]
REGEX = \"log\":\{\"file\":\{\"path\":\"([^\"]+)
FORMAT = source::$1
DEST_KEY = MetaData:Source

[set_sourcetype_logstash_linux_audit]
REGEX = \"log\":\{\"file\":\{\"path\":\"(/var/log/audit/audit.log)\"
CLONE_SOURCETYPE = linux_audit

[set_sourcetype_logstash_linux_secure]
REGEX = \"log\":\{\"file\":\{\"path\":\"(/var/log/secure)\"
CLONE_SOURCETYPE = linux_secure

[drop_dead_linux_audit]
REGEX = sourcetype:linux_audit
DEST_KEY = queue
FORMAT = nullQueue

[drop_dead_linux_secure]
REGEX = sourcetype:linux_secure
DEST_KEY = queue
FORMAT = nullQueue

[drop_dead_all]
REGEX = .
DEST_KEY = queue
FORMAT = nullQueue

[keep_matching_events_linux_audit]
REGEX = sourcetype:linux_audit
DEST_KEY = queue
FORMAT = indexQueue

[keep_matching_events_linux_secure]
REGEX = sourcetype:linux_secure
DEST_KEY = queue
FORMAT = indexQueue

[cut_most_of_the_event_linux_audit]
REGEX = (?ms).*\"message\":\"([^\"]+).*
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

[cut_most_of_the_event_linux_secure]
REGEX = (?ms).*\"message\":\"([^\"]+).*
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

but with this configuration I lost linux_audit tranformation and the remove of extra contents does,'t run.

Your approach is correct, but probably there's something wrong i my application.

Now I try to understand where's the issue.

Thank you again for your help 

Ciao.

Giuseppe

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi @PickleRick ,

I changed something in the transforms.conf:

# set host=host.name
[set_hostname_logstash]
REGEX = \"host\":\{\"name\":\"([^\"]+)
FORMAT = host::$1
DEST_KEY = MetaData:Host

# set source=log.file.path per linux_audit e linux_secure
[set_source_logstash_linux]
REGEX = \"log\":\{\"file\":\{\"path\":\"([^\"]+)
FORMAT = source::$1
DEST_KEY = MetaData:Source

[set_sourcetype_logstash_linux_audit]
REGEX = .
#REGEX = \"log\":\{\"file\":\{\"path\":\"(/var/log/audit/audit.log)\"
CLONE_SOURCETYPE = linux_audit

[set_sourcetype_logstash_linux_secure]
#REGEX = \"log\":\{\"file\":\{\"path\":\"(/var/log/secure)\"
REGEX = .
CLONE_SOURCETYPE = linux_secure

[drop_dead_linux_audit]
REGEX = .
#REGEX = sourcetype:linux_audit
DEST_KEY = queue
FORMAT = nullQueue

[drop_dead_linux_secure]
#REGEX = sourcetype:linux_secure
REGEX = .
DEST_KEY = queue
FORMAT = nullQueue

[drop_dead_all]
REGEX = .
DEST_KEY = queue
FORMAT = nullQueue

[keep_matching_events_linux_audit]
#REGEX = sourcetype:linux_audit
REGEX = \"log\":\{\"file\":\{\"path\":\"(/var/log/audit/audit.log)\"
DEST_KEY = queue
FORMAT = indexQueue

[keep_matching_events_linux_secure]
REGEX = \"log\":\{\"file\":\{\"path\":\"(/var/log/secure)\"
#REGEX = sourcetype:linux_secure
DEST_KEY = queue
FORMAT = indexQueue

[cut_most_of_the_event_linux_audit]
REGEX = (?ms).*\"message\":\"([^\"]+).*
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

[cut_most_of_the_event_linux_secure]
REGEX = (?ms).*\"message\":\"([^\"]+).*
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

and now linux_audit is rcorrectly running, instead linux_secure sometimes loses the extra contents removal.

I'm working to understand why.

One additional question: when I'll have other data flows, they will be removed or they will remain with the original sourcetype(logstash)?

Ciao and thank you very much for your help.

Giuseppe

0 Karma

PickleRick
SplunkTrust
SplunkTrust

OK. I did some more tries and the final "solution" would be like this (notice that as I need to rewrite the queue in the destination sourcetype based on the event's contents - need to match only some of the events - I have to make sure the queue-manipulating transforms are invoked before the event-cutting ones).

props.conf:

[test_sourcetype_to_recast]

#Order of transform classes i crucial! You can do the same casting multiple transforms from
#one class
TRANSFORMS-0_extract_host = picklerick_extract_host
TRANSFORMS-1_extract_source = picklerick_extract_source
TRANSFORMS-2_recast_sourcetype = picklerick_recast_sourcetype
TRANSFORMS-3_drop_dead = picklerick_drop_dead

[destination_sourcetype]
TRANSFORMS-0_drop_all_except_proper_ones = drop_dead_all,keep_matching_events
TRANSFORMS-conditional_host_overwrite = conditional_host_overwrite
TRANSFORMS-cut_most_of_the_event = cut_most_of_the_event

transforms.conf:

[picklerick_drop_dead]
REGEX = sourcetype:destination_sourcetype
DEST_KEY = queue
FORMAT = nullQueue

[picklerick_recast_sourcetype]
REGEX = (.)
CLONE_SOURCETYPE = destination_sourcetype

[picklerick_extract_source]
REGEX = source:(\w*)
FORMAT = source::$1
DEST_KEY = MetaData:Source
WRITE_META = true

[picklerick_extract_host]
REGEX = host:(\w*)
FORMAT = host::$1
DEST_KEY = MetaData:Host
WRITE_META = true

[conditional_host_overwrite]
REGEX = desthost=(\w*)
FORMAT = host::$1
DEST_KEY = MetaData:Host
WRITE_META = true

[cut_most_of_the_event]
REGEX = .*:([^:]*)$
FORMAT = $1
DEST_KEY = _raw
WRITE_META = true

[drop_dead_all]
REGEX = .
DEST_KEY = queue
FORMAT = nullQueue

[keep_matching_events]
REGEX = sourcetype:destination_sourcetype
DEST_KEY = queue
FORMAT = indexQueue

 This way if I include a string "sourcetype:destination_sourcetype" in my event contents, the event will be recast to the destination_sourcetype and processed accordingly.

The downsides to this are two:

1) You can't specify the sourcetype you'll be cloning your event to dynamically so if you have many of them in one event stream... that's going to get complicated.

2) You have to account for all non-matching events in the destination sourcetype which can be tricky in case of an already existing sourcetype (like your linux_audit). I'm wondering if you could do something with it by firstly cloning that to an intermediate sourcetype, filtering your data there (the queue field should be retained over cloning) and cloning them again to the destination sourcetype. But that's getting ridiculously complicated.

gcusello
SplunkTrust
SplunkTrust

Hi @PickleRick ,

your solution was correct!

I only changed a very little detail: in the "drop_dead_all stanza" because I have to remove only the cloned events not all, otherwise also eventual new flows are deleted without cloning, but you solution is great!

Thank You very much for your help, I hope to have the opportunity to return the favor i the future because you solved a very important issue for my job.
On this occasion I take advantage of your knowledge, if you have expertise on WinLogBeat, would you please take a look at my question: https://community.splunk.com/t5/Getting-Data-In/Connect-winlogbeat-log-format-to-Splunk-TA-Windows/m... ?

Thank you again.

Ciao.

Giuseppe

0 Karma

gcusello
SplunkTrust
SplunkTrust

Hi at all,

new failed test: I also tried to use  transforms.conf instead of SEDCMD in props.conf (as I usually do) with no luck:

[set_sourcetype_linux_audit_remove]
REGEX = (?ms).*\"message\":\"([^\"]+).*
FORMAT = $2
DEST_KEY = _raw

and I tried

[set_sourcetype_linux_audit_remove]
REGEX = .*\"message\":\"([^\"]+).*
FORMAT = $1
DEST_KEY = _raw

with the same result.

Ciao.

Giuseppe

0 Karma

isoutamo
SplunkTrust
SplunkTrust

Which HEC endpoint you are using? It depends on that what you can do for event. Here is their instruction about it https://www.aplura.com/assets/pdf/hec_pipelines.pdf

r. Ismo

gcusello
SplunkTrust
SplunkTrust

Hi @isoutamo ,

probably it isn't so clear for me how HEC works but how can the endpoint be relevant?

events are processed and parsed as usual: Parsing, Merging, Typing and Indexing.

The issue I suppose that's in the precedence on events in the activities listed in the props.conf.

Ciao.

Giuseppe

 

0 Karma

isoutamo
SplunkTrust
SplunkTrust

In general case endpoint define what actions you can do for your event. But when I read again your issue, this is not your problem. I’m quite sure that @PickleRick ‘s note is valid in your case. You need to try CLONE_SOURCETYPE and then manage those transforms etc. for each individual sourcetype separately.

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...