diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml
index ea731f3..26594f2 100644
--- a/ballerina/Dependencies.toml
+++ b/ballerina/Dependencies.toml
@@ -5,7 +5,7 @@
[ballerina]
dependencies-toml-version = "2"
-distribution-version = "2201.12.0"
+distribution-version = "2201.12.3"
[[package]]
org = "ballerina"
diff --git a/build-config/spotbugs-exclude.xml b/build-config/spotbugs-exclude.xml
index 6a9614e..f5c029f 100644
--- a/build-config/spotbugs-exclude.xml
+++ b/build-config/spotbugs-exclude.xml
@@ -24,4 +24,8 @@
+
+
+
+
diff --git a/native/src/main/java/io/ballerina/lib/cdc/Listener.java b/native/src/main/java/io/ballerina/lib/cdc/Listener.java
index 6e27e96..8acd1db 100644
--- a/native/src/main/java/io/ballerina/lib/cdc/Listener.java
+++ b/native/src/main/java/io/ballerina/lib/cdc/Listener.java
@@ -36,8 +36,8 @@
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -172,33 +172,43 @@ public static Object start(Environment environment, BObject listener, BMap serviceMap = (ConcurrentHashMap) listener
.getNativeData(TABLE_TO_SERVICE_MAP_KEY);
- try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
- EngineResult result = new EngineResult();
+ try {
+ CompletableFuture comFuture = new CompletableFuture<>();
+ ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
DebeziumEngine> engine = create(Json.class)
.using(engineProperties)
.notifying(new BalChangeConsumer(serviceMap, environment.getRuntime()))
+ .using(new DebeziumEngine.ConnectorCallback() {
+ @Override
+ public void taskStarted() {
+ EngineResult result = new EngineResult();
+ result.success = true;
+ comFuture.complete(result);
+ }
+ })
.using((success, message, error) -> {
+ EngineResult result = new EngineResult();
result.success = success;
result.message = message;
result.error = error;
- result.latch.countDown();
+ comFuture.complete(result);
})
.build();
- executor.execute(engine);
+ executor.submit(engine);
- result.latch.await();
- if (result.success) {
+ EngineResult engineResult = comFuture.get();
+ if (engineResult.success) {
listener.addNativeData(DEBEZIUM_ENGINE_KEY, engine);
listener.addNativeData(EXECUTOR_SERVICE_KEY, executor);
} else {
- String errorMessage = result.message != null ? result.message
- : (result.error != null ? result.error.getMessage() : "Unknown error");
+ String errorMessage = engineResult.message != null ? engineResult.message
+ : (engineResult.error != null ? engineResult.error.getMessage() : "Unknown error");
return createCdcError("Failed to start the Debezium engine: " + errorMessage);
}
- } catch (InterruptedException e) {
- return createCdcError("Failed to start the Debezium engine: " + e.getMessage());
+ return null;
+ } catch (Throwable t) {
+ return createCdcError("Failed to start the Debezium engine: " + t.getMessage());
}
- return null;
}
private static Properties populateEngineProperties(BMap config) {
@@ -237,9 +247,8 @@ public static Object immediateStop(BObject listener) {
// Helper class to store result
static class EngineResult {
- CountDownLatch latch = new CountDownLatch(1);
- boolean success = false;
- String message = null;
- Throwable error = null;
+ volatile boolean success = true;
+ volatile String message = null;
+ volatile Throwable error = null;
}
}