Getting Data In

Splunk HEC receive data from Logstash http output

jbanAtSplunk
Communicator

Hi,

Does anyone have a good example from Logstash to Splunk HEC?
I only get "services/collector/raw" working with logstash but would prefer more to use /collector or /event so we can easy change sourcetype.

I see that in case of /collector or /event message must be constructed in a special way. So If anyone have good logstash example.

as we are using also multiple index-es, we would like to dynamically change and parse message logs and then parse with good sourcetype stanza and deliver to different index. depends on log type (eg. different OS, or network equipment, etc...)

Labels (1)
Tags (2)
0 Karma
1 Solution

PickleRick
SplunkTrust
SplunkTrust

I don't use logstash but I have an intermediate layer of rsyslog processing events and posting them to HEC, so  it's similar 😉

The document you're interested in is https://docs.splunk.com/Documentation/Splunk/8.2.2/Data/FormateventsforHTTPEventCollector

In general - you need to send via a HTTP request with an authorization header (but I assume you already know how to do that) a json structure containing an "event" field which contains your raw event. That's absolute minimum. You can, if you want, send additional fields like source, host and so on. Pay special attention to time field - you _must_ specify it as unix timestamp _with a millisecond part_.

View solution in original post

0 Karma

jbanAtSplunk
Communicator

This looks like step forward, step back.

[rule::logstash]
sourcetype = logstash
MORE_THAN_[0_80] = .*?

and does not work, does not change sourcetype or anything.

This will work
[source::http_logstash_to_splunk]
TRANSFORMS-changeSourceType = set_sourcetype_logstash_linux

but then I call that sourcetype stanza [logstash]
and will not extract anything that I wrote for Extraction from _raw log.



0 Karma

PickleRick
SplunkTrust
SplunkTrust

Why do you want to change sourcetype on splunk side?  I'd just set proper sourcetype using logstash before outputting the event to HEC. But that's me.

Anyway, you can rewrite sourcetype for a given source but then it gets confusing and the effective configuration might sometimes not be what you wanted.

0 Karma

jbanAtSplunk
Communicator

Yes, I decided no to change sourcetype. And went just for Extracting fields, but it's not working, basically it's ignoring sourcetype that I put on HTTP HEC collector.
Example:

[logstash]
EXTRACT-test = .*(?<name>Disconnected) from user (?<user>.*) (?<src_ip>\d+.\d+.\d+.\d+) port (?<port>\d+)
BREAK_ONLY_BEFORE_DATE =
DATETIME_CONFIG =
KV_MODE = json
LINE_BREAKER = ([\r\n]+)
NO_BINARY_CHECK = true
SHOULD_LINEMERGE = false
category = Custom
disabled = false

 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Do you have an all-in-one environment or split (search-head(s), indexer(s), maybe HFs)?

Because some of the settings you showed are index-time and some are search-time. So if you try to apply - for example - search-time settings at index-time - it simply won't work.

0 Karma

jbanAtSplunk
Communicator

I have distributed.

Currently testing with Splunk Docker (All In One) and Logstash Docker. If I make it work like that, then other part what I need to put on HF and what is going on SH is easy to understand.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Blah. Hate containerized splunk. But then again I hate the whole container idea 🤣

But seriously,

from what I'm seeing there might be slight misunderstanding. You set KV_MODE to json but the event data itself seem to not be in json format.

In order for the KV_MODE to work as json, you'd have to have your event field that you supply to HEC as a fully compliang json structure. It doesn't mean that you supply your event via HEC as a json structure.

OK, let me make it clearer, because it might be slightly confusing 🙂

You _always_ post an event to a HEC /event endpoint as a json structure. Your event might look like this:

{
"source": "myHECpusher",
"index": "someindex",
"event": "This is the event data"
}

In this case you don't want KV_MODE because the event itself is not structured in any way and it's not a json in particular. You'd want KV_MODE=json (although usually splunk detects proper json on its own) for data that you push to HEC as, for example:

{
"source": "myHECpusher",
"index": "someindex",
"event": "{\"somefield\":\"somevalue\",\"anotherfield\":\"anothervalue\"}"
}

I'm not sure though that your extraction is what you want it to be.

Firstly, you don't need (and usually don't want, because it adds work to the parser) a ".*" (any string) at the beginning of the regex. The extraction works for the match anywhere within the string (unless anchored with ^ of course), so there's no point in putting it there.

Secondly, are you sure you want to capture static text "Disconnected" as a field called "name"?

Thirdly, it's usually more robust solution to use \s instead of literal space. And have you checked that the regex does work on your data? (for example on https://regex101.com)

Fourthly, your events _are_ of sourcetype logstash, right?

Fifthly, did you search in verbose mode?

0 Karma

jbanAtSplunk
Communicator

Hey,

I destroy Splunk Docker and start clean one, and now props are working.

Now, my idea is that I send like:

{
"source": "myHECpusher",
"index": "someindex",
"event": "This is the event data"
}

where "event" will be as close as Linux syslog message (without timestamp, I will get it from metadata) so I can utilize existing Transform.conf from Splunk_TA_nix* for extracting fields.

 

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Hah! Gotcha!

Typical mistake (did it myself long ago) - if you use /event endpoint splunk bypasses line breaking and date parsing. The first one - because obviously you're providing whole events on input so there's no point in breaking lines, checking if the lines should be merged and so on. The second one - because you're supposed to either supply the time field with a proper value or it will get a value from the time of arrival at the HEC input if I remember correctly.

That used to be a big disadvantage of HEC /event endpoint. Used to, because since 8.0 you can add "?auto_extract_timestamp=true" to the URL to parse the timestamp from the event.

https://docs.splunk.com/Documentation/Splunk/8.2.2/RESTREF/RESTinput#services.2Fcollector.2Fevent

EDIT: Ahhh. I misread. I thought you wanted to parse the date from the event when in fact you want to do the completely oposite thing - provide the timestamp in the "time" field. That'll work properly. Just remember about milliseconds.

0 Karma

jbanAtSplunk
Communicator

Yes, I read about it.

I think you are thinking about this.

ruby { code => "event.set('time', event.get('@timestamp').to_f)" }

This will give me time field in epoch.

0 Karma

jbanAtSplunk
Communicator

There is also a challenge that now I have "two" hosts in Splunk.

and it's like multi-value because Logstash will send "host" in metadata and I want only one.

But I would like Host from logstash (to see who is sending as host in metadata).

Figuring out what is the best way to approach that. Just thinking to put it event and transform from event.

Or there is some Splunk trick?

0 Karma

PickleRick
SplunkTrust
SplunkTrust

Question is are you doing search-time extraction for the host value or index-time one?

I'm not sure at the moment but there might have been some issue with (not)overwriting indexed field value at search-time.

0 Karma

jbanAtSplunk
Communicator

Transform overwrite but now I have 2 times the same 😄

jbanAtSplunk_0-1635275875472.png

And logstash - cannot create "event" as they have bug with \" escaping 😄 at the end I get "event":"\\\"message\\\".\\\"value\\\"" 😄

so, continuing with raw 😄

0 Karma

jbanAtSplunk
Communicator

Hey,

I manage to get /collector working, not raw.

I am interesting now, why does automatic source type not working from Splunk_TA_nix app?

Can Splunk HEC have auto assignment? From SUF it's working as supposed to.

I have correct source:/var/log/secure etc... but it'll not automatically assign linux_secure sourcetype? it's httpevent

0 Karma

PickleRick
SplunkTrust
SplunkTrust

It's easiest to specify your metadata directly in hec request. Then you don't have to worry about overwriting it later.

0 Karma

jbanAtSplunk
Communicator

Hey, yes. I think I am almost done (indexer and parsed like SUF is sending log 🙂
a little bit if facility missing, i need to reverse it a little more. 🙂 what goes where but.

Maybe someone find this usefull so...

rsyslog config

 

template(name="logstash_to_splunk" type="string"
         string="<%PRI%>%TIMESTAMP:::date-rfc3339% %HOSTNAME% %syslogtag:1:32%%msg:::sp-if-no-1st-sp%%TIMESTAMP% %HOSTNAME% %syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%"
        )
authpriv.* @@127.0.0.1:5555; logstash_to_splunk

 

 

logstash config example

input {
 syslog {
    id => "syslog input"
    port => 5555
 }
}

filter {
#Add time for Auto time extraction in Epoch for Splunk HEC
  ruby { code => "event.set('time', event.get('@timestamp').to_i)" }
#Map Facility with Source file metadata
  if [facility] == 10 { mutate { add_field => {"source" => "/var/log/secure"} 
                                 add_field => {"sourcetype" => "linux_secure"} 
                               }
                      }
}

output {
  http {
    http_method => "post"
     id => "splunk output"
     format => "json"
     http_compression => "true"
     url => "https://fqdn:8088/services/collector"
     headers => ["Authorization", "Splunk your_key"]
     mapping => {
       "time" => "%{time}"
       "source" => "%{source}"
       "sourcetype" => "%{sourcetype}"
       "host" => "%{logsource}"
       "event" => "%{message}"
       "fields" => {
         "facility" => "%{facility}"
         "facility_label" => "%{facility_label}"
         "severity" => "%{severity}"
         "severtiy_label" => "%{severity_label}"
       }
     }
  }
}
0 Karma

PickleRick
SplunkTrust
SplunkTrust

OK. But you didn't mention rsyslog before. If you're already using it why put logstash in there?

You can use omhttp to send directly to HEC.

0 Karma

PickleRick
SplunkTrust
SplunkTrust

I don't use logstash but I have an intermediate layer of rsyslog processing events and posting them to HEC, so  it's similar 😉

The document you're interested in is https://docs.splunk.com/Documentation/Splunk/8.2.2/Data/FormateventsforHTTPEventCollector

In general - you need to send via a HTTP request with an authorization header (but I assume you already know how to do that) a json structure containing an "event" field which contains your raw event. That's absolute minimum. You can, if you want, send additional fields like source, host and so on. Pay special attention to time field - you _must_ specify it as unix timestamp _with a millisecond part_.

0 Karma
Get Updates on the Splunk Community!

Now Available: Cisco Talos Threat Intelligence Integrations for Splunk Security Cloud ...

At .conf24, we shared that we were in the process of integrating Cisco Talos threat intelligence into Splunk ...

Preparing your Splunk Environment for OpenSSL3

The Splunk platform will transition to OpenSSL version 3 in a future release. Actions are required to prepare ...

Easily Improve Agent Saturation with the Splunk Add-on for OpenTelemetry Collector

Agent Saturation What and Whys In application performance monitoring, saturation is defined as the total load ...