Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4b32bb0
[Automated] Update the toml files
ayeshLK Feb 10, 2026
3f78a32
[Automated] Update the toml files
ayeshLK Feb 10, 2026
d07b39e
Add basic implementation for CDC liveness check
ayeshLK Feb 10, 2026
1af8ca8
Resolve merge conflitct
ayeshLK Feb 10, 2026
82dc401
Add newline to the EOF
ayeshLK Feb 10, 2026
172d5d8
Add default liveness-interval to the cdc listener
ayeshLK Feb 10, 2026
60860e9
Fix review comments
ayeshLK Feb 10, 2026
706ca06
[Automated] Update the toml files
ayeshLK Feb 10, 2026
c9d119d
Fix event consumer time-stamp update logic for thread-safety
ayeshLK Feb 10, 2026
cdda861
[Automated] Update the toml files
ayeshLK Feb 13, 2026
de7b854
[Automated] Update the toml files
ayeshLK Feb 13, 2026
4c94c04
[Automated] Update the toml files
ayeshLK Feb 13, 2026
2846906
[Automated] Update the toml files
ayeshLK Feb 13, 2026
a9b5d3c
[Automated] Update the toml files
ayeshLK Feb 13, 2026
0adc17b
[Automated] Update the toml files
ayeshLK Feb 13, 2026
10e43bd
Add test cases for liveness check
ayeshLK Feb 13, 2026
e1c1539
Fix implementation issues in the liveness check
ayeshLK Feb 13, 2026
b9d007a
[Automated] Update the toml files
ayeshLK Feb 13, 2026
35697e0
Update the package spec
ayeshLK Feb 13, 2026
b83ff70
Update changelog
ayeshLK Feb 13, 2026
be8da3e
Update dependencies toml
ayeshLK Feb 13, 2026
9e9110b
[Automated] Update the toml files
ayeshLK Feb 13, 2026
25efdc2
Update dependencies
ayeshLK Feb 13, 2026
41a6a26
Update sample code in the spec
ayeshLK Feb 13, 2026
839f938
[Automated] Update the toml files
ayeshLK Feb 13, 2026
083532f
[Automated] Update the toml files
ayeshLK Feb 13, 2026
1194337
[Automated] Update the toml files
ayeshLK Feb 13, 2026
ba705ee
Revert "[Automated] Update the toml files"
ayeshLK Feb 13, 2026
a36fcc0
Skip adding custom configurations into the debezium configs
ayeshLK Feb 13, 2026
c91a6a7
Update dependencies
ayeshLK Feb 13, 2026
32eac11
Update package version
ayeshLK Feb 13, 2026
c91b148
[Automated] Update the toml files
ayeshLK Feb 13, 2026
23ca6d3
Update package version
ayeshLK Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ballerina/extern_functions.bal
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,13 @@ public isolated function externImmediateStop(Listener cdcListener) returns Error
name: "immediateStop",
'class: "io.ballerina.lib.cdc.Listener"
} external;


# Checks whether the given CDC listener is live.
#
# + cdc - The CDC listener instance to be checked
# + return - Returns `true` if the listener is considered live, `false` otherwise,
# or an error if the liveness check fail
public function isLive(Listener cdc) returns boolean|Error = @java:Method {
'class: "io.ballerina.lib.cdc.Listener"
} external;
2 changes: 2 additions & 0 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,11 @@ public type Options record {|
# + internalSchemaStorage - The internal schema history configuration
# + offsetStorage - The offset storage configuration
# + options - The additional options for the CDC engine
# + livenessInterval - Time interval (in seconds) used to evaluate the liveness of the CDC listener
public type ListenerConfiguration record {|
string engineName = "ballerina-cdc-connector";
FileInternalSchemaStorage|KafkaInternalSchemaStorage internalSchemaStorage = {};
FileOffsetStorage|KafkaOffsetStorage offsetStorage = {};
Options options = {};
decimal livenessInterval = 60.0;
|};
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.ballerina.lib.cdc.utils.Constants.ALLOW_DATA_PROJECTION;
import static io.ballerina.lib.cdc.utils.Constants.BallerinaErrors.EVENT_PROCESSING_ERROR;
Expand All @@ -68,6 +70,7 @@ public class BalChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEv
private final boolean isSingleServiceAttached;
private final Service singleService;
private final Runtime runtime;
private Instant lastEventReceivedTime;

public BalChangeConsumer(Map<String, Service> serviceMap, Runtime runtime) {
this.serviceMap = new HashMap<>(serviceMap);
Expand All @@ -85,6 +88,9 @@ public BalChangeConsumer(Map<String, Service> serviceMap, Runtime runtime) {
public void handleBatch(List<ChangeEvent<String, String>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer)
throws InterruptedException {
if (!records.isEmpty()) {
updateLastEventReceivedTime();
}
for (ChangeEvent<String, String> record : records) {
Service selectedService = null;
try {
Expand Down Expand Up @@ -221,4 +227,12 @@ private BError createMethodNotFoundError(Payload payload, String methodName) {
return createError(EVENT_PROCESSING_ERROR, "Function '" + methodName + "' is not available.",
null, ValueCreator.createRecordValue(getModule(), EVENT_PROCESSING_ERROR_DETAIL, detail));
}

private void updateLastEventReceivedTime() {
lastEventReceivedTime = Instant.now();
}

public Optional<Instant> getLastEventReceivedTime() {
return Optional.ofNullable(lastEventReceivedTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2026, WSO2 LLC. (http://www.wso2.org).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.ballerina.lib.cdc;

import io.debezium.engine.DebeziumEngine;

import java.io.PrintStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class contains the logic to be executed when the CDC engine shuts-down gracefully.
*/
public class CdcCompletionCallback implements DebeziumEngine.CompletionCallback {
private static final PrintStream ERR_OUT = System.err;

private final AtomicBoolean invoked = new AtomicBoolean(false);

@Override
public void handle(boolean success, String message, Throwable error) {
invoked.set(true);
if (success) {
return;
}

String errorMsg = "Debezium engine terminated unexpectedly: " + message;
if (Objects.nonNull(error)) {
errorMsg = errorMsg + ": " + error.getMessage();
}
ERR_OUT.println(errorMsg);
}

public boolean isInvoked() {
return invoked.get();
}
}
71 changes: 58 additions & 13 deletions native/src/main/java/io/ballerina/lib/cdc/Listener.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.utils.TypeUtils;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BDecimal;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BObject;
import io.ballerina.runtime.api.values.BString;
Expand All @@ -34,7 +35,11 @@
import io.debezium.engine.format.Json;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -60,6 +65,10 @@ public class Listener {
public static final String TABLE_TO_SERVICE_MAP_KEY = "TABLE_TO_SERVICE_MAP";
public static final String DEBEZIUM_ENGINE_KEY = "DEB_ENGINE";
public static final String EXECUTOR_SERVICE_KEY = "ExecutorService";
public static final String CHANGE_CONSUMER_KEY = "ChangeConsumer";
public static final String COMP_CALLBACK_KEY = "CompletionCallback";
public static final String LIVENESS_INTERVAL_KEY = "LivenessInterval";
public static final BString LIVENSS_INTERVAL_CONFIG_KEY = StringUtils.fromString("livenessInterval");
public static final String IS_STARTED_KEY = "isStarted";
public static final String HAS_ATTACHED_SERVICE_KEY = "hasAttachedService";
public static final String LISTENER_ID = "Id";
Expand Down Expand Up @@ -152,15 +161,21 @@ public static Object start(Environment environment, BObject listener, BMap<BStri
}

Properties engineProperties = populateEngineProperties(config);
Long livenessInterval = ((BDecimal) config.get(LIVENSS_INTERVAL_CONFIG_KEY))
.decimalValue()
.multiply(BigDecimal.valueOf(1000))
.longValue();
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, Service> serviceMap = (ConcurrentHashMap<String, Service>) listener
.getNativeData(TABLE_TO_SERVICE_MAP_KEY);

CompletableFuture<EngineResult> comFuture = new CompletableFuture<>();
CdcCompletionCallback completionCallback = new CdcCompletionCallback();
BalChangeConsumer changeConsumer = new BalChangeConsumer(serviceMap, environment.getRuntime());
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
DebeziumEngine<ChangeEvent<String, String>> engine = create(Json.class)
.using(engineProperties)
.notifying(new BalChangeConsumer(serviceMap, environment.getRuntime()))
.notifying(changeConsumer)
.using(new DebeziumEngine.ConnectorCallback() {
@Override
public void taskStarted() {
Expand All @@ -169,24 +184,19 @@ public void taskStarted() {
comFuture.complete(result);
}
})
.using((success, message, error) -> {
EngineResult result = new EngineResult();
result.success = success;
result.message = message;
result.error = error;
comFuture.complete(result);
})
.using(completionCallback)
.build();
executor.submit(engine);

EngineResult engineResult = comFuture.get();
if (engineResult.success) {
listener.addNativeData(DEBEZIUM_ENGINE_KEY, engine);
listener.addNativeData(EXECUTOR_SERVICE_KEY, executor);
listener.addNativeData(CHANGE_CONSUMER_KEY, changeConsumer);
listener.addNativeData(COMP_CALLBACK_KEY, completionCallback);
listener.addNativeData(LIVENESS_INTERVAL_KEY, livenessInterval);
} else {
String errorMessage = engineResult.message != null ? engineResult.message
: (engineResult.error != null ? engineResult.error.getMessage() : "Unknown error");
return createCdcError("Failed to start the Debezium engine: " + errorMessage);
return createCdcError("Failed to start the Debezium engine due to unknown error");
}
listener.addNativeData(IS_STARTED_KEY, true);
return null;
Expand Down Expand Up @@ -247,6 +257,43 @@ public static Object immediateStop(BObject listener) {
}
}

public static Object isLive(BObject listener) {
String id = getListenerId(listener);
ReentrantLock lock = lockMap.computeIfAbsent(id, k -> new ReentrantLock());

lock.lock();
try {
Object completionCallback = listener.getNativeData(COMP_CALLBACK_KEY);
if (completionCallback != null) {
boolean invoked = ((CdcCompletionCallback) completionCallback).isInvoked();
if (invoked) {
return false;
}
}

Object changeConsumer = listener.getNativeData(CHANGE_CONSUMER_KEY);
if (changeConsumer != null) {
Optional<Instant> lastEventReceivedTime = ((BalChangeConsumer) changeConsumer)
.getLastEventReceivedTime();
if (lastEventReceivedTime.isPresent()) {
Instant current = Instant.now();
Instant lastEventReceived = lastEventReceivedTime.get();
long diff = ChronoUnit.MILLIS.between(current, lastEventReceived);
Object livenessInterval = listener.getNativeData(LIVENESS_INTERVAL_KEY);
if (diff > ((Long) livenessInterval)) {
return false;
}
}
}

return true;
} catch (Exception e) {
return createCdcError("Failed to invoke the liveliness check for the Debezium engine: " + e.getMessage());
} finally {
lock.unlock();
}
}

private static Object getServiceConfigAnnotation(BObject service) {
return ((ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service))).getAnnotation(
StringUtils.fromString(getPackageIdentifier() + COLON + ANN_NAME_EVENTS_FROM));
Expand Down Expand Up @@ -343,7 +390,5 @@ private static String getListenerId(BObject listener) {
// Helper class to store result
static class EngineResult {
volatile boolean success = true;
volatile String message = null;
volatile Throwable error = null;
}
}
Loading