-
Notifications
You must be signed in to change notification settings - Fork 238
Description
To be able to access the KafkaConsumer when using receiverOptions.addAssignListener:
Motivation
In spring-cloud-stream/issues/2727
a similar feature was requested to access the endOffsets. This was then implemented in reactor-kafka/issues/344 by @garyrussell .
In my case, instead of needing the endOffsets, I need to check which partitions have commits. The usecase is to only seekToTimestamp for partitions that have no commits.
Maybe we should expose the consumer instance as originally suggested like in KafkaBindingRebalanceListener, to avoid ending up with a facade for all possible custom needs.
Desired solution
A consumer instance is received in an overload of addAssignListener.
@Bean
public ReceiverOptionsCustomizer customizer() {
return (bindingName, receiverOptions) -> receiverOptions.addAssignListener((partitions, consumer) -> {
// ...
});Additional context
spring-cloud/spring-cloud-stream#2727
#344
https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/rebalance_listener.html
Workarround
Right now, I am forced to go with a very bad workaround suing the consumerListener which is cleary not good enough for production.
@Component
public class ReplayStartReceiverOptionsCustomizer {
private final ConsumerListener consumerListener;
public ReplayStartReceiverOptionsCustomizer(){
this.consumerListener = new ConsumerListener();
}
@Bean
public ReceiverOptionsCustomizer<String, String> customizer(){
return (bindingName, options) -> options
.consumerListener(this.consumerListener)
.addAssignListener(partitions -> {
System.out.println(">>>>> Assign listener");
Map<String, ReceiverPartition> partitionsByKey = partitions
.stream()
.collect(Collectors.toMap(
Object::toString,
Function.identity())
);
List<TopicPartition> topicPartitions = partitions.stream().map(ReceiverPartition::topicPartition).toList();
getPartitionsWithoutCommits(this.consumerListener.consumer, topicPartitions)
.forEach(partition -> {
System.out.printf(
"Partition '%s' has no offset. Seeking to start time: '%d'.", partition,
1704720729);
partitionsByKey.get(partition.toString()).seekToTimestamp(1704720729);
});
});
}
private static Stream<TopicPartition> getPartitionsWithoutCommits(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
return consumer.committed(new HashSet<>(partitions))
.entrySet()
.stream()
.filter(e -> e.getValue() == null) // no commits (first time or new partition)
.map(Map.Entry::getKey);
}
static class ConsumerListener implements ReceiverOptions.ConsumerListener {
public Consumer<?,?> consumer;
@Override
public void consumerAdded(String id, Consumer<?, ?> consumer) {
System.out.println(">>>>> Consumer added");
System.out.println(id);
System.out.println(consumer);
this.consumer = consumer;
}
@Override
public void consumerRemoved(String id, Consumer<?, ?> consumer)
{
this.consumer = null;
}
}
}Another alternative would be to add a seekToTimestampIfNoCommits or a flag skipIfHasCommits. However, if we start adding so many options, we are basically replicating the full consumer.