Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
39ffe90
[Automated] Update the toml files
gayaldassanayake Feb 6, 2026
5258a66
[Automated] Update the toml files
gayaldassanayake Feb 6, 2026
bfc3a82
[Automated] Update the toml files
gayaldassanayake Feb 6, 2026
1788226
[Automated] Update the toml files
gayaldassanayake Feb 24, 2026
009a737
Add new types
gayaldassanayake Feb 11, 2026
79bba0d
[Automated] Update the toml files
gayaldassanayake Feb 12, 2026
53adf21
Add all types
gayaldassanayake Feb 12, 2026
392e78e
[Automated] Update the toml files
gayaldassanayake Feb 12, 2026
44f3962
Update the types and utils
gayaldassanayake Feb 12, 2026
e1f1e85
Update the comments
gayaldassanayake Feb 12, 2026
6e33973
[Automated] Update the toml files
gayaldassanayake Feb 13, 2026
6af8af7
[Automated] Update the toml files
gayaldassanayake Feb 13, 2026
544f669
[Automated] Update the toml files
gayaldassanayake Feb 13, 2026
c3bd05c
[Automated] Update the toml files
gayaldassanayake Feb 13, 2026
f1e738e
[Automated] Update the toml files
gayaldassanayake Feb 13, 2026
a6fa32a
Add db specific tests
gayaldassanayake Feb 13, 2026
5b41ab3
[Automated] Update the toml files
gayaldassanayake Feb 15, 2026
974a813
Temp commit
gayaldassanayake Feb 16, 2026
9d76edc
Fix issues
gayaldassanayake Feb 17, 2026
c3a8717
[Automated] Update the toml files
gayaldassanayake Feb 19, 2026
3f89b50
Refactor init params
gayaldassanayake Feb 19, 2026
3334699
Fix tests
gayaldassanayake Feb 19, 2026
254b55d
Resolve todos
gayaldassanayake Feb 20, 2026
3fae340
Group retry and wait configs
gayaldassanayake Feb 20, 2026
53409f3
Group configs
gayaldassanayake Feb 20, 2026
7c3c1cb
Allow primitive types for additional properties
gayaldassanayake Feb 23, 2026
2db693a
Refactor the tests
gayaldassanayake Feb 23, 2026
b771031
[Automated] Update the toml files
gayaldassanayake Feb 24, 2026
e0484ce
Remove offset logs at the end of the test suit
gayaldassanayake Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ examples/**/Config.toml
*.env

examples/*/tmp
ballerina/tmp
8 changes: 4 additions & 4 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "cdc"
version = "1.2.1"
version = "1.2.2"
distribution = "2201.12.0"
authors = ["Ballerina"]
repository = "https://github.com/ballerina-platform/module-ballerinax-cdc"
Expand All @@ -15,9 +15,9 @@ graalvmCompatible=true

[[platform.java21.dependency]]
groupId = "io.ballerina.lib.cdc"
artifactId = "cdc-native-1.2.1"
version = "1.2.1"
path = "../native/build/libs/cdc-native-1.2.1.jar"
artifactId = "cdc-native-1.2.2-SNAPSHOT"
version = "1.2.2-SNAPSHOT"
path = "../native/build/libs/cdc-native-1.2.2-SNAPSHOT.jar"

[[platform.java21.dependency]]
groupId = "io.debezium"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "cdc-compiler-plugin"
class = "io.ballerina.lib.cdc.compiler.CdcCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/cdc-compiler-plugin-1.2.1.jar"
path = "../compiler-plugin/build/libs/cdc-compiler-plugin-1.2.2-SNAPSHOT.jar"
34 changes: 30 additions & 4 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ modules = [
[[package]]
org = "ballerina"
name = "data.jsondata"
version = "1.1.0"
version = "1.1.3"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.object"}
Expand All @@ -39,6 +39,21 @@ modules = [
{org = "ballerina", packageName = "data.jsondata", moduleName = "data.jsondata"}
]

[[package]]
org = "ballerina"
name = "file"
version = "1.12.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "os"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerina", packageName = "file", moduleName = "file"}
]

[[package]]
org = "ballerina"
name = "io"
Expand Down Expand Up @@ -158,11 +173,21 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.5.0"
version = "1.5.1"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "os"
version = "1.10.1"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "random"
Expand Down Expand Up @@ -236,10 +261,11 @@ modules = [
[[package]]
org = "ballerinax"
name = "cdc"
version = "1.2.1"
version = "1.2.2"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "data.jsondata"},
{org = "ballerina", name = "file"},
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.runtime"},
Expand Down Expand Up @@ -277,7 +303,7 @@ dependencies = [
[[package]]
org = "ballerinax"
name = "java.jdbc"
version = "1.14.0"
version = "1.14.1"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
Expand Down
10 changes: 8 additions & 2 deletions ballerina/extern_functions.bal
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public isolated function externDetach(Listener cdcListener, Service cdcService)
# Attach point to call the native CDC listener start method.
#
# + cdcListener - the cdc listener object
# + config - the configuration map containing debezium properties
# + debeziumConfigs - the configuration map containing debezium properties
# + listenerConfigs - the configuration map containing listener-specific properties
# + return - an error if the listener cannot be started, or `()` if successful
public isolated function externStart(Listener cdcListener, map<anydata> config) returns Error? = @java:Method {
public isolated function externStart(Listener cdcListener, map<anydata> debeziumConfigs, map<anydata> listenerConfigs) returns Error? = @java:Method {
name: "start",
'class: "io.ballerina.lib.cdc.Listener"
} external;
Expand Down Expand Up @@ -73,3 +74,8 @@ public isolated function externImmediateStop(Listener cdcListener) returns Error
public isolated function isLive(Listener cdc) returns boolean|Error = @java:Method {
'class: "io.ballerina.lib.cdc.Listener"
} external;

public isolated function externGetAdditionalConfigKeys(Options options, typedesc<Options> optionsSubType) returns string[] = @java:Method {
name: "getAdditionalConfigKeys",
'class: "io.ballerina.lib.cdc.utils.Utils"
} external;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com).
// Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
Expand All @@ -16,6 +16,7 @@

import ballerina/crypto;
import ballerina/test;

import ballerinax/kafka;

@test:Config {}
Expand All @@ -37,6 +38,10 @@ function testGetDebeziumProperties() {
"schema.history.internal.kafka.topic": "bal_cdc_internal_schema_history",
"schema.history.internal.producer.security.protocol": "PLAINTEXT",
"schema.history.internal.consumer.security.protocol": "PLAINTEXT",
"schema.history.internal.kafka.recovery.poll.interval.ms": "100",
"schema.history.internal.kafka.recovery.attempts": "100",
"schema.history.internal.kafka.query.timeout.ms": "3",
"schema.history.internal.kafka.create.timeout.ms": "30",
"offset.storage": "org.apache.kafka.connect.storage.KafkaOffsetBackingStore",
"offset.flush.interval.ms": "60000",
"offset.flush.timeout.ms": "5000",
Expand All @@ -49,7 +54,7 @@ function testGetDebeziumProperties() {
"database.query.timeout.ms": "60000"
};

ListenerConfiguration config = {
SampleDBListenerConfiguration config = {
offsetStorage: {
bootstrapServers: ""
},
Expand All @@ -60,7 +65,7 @@ function testGetDebeziumProperties() {

map<string> actualProperties = {};
// Call the function to test
populateDebeziumProperties(config, actualProperties);
populateSampleDBDebeziumProperties(config, actualProperties);

// Validate the returned properties
test:assertEquals(actualProperties, expectedProperties, msg = "Debezium properties do not match the expected values.");
Expand Down Expand Up @@ -116,14 +121,7 @@ function testKafkaOffsetStorageWithSslAuth() {
// Expected properties map with SSL authentication for offset storage
map<string> expectedProperties = {
"name": "ballerina-cdc-connector",
"max.queue.size": "8192",
"max.batch.size": "2048",
"event.processing.failure.handling.mode": "warn",
"snapshot.mode": "initial",
"skipped.operations": "t",
"skip.messages.without.change": "false",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double",
"schema.history.internal": "io.debezium.storage.file.history.FileSchemaHistory",
"topic.prefix": "bal_cdc_schema_history",
"schema.history.internal.file.filename": "tmp/dbhistory.dat",
Expand All @@ -139,8 +137,7 @@ function testKafkaOffsetStorageWithSslAuth() {
"ssl.keystore.password": "keystore-password",
"ssl.truststore.location": "/path/to/truststore.jks",
"ssl.truststore.password": "truststore-password",
"include.schema.changes": "false",
"database.query.timeout.ms": "60000"
"include.schema.changes": "false"
};

ListenerConfiguration config = {
Expand All @@ -154,7 +151,7 @@ function testKafkaOffsetStorageWithSslAuth() {
}
}
},
internalSchemaStorage: {}
internalSchemaStorage: <FileInternalSchemaStorage>{}
};

map<string> actualProperties = {};
Expand All @@ -167,14 +164,7 @@ function testKafkaOffsetStorageWithSaslAuth() {
// Expected properties map with SASL authentication for offset storage
map<string> expectedProperties = {
"name": "ballerina-cdc-connector",
"max.queue.size": "8192",
"max.batch.size": "2048",
"event.processing.failure.handling.mode": "warn",
"snapshot.mode": "initial",
"skipped.operations": "t",
"skip.messages.without.change": "false",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double",
"schema.history.internal": "io.debezium.storage.file.history.FileSchemaHistory",
"topic.prefix": "bal_cdc_schema_history",
"schema.history.internal.file.filename": "tmp/dbhistory.dat",
Expand All @@ -190,8 +180,7 @@ function testKafkaOffsetStorageWithSaslAuth() {
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"pass\";",
"ssl.truststore.location": "/path/to/truststore.jks",
"ssl.truststore.password": "truststore-password",
"include.schema.changes": "false",
"database.query.timeout.ms": "60000"
"include.schema.changes": "false"
};

ListenerConfiguration config = {
Expand All @@ -206,8 +195,7 @@ function testKafkaOffsetStorageWithSaslAuth() {
secureSocket: {
cert: <crypto:TrustStore>{path: "/path/to/truststore.jks", password: "truststore-password"}
}
},
internalSchemaStorage: {}
}
};

map<string> actualProperties = {};
Expand All @@ -220,18 +208,15 @@ function testKafkaSchemaHistoryWithAuth() {
// Expected properties map with SASL authentication for schema history
map<string> expectedProperties = {
"name": "ballerina-cdc-connector",
"max.queue.size": "8192",
"max.batch.size": "2048",
"event.processing.failure.handling.mode": "warn",
"snapshot.mode": "initial",
"skipped.operations": "t",
"skip.messages.without.change": "false",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double",
"schema.history.internal": "io.debezium.storage.kafka.history.KafkaSchemaHistory",
"topic.prefix": "bal_cdc_schema_history",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9093",
"schema.history.internal.kafka.topic": "bal_cdc_internal_schema_history",
"schema.history.internal.kafka.recovery.poll.interval.ms": "100",
"schema.history.internal.kafka.recovery.attempts": "100",
"schema.history.internal.kafka.query.timeout.ms": "3",
"schema.history.internal.kafka.create.timeout.ms": "30",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";",
Expand All @@ -246,12 +231,10 @@ function testKafkaSchemaHistoryWithAuth() {
"offset.flush.interval.ms": "60000",
"offset.flush.timeout.ms": "5000",
"offset.storage.file.filename": "tmp/debezium-offsets.dat",
"include.schema.changes": "false",
"database.query.timeout.ms": "60000"
"include.schema.changes": "false"
};

ListenerConfiguration config = {
offsetStorage: {},
internalSchemaStorage: {
bootstrapServers: "localhost:9093",
securityProtocol: kafka:PROTOCOL_SASL_SSL,
Expand All @@ -276,18 +259,15 @@ function testBothKafkaStoragesWithAuth() {
// Test that both offset storage and schema history can have independent auth configurations
map<string> expectedProperties = {
"name": "ballerina-cdc-connector",
"max.queue.size": "8192",
"max.batch.size": "2048",
"event.processing.failure.handling.mode": "warn",
"snapshot.mode": "initial",
"skipped.operations": "t",
"skip.messages.without.change": "false",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double",
"schema.history.internal": "io.debezium.storage.kafka.history.KafkaSchemaHistory",
"topic.prefix": "bal_cdc_schema_history",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9093",
"schema.history.internal.kafka.topic": "bal_cdc_internal_schema_history",
"schema.history.internal.kafka.recovery.poll.interval.ms": "100",
"schema.history.internal.kafka.recovery.attempts": "100",
"schema.history.internal.kafka.query.timeout.ms": "3",
"schema.history.internal.kafka.create.timeout.ms": "30",
"schema.history.internal.producer.security.protocol": "SSL",
"schema.history.internal.producer.ssl.keystore.location": "/path/to/keystore1.jks",
"schema.history.internal.producer.ssl.keystore.password": "pass1",
Expand All @@ -308,8 +288,7 @@ function testBothKafkaStoragesWithAuth() {
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"offsetuser\" password=\"offsetpass\";",
"include.schema.changes": "false",
"database.query.timeout.ms": "60000"
"include.schema.changes": "false"
};

ListenerConfiguration config = {
Expand Down Expand Up @@ -344,18 +323,15 @@ function testBothKafkaStoragesWithCertKey() {
// Test that both offset storage and schema history can use CertKey independently
map<string> expectedProperties = {
"name": "ballerina-cdc-connector",
"max.queue.size": "8192",
"max.batch.size": "2048",
"event.processing.failure.handling.mode": "warn",
"snapshot.mode": "initial",
"skipped.operations": "t",
"skip.messages.without.change": "false",
"tombstones.on.delete": "false",
"decimal.handling.mode": "double",
"schema.history.internal": "io.debezium.storage.kafka.history.KafkaSchemaHistory",
"topic.prefix": "bal_cdc_schema_history",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9093",
"schema.history.internal.kafka.topic": "bal_cdc_internal_schema_history",
"schema.history.internal.kafka.recovery.poll.interval.ms": "100",
"schema.history.internal.kafka.recovery.attempts": "100",
"schema.history.internal.kafka.query.timeout.ms": "3",
"schema.history.internal.kafka.create.timeout.ms": "30",
"schema.history.internal.producer.security.protocol": "SSL",
"schema.history.internal.producer.ssl.keystore.type": "PEM",
"schema.history.internal.producer.ssl.keystore.certificate.chain": "-----BEGIN CERTIFICATE-----\ntest-cert-content\n-----END CERTIFICATE-----",
Expand All @@ -382,8 +358,7 @@ function testBothKafkaStoragesWithCertKey() {
"ssl.key.password": "offset-key-password",
"ssl.truststore.location": "truststore2.pem",
"ssl.truststore.type": "PEM",
"include.schema.changes": "false",
"database.query.timeout.ms": "60000"
"include.schema.changes": "false"
};

ListenerConfiguration config = {
Expand Down
Loading