Kafka Connect not working with Subject Strategies












0















Context



I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.



I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop



The basic setup works and produces a message each second that is logged



However, I want to change the subject name strategy. The default one generates two subjects:




  • ${topic}-key

  • ${topic}-value


As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:




  • ${topic}-${keyRecordName}

  • ${topic}-${valueRecordName}


As per the docs, my needs fits into the TopicRecordNameStrategy



What have I tried



I create the avroData object for sending values to connect:



class SampleSourceConnectorTask : SourceTask() {

private lateinit var avroData: AvroData

override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}


and use it afterwards for creating the SourceRecord response objects



The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:



name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy


Problem



The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.



Question



Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works



What is the proper setup for specyfing the subject strategy to Kafka Connect?










share|improve this question



























    0















    Context



    I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.



    I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop



    The basic setup works and produces a message each second that is logged



    However, I want to change the subject name strategy. The default one generates two subjects:




    • ${topic}-key

    • ${topic}-value


    As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:




    • ${topic}-${keyRecordName}

    • ${topic}-${valueRecordName}


    As per the docs, my needs fits into the TopicRecordNameStrategy



    What have I tried



    I create the avroData object for sending values to connect:



    class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData

    override fun start(props: Map<String, String>) {
    [...]
    avroData = AvroData(AvroDataConfig(props))
    }


    and use it afterwards for creating the SourceRecord response objects



    The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:



    name=SampleSourceConnector
    connector.class=[...]
    tasks.max=1
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
    value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy


    Problem



    The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.



    Question



    Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works



    What is the proper setup for specyfing the subject strategy to Kafka Connect?










    share|improve this question

























      0












      0








      0








      Context



      I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.



      I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop



      The basic setup works and produces a message each second that is logged



      However, I want to change the subject name strategy. The default one generates two subjects:




      • ${topic}-key

      • ${topic}-value


      As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:




      • ${topic}-${keyRecordName}

      • ${topic}-${valueRecordName}


      As per the docs, my needs fits into the TopicRecordNameStrategy



      What have I tried



      I create the avroData object for sending values to connect:



      class SampleSourceConnectorTask : SourceTask() {

      private lateinit var avroData: AvroData

      override fun start(props: Map<String, String>) {
      [...]
      avroData = AvroData(AvroDataConfig(props))
      }


      and use it afterwards for creating the SourceRecord response objects



      The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:



      name=SampleSourceConnector
      connector.class=[...]
      tasks.max=1
      key.converter=io.confluent.connect.avro.AvroConverter
      key.converter.schema.registry.url=http://localhost:8081
      value.converter=io.confluent.connect.avro.AvroConverter
      value.converter.schema.registry.url=http://localhost:8081
      key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
      value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy


      Problem



      The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.



      Question



      Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works



      What is the proper setup for specyfing the subject strategy to Kafka Connect?










      share|improve this question














      Context



      I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.



      I deployed them into a local Kafka environment using the fast-data-dev Docker image provided by Landoop



      The basic setup works and produces a message each second that is logged



      However, I want to change the subject name strategy. The default one generates two subjects:




      • ${topic}-key

      • ${topic}-value


      As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:




      • ${topic}-${keyRecordName}

      • ${topic}-${valueRecordName}


      As per the docs, my needs fits into the TopicRecordNameStrategy



      What have I tried



      I create the avroData object for sending values to connect:



      class SampleSourceConnectorTask : SourceTask() {

      private lateinit var avroData: AvroData

      override fun start(props: Map<String, String>) {
      [...]
      avroData = AvroData(AvroDataConfig(props))
      }


      and use it afterwards for creating the SourceRecord response objects



      The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:



      name=SampleSourceConnector
      connector.class=[...]
      tasks.max=1
      key.converter=io.confluent.connect.avro.AvroConverter
      key.converter.schema.registry.url=http://localhost:8081
      value.converter=io.confluent.connect.avro.AvroConverter
      value.converter.schema.registry.url=http://localhost:8081
      key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
      value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy


      Problem



      The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.



      Question



      Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works



      What is the proper setup for specyfing the subject strategy to Kafka Connect?







      apache-kafka apache-kafka-connect confluent-schema-registry






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 28 '18 at 14:25









      PelochoPelocho

      4,28221333




      4,28221333
























          1 Answer
          1






          active

          oldest

          votes


















          3














          You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:



          key.subject.name.strategy
          value.subject.name.strategy


          you want:



          key.converter.key.subject.name.strategy
          value.converter.value.subject.name.strategy


          Source https://docs.confluent.io/current/connect/managing/configuring.html:




          To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.







          share|improve this answer


























          • You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

            – Pelocho
            Nov 28 '18 at 15:54











          • Edited question to include reference

            – Robin Moffatt
            Nov 28 '18 at 16:43











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53521661%2fkafka-connect-not-working-with-subject-strategies%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          3














          You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:



          key.subject.name.strategy
          value.subject.name.strategy


          you want:



          key.converter.key.subject.name.strategy
          value.converter.value.subject.name.strategy


          Source https://docs.confluent.io/current/connect/managing/configuring.html:




          To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.







          share|improve this answer


























          • You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

            – Pelocho
            Nov 28 '18 at 15:54











          • Edited question to include reference

            – Robin Moffatt
            Nov 28 '18 at 16:43
















          3














          You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:



          key.subject.name.strategy
          value.subject.name.strategy


          you want:



          key.converter.key.subject.name.strategy
          value.converter.value.subject.name.strategy


          Source https://docs.confluent.io/current/connect/managing/configuring.html:




          To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.







          share|improve this answer


























          • You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

            – Pelocho
            Nov 28 '18 at 15:54











          • Edited question to include reference

            – Robin Moffatt
            Nov 28 '18 at 16:43














          3












          3








          3







          You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:



          key.subject.name.strategy
          value.subject.name.strategy


          you want:



          key.converter.key.subject.name.strategy
          value.converter.value.subject.name.strategy


          Source https://docs.confluent.io/current/connect/managing/configuring.html:




          To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.







          share|improve this answer















          You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:



          key.subject.name.strategy
          value.subject.name.strategy


          you want:



          key.converter.key.subject.name.strategy
          value.converter.value.subject.name.strategy


          Source https://docs.confluent.io/current/connect/managing/configuring.html:




          To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.








          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 28 '18 at 16:44

























          answered Nov 28 '18 at 15:40









          Robin MoffattRobin Moffatt

          7,9331429




          7,9331429













          • You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

            – Pelocho
            Nov 28 '18 at 15:54











          • Edited question to include reference

            – Robin Moffatt
            Nov 28 '18 at 16:43



















          • You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

            – Pelocho
            Nov 28 '18 at 15:54











          • Edited question to include reference

            – Robin Moffatt
            Nov 28 '18 at 16:43

















          You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

          – Pelocho
          Nov 28 '18 at 15:54





          You're right. I didn't find anything related to that in the documentation (may I didn't dig enough). Do you have some link to some documentation stating that? That would complete the answer

          – Pelocho
          Nov 28 '18 at 15:54













          Edited question to include reference

          – Robin Moffatt
          Nov 28 '18 at 16:43





          Edited question to include reference

          – Robin Moffatt
          Nov 28 '18 at 16:43




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53521661%2fkafka-connect-not-working-with-subject-strategies%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Contact image not getting when fetch all contact list from iPhone by CNContact

          count number of partitions of a set with n elements into k subsets

          A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks