All Apps and Add-ons

Issues with using splunk connect for Kafka

rahulSplunk123
New Member

Hello Members,

I am trying to use splunk connect for kafka to get JSON messages from few topics be inserted into Splunk.(http://docs.splunk.com/Documentation/KafkaConnect/1.0.0/User/About)

I configured the connect-distributed.properties and I also configured the splunk as per the documentation to consume the messages. So far I am able to see the correct output of commands such as
http://localhost:8083/connector-plugins and http://localhost:8083/connectors (GET request to see all the connector names) and also able to send the task using post request as below:

{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics":"TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval" : "20",
"splunk.hec.ack.poll.threads" : "2",
"splunk.hec.event.timeout" : "120",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE"
}
}

For above post request I get following response back:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval": "20",
"splunk.hec.ack.poll.threads": "2",
"splunk.hec.event.timeout": "120",
"splunk.hec.raw": "false",
"splunk.hec.json.event.enrichment": "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE",
"name": "splunk-kafka-poc-11"
},
"tasks": []
}

But I am getting errors in the kafka console about this task which suggests there are sime issues.

Question

  1. My Kafka Topics have messages in JSON- So in connect-distributed.properties should I use jsoncenverter as bellow:

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true

    OR

    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    Since I was not sure so I tried with both the values but I am getting errors in both scenarios.
    But I saw in the documentation to use the stringConverter only. Could you please confirm which converter to be used?.

When I use Stringconverter I get following error:

[2018-05-25 16:34:27,360] ERROR [pool-1-thread-3] failed to send batch (com.splunk.kafka.connect.SplunkSinkTask)
com.splunk.hecclient.HecException: All channels have back pressure
at com.splunk.hecclient.LoadBalancer.send(LoadBalancer.java:62)
at com.splunk.hecclient.Hec.send(Hec.java:94)
at com.splunk.kafka.connect.SplunkSinkTask.send(SplunkSinkTask.java:189)
at com.splunk.kafka.connect.SplunkSinkTask.handleFailedBatches(SplunkSinkTask.java:124)
at com.splunk.kafka.connect.SplunkSinkTask.put(SplunkSinkTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2018-05-25 16:34:27,360] INFO [pool-1-thread-3] handled 2 failed batches with 702 events (com.splunk.kafka.connect.SplunkSinkTask)

When I use Jsonconverter I get following error

[2018-05-31 15:45:40,514] ERROR [pool-1-thread-24] Task splunk-kafka-poc-11-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:269)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:71)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Could you please advise how to make json messages make their way to splunk,
Please let me know if you need any other information.

Labels (1)
0 Karma

fman82
Explorer

I've used this config before:

connector.class=com.splunk.kafka.connect.SplunkSinkConnector
key.converter.schemas.enable=false
topics=payments
name=SplunkTest2
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
splunk.hec.token=92933e00-0adf-424d-ace4-96e07f47bba9
splunk.hec.uri=http://somehost.compute.amazonaws.com:8088

I highly recommend using Lenses.io to manage Kafka and Kafka Connect. See this little example vid tutorial: https://www.youtube.com/watch?v=cnKHhE8ApPA&

0 Karma

hsesterhenn_spl
Splunk Employee
Splunk Employee

Hi,
looks like you found a solution:

https://github.com/splunk/kafka-connect-splunk/issues/151

@Ken Chen said:
Did you have valid CA signed certificate on your Splunk localhost ? If not, disable the cert validation ( hec.ssl.validate.cert to false) in the config and retry for the string converter scenario.

HTH,

Holger

0 Karma

inawaz123
Loves-to-Learn

Any update on this

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...