Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class KafkaConsumerGroupClientImpl implements KafkaConsumerGroupClient {

private final Supplier<Admin> adminClient;
private static final Logger log = LogManager.getLogger(KafkaConsumerGroupClientImpl.class);

public KafkaConsumerGroupClientImpl(final Supplier<Admin> adminClient) {
this.adminClient = adminClient;
Expand Down Expand Up @@ -83,6 +87,11 @@ public ConsumerGroupSummary describeConsumerGroup(final String group) {
return new ConsumerGroupSummary(results);
} catch (final GroupAuthorizationException e) {
throw new KsqlGroupAuthorizationException(AclOperation.DESCRIBE, group);
} catch (final GroupIdNotFoundException e) {
// Per KIP-1043, Kafka now throws GroupIdNotFoundException if a group doesn't exist.
// Treat this expected scenario as a group with zero consumers by returning an empty summary.
log.debug("Consumer group '{}' not found, treating as empty. Reason: {}", group, e);
return new ConsumerGroupSummary(Collections.emptySet());
} catch (final Exception e) {
throw new KafkaResponseGetFailedException(
"Failed to describe Kafka consumer groups: " + group, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ public void shouldListConsumerGroupsWhenTheyExist() {
verifyListsGroups(group1, ImmutableList.of(group0, group1));
}

// @Test
// public void shouldDescribeConsumerGroup() {
// givenTopicExistsWithData();
// try (KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
// verifyDescribeConsumerGroup(1, group0, ImmutableList.of(c1));
// try (KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
// verifyDescribeConsumerGroup(2, group0, ImmutableList.of(c1, c2));
// }
// }
// }
@Test
public void shouldDescribeConsumerGroup() {
givenTopicExistsWithData();
try (KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
verifyDescribeConsumerGroup(1, group0, ImmutableList.of(c1));
try (KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
verifyDescribeConsumerGroup(2, group0, ImmutableList.of(c1, c2));
}
}
}

@Test
public void shouldListConsumerGroupOffsetsWhenTheyExist() {
Expand Down