Skip to content

Resume Not working #411

@earthspiligrim

Description

@earthspiligrim

Expected Behavior

The pausing the topic consumption works fine but the receiver is not resumed. I am expecting the receiver to resume listening to the events from the topic work upon calling called consumer.resume(consumer.assignment());

Actual Behavior

It is throwing an error You must call one of receive*() methods before using doOnConsumer"

Steps to Reproduce

public <T> void applyCircuitBreaker(CircuitBreaker circuitBreaker, KafkaReceiver<T, T> kafkaReceiver) {
        circuitBreaker.getEventPublisher().onStateTransition((event) -> {
            if (event.getStateTransition() == StateTransition.CLOSED_TO_OPEN) {
                this.pauseReceiver(kafkaReceiver);
            } else if (event.getStateTransition() == StateTransition.OPEN_TO_HALF_OPEN || event.getStateTransition() == StateTransition.HALF_OPEN_TO_CLOSED) {
                this.resumeReceiver(kafkaReceiver);
            }

        });
    }

    private <T> void pauseReceiver(KafkaReceiver<T, T> kafkaReceiver) {
        kafkaReceiver.doOnConsumer((consumer) -> {
            consumer.pause(consumer.assignment());
            return consumer;
        }).doOnSuccess((success) -> {
            log.info("Successful pause {}", success.paused());
        }).doOnError((e) -> {
            log.error("Error in pausing", e);
        }).subscribe();
    }

    private <T> void resumeReceiver(KafkaReceiver<T, T> kafkaReceiver) {
        kafkaReceiver.doOnConsumer((consumer) -> {
            consumer.resume(consumer.assignment());
            return consumer;
        }).doOnSuccess((success) -> {
            log.info("Successfully resumed kafka receiver {}", success.assignment());
        }).doOnError((e) -> {
            log.error("Error in resuming", e);
        }).subscribe();
        log.info("resumeReceiver: Kafka receiver resumed.");
    }

Possible Solution

Your Environment

  • Reactor version(s) used:
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version):
  • OS and version (eg uname -a):

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions