From 64c0bdfbdc843f5c9bb96b892b87f1b044ee6ebb Mon Sep 17 00:00:00 2001 From: Niveathika Date: Mon, 26 Jan 2026 10:35:51 +0530 Subject: [PATCH 1/4] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 3a6dbd88d..71e7cb3f9 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -10,7 +10,7 @@ distribution-version = "2201.12.0" [[package]] org = "ballerina" name = "crypto" -version = "2.10.0" +version = "2.10.1" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} @@ -215,7 +215,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.5.0" +version = "1.5.1" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -223,7 +223,7 @@ dependencies = [ [[package]] org = "ballerina" name = "os" -version = "1.10.0" +version = "1.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -233,7 +233,7 @@ dependencies = [ [[package]] org = "ballerina" name = "task" -version = "2.11.0" +version = "2.11.1" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"}, From d73200c5865919df9368581bd0d3cd1bd4b1b8ee Mon Sep 17 00:00:00 2001 From: Niveathika Date: Mon, 26 Jan 2026 10:40:38 +0530 Subject: [PATCH 2/4] Use task listener to schedule polls --- ballerina/listener_endpoint.bal | 39 +++++++++++++++++---------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/ballerina/listener_endpoint.bal b/ballerina/listener_endpoint.bal index 2ae02d8cb..224d23663 100644 --- a/ballerina/listener_endpoint.bal +++ b/ballerina/listener_endpoint.bal @@ -23,7 +23,7 @@ public isolated class Listener { private handle EMPTY_JAVA_STRING = java:fromString(""); private ListenerConfiguration config = {}; - private task:JobId? jobId = (); + private task:Listener taskListener; # Gets invoked during object initialization. # @@ -32,6 +32,13 @@ public isolated class Listener { public isolated function init(*ListenerConfiguration listenerConfig) returns Error? { self.config = listenerConfig.clone(); lock { + task:Listener|error taskListener = new ({ + trigger: {interval: listenerConfig.pollingInterval} + }); + if taskListener is error { + return error Error("Failed to create internal task listener: " + taskListener.message()); + } + self.taskListener = taskListener; return initListener(self, self.config); } } @@ -94,16 +101,14 @@ public isolated class Listener { isolated function internalStart() returns error? { lock { - self.jobId = check task:scheduleJobRecurByFrequency(new Job(self), self.config.pollingInterval); + check self.taskListener.attach(getPollingService(self)); + check self.taskListener.'start(); } } isolated function stop() returns error? { lock { - var id = self.jobId; - if id is task:JobId { - check task:unscheduleJob(id); - } + check self.taskListener.gracefulStop(); return cleanup(self); } } @@ -131,21 +136,17 @@ public isolated class Listener { } } -class Job { +isolated function getPollingService(Listener initializedListener) returns task:Service { + return service object { + private final Listener ftpListener = initializedListener; - *task:Job; - private Listener ftpListener; - - public isolated function execute() { - var result = self.ftpListener.poll(); - if result is error { - log:printError("Error while executing poll function", 'error = result); + isolated function execute() { + var result = self.ftpListener.poll(); + if result is error { + log:printError("Error while executing poll function", 'error = result); + } } - } - - public isolated function init(Listener initializedListener) { - self.ftpListener = initializedListener; - } + }; } # Configuration for the FTP listener. From 901fb03d2c846aae4de50bb47263e3e7396c5a8b Mon Sep 17 00:00:00 2001 From: Niveathika Date: Mon, 26 Jan 2026 11:48:19 +0530 Subject: [PATCH 3/4] Add coordination support --- ballerina/listener_endpoint.bal | 48 ++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/ballerina/listener_endpoint.bal b/ballerina/listener_endpoint.bal index 224d23663..5582084bb 100644 --- a/ballerina/listener_endpoint.bal +++ b/ballerina/listener_endpoint.bal @@ -31,10 +31,29 @@ public isolated class Listener { # + return - `()` or else an `ftp:Error` upon failure to initialize the listener public isolated function init(*ListenerConfiguration listenerConfig) returns Error? { self.config = listenerConfig.clone(); + lock { - task:Listener|error taskListener = new ({ - trigger: {interval: listenerConfig.pollingInterval} - }); + decimal pollingInterval = self.config.pollingInterval; + CoordinationConfiguration? coordination = self.config.coordination; + + task:Listener|error taskListener; + if coordination is CoordinationConfiguration { + taskListener = new ({ + trigger: {interval: pollingInterval}, + warmBackupConfig: { + databaseConfig: coordination.databaseConfig, + livenessCheckInterval: coordination.livenessCheckInterval, + taskId: coordination.nodeId, + groupId: coordination.clusterName, + heartbeatFrequency: coordination.heartbeatFrequency + } + }); + } else { + taskListener = new ({ + trigger: {interval: pollingInterval} + }); + } + if taskListener is error { return error Error("Failed to create internal task listener: " + taskListener.message()); } @@ -173,8 +192,10 @@ isolated function getPollingService(Listener initializedListener) returns task:S # + sftpCompression - Compression algorithms (SFTP only) # + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only) # + proxy - Proxy configuration for SFTP connections (SFTP only) -# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, +# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, # malformed CSV records are skipped and written to a separate file in the current directory +# + coordination - Configuration for distributed task coordination using warm backup approach. +# When configured, only one node in the cluster will actively poll while others act as standby. public type ListenerConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -194,6 +215,7 @@ public type ListenerConfiguration record {| TransferCompression[] sftpCompression = [NO]; string sftpSshKnownHosts?; FailSafeOptions csvFailSafe?; + CoordinationConfiguration coordination?; |}; # Fail-safe options for CSV content processing. @@ -213,3 +235,21 @@ public enum ErrorLogContentType { # FTP service for handling file system change events. public type Service distinct service object { }; + +# Represents the configuration required for distributed task coordination. +# When configured, multiple FTP listener instances coordinate so that only one actively polls +# while others act as warm standby nodes. +# +# + databaseConfig - The database configuration for task coordination +# + livenessCheckInterval - The interval (in seconds) to check the liveness of the active node. Default is 30 seconds. +# + nodeId - Unique identifier for the current node. Must be distinct for each node in the distributed system. +# + clusterName - The name of the cluster of FTP listeners that coordinate together. +# It is recommended to use a unique name for each cluster. +# + heartbeatFrequency - The interval (in seconds) for the node to update its heartbeat status. Default is 1 second. +public type CoordinationConfiguration record {| + task:DatabaseConfig databaseConfig; + int livenessCheckInterval = 30; + string nodeId; + string clusterName; + int heartbeatFrequency = 1; +|}; From 97813257e1c9651a03a23d23a994ef71065e3eea Mon Sep 17 00:00:00 2001 From: Niveathika Date: Mon, 26 Jan 2026 11:53:15 +0530 Subject: [PATCH 4/4] Add test cases --- .../tests/listener_coordination_test.bal | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 ballerina/tests/listener_coordination_test.bal diff --git a/ballerina/tests/listener_coordination_test.bal b/ballerina/tests/listener_coordination_test.bal new file mode 100644 index 000000000..4a314a3db --- /dev/null +++ b/ballerina/tests/listener_coordination_test.bal @@ -0,0 +1,92 @@ +// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/task; +import ballerina/test; + +// Test that ListenerConfiguration works without coordination (backward compatibility) +@test:Config {} +public function testListenerConfigurationWithoutCoordination() { + ListenerConfiguration config = { + protocol: FTP, + host: "ftp.example.com", + port: 21, + auth: { + credentials: { + username: "user", + password: "pass" + } + }, + path: "/data", + pollingInterval: 30 + }; + + test:assertTrue(config.coordination is ()); +} + +// Test listener initialization without coordination still works +@test:Config {} +public function testListenerInitWithoutCoordination() returns error? { + Listener ftpListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: { + credentials: { + username: "wso2", + password: "wso2123" + } + }, + path: "/home/in", + pollingInterval: 2 + }); + + // If we reach here, initialization succeeded without coordination + check ftpListener.gracefulStop(); +} + +// Test listener initialization with coordination config +// Note: This test expects an error since no coordination database is available +@test:Config {} +public function testListenerInitWithCoordinationFailsWithoutDb() { + Listener|Error result = new ({ + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: { + credentials: { + username: "wso2", + password: "wso2123" + } + }, + path: "/home/in", + pollingInterval: 2, + coordination: { + databaseConfig: { + host: "nonexistent-db.local", + user: "user", + password: "pass", + database: "coordination" + }, + nodeId: "test-node", + clusterName: "test-cluster" + } + }); + + // We expect this to fail because the coordination database is not available + test:assertTrue(result is Error, + msg = "Expected error when coordination database is unavailable"); +}