All Apps and Add-ons

Kafka Messaging Modular Input: Messages are consumed, but why do they never show up in the index?

  1. Modular input is installed
  2. It is connected properly and I can see in the logs of splunk and Kafka that the connection happens.
  3. No error messages (except for the Failed to load class "org.slf4j.impl.StaticLoggerBinder". message which I read it should be ignored).
  4. The Messa ges are consumed ( I do run the kafka consumer offset checker and it shows that the splunk group id is at the latest offset alway).
  5. Nothing shows on the index ?
  6. Tried changing index to no avail
  7. No Errors in splunkd.log
  8. Created my own handler (copied the default handler) and added it.
  9. Put 'System.out.println' on the "HandleMessage' and "SetParams" methods but I do not see them in the logs
  10. Thought maybe this is not the best way to log so I actually put code to create outside files and put the log into them but the files are never created

Any ideas? I am at the end of my attempts here. Is there a specific format for the message? What am I missing here?

New Member

We are facing the same problem. Appreciate if any one can help with resolving this issue

0 Karma

Explorer

Just in case you are still having this issue. We added slf4j-simple-1.7.21.jar to the .../kafka_ta/bin/lib folder and we stopped getting the org.slf4j.impl.StaticLoggerBinder

0 Karma

Ultra Champion

Props to Mark Dixon , I realised that the version of the Add-On on Splunkbase didn't have the correct STDOUTTransport code compiled in , blame myself for the oversight because I have been too focussed on the HECTransport that performs way better.
New version 0.9.2 now released on Splunkbase.
Thanks for your patience folks.

Explorer

Never mind, I got it:

Lines 80 onwards of STDDOUTTransport.scala should be:

@Override
public void transport(String message, String time) {
    transport(message);

}

Well,
Guess what, I wrote my own modular input in half the time it took me trying to get this to work and mine works like a charm.
Sadly the level of debugging and tracing this input gives is lacking and the documentation assumes 'every thing is straight forward' which is not the case at all.
I am going to close this topic, I got 50 email notifications over night and not a single one of them was in response to my original ask.

0 Karma

New Member

We face the same problem as mentioned. If you can post the changes you made to github that would be really helpful

0 Karma

Path Finder

Good job. Any chance of popping this on github so we could all benefit?

0 Karma

Ultra Champion

Put 'System.out.println' on the
"HandleMessage' and "SetParams"
methods but I do not see them in the
logs

Well that wont work. You need to write to STD ERR as dictated by the core modular inputs implementation in Splunk.

0 Karma

Ultra Champion

what does your inputs.conf stanza look like ?

0 Karma

Explorer

I've already typed this once, but for some reason it's disappeared.

The problem seems to stem from Line 377 of ModularInput.StateCheckerThread which makes a REST call to splunk to determine whether the input is enable. As it's a local instance with a default SSL cert the connection fails the SSL verification and throws an exception, causing the input to assume it's disabled (why?) and throw away all the messages (why again?)

This does not (AFAICT) appear in any logs, nor does there appear to be any way of overriding it.

How can we get this to work?

0 Karma

Ultra Champion

That is core functionality in the underlying Java framework used by all my Java based Modular Inputs to call back to the SplunkD management port to check if the stanza is enabled or disabled in Splunk , not specific to Kafka in any way at all.

You will definitely see error messages in the logs if this logic is triggering and then the Kafka Mod Input process is self terminating (by design) , index=_internal ExecProcessor kafka.py

What is the error message ?

0 Karma

Explorer

There is no error message, but this might be due to the Kafka input not shipping any slf4j implementation and it defaulting to noop.

0 Karma

Ultra Champion

You are going off on tangents here.

Those SLF4J messages can be ignored, they are from the underlying kafka client library and have absolutely nothing to do with the Modular Inputs runtime logging to Splunk.

So , back on track...if you see no error messages , then the Mod Input state checker is working just fine and this is not the source of your issue.

0 Karma

Explorer

The only logline of interest is:

11-03-2015 14:53:18.627 +0000 INFO ExecProcessor - New scheduled exec process: python /opt/splunk/etc/apps/kafka_ta/bin/kafka.py

0 Karma

Ultra Champion

How many kafka.py processes are running ? Check that it is only 1.

0 Karma

Explorer

There were two, don't know why, but that hasn't always been the case. I killed everything and restarted - logs are:

/2015
15:15:17.088

11-03-2015 15:15:17.088 +0000 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/kafka_ta/bin/kafka.py" SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

host = 
source = /opt/splunk/var/log/splunk/splunkd.log
sourcetype = splunkd

03/11/2015

15:15:17.088

11-03-2015 15:15:17.088 +0000 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/kafka_ta/bin/kafka.py" SLF4J: Defaulting to no-operation (NOP) logger implementation

host = 
source = /opt/splunk/var/log/splunk/splunkd.log
sourcetype = splunkd

03/11/2015

15:15:17.088

11-03-2015 15:15:17.088 +0000 ERROR ExecProcessor - message from "python /opt/splunk/etc/apps/kafka_ta/bin/kafka.py" SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

host = 
source = /opt/splunk/var/log/splunk/splunkd.log
sourcetype = splunkd

03/11/2015

15:15:10.155

11-03-2015 15:15:10.155 +0000 INFO ExecProcessor - New scheduled exec process: python /opt/splunk/etc/apps/kafka_ta/bin/kafka.py

0 Karma

Path Finder

Incidentally, the UI saves my changes out to apps/launcher/local/inputs.conf. Is this expected?

0 Karma

Ultra Champion

yes , when you setup your stanza whilst in the context of the "launcher" app.

0 Karma

Path Finder

I can't vouch for OP, but I'm the 'me too' person...

[kafka://Kafka]
groupid = splunk
hec
batchmode = 0
hec
https = 0
outputtype = stdout
sourcetype = _json
topic
name = test
zookeeperconnecthost = localhost
additionalconsumerproperties = key.deserializer=org.apache.kafka.common.serialization.StringDeserializer,value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

0 Karma

Ultra Champion

what happens if you try a different sourcetype ie : a custom one called "mykafkadata" ?

0 Karma