All Apps and Add-ons

Issues with using splunk connect for Kafka

New Member

Hello Members,

I am trying to use splunk connect for kafka to get JSON messages from few topics be inserted into Splunk.(http://docs.splunk.com/Documentation/KafkaConnect/1.0.0/User/About)

I configured the connect-distributed.properties and I also configured the splunk as per the documentation to consume the messages. So far I am able to see the correct output of commands such as
http://localhost:8083/connector-plugins and http://localhost:8083/connectors (GET request to see all the connector names) and also able to send the task using post request as below:

{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics":"TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval" : "20",
"splunk.hec.ack.poll.threads" : "2",
"splunk.hec.event.timeout" : "120",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE"
}
}

For above post request I get following response back:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval": "20",
"splunk.hec.ack.poll.threads": "2",
"splunk.hec.event.timeout": "120",
"splunk.hec.raw": "false",
"splunk.hec.json.event.enrichment": "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE",
"name": "splunk-kafka-poc-11"
},
"tasks": []
}

But I am getting errors in the kafka console about this task which suggests there are sime issues.

Question

  1. My Kafka Topics have messages in JSON- So in connect-distributed.properties should I use jsoncenverter as bellow:

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true

    OR

    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    Since I was not sure so I tried with both the values but I am getting errors in both scenarios.
    But I saw in the documentation to use the stringConverter only. Could you please confirm which converter to be used?.

When I use Stringconverter I get following error:

[2018-05-25 16:34:27,360] ERROR [pool-1-thread-3] failed to send batch (com.splunk.kafka.connect.SplunkSinkTask)
com.splunk.hecclient.HecException: All channels have back pressure
at com.splunk.hecclient.LoadBalancer.send(LoadBalancer.java:62)
at com.splunk.hecclient.Hec.send(Hec.java:94)
at com.splunk.kafka.connect.SplunkSinkTask.send(SplunkSinkTask.java:189)
at com.splunk.kafka.connect.SplunkSinkTask.handleFailedBatches(SplunkSinkTask.java:124)
at com.splunk.kafka.connect.SplunkSinkTask.put(SplunkSinkTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2018-05-25 16:34:27,360] INFO [pool-1-thread-3] handled 2 failed batches with 702 events (com.splunk.kafka.connect.SplunkSinkTask)

When I use Jsonconverter I get following error

[2018-05-31 15:45:40,514] ERROR [pool-1-thread-24] Task splunk-kafka-poc-11-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:269)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:71)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Could you please advise how to make json messages make their way to splunk,
Please let me know if you need any other information.

Labels (1)
0 Karma

Explorer

I've used this config before:

connector.class=com.splunk.kafka.connect.SplunkSinkConnector
key.converter.schemas.enable=false
topics=payments
name=SplunkTest2
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
splunk.hec.token=92933e00-0adf-424d-ace4-96e07f47bba9
splunk.hec.uri=http://somehost.compute.amazonaws.com:8088

I highly recommend using Lenses.io to manage Kafka and Kafka Connect. See this little example vid tutorial: https://www.youtube.com/watch?v=cnKHhE8ApPA&

0 Karma

Splunk Employee
Splunk Employee

Hi,
looks like you found a solution:

https://github.com/splunk/kafka-connect-splunk/issues/151

@Ken Chen said:
Did you have valid CA signed certificate on your Splunk localhost ? If not, disable the cert validation ( hec.ssl.validate.cert to false) in the config and retry for the string converter scenario.

HTH,

Holger

0 Karma

New Member

Any update on this

0 Karma