Hello Members,
I am trying to use splunk connect for kafka to get JSON messages from few topics be inserted into Splunk.(http://docs.splunk.com/Documentation/KafkaConnect/1.0.0/User/About)
I configured the connect-distributed.properties and I also configured the splunk as per the documentation to consume the messages. So far I am able to see the correct output of commands such as http://localhost:8083/connector-plugins and http://localhost:8083/connectors (GET request to see all the connector names) and also able to send the task using post request as below:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics":"TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval" : "20",
"splunk.hec.ack.poll.threads" : "2",
"splunk.hec.event.timeout" : "120",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE"
}
}
For above post request I get following response back:
{
"name": "splunk-kafka-poc-11",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "TIMELOGS",
"splunk.hec.uri": "https://localhost:8088",
"splunk.hec.token": "Kafka-Token",
"splunk.hec.ack.enabled": "TRUE",
"splunk.hec.ack.poll.interval": "20",
"splunk.hec.ack.poll.threads": "2",
"splunk.hec.event.timeout": "120",
"splunk.hec.raw": "false",
"splunk.hec.json.event.enrichment": "org=fin,bu=south-east-us",
"splunk.hec.ssl.validate.certs": "TRUE",
"name": "splunk-kafka-poc-11"
},
"tasks": []
}
But I am getting errors in the kafka console about this task which suggests there are sime issues.
Question
My Kafka Topics have messages in JSON- So in connect-distributed.properties should I use jsoncenverter as bellow:
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true
OR
key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable=false value.converter.schemas.enable=false Since I was not sure so I tried with both the values but I am getting errors in both scenarios. But I saw in the documentation to use the stringConverter only. Could you please confirm which converter to be used?.
When I use Stringconverter I get following error:
[2018-05-25 16:34:27,360] ERROR [pool-1-thread-3] failed to send batch (com.splunk.kafka.connect.SplunkSinkTask)
com.splunk.hecclient.HecException: All channels have back pressure
at com.splunk.hecclient.LoadBalancer.send(LoadBalancer.java:62)
at com.splunk.hecclient.Hec.send(Hec.java:94)
at com.splunk.kafka.connect.SplunkSinkTask.send(SplunkSinkTask.java:189)
at com.splunk.kafka.connect.SplunkSinkTask.handleFailedBatches(SplunkSinkTask.java:124)
at com.splunk.kafka.connect.SplunkSinkTask.put(SplunkSinkTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2018-05-25 16:34:27,360] INFO [pool-1-thread-3] handled 2 failed batches with 702 events (com.splunk.kafka.connect.SplunkSinkTask)
When I use Jsonconverter I get following error
[2018-05-31 15:45:40,514] ERROR [pool-1-thread-24] Task splunk-kafka-poc-11-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'B': was expecting ('true', 'false' or 'null')
at [Source: [B@79fc4672; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeArray(JsonNodeDeserializer.java:269)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:71)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Could you please advise how to make json messages make their way to splunk, Please let me know if you need any other information.
... View more