Idempotent receiver - is there any equivalent to FilteringMessageListenerAdapter/RecordFilterStrategy for...
I need to implement an idempotent listener using spring cloud stream kafka binder.
How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?
Thanks.
spring-cloud-stream
add a comment |
I need to implement an idempotent listener using spring cloud stream kafka binder.
How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?
Thanks.
spring-cloud-stream
Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask
– Sean Pianka
Nov 26 '18 at 18:41
add a comment |
I need to implement an idempotent listener using spring cloud stream kafka binder.
How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?
Thanks.
spring-cloud-stream
I need to implement an idempotent listener using spring cloud stream kafka binder.
How could we wrap the StreamListener so that necessary filtering/de-duplication advice could be applied before messages are delegated to corresponding handlers?
Thanks.
spring-cloud-stream
spring-cloud-stream
asked Nov 26 '18 at 17:24
Vikas Chhabra Vikas Chhabra
63
63
Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask
– Sean Pianka
Nov 26 '18 at 18:41
add a comment |
Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask
– Sean Pianka
Nov 26 '18 at 18:41
Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask
– Sean Pianka
Nov 26 '18 at 18:41
Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask
– Sean Pianka
Nov 26 '18 at 18:41
add a comment |
1 Answer
1
active
oldest
votes
You can use a condition
on the @StreamListener
; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53486162Application {
public static void main(String args) {
SpringApplication.run(So53486162Application.class, args);
}
@StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
return args -> {
template.send("so53486162", "dontProcessThisOne".getBytes());
template.send("so53486162", "processThisOne".getBytes());
};
}
}
@Component
class Filter {
public boolean shouldProcess(Message<byte> in) {
String string = new String(in.getPayload());
System.out.println("Filtering: " + string);
return !string.equals("dontProcessThisOne");
}
}
and
Filtering: dontProcessThisOne
2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
Cannot find a @StreamListener matching for message with id: null
Filtering: processThisOne
processThisOne
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53486162%2fidempotent-receiver-is-there-any-equivalent-to-filteringmessagelisteneradapter%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can use a condition
on the @StreamListener
; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53486162Application {
public static void main(String args) {
SpringApplication.run(So53486162Application.class, args);
}
@StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
return args -> {
template.send("so53486162", "dontProcessThisOne".getBytes());
template.send("so53486162", "processThisOne".getBytes());
};
}
}
@Component
class Filter {
public boolean shouldProcess(Message<byte> in) {
String string = new String(in.getPayload());
System.out.println("Filtering: " + string);
return !string.equals("dontProcessThisOne");
}
}
and
Filtering: dontProcessThisOne
2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
Cannot find a @StreamListener matching for message with id: null
Filtering: processThisOne
processThisOne
add a comment |
You can use a condition
on the @StreamListener
; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53486162Application {
public static void main(String args) {
SpringApplication.run(So53486162Application.class, args);
}
@StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
return args -> {
template.send("so53486162", "dontProcessThisOne".getBytes());
template.send("so53486162", "processThisOne".getBytes());
};
}
}
@Component
class Filter {
public boolean shouldProcess(Message<byte> in) {
String string = new String(in.getPayload());
System.out.println("Filtering: " + string);
return !string.equals("dontProcessThisOne");
}
}
and
Filtering: dontProcessThisOne
2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
Cannot find a @StreamListener matching for message with id: null
Filtering: processThisOne
processThisOne
add a comment |
You can use a condition
on the @StreamListener
; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53486162Application {
public static void main(String args) {
SpringApplication.run(So53486162Application.class, args);
}
@StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
return args -> {
template.send("so53486162", "dontProcessThisOne".getBytes());
template.send("so53486162", "processThisOne".getBytes());
};
}
}
@Component
class Filter {
public boolean shouldProcess(Message<byte> in) {
String string = new String(in.getPayload());
System.out.println("Filtering: " + string);
return !string.equals("dontProcessThisOne");
}
}
and
Filtering: dontProcessThisOne
2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
Cannot find a @StreamListener matching for message with id: null
Filtering: processThisOne
processThisOne
You can use a condition
on the @StreamListener
; the only problem is the payload is not converted until the condition returns true. So you either need information in headers, or you would need to use native Kafka deserialization.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So53486162Application {
public static void main(String args) {
SpringApplication.run(So53486162Application.class, args);
}
@StreamListener(target = Sink.INPUT, condition="@filter.shouldProcess(#root)")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte, byte> template) {
return args -> {
template.send("so53486162", "dontProcessThisOne".getBytes());
template.send("so53486162", "processThisOne".getBytes());
};
}
}
@Component
class Filter {
public boolean shouldProcess(Message<byte> in) {
String string = new String(in.getPayload());
System.out.println("Filtering: " + string);
return !string.equals("dontProcessThisOne");
}
}
and
Filtering: dontProcessThisOne
2018-11-26 13:06:56.729 WARN 25874 --- [container-0-C-1] .DispatchingStreamListenerMessageHandler :
Cannot find a @StreamListener matching for message with id: null
Filtering: processThisOne
processThisOne
answered Nov 26 '18 at 18:09
Gary RussellGary Russell
81.9k74773
81.9k74773
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53486162%2fidempotent-receiver-is-there-any-equivalent-to-filteringmessagelisteneradapter%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Sharing your research helps everyone. Tell us what you've tried and why it didn’t meet your needs. This demonstrates that you’ve taken the time to try to help yourself, it saves us from reiterating obvious answers, and most of all it helps you get a more specific and relevant answer! See also: How to Ask
– Sean Pianka
Nov 26 '18 at 18:41