Add kafka auth for offset, schema mgt#22
Add kafka auth for offset, schema mgt#22gayaldassanayake merged 1 commit intoballerina-platform:mainfrom
Conversation
c5fb64e to
dabfd32
Compare
Codecov Report❌ Patch coverage is
❌ Your project status has failed because the head coverage (77.07%) is below the target coverage (80.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #22 +/- ##
============================================
+ Coverage 76.68% 77.07% +0.39%
Complexity 214 214
============================================
Files 29 29
Lines 892 1025 +133
Branches 129 159 +30
============================================
+ Hits 684 790 +106
- Misses 136 163 +27
Partials 72 72 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
The updated type changes # Represents the Kafka-based schema history configuration.
#
# + className - The class name of the Kafka schema history implementation to use
# + topicName - The name of the Kafka topic to store schema history
# + bootstrapServers - The list of Kafka bootstrap servers
# + securityProtocol - Kafka security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL). Defaults to PLAINTEXT
# + auth - SASL authentication credentials (mechanism, username, password). Required for SASL_PLAINTEXT and SASL_SSL
# + secureSocket - SSL/TLS configuration with truststore and optional keystore for mutual TLS. Supports both JKS/PKCS12 truststores and PEM certificates
public type KafkaInternalSchemaStorage record {|
*SchemaHistoryInternal;
string className = "io.debezium.storage.kafka.history.KafkaSchemaHistory";
string topicName = "bal_cdc_internal_schema_history";
string|string[] bootstrapServers;
kafka:SecurityProtocol securityProtocol = kafka:PROTOCOL_PLAINTEXT; // NEW
kafka:AuthenticationConfiguration auth?; // NEW
kafka:SecureSocket secureSocket?; // NEW
|};
# Represents the Kafka-based offset storage configuration.
#
# + className - The class name of the Kafka offset storage implementation to use
# + bootstrapServers - The list of Kafka bootstrap servers
# + topicName - The name of the Kafka topic to store offsets
# + partitions - The number of partitions for the Kafka topic
# + replicationFactor - The replication factor for the Kafka topic
# + securityProtocol - Kafka security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL). Defaults to PLAINTEXT
# + auth - SASL authentication credentials (mechanism, username, password). Required for SASL_PLAINTEXT and SASL_SSL
# + secureSocket - SSL/TLS configuration with truststore and optional keystore for mutual TLS
public type KafkaOffsetStorage record {|
*OffsetStorage;
string className = "org.apache.kafka.connect.storage.KafkaOffsetBackingStore";
string|string[] bootstrapServers;
string topicName = "bal_cdc_offsets";
int partitions = 1;
int replicationFactor = 2;
kafka:SecurityProtocol securityProtocol = kafka:PROTOCOL_PLAINTEXT; // NEW
kafka:AuthenticationConfiguration auth?; // NEW
kafka:SecureSocket secureSocket?; // NEW
|}; |
There was a problem hiding this comment.
Pull request overview
This PR adds Kafka authentication support for both offset storage and schema history management in the CDC connector. It introduces security configurations (SSL/TLS, SASL) to enable secure communication with Kafka brokers for CDC internal storage needs, addressing issue #8572.
Changes:
- Added Kafka authentication and SSL/TLS configuration support for offset storage and schema history management
- Enhanced offset commit handling by explicitly marking records as processed and marking batch completion
- Updated multiple dependencies including Kafka clients (3.9.0→3.9.1), stdlib versions, and added new Kafka, Avro, and Confluent libraries
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| native/src/main/java/io/ballerina/lib/cdc/BalChangeConsumer.java | Added explicit offset commit calls (markProcessed/markBatchFinished) and InterruptedException declaration |
| gradle.properties | Updated dependency versions and added new Kafka, Avro, UUID, and Confluent library versions |
| build.gradle | Added dependencies for Kafka, Avro, UUID, and Confluent libraries |
| ballerina/utils.bal | Added authentication configuration logic for Kafka offset storage and schema history including SASL, SSL/TLS support |
| ballerina/types.bal | Extended KafkaOffsetStorage and KafkaInternalSchemaStorage types with security protocol, auth, and secureSocket fields |
| ballerina/tests/utils_test.bal | Added comprehensive tests for SSL and SASL authentication configurations |
| ballerina/Dependencies.toml | Updated dependency versions and added Kafka, Avro, Confluent dependencies |
| ballerina/CompilerPlugin.toml | Updated version reference to 1.1.0-SNAPSHOT |
| ballerina/Ballerina.toml | Updated version to 1.1.0 and Kafka client versions to 3.9.1 |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
31ee230 to
9352517
Compare
9352517 to
d0b287e
Compare
6fa62e3
into
ballerina-platform:main
Purpose
A part of ballerina-platform/ballerina-library#8572
Examples
Checklist