Idempotent receiver - is there any equivalent to FilteringMessageListenerAdapter/RecordFilterStrategy for...












1















I need to implement an idempotent listener using spring cloud stream kafka binder.



How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?



Thanks.










share|improve this question























  • Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask

    – Sean Pianka
    Nov 26 '18 at 18:41
















1















I need to implement an idempotent listener using spring cloud stream kafka binder.



How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?



Thanks.










share|improve this question























  • Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask

    – Sean Pianka
    Nov 26 '18 at 18:41














1












1








1








I need to implement an idempotent listener using spring cloud stream kafka binder.



How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?



Thanks.










share|improve this question














I need to implement an idempotent listener using spring cloud stream kafka binder.



How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?



Thanks.







spring-cloud-stream






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 26 '18 at 17:24









Vikas Chhabra Vikas Chhabra

63




63













  • Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask

    – Sean Pianka
    Nov 26 '18 at 18:41



















  • Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask

    – Sean Pianka
    Nov 26 '18 at 18:41

















Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask

– Sean Pianka
Nov 26 '18 at 18:41





Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask

– Sean Pianka
Nov 26 '18 at 18:41












1 Answer
1






active

oldest

votes


















0














You can use a condition on the @StreamListener; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.



@SpringBootApplication
@EnableBinding(Sink.class)
public class So53486162Application {

public static void main(String args) {
SpringApplication.run(So53486162Application.class, args);
}

@StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
public void listen(String in) {
System.out.println(in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
return args -> {
template.send("so53486162", "dontProcessThisOne".getBytes());
template.send("so53486162", "processThisOne".getBytes());
};
}

}

@Component
class Filter {

public boolean shouldProcess(Message<byte> in) {
String string = new String(in.getPayload());
System.out.println("Filtering: " + string);
return !string.equals("dontProcessThisOne");
}

}


and



Filtering: dontProcessThisOne
2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
Cannot find a @StreamListener matching for message with id: null
Filtering: processThisOne
processThisOne





share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53486162%2fidempotent-receiver-is-there-any-equivalent-to-filteringmessagelisteneradapter%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    You can use a condition on the @StreamListener; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.



    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So53486162Application {

    public static void main(String args) {
    SpringApplication.run(So53486162Application.class, args);
    }

    @StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
    public void listen(String in) {
    System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
    return args -> {
    template.send("so53486162", "dontProcessThisOne".getBytes());
    template.send("so53486162", "processThisOne".getBytes());
    };
    }

    }

    @Component
    class Filter {

    public boolean shouldProcess(Message<byte> in) {
    String string = new String(in.getPayload());
    System.out.println("Filtering: " + string);
    return !string.equals("dontProcessThisOne");
    }

    }


    and



    Filtering: dontProcessThisOne
    2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
    Cannot find a @StreamListener matching for message with id: null
    Filtering: processThisOne
    processThisOne





    share|improve this answer




























      0














      You can use a condition on the @StreamListener; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.



      @SpringBootApplication
      @EnableBinding(Sink.class)
      public class So53486162Application {

      public static void main(String args) {
      SpringApplication.run(So53486162Application.class, args);
      }

      @StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
      public void listen(String in) {
      System.out.println(in);
      }

      @Bean
      public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
      return args -> {
      template.send("so53486162", "dontProcessThisOne".getBytes());
      template.send("so53486162", "processThisOne".getBytes());
      };
      }

      }

      @Component
      class Filter {

      public boolean shouldProcess(Message<byte> in) {
      String string = new String(in.getPayload());
      System.out.println("Filtering: " + string);
      return !string.equals("dontProcessThisOne");
      }

      }


      and



      Filtering: dontProcessThisOne
      2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
      Cannot find a @StreamListener matching for message with id: null
      Filtering: processThisOne
      processThisOne





      share|improve this answer


























        0












        0








        0







        You can use a condition on the @StreamListener; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.



        @SpringBootApplication
        @EnableBinding(Sink.class)
        public class So53486162Application {

        public static void main(String args) {
        SpringApplication.run(So53486162Application.class, args);
        }

        @StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
        public void listen(String in) {
        System.out.println(in);
        }

        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
        return args -> {
        template.send("so53486162", "dontProcessThisOne".getBytes());
        template.send("so53486162", "processThisOne".getBytes());
        };
        }

        }

        @Component
        class Filter {

        public boolean shouldProcess(Message<byte> in) {
        String string = new String(in.getPayload());
        System.out.println("Filtering: " + string);
        return !string.equals("dontProcessThisOne");
        }

        }


        and



        Filtering: dontProcessThisOne
        2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
        Cannot find a @StreamListener matching for message with id: null
        Filtering: processThisOne
        processThisOne





        share|improve this answer













        You can use a condition on the @StreamListener; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.



        @SpringBootApplication
        @EnableBinding(Sink.class)
        public class So53486162Application {

        public static void main(String args) {
        SpringApplication.run(So53486162Application.class, args);
        }

        @StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
        public void listen(String in) {
        System.out.println(in);
        }

        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
        return args -> {
        template.send("so53486162", "dontProcessThisOne".getBytes());
        template.send("so53486162", "processThisOne".getBytes());
        };
        }

        }

        @Component
        class Filter {

        public boolean shouldProcess(Message<byte> in) {
        String string = new String(in.getPayload());
        System.out.println("Filtering: " + string);
        return !string.equals("dontProcessThisOne");
        }

        }


        and



        Filtering: dontProcessThisOne
        2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
        Cannot find a @StreamListener matching for message with id: null
        Filtering: processThisOne
        processThisOne






        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 26 '18 at 18:09









        Gary RussellGary Russell

        81.9k74773




        81.9k74773
































            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53486162%2fidempotent-receiver-is-there-any-equivalent-to-filteringmessagelisteneradapter%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

            Calculate evaluation metrics using cross_val_predict sklearn

            Insert data from modal to MySQL (multiple modal on website)