Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static class Cluster {
String schemaRegistry;
SchemaRegistryAuth schemaRegistryAuth;
KeystoreConfig schemaRegistrySsl;
String schemaRegistryTopicSubjectSuffix = "-value";

String ksqldbServer;
KsqldbServerAuth ksqldbServerAuth;
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaSrMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLev
.schema(s.getSchema())
.schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
.references(toDto(s.getReferences()))
.topic(s.getTopic())
.compatibilityLevel(s.getCompatibility().toString());
}

Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/io/kafbat/ui/model/KafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class KafkaCluster {
private final PollingSettings pollingSettings;
private final MetricsScraper metricsScrapping;
private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
private final String schemaRegistryTopicSubjectSuffix;
private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
private final ReactiveFailover<KsqlApiClient> ksqlClient;
private final ReactiveFailover<PrometheusClientApi> prometheusStorageClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public KafkaCluster create(ClustersProperties properties,

if (schemaRegistryConfigured(clusterProperties)) {
builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
builder.schemaRegistryTopicSubjectSuffix(clusterProperties.getSchemaRegistryTopicSubjectSuffix());
}
if (connectClientsConfigured(clusterProperties)) {
builder.connectsClients(connectClients(clusterProperties));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.exception.SchemaNotFoundException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import io.kafbat.ui.sr.api.KafkaSrClientApi;
import io.kafbat.ui.sr.model.Compatibility;
import io.kafbat.ui.sr.model.CompatibilityCheckResponse;
Expand All @@ -15,6 +16,7 @@
import io.kafbat.ui.sr.model.SchemaSubject;
import io.kafbat.ui.util.ReactiveFailover;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -31,7 +33,6 @@
@Slf4j
@RequiredArgsConstructor
public class SchemaRegistryService {

private static final String LATEST = "latest";

@AllArgsConstructor
Expand All @@ -40,8 +41,12 @@ public static class SubjectWithCompatibilityLevel {
SchemaSubject subject;
@Getter
Compatibility compatibility;
@Getter
String topic;
}

private final StatisticsCache statisticsCache;

private ReactiveFailover<KafkaSrClientApi> api(KafkaCluster cluster) {
return cluster.getSchemaRegistryClient();
}
Expand Down Expand Up @@ -88,12 +93,22 @@ public Mono<SubjectWithCompatibilityLevel> getLatestSchemaVersionBySubject(Kafka
return getSchemaSubject(cluster, schemaName, LATEST);
}

private String topicName(KafkaCluster cluster, String schemaName) {
return Optional.ofNullable(
statisticsCache.get(cluster)
.getClusterState()
.getTopicStates()
.get(schemaName.replace(cluster.getSchemaRegistryTopicSubjectSuffix(), "")))
.map(ScrapedClusterState.TopicState::name)
.orElse(null);
}

private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
String version) {
return api(cluster)
.mono(c -> c.getSubjectVersion(schemaName, version, false))
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2(), topicName(cluster, schemaName)))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
}

Expand Down
38 changes: 38 additions & 0 deletions api/src/test/java/io/kafbat/ui/SchemaRegistryServiceTests.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package io.kafbat.ui;

import static org.assertj.core.api.Assertions.assertThat;

import io.kafbat.ui.model.CompatibilityLevelDTO;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewSchemaSubjectDTO;
import io.kafbat.ui.model.SchemaReferenceDTO;
import io.kafbat.ui.model.SchemaSubjectDTO;
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
import io.kafbat.ui.model.SchemaTypeDTO;
import io.kafbat.ui.service.ClustersStorage;
import io.kafbat.ui.service.StatisticsService;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.kafka.clients.admin.NewTopic;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openapitools.jackson.nullable.JsonNullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand All @@ -31,6 +39,12 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
WebTestClient webTestClient;
String subject;

@Autowired
private StatisticsService statisticsService;

@Autowired
private ClustersStorage clustersStorage;

@BeforeEach
void setUpBefore() {
this.subject = UUID.randomUUID().toString();
Expand Down Expand Up @@ -342,6 +356,30 @@ void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
.expectStatus().isOk();
}

@Test
void shouldReturnSubjectWithTopicIfTopicExists() {
String topicName = "test-topic" + UUID.randomUUID();
String subjectName = topicName + "-value";
createTopic(new NewTopic(topicName, 1, (short) 1));
createNewSubjectAndAssert(subjectName);
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(LOCAL).get();
// Wait for the topic to be cached
Awaitility.await().until(() ->
statisticsService.updateCache(kafkaCluster)
.map(s ->
s.getClusterState().getTopicStates().containsKey(topicName)
).block()
);
webTestClient
.get()
.uri("/api/clusters/{clusterName}/schemas/{subjetcName}/latest", LOCAL, subjectName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(SchemaSubjectDTO.class)
.value((schemas) -> assertThat(schemas).allMatch(s -> s.getTopic().equals(JsonNullable.of(topicName))));
}

private void createNewSubjectAndAssert(String subject) {
webTestClient
.post()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.openapitools.jackson.nullable.JsonNullable;
import reactor.core.publisher.Mono;

class SchemaRegistryPaginationTest {
Expand All @@ -41,7 +42,8 @@ private void init(List<String> subjects) {
initWithData(subjects.stream().map(s ->
new SubjectWithCompatibilityLevel(
new SchemaSubject().subject(s),
Compatibility.FULL
Compatibility.FULL,
s.contains("-value") ? s.replace("-value", "") : null
)
).toList());
}
Expand Down Expand Up @@ -114,6 +116,20 @@ void shouldListSchemasContaining_1() {
assertThat(schemasSearch7.getBody().getSchemas()).hasSize(20);
}

@Test
void shouldReturnTopic() {
init(
List.of("topic-1-value")
);
var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
null, null, "1", null, null, null, null).block();
assertThat(schemasSearch7).isNotNull();
assertThat(schemasSearch7.getBody()).isNotNull();
assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
assertThat(schemasSearch7.getBody().getSchemas()).hasSize(1);
assertThat(schemasSearch7.getBody().getSchemas().getFirst().getTopic()).isEqualTo(JsonNullable.of("topic-1"));
}

@Test
void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
init(
Expand Down Expand Up @@ -169,7 +185,7 @@ void shouldOrderByAndPaginate() {
.subject("subject" + num)
.schemaType(SchemaType.AVRO)
.id(num),
Compatibility.FULL
Compatibility.FULL, null
)
).toList();

Expand Down
1 change: 1 addition & 0 deletions contract-typespec/api/schemas.tsp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ model SchemaSubject {
compatibilityLevel: string;
schemaType: SchemaType;
references?: SchemaReference[];
topic?: string | null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does string | null make any sense given the field is already marked as nullable (topic?)?

}

model NewSchemaSubject {
Expand Down
Loading