Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean matches(String str) {
}
});
assertThat(props).containsKey("kafka.connect.commit-id");
assertThat(props).containsKey("kafka.connect.task-id");
assertThat(props).containsKey("kafka.connect.coordinator-id");
}

protected List<DataFile> dataFiles(TableIdentifier tableIdentifier, String branch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
Expand All @@ -48,6 +47,7 @@ public class CommitterImpl implements Committer {
private KafkaClientFactory clientFactory;
private Collection<MemberDescription> membersWhenWorkerIsCoordinator;
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
private String identifier;
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field 'identifier' should be marked as 'final' since it's only assigned once during initialization and never modified afterward. This makes the immutability explicit and prevents accidental reassignment.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although set once, a final variable needs to be set using constructor which is not the case here.


private void initialize(
Catalog icebergCatalog,
Expand All @@ -58,6 +58,7 @@ private void initialize(
this.config = icebergSinkConfig;
this.context = sinkTaskContext;
this.clientFactory = new KafkaClientFactory(config.kafkaProps());
this.identifier = config.connectorName() + "-" + config.taskId();
}
}

Expand Down Expand Up @@ -92,16 +93,49 @@ boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions)
@VisibleForTesting
boolean containsFirstPartition(
Collection<MemberDescription> members, Collection<TopicPartition> partitions) {
// there should only be one task assigned partition 0 of the first topic,
// so elect that one the leader
TopicPartition firstTopicPartition =
members.stream()
.flatMap(member -> member.assignment().topicPartitions().stream())
.min(new TopicPartitionComparator())
.orElseThrow(
() -> new ConnectException("No partitions assigned, cannot determine leader"));

return partitions.contains(firstTopicPartition);
// Determine the first partition across all members to elect the leader
TopicPartition firstTopicPartition = findFirstTopicPartition(members);

if (firstTopicPartition == null) {
LOG.warn(
"Committer {} found no partitions assigned across all members, cannot determine leader",
identifier);
return false;
}

boolean containsFirst = partitions.contains(firstTopicPartition);
if (containsFirst) {
LOG.info(
"Committer {} contains the first partition {}, this task is the leader",
identifier,
firstTopicPartition);
} else {
LOG.debug(
"Committer {} does not contain the first partition {}, not the leader",
identifier,
firstTopicPartition);
}

return containsFirst;
}

/**
* Finds the first (minimum) topic partition across all consumer group members.
*
* <p>The "first" partition is determined using {@link TopicPartitionComparator}, which orders
* {@link TopicPartition} instances lexicographically by topic name and, for equal topics, by
* ascending partition number.
*
* @param members the collection of consumer group members
* @return the first topic partition according to {@link TopicPartitionComparator}, or null if no
* partitions are assigned
*/
@VisibleForTesting
TopicPartition findFirstTopicPartition(Collection<MemberDescription> members) {
return members.stream()
.flatMap(member -> member.assignment().topicPartitions().stream())
.min(new TopicPartitionComparator())
.orElse(null);
}

@Override
Expand All @@ -122,7 +156,7 @@ public void open(
Collection<TopicPartition> addedPartitions) {
initialize(icebergCatalog, icebergSinkConfig, sinkTaskContext);
if (hasLeaderPartition(addedPartitions)) {
LOG.info("Committer received leader partition. Starting Coordinator.");
LOG.info("Committer {} received leader partition. Starting Coordinator.", identifier);
startCoordinator();
}
}
Expand All @@ -141,31 +175,26 @@ public void close(Collection<TopicPartition> closedPartitions) {

// Defensive: close called without prior initialization (should not happen).
if (!isInitialized.get()) {
LOG.warn("Close unexpectedly called without partition assignment");
LOG.warn(
"Close unexpectedly called on committer {} without partition assignment", identifier);
return;
}

// Empty partitions → task was stopped explicitly. Stop coordinator if running.
if (closedPartitions.isEmpty()) {
LOG.info("Task stopped. Closing coordinator.");
LOG.info("Committer {} stopped. Closing coordinator.", identifier);
stopCoordinator();
return;
}

// Normal close: if leader partition is lost, stop coordinator.
if (hasLeaderPartition(closedPartitions)) {
LOG.info(
"Committer {}-{} lost leader partition. Stopping coordinator.",
config.connectorName(),
config.taskId());
LOG.info("Committer {} lost leader partition. Stopping coordinator.", identifier);
stopCoordinator();
}

// Reset offsets to last committed to avoid data loss.
LOG.info(
"Seeking to last committed offsets for worker {}-{}.",
config.connectorName(),
config.taskId());
LOG.info("Seeking to last committed offsets for worker {}.", identifier);
KafkaUtils.seekToLastCommittedOffsets(context);
}

Expand All @@ -181,9 +210,7 @@ public void save(Collection<SinkRecord> sinkRecords) {
private void processControlEvents() {
if (coordinatorThread != null && coordinatorThread.isTerminated()) {
throw new NotRunningException(
String.format(
"Coordinator unexpectedly terminated on committer %s-%s",
config.connectorName(), config.taskId()));
String.format("Coordinator unexpectedly terminated on committer %s", identifier));
}
if (worker != null) {
worker.process();
Expand All @@ -192,7 +219,7 @@ private void processControlEvents() {

private void startWorker() {
if (null == this.worker) {
LOG.info("Starting commit worker {}-{}", config.connectorName(), config.taskId());
LOG.info("Starting commit worker {}", identifier);
SinkWriter sinkWriter = new SinkWriter(catalog, config);
worker = new Worker(config, clientFactory, sinkWriter, context);
worker.start();
Expand All @@ -201,10 +228,7 @@ private void startWorker() {

private void startCoordinator() {
if (null == this.coordinatorThread) {
LOG.info(
"Task {}-{} elected leader, starting commit coordinator",
config.connectorName(),
config.taskId());
LOG.info("Task {} elected leader, starting commit coordinator", identifier);
Coordinator coordinator =
new Coordinator(catalog, config, membersWhenWorkerIsCoordinator, clientFactory, context);
coordinatorThread = new CoordinatorThread(coordinator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Coordinator extends Channel {
private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id";
private static final String TASK_ID_SNAPSHOT_PROP = "kafka.connect.task-id";
private static final String COORDINATOR_ID_SNAPSHOT_PROP = "kafka.connect.coordinator-id";
private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts";
private static final Duration POLL_DURATION = Duration.ofSeconds(1);

Expand All @@ -81,6 +81,7 @@ class Coordinator extends Channel {
private final ExecutorService exec;
private final CommitState commitState;
private volatile boolean terminated;
private final String coordinatorId;

Coordinator(
Catalog catalog,
Expand Down Expand Up @@ -110,6 +111,7 @@ class Coordinator extends Channel {
.setNameFormat("iceberg-committer" + "-%d")
.build());
this.commitState = new CommitState(config);
this.coordinatorId = config.connectorName() + "-" + config.taskId();
}

void process() {
Expand All @@ -119,7 +121,7 @@ void process() {
Event event =
new Event(config.connectGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.info("Commit {} initiated", commitState.currentCommitId());
LOG.info("Coordinator {} initiated commit {}", coordinatorId, commitState.currentCommitId());
}

consumeAvailable(POLL_DURATION);
Expand Down Expand Up @@ -149,7 +151,11 @@ private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
} catch (Exception e) {
LOG.warn("Commit failed, will try again next cycle", e);
LOG.warn(
"Coordinator {} failed to commit for commit {}, will try again next cycle",
coordinatorId,
commitState.currentCommitId(),
e);
} finally {
commitState.endCurrentCommit();
}
Expand Down Expand Up @@ -179,7 +185,8 @@ private void doCommit(boolean partialCommit) {
send(event);

LOG.info(
"Commit {} complete, committed to {} table(s), valid-through {}",
"Coordinator {} completed commit {}, committed to {} table(s), valid-through {}",
coordinatorId,
commitState.currentCommitId(),
commitMap.size(),
validThroughTs);
Expand Down Expand Up @@ -246,13 +253,13 @@ private void commitToTable(
.collect(Collectors.toList());

if (terminated) {
throw new ConnectException("Coordinator is terminated, commit aborted");
throw new ConnectException(
String.format("Coordinator %s is terminated, commit aborted", coordinatorId));
}

if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
} else {
String taskId = String.format("%s-%s", config.connectorName(), config.taskId());
if (deleteFiles.isEmpty()) {
AppendFiles appendOp =
table.newAppend().validateWith(offsetValidator(tableIdentifier, committedOffsets));
Expand All @@ -261,7 +268,7 @@ private void commitToTable(
}
appendOp.set(snapshotOffsetsProp, offsetsJson);
appendOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
appendOp.set(TASK_ID_SNAPSHOT_PROP, taskId);
appendOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId);
if (validThroughTs != null) {
appendOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
}
Expand All @@ -275,7 +282,7 @@ private void commitToTable(
}
deltaOp.set(snapshotOffsetsProp, offsetsJson);
deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
deltaOp.set(TASK_ID_SNAPSHOT_PROP, taskId);
deltaOp.set(COORDINATOR_ID_SNAPSHOT_PROP, coordinatorId);
if (validThroughTs != null) {
deltaOp.set(VALID_THROUGH_TS_SNAPSHOT_PROP, validThroughTs.toString());
}
Expand All @@ -293,7 +300,8 @@ private void commitToTable(
send(event);

LOG.info(
"Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}",
"Coordinator {} completed commit to table {}, snapshot {}, commit ID {}, valid-through {}",
coordinatorId,
tableIdentifier,
snapshotId,
commitState.currentCommitId(),
Expand Down