Do not fail if no partitions assigned#15208
Do not fail if no partitions assigned#15208kumarpritam863 wants to merge 12 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses an issue where the consumer group rebalance could cause invalid calls when no partitions are assigned. Instead of throwing an exception when no partitions are found, the code now returns false and logs a warning, allowing the system to wait for the final rebalance to complete.
Changes:
- Renamed
TASK_ID_SNAPSHOT_PROPtoCOORDINATOR_ID_SNAPSHOT_PROPfor more accurate terminology - Enhanced logging throughout to include the coordinator/committer identifier for better traceability
- Modified partition assignment check to return false instead of throwing exception when no partitions are assigned
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| Coordinator.java | Renamed task ID constant to coordinator ID, added coordinatorId field, and updated all log statements to include the coordinator identifier |
| CommitterImpl.java | Extracted partition finding logic into separate method that returns null instead of throwing exception, added identifier field, and updated all log statements to use the identifier |
| IntegrationTestBase.java | Updated test assertion to check for the renamed coordinator-id property instead of task-id |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private KafkaClientFactory clientFactory; | ||
| private Collection<MemberDescription> membersWhenWorkerIsCoordinator; | ||
| private final AtomicBoolean isInitialized = new AtomicBoolean(false); | ||
| private String identifier; |
There was a problem hiding this comment.
The field 'identifier' should be marked as 'final' since it's only assigned once during initialization and never modified afterward. This makes the immutability explicit and prevents accidental reassignment.
There was a problem hiding this comment.
Although set once, a final variable needs to be set using constructor which is not the case here.
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
Outdated
Show resolved
Hide resolved
…onnect/channel/CommitterImpl.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This #14395 removed the client side stability check of the consumer group (which is rightly done). But since we have removed the check there will be more instances of invalid calls (class when rebalance is going on or happening frequently) and we should wait for the final rebalance to happen and simply return false.