-
Notifications
You must be signed in to change notification settings - Fork 238
Open
Description
Expected Behavior
The pausing the topic consumption works fine but the receiver is not resumed. I am expecting the receiver to resume listening to the events from the topic work upon calling called consumer.resume(consumer.assignment());
Actual Behavior
It is throwing an error You must call one of receive*() methods before using doOnConsumer"
Steps to Reproduce
public <T> void applyCircuitBreaker(CircuitBreaker circuitBreaker, KafkaReceiver<T, T> kafkaReceiver) {
circuitBreaker.getEventPublisher().onStateTransition((event) -> {
if (event.getStateTransition() == StateTransition.CLOSED_TO_OPEN) {
this.pauseReceiver(kafkaReceiver);
} else if (event.getStateTransition() == StateTransition.OPEN_TO_HALF_OPEN || event.getStateTransition() == StateTransition.HALF_OPEN_TO_CLOSED) {
this.resumeReceiver(kafkaReceiver);
}
});
}
private <T> void pauseReceiver(KafkaReceiver<T, T> kafkaReceiver) {
kafkaReceiver.doOnConsumer((consumer) -> {
consumer.pause(consumer.assignment());
return consumer;
}).doOnSuccess((success) -> {
log.info("Successful pause {}", success.paused());
}).doOnError((e) -> {
log.error("Error in pausing", e);
}).subscribe();
}
private <T> void resumeReceiver(KafkaReceiver<T, T> kafkaReceiver) {
kafkaReceiver.doOnConsumer((consumer) -> {
consumer.resume(consumer.assignment());
return consumer;
}).doOnSuccess((success) -> {
log.info("Successfully resumed kafka receiver {}", success.assignment());
}).doOnError((e) -> {
log.error("Error in resuming", e);
}).subscribe();
log.info("resumeReceiver: Kafka receiver resumed.");
}Possible Solution
Your Environment
- Reactor version(s) used:
- Other relevant libraries versions (eg.
netty, ...): - JVM version (
java -version): - OS and version (eg
uname -a):
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels