diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index ea731f3..26594f2 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.12.0" +distribution-version = "2201.12.3" [[package]] org = "ballerina" diff --git a/build-config/spotbugs-exclude.xml b/build-config/spotbugs-exclude.xml index 6a9614e..f5c029f 100644 --- a/build-config/spotbugs-exclude.xml +++ b/build-config/spotbugs-exclude.xml @@ -24,4 +24,8 @@ + + + + diff --git a/native/src/main/java/io/ballerina/lib/cdc/Listener.java b/native/src/main/java/io/ballerina/lib/cdc/Listener.java index 6e27e96..8acd1db 100644 --- a/native/src/main/java/io/ballerina/lib/cdc/Listener.java +++ b/native/src/main/java/io/ballerina/lib/cdc/Listener.java @@ -36,8 +36,8 @@ import java.io.IOException; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -172,33 +172,43 @@ public static Object start(Environment environment, BObject listener, BMap serviceMap = (ConcurrentHashMap) listener .getNativeData(TABLE_TO_SERVICE_MAP_KEY); - try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { - EngineResult result = new EngineResult(); + try { + CompletableFuture comFuture = new CompletableFuture<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); DebeziumEngine> engine = create(Json.class) .using(engineProperties) .notifying(new BalChangeConsumer(serviceMap, environment.getRuntime())) + .using(new DebeziumEngine.ConnectorCallback() { + @Override + public void taskStarted() { + EngineResult result = new EngineResult(); + result.success = true; + comFuture.complete(result); + } + }) .using((success, message, error) -> { + EngineResult result = new EngineResult(); result.success = success; result.message = message; result.error = error; - result.latch.countDown(); + comFuture.complete(result); }) .build(); - executor.execute(engine); + executor.submit(engine); - result.latch.await(); - if (result.success) { + EngineResult engineResult = comFuture.get(); + if (engineResult.success) { listener.addNativeData(DEBEZIUM_ENGINE_KEY, engine); listener.addNativeData(EXECUTOR_SERVICE_KEY, executor); } else { - String errorMessage = result.message != null ? result.message - : (result.error != null ? result.error.getMessage() : "Unknown error"); + String errorMessage = engineResult.message != null ? engineResult.message + : (engineResult.error != null ? engineResult.error.getMessage() : "Unknown error"); return createCdcError("Failed to start the Debezium engine: " + errorMessage); } - } catch (InterruptedException e) { - return createCdcError("Failed to start the Debezium engine: " + e.getMessage()); + return null; + } catch (Throwable t) { + return createCdcError("Failed to start the Debezium engine: " + t.getMessage()); } - return null; } private static Properties populateEngineProperties(BMap config) { @@ -237,9 +247,8 @@ public static Object immediateStop(BObject listener) { // Helper class to store result static class EngineResult { - CountDownLatch latch = new CountDownLatch(1); - boolean success = false; - String message = null; - Throwable error = null; + volatile boolean success = true; + volatile String message = null; + volatile Throwable error = null; } }