Kafka Connect not working with Subject Strategies
Context
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop
The basic setup works and produces a message each second that is logged
However, I want to change the subject name strategy. The default one generates two subjects:
${topic}-key
${topic}-value
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
As per the docs, my needs fits into the TopicRecordNameStrategy
What have I tried
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
and use it afterwards for creating the SourceRecord
response objects
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Problem
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
Question
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
What is the proper setup for specyfing the subject strategy to Kafka Connect?
apache-kafka apache-kafka-connect confluent-schema-registry
add a comment |
Context
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop
The basic setup works and produces a message each second that is logged
However, I want to change the subject name strategy. The default one generates two subjects:
${topic}-key
${topic}-value
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
As per the docs, my needs fits into the TopicRecordNameStrategy
What have I tried
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
and use it afterwards for creating the SourceRecord
response objects
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Problem
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
Question
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
What is the proper setup for specyfing the subject strategy to Kafka Connect?
apache-kafka apache-kafka-connect confluent-schema-registry
add a comment |
Context
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop
The basic setup works and produces a message each second that is logged
However, I want to change the subject name strategy. The default one generates two subjects:
${topic}-key
${topic}-value
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
As per the docs, my needs fits into the TopicRecordNameStrategy
What have I tried
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
and use it afterwards for creating the SourceRecord
response objects
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Problem
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
Question
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
What is the proper setup for specyfing the subject strategy to Kafka Connect?
apache-kafka apache-kafka-connect confluent-schema-registry
Context
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop
The basic setup works and produces a message each second that is logged
However, I want to change the subject name strategy. The default one generates two subjects:
${topic}-key
${topic}-value
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
As per the docs, my needs fits into the TopicRecordNameStrategy
What have I tried
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
and use it afterwards for creating the SourceRecord
response objects
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Problem
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
Question
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
What is the proper setup for specyfing the subject strategy to Kafka Connect?
apache-kafka apache-kafka-connect confluent-schema-registry
apache-kafka apache-kafka-connect confluent-schema-registry
asked Nov 28 '18 at 14:25
PelochoPelocho
4,28221333
4,28221333
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
you want:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Source https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53521661%2fkafka-connect-not-working-with-subject-strategies%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
you want:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Source https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
add a comment |
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
you want:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Source https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
add a comment |
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
you want:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Source https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
you want:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Source https://docs.confluent.io/current/connect/managing/configuring.html:
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
edited Nov 28 '18 at 16:44
answered Nov 28 '18 at 15:40
Robin MoffattRobin Moffatt
7,9331429
7,9331429
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
add a comment |
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer
– Pelocho
Nov 28 '18 at 15:54
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
Edited question to include reference
– Robin Moffatt
Nov 28 '18 at 16:43
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53521661%2fkafka-connect-not-working-with-subject-strategies%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown