All Apps and Add-ons

Why is Splunk kafka sink connector not able to fetch the logs from Eventhub kafka topic?

tariq1992
Engager

Hello Team,

 

We have configured the standalone splunk kafka connect server with event hub kafka topic . But we are not able to fetch the logs from kafka topic to Splunk. Below are the our configuration details.

 

standalone.properties

==================================================================================

bootstrap.servers=<hostname>:9093
plugin.path=/opt/app/kafka/plugins,/opt/app/kafka/kafka_2.13-3.4.0/jre1.8.0_211

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=<group>

 

value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=60000
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=OAUTHBEARER
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER


consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//Kafka Serializer class for Kafka record values, we have set our message body to be String
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer


consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<token>" clientSecret="<token>" scope="https://<bootstrap_server>/.default";
consumer.sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/<values>/oauth2/v2.0/token
consumer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
offset.storage.file.filename=/opt/app/kafka/run/offset_connectorservice_1
#consumer.auto.offset.reset=latest
auto.offset.reset=latest
consumer.group.id=<topic_group>
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/<values>/oauth2/v2.0/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<token>" clientSecret="<token>" scope="https://<bootstrap_server>/.default";

access.control.allow.origin=*
access.control.allow.methods=GET,OPTIONS,HEAD,POST,PUT,DELETE

=================================================================================

Logs from standalone kafka

================================================================================

WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
Jul 27, 2023 4:37:51 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Jul 27, 2023 4:37:51 AM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored.
Jul 27, 2023 4:37:51 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2023-07-27 04:37:51,213] INFO Started o.e.j.s.ServletContextHandler@252dc8c4{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
[2023-07-27 04:37:51,213] DEBUG STARTED @14030ms o.e.j.s.ServletContextHandler@252dc8c4{/,null,AVAILABLE} (org.eclipse.jetty.util.component.AbstractLifeCycle:191)
[2023-07-27 04:37:51,213] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:309)
[2023-07-27 04:37:51,213] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)

=================================================================================

 

splunksinkconnector.sh

=================================================================================

#!/bin/bash

curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{
"name": "splunk-asla-dev",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "1",
"topics": "<topic>",
"splunk.hec.uri": "https://<splunk_indexer>:8088",
"splunk.hec.token": "<token>",
"splunk.hec.ack.enabled": "true",
"splunk.hec.raw": "true",
"splunk.hec.track.data": "true",
"splunk.hec.ssl.validate.certs": "false",
"splunk.indexes": "index",
"splunk.sourcetypes": "sourcetype",
"splunk.hec.raw.line.breaker": "\n"
}
}'

===================================================================================

 

Below are the error messages, after running the sink connector,

 

================================================================================

[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] sslCiphers: closed 1 metric(s). (org.apache.kafka.common.network.Selector:269)
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] clients: entering performPendingMetricsOperations (org.apache.kafka.common.network.Selector:213)
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] clients: leaving performPendingMetricsOperations (org.apache.kafka.common.network.Selector:229)
[2023-07-27 04:40:50,906] TRACE [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] clients: closed 0 metric(s). (org.apache.kafka.common.network.Selector:269)
[2023-07-27 04:40:50,906] INFO [sink_connector|task-0] [Principal=:74fafaf6-a0c9-4b8b-bd8f-397ccb3c1212]: Expiring credential re-login thread has been interrupted and will exit. (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin:95)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] LoginManager(serviceName=kafka, publicCredentials=[SaslExtensions[extensionsMap={}]], refCount=0) released (org.apache.kafka.common.security.authenticator.LoginManager:157)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] Removed metric named MetricName [name=version, group=app-info, description=Metric indicating version, tags={client-id=connector-consumer-sink_connector-0}] (org.apache.kafka.common.metrics.Metrics:568)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] Removed metric named MetricName [name=commit-id, group=app-info, description=Metric indicating commit-id, tags={client-id=connector-consumer-sink_connector-0}] (org.apache.kafka.common.metrics.Metrics:568)
[2023-07-27 04:40:50,907] TRACE [sink_connector|task-0] Removed metric named MetricName [name=start-time-ms, group=app-info, description=Metric indicating start-time-ms, tags={client-id=connector-consumer-sink_connector-0}] (org.apache.kafka.common.metrics.Metrics:568)
[2023-07-27 04:40:50,907] INFO [sink_connector|task-0] App info kafka.consumer for connector-consumer-sink_connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2023-07-27 04:40:50,907] DEBUG [sink_connector|task-0] [Consumer clientId=connector-consumer-sink_connector-0, groupId=stage-group] Kafka consumer has been closed (org.apache.kafka.clients.consumer.KafkaConsumer:2425)

 

===============================================================================

 

Please let us know what we are missing in this configuration .

 

Labels (1)
1 Solution

tariq1992
Engager

I was able to resolved this issue. Let me answer my question here.  I have removed Deserializer from connector properties .

consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//Kafka Serializer class for Kafka record values, we have set our message body to be String
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

And I have also published the kafka topic to event hub platform. But Deserializer properties seems to be main reason for this issue. It is working good now.

View solution in original post

0 Karma

tariq1992
Engager

I was able to resolved this issue. Let me answer my question here.  I have removed Deserializer from connector properties .

consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
//Kafka Serializer class for Kafka record values, we have set our message body to be String
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

And I have also published the kafka topic to event hub platform. But Deserializer properties seems to be main reason for this issue. It is working good now.

0 Karma
Get Updates on the Splunk Community!

Stay Connected: Your Guide to November Tech Talks, Office Hours, and Webinars!

&#x1f342; Fall into November with a fresh lineup of Community Office Hours, Tech Talks, and Webinars we’ve ...

Transform your security operations with Splunk Enterprise Security

Hi Splunk Community, Splunk Platform has set a great foundation for your security operations. With the ...

Splunk Admins and App Developers | Earn a $35 gift card!

Splunk, in collaboration with ESG (Enterprise Strategy Group) by TechTarget, is excited to announce a ...