Hello Team,
We have configured the standalone splunk kafka connect server with event hub kafka topic . But we are not able to fetch the logs from kafka topic to Splunk. Below are the our configuration details.
standalone.properties
==================================================================================
bootstrap.servers=<hostname>:9093
plugin.path=/opt/app/kafka/plugins,/opt/app/kafka/kafka_2.13-3.4.0/jre1.8.0_211
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=<group>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=60000
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=OAUTHBEARER
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//Kafka Serializer class for Kafka record values, we have set our message body to be String
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<token>" clientSecret="<token>" scope="https://<bootstrap_server>/.default";
consumer.sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/<values>/oauth2/v2.0/token
consumer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
offset.storage.file.filename=/opt/app/kafka/run/offset_connectorservice_1
#consumer.auto.offset.reset=latest
auto.offset.reset=latest
consumer.group.id=<topic_group>
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/<values>/oauth2/v2.0/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<token>" clientSecret="<token>" scope="https://<bootstrap_server>/.default";
access.control.allow.origin=*
access.control.allow.methods=GET,OPTIONS,HEAD,POST,PUT,DELETE
=================================================================================
Logs from standalone kafka
================================================================================
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
Jul 27, 2023 4:37:51 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Jul 27, 2023 4:37:51 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored.
Jul 27, 2023 4:37:51 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2023-07-27 04:37:51,213] INFO Started o.e.j.s.ServletContextHandler@252dc8c4{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
[2023-07-27 04:37:51,213] DEBUG STARTED @14030ms o.e.j.s.ServletContextHandler@252dc8c4{/,null,AVAILABLE} (org.eclipse.jetty.util.component.AbstractLifeCycle:191)
[2023-07-27 04:37:51,213] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:309)
[2023-07-27 04:37:51,213] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
=================================================================================
splunksinkconnector.sh
=================================================================================
#!/bin/bash
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "splunk-asla-dev",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "<topic>",
"splunk.hec.uri": "https://<splunk_indexer>:8088",
"splunk.hec.token": "<token>",
"splunk.hec.ack.enabled": "true",
"splunk.hec.raw": "true",
"splunk.hec.track.data": "true",
"splunk.hec.ssl.validate.certs": "false",
"splunk.indexes": "index",
"splunk.sourcetypes": "sourcetype",
"splunk.hec.raw.line.breaker": "\n"
}
}'
===================================================================================
Below are the error messages, after running the sink connector,
================================================================================
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] sslCiphers: closed 1 metric(s). (org.apache.kafka.common.network.Selector:269)
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] clients: entering performPendingMetricsOperations (org.apache.kafka.common.network.Selector:213)
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] clients: leaving performPendingMetricsOperations (org.apache.kafka.common.network.Selector:229)
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] clients: closed 0 metric(s). (org.apache.kafka.common.network.Selector:269)
[2023-07-27 04:40:50,906] INFO [sink_connector|task-0] [Principal=:74fafaf6-a0c9-4b8b-bd8f-397ccb3c1212]: Expiring credential re-login thread has been interrupted and will exit. (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin:95)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] LoginManager(serviceName=kafka, publicCredentials=[SaslExtensions[extensionsMap={}]], refCount=0) released (org.apache.kafka.common.security.authenticator.LoginManager:157)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] Removed metric named MetricName [name=version, group=app-info, description=Metric indicating version, tags={client-id=connector-consumer-sink_connector-0}] (org.apache.kafka.common.metrics.Metrics:568)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] Removed metric named MetricName [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=connector-consumer-sink_connector-0}] (org.apache.kafka.common.metrics.Metrics:568)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] Removed metric named MetricName [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=connector-consumer-sink_connector-0}] (org.apache.kafka.common.metrics.Metrics:568)
[2023-07-27 04:40:50,907] INFO [sink_connector|task-0] App info kafka.consumer for connector-consumer-sink_connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2023-07-27 04:40:50,907] DEBUG [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] Kafka consumer has been closed (org.apache.kafka.clients.consumer.KafkaConsumer:2425)
===============================================================================
Please let us know what we are missing in this configuration .
I was able to resolved this issue. Let me answer my question here. I have removed Deserializer from connector properties .
consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//Kafka Serializer class for Kafka record values, we have set our message body to be String
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
And I have also published the kafka topic to event hub platform. But Deserializer properties seems to be main reason for this issue. It is working good now.
I was able to resolved this issue. Let me answer my question here. I have removed Deserializer from connector properties .
consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//Kafka Serializer class for Kafka record values, we have set our message body to be String
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
And I have also published the kafka topic to event hub platform. But Deserializer properties seems to be main reason for this issue. It is working good now.