Skip to content

Expose KafkaConsumer in ReceiverOptionsCustomizer.addAssignListener() - parity with KafkaBindingRebalanceListener #379

@mvazquezrius

Description

@mvazquezrius

To be able to access the KafkaConsumer when using receiverOptions.addAssignListener:

Motivation

In spring-cloud-stream/issues/2727
a similar feature was requested to access the endOffsets. This was then implemented in reactor-kafka/issues/344 by @garyrussell .

In my case, instead of needing the endOffsets, I need to check which partitions have commits. The usecase is to only seekToTimestamp for partitions that have no commits.

Maybe we should expose the consumer instance as originally suggested like in KafkaBindingRebalanceListener, to avoid ending up with a facade for all possible custom needs.

Desired solution

A consumer instance is received in an overload of addAssignListener.

@Bean
public ReceiverOptionsCustomizer customizer() {
   return (bindingName, receiverOptions) -> receiverOptions.addAssignListener((partitions, consumer) -> {
     // ...
});

Additional context

spring-cloud/spring-cloud-stream#2727
#344
https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/rebalance_listener.html

Workarround

Right now, I am forced to go with a very bad workaround suing the consumerListener which is cleary not good enough for production.

@Component
public class ReplayStartReceiverOptionsCustomizer {
    private final ConsumerListener consumerListener;
    public ReplayStartReceiverOptionsCustomizer(){
        this.consumerListener = new ConsumerListener();
    }
    @Bean
    public ReceiverOptionsCustomizer<String, String> customizer(){
        return (bindingName, options) -> options
                .consumerListener(this.consumerListener)
                .addAssignListener(partitions -> {
                System.out.println(">>>>> Assign listener");

                Map<String, ReceiverPartition> partitionsByKey = partitions
                        .stream()
                        .collect(Collectors.toMap(
                                Object::toString,
                                Function.identity())
                        );

                List<TopicPartition> topicPartitions = partitions.stream().map(ReceiverPartition::topicPartition).toList();
                getPartitionsWithoutCommits(this.consumerListener.consumer, topicPartitions)
                        .forEach(partition -> {
                            System.out.printf(
                                    "Partition '%s' has no offset. Seeking to start time: '%d'.", partition,
                                    1704720729);
                            partitionsByKey.get(partition.toString()).seekToTimestamp(1704720729);
                        });
            });
    }

    private static Stream<TopicPartition> getPartitionsWithoutCommits(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
        return consumer.committed(new HashSet<>(partitions))
                .entrySet()
                .stream()
                .filter(e -> e.getValue() == null)  // no commits (first time or new partition)
                .map(Map.Entry::getKey);
    }

    static class ConsumerListener implements ReceiverOptions.ConsumerListener {
        public Consumer<?,?> consumer;
        @Override
        public void consumerAdded(String id, Consumer<?, ?> consumer) {
            System.out.println(">>>>> Consumer added");
            System.out.println(id);
            System.out.println(consumer);
            this.consumer = consumer;
        }
        @Override
        public void consumerRemoved(String id, Consumer<?, ?> consumer)
        {
            this.consumer = null;
        }
    }
}

Another alternative would be to add a seekToTimestampIfNoCommits or a flag skipIfHasCommits. However, if we start adding so many options, we are basically replicating the full consumer.

Metadata

Metadata

Assignees

No one assigned

    Labels

    ❓need-triageThis issue needs triage, hasn't been looked at by a team member yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions