Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4b32bb0
[Automated] Update the toml files
ayeshLK Feb 10, 2026
3f78a32
[Automated] Update the toml files
ayeshLK Feb 10, 2026
d07b39e
Add basic implementation for CDC liveness check
ayeshLK Feb 10, 2026
1af8ca8
Resolve merge conflitct
ayeshLK Feb 10, 2026
82dc401
Add newline to the EOF
ayeshLK Feb 10, 2026
172d5d8
Add default liveness-interval to the cdc listener
ayeshLK Feb 10, 2026
60860e9
Fix review comments
ayeshLK Feb 10, 2026
706ca06
[Automated] Update the toml files
ayeshLK Feb 10, 2026
c9d119d
Fix event consumer time-stamp update logic for thread-safety
ayeshLK Feb 10, 2026
cdda861
[Automated] Update the toml files
ayeshLK Feb 13, 2026
de7b854
[Automated] Update the toml files
ayeshLK Feb 13, 2026
4c94c04
[Automated] Update the toml files
ayeshLK Feb 13, 2026
2846906
[Automated] Update the toml files
ayeshLK Feb 13, 2026
a9b5d3c
[Automated] Update the toml files
ayeshLK Feb 13, 2026
0adc17b
[Automated] Update the toml files
ayeshLK Feb 13, 2026
10e43bd
Add test cases for liveness check
ayeshLK Feb 13, 2026
e1c1539
Fix implementation issues in the liveness check
ayeshLK Feb 13, 2026
b9d007a
[Automated] Update the toml files
ayeshLK Feb 13, 2026
35697e0
Update the package spec
ayeshLK Feb 13, 2026
b83ff70
Update changelog
ayeshLK Feb 13, 2026
be8da3e
Update dependencies toml
ayeshLK Feb 13, 2026
9e9110b
[Automated] Update the toml files
ayeshLK Feb 13, 2026
25efdc2
Update dependencies
ayeshLK Feb 13, 2026
41a6a26
Update sample code in the spec
ayeshLK Feb 13, 2026
839f938
[Automated] Update the toml files
ayeshLK Feb 13, 2026
083532f
[Automated] Update the toml files
ayeshLK Feb 13, 2026
1194337
[Automated] Update the toml files
ayeshLK Feb 13, 2026
ba705ee
Revert "[Automated] Update the toml files"
ayeshLK Feb 13, 2026
a36fcc0
Skip adding custom configurations into the debezium configs
ayeshLK Feb 13, 2026
c91a6a7
Update dependencies
ayeshLK Feb 13, 2026
32eac11
Update package version
ayeshLK Feb 13, 2026
c91b148
[Automated] Update the toml files
ayeshLK Feb 13, 2026
23ca6d3
Update package version
ayeshLK Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "cdc"
version = "1.1.0"
version = "1.1.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to bump this to 1.2.0?

distribution = "2201.12.0"
authors = ["Ballerina"]
repository = "https://github.com/ballerina-platform/module-ballerinax-cdc"
Expand All @@ -15,9 +15,9 @@ graalvmCompatible=true

[[platform.java21.dependency]]
groupId = "io.ballerina.lib.cdc"
artifactId = "cdc-native-1.1.0"
version = "1.1.0"
path = "../native/build/libs/cdc-native-1.1.0.jar"
artifactId = "cdc-native-1.1.1-SNAPSHOT"
version = "1.1.1-SNAPSHOT"
path = "../native/build/libs/cdc-native-1.1.1-SNAPSHOT.jar"

[[platform.java21.dependency]]
groupId = "io.debezium"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "cdc-compiler-plugin"
class = "io.ballerina.lib.cdc.compiler.CdcCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/cdc-compiler-plugin-1.1.0.jar"
path = "../compiler-plugin/build/libs/cdc-compiler-plugin-1.1.1-SNAPSHOT.jar"
12 changes: 11 additions & 1 deletion ballerina/extern_functions.bal
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public isolated function externDetach(Listener cdcListener, Service cdcService)
# + cdcListener - the cdc listener object
# + config - the configuration map containing debezium properties
# + return - an error if the listener cannot be started, or `()` if successful
public isolated function externStart(Listener cdcListener, map<string> config) returns Error? = @java:Method {
public isolated function externStart(Listener cdcListener, map<anydata> config) returns Error? = @java:Method {
name: "start",
'class: "io.ballerina.lib.cdc.Listener"
} external;
Expand All @@ -63,3 +63,13 @@ public isolated function externImmediateStop(Listener cdcListener) returns Error
name: "immediateStop",
'class: "io.ballerina.lib.cdc.Listener"
} external;


# Checks whether the given CDC listener is live.
#
# + cdc - The CDC listener instance to be checked
# + return - Returns `true` if the listener is considered live, `false` otherwise,
# or an error if the liveness check fail
public function isLive(Listener cdc) returns boolean|Error = @java:Method {
'class: "io.ballerina.lib.cdc.Listener"
} external;
85 changes: 85 additions & 0 deletions ballerina/tests/dynamic_attachment.bal
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// under the License.

import ballerina/test;
import ballerina/lang.runtime;

Service testService = service object {
remote function onCreate(record {} after, string tableName = "") returns error? {
Expand Down Expand Up @@ -131,3 +132,87 @@ function testDetachAfterStart() returns error? {
"Cannot detach a CDC service from the listener once it is running.");
check mysqlListener.gracefulStop();
}

@test:Config {
groups: ["liveness"]
}
function testLivenessBeforeListenerStart() returns error? {
MockListener mysqlListener = new ({
database: {
username,
password,
port
},
options: {
snapshotMode: NO_DATA
}
});
check mysqlListener.attach(testService);
boolean liveness = check isLive(mysqlListener);
test:assertFalse(liveness, "Liveness check passes even before listener starts");
}

@test:Config {
groups: ["liveness"]
}
function testLivenessWithStartedListener() returns error? {
MockListener mysqlListener = new ({
database: {
username,
password,
port
},
options: {
snapshotMode: NO_DATA
}
});
check mysqlListener.attach(testService);
check mysqlListener.'start();
boolean liveness = check isLive(mysqlListener);
test:assertTrue(liveness, "Liveness fails for a started listener");
check mysqlListener.gracefulStop();
}

@test:Config {
groups: ["liveness"]
}
function testLivenessAfterListenerStop() returns error? {
MockListener mysqlListener = new ({
database: {
username,
password,
port
},
options: {
snapshotMode: NO_DATA
}
});
check mysqlListener.attach(testService);
check mysqlListener.'start();
check mysqlListener.gracefulStop();
boolean liveness = check isLive(mysqlListener);
test:assertFalse(liveness, "Liveness check passes after the listener has stopped");
}

@test:Config {
groups: ["liveness"]
}
function testLivenessWithoutReceivingEvents() returns error? {
MockListener mysqlListener = new ({
database: {
username,
password,
port
},
options: {
snapshotMode: NO_DATA
},
livenessInterval: 5.0
});
check mysqlListener.attach(testService);
check mysqlListener.'start();
runtime:sleep(10);
boolean liveness = check isLive(mysqlListener);
test:assertFalse(liveness, "Liveness check passes even after not receiving events within the liveness interval");
check mysqlListener.gracefulStop();
}
16 changes: 10 additions & 6 deletions ballerina/tests/mock_listener.bal
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import ballerina/random;
public isolated class MockListener {
*Listener;

private final map<string> & readonly config;
private final map<anydata> & readonly config;
private boolean isStarted = false;
private boolean hasAttachedService = false;

# Initializes the MySQL listener with the given configuration.
#
# + config - The configuration for the MySQL connector
public isolated function init(*MySqlListenerConfiguration config) {
map<string> configMap = {};
map<string> debeziumConfigs = {};
populateDebeziumProperties({
engineName: config.engineName,
offsetStorage: config.offsetStorage,
internalSchemaStorage: config.internalSchemaStorage,
options: config.options
}, configMap);
}, debeziumConfigs);
populateDatabaseConfigurations({
connectorClass: config.database.connectorClass,
hostname: config.database.hostname,
Expand All @@ -47,9 +47,13 @@ public isolated class MockListener {
excludedTables: config.database.excludedTables,
includedColumns: config.database.includedColumns,
excludedColumns: config.database.excludedColumns
}, configMap);
configMap["database.server.id"] = "100000";
self.config = configMap.cloneReadOnly();
}, debeziumConfigs);
debeziumConfigs["database.server.id"] = "100000";
map<anydata> listenerConfigs = {
...debeziumConfigs
};
listenerConfigs["livenessInterval"] = config.livenessInterval;
self.config = listenerConfigs.cloneReadOnly();
}

# Attaches a CDC service to the MySQL listener.
Expand Down
2 changes: 2 additions & 0 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,11 @@ public type Options record {|
# + internalSchemaStorage - The internal schema history configuration
# + offsetStorage - The offset storage configuration
# + options - The additional options for the CDC engine
# + livenessInterval - Time interval (in seconds) used to evaluate the liveness of the CDC listener
public type ListenerConfiguration record {|
string engineName = "ballerina-cdc-connector";
FileInternalSchemaStorage|KafkaInternalSchemaStorage internalSchemaStorage = {};
FileOffsetStorage|KafkaOffsetStorage offsetStorage = {};
Options options = {};
decimal livenessInterval = 60.0;
|};
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ This file contains all the notable changes done to the Ballerina `cdc` package t
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [Unreleased]

### Added
- [Introduce a mechanism to detect whether the CDC engine is running](https://github.com/ballerina-platform/ballerina-library/issues/8589)

## [1.1.0]

### Added
Expand Down
34 changes: 34 additions & 0 deletions docs/spec/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The conforming implementation of the specification is released and included in t
- [2.2.4.5 `onTruncate`](#2245-ontruncate)
- [2.2.4.6 `onError`](#2246-onerror)
- [2.2.5 Service Configuration](#225-service-configuration)
- [2.2 Using health checks](#23-using-health-checks)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the section numbering in the table of contents.

The link text says "2.2 Using health checks" but it should be "2.3" to match the actual section number.

📝 Proposed fix
-    - [2.2 Using health checks](`#23-using-health-checks`) 
+    - [2.3 Using health checks](`#23-using-health-checks`) 
🤖 Prompt for AI Agents
In `@docs/spec/spec.md` at line 43, Update the table-of-contents entry that
currently reads "- [2.2 Using health checks](`#23-using-health-checks`)" so the
visible link text matches the actual section number; change "2.2" to "2.3" in
the TOC line (the string "- [2.2 Using health
checks](`#23-using-health-checks`)"), leaving the anchor unchanged.

- [3. Errors](#3-errors)
- [3.1 Service Error Handling](#31-service-error-handling)
- [Example: `onCreate` Throwing an Error](#example-oncreate-throwing-an-error)
Expand Down Expand Up @@ -289,6 +290,39 @@ remote function onError(cdc:Error e) {

The `cdc:ServiceConfig` annotation can be used to provide additional configurations to the CDC service. These configurations are described in the [Service Configuration](#41-service-config) section.

### 2.3 Using health checks

In a distributed deployment, health checks are essential for monitoring the state of the CDC listener and enabling automated recovery mechanisms. If the listener becomes unhealthy, the orchestrator (e.g., Kubernetes) can restart the service based on the reported health status.

To support this, the `isLive` function is provided to determine the liveness of a given CDC listener instance.

```ballerina
# Checks whether the given CDC listener is live.
#
# + cdc - The CDC listener instance to be checked
# + return - Returns `true` if the listener is considered live, `false` otherwise,
# or an error if the liveness check fails
public function isLive(cdc: Listener cdc) returns boolean|Error;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Syntax error in function signature.

The parameter type has an incorrect space: cdc: Listener cdc should be cdc:Listener cdc (no space after the colon in the module-qualified type).

📝 Proposed fix
-public function isLive(cdc: Listener cdc) returns boolean|Error;
+public function isLive(cdc:Listener cdc) returns boolean|Error;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public function isLive(cdc: Listener cdc) returns boolean|Error;
public function isLive(cdc:Listener cdc) returns boolean|Error;
🤖 Prompt for AI Agents
In `@docs/spec/spec.md` at line 305, The function signature for isLive has a
syntax error in the parameter type declaration; update the parameter from "cdc:
Listener cdc" to remove the space after the colon so it reads "cdc:Listener cdc"
in the isLive declaration to match the module-qualified type syntax.

```

The following example demonstrates how the `isLive` function can be integrated with an HTTP-based liveness probe:

```ballerina
listener cdc:Listener mssqlCdc = ...;

// HTTP-based liveness probe
service on new http:Listener(...) {

resource function get liveness() returns http:Ok|http:ServiceUnavailable|error {
boolean isLive = check cdc:isLive(mssqlCdc);
if isLive {
return http:OK;
}
return http:SERVICE_UNAVAILABLE;
}
}
```

## 3. Errors

### 3.1 Service Error Handling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static io.ballerina.lib.cdc.utils.Constants.ALLOW_DATA_PROJECTION;
import static io.ballerina.lib.cdc.utils.Constants.BallerinaErrors.EVENT_PROCESSING_ERROR;
Expand All @@ -68,6 +71,7 @@ public class BalChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEv
private final boolean isSingleServiceAttached;
private final Service singleService;
private final Runtime runtime;
private final AtomicReference<Instant> lastEventReceivedTime = new AtomicReference<>();

public BalChangeConsumer(Map<String, Service> serviceMap, Runtime runtime) {
this.serviceMap = new HashMap<>(serviceMap);
Expand All @@ -85,6 +89,9 @@ public BalChangeConsumer(Map<String, Service> serviceMap, Runtime runtime) {
public void handleBatch(List<ChangeEvent<String, String>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer)
throws InterruptedException {
if (!records.isEmpty()) {
updateLastEventReceivedTime();
}
for (ChangeEvent<String, String> record : records) {
Service selectedService = null;
try {
Expand Down Expand Up @@ -221,4 +228,12 @@ private BError createMethodNotFoundError(Payload payload, String methodName) {
return createError(EVENT_PROCESSING_ERROR, "Function '" + methodName + "' is not available.",
null, ValueCreator.createRecordValue(getModule(), EVENT_PROCESSING_ERROR_DETAIL, detail));
}

private void updateLastEventReceivedTime() {
lastEventReceivedTime.set(Instant.now());
}

public Optional<Instant> getLastEventReceivedTime() {
return Optional.ofNullable(lastEventReceivedTime.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2026, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.cdc;

import io.debezium.engine.DebeziumEngine;

import java.io.PrintStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class contains the logic to be executed when the CDC engine shuts-down gracefully.
*/
public class CdcCompletionCallback implements DebeziumEngine.CompletionCallback {
private static final PrintStream ERR_OUT = System.err;

private final AtomicBoolean invoked = new AtomicBoolean(false);

@Override
public void handle(boolean success, String message, Throwable error) {
invoked.set(true);
if (success) {
return;
}

String errorMsg = "Debezium engine terminated unexpectedly: " + message;
if (Objects.nonNull(error)) {
errorMsg = errorMsg + ": " + error.getMessage();
}
ERR_OUT.println(errorMsg);
}

public boolean isInvoked() {
return invoked.get();
}
}
Loading