- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Issues with using splunk connect for Kafka
Hello Members,
I am trying to use splunk connect for kafka to get JSON messages from few topics be inserted into Splunk.(http://docs.splunk.com/Documentation/KafkaConnect/1.0.0/User/About)
I configured the connect-distributed.properties and I also configured the splunk as per the documentation to consume the messages. So far I am able to see the correct output of commands such as
http://localhost:8083/connector-plugins and http://localhost:8083/connectors (GET request to see all the connector names) and also able to send the task using post request as below:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics":"TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval" : "20",
"splunk.hec.ack.poll.threads" : "2",
"splunk.hec.event.timeout" : "120",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE"
}
}
For above post request I get following response back:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval": "20",
"splunk.hec.ack.poll.threads": "2",
"splunk.hec.event.timeout": "120",
"splunk.hec.raw": "false",
"splunk.hec.json.event.enrichment": "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE",
"name": "splunk-kafka-poc-11"
},
"tasks": []
}
But I am getting errors in the kafka console about this task which suggests there are sime issues.
Question
-
My Kafka Topics have messages in JSON- So in connect-distributed.properties should I use jsoncenverter as bellow:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueOR
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Since I was not sure so I tried with both the values but I am getting errors in both scenarios.
But I saw in the documentation to use the stringConverter only. Could you please confirm which converter to be used?.
When I use Stringconverter I get following error:
[2018-05-25 16:34:27,360] ERROR [pool-1-thread-3] failed to send batch (com.splunk.kafka.connect.SplunkSinkTask)
com.splunk.hecclient.HecException: All channels have back pressure
at com.splunk.hecclient.LoadBalancer.send(LoadBalancer.java:62)
at com.splunk.hecclient.Hec.send(Hec.java:94)
at com.splunk.kafka.connect.SplunkSinkTask.send(SplunkSinkTask.java:189)
at com.splunk.kafka.connect.SplunkSinkTask.handleFailedBatches(SplunkSinkTask.java:124)
at com.splunk.kafka.connect.SplunkSinkTask.put(SplunkSinkTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2018-05-25 16:34:27,360] INFO [pool-1-thread-3] handled 2 failed batches with 702 events (com.splunk.kafka.connect.SplunkSinkTask)
When I use Jsonconverter I get following error
[2018-05-31 15:45:40,514] ERROR [pool-1-thread-24] Task splunk-kafka-poc-11-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:269)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:71)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Could you please advise how to make json messages make their way to splunk,
Please let me know if you need any other information.
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I've used this config before:
connector.class=com.splunk.kafka.connect.SplunkSinkConnector
key.converter.schemas.enable=false
topics=payments
name=SplunkTest2
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
splunk.hec.token=92933e00-0adf-424d-ace4-96e07f47bba9
splunk.hec.uri=http://somehost.compute.amazonaws.com:8088
I highly recommend using Lenses.io to manage Kafka and Kafka Connect. See this little example vid tutorial: https://www.youtube.com/watch?v=cnKHhE8ApPA&
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content


Hi,
looks like you found a solution:
https://github.com/splunk/kafka-connect-splunk/issues/151
@Ken Chen said:
Did you have valid CA signed certificate on your Splunk localhost ? If not, disable the cert validation ( hec.ssl.validate.cert to false) in the config and retry for the string converter scenario.
HTH,
Holger
- Mark as New
- Bookmark Message
- Subscribe to Message
- Mute Message
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Any update on this
