Skip to content
Closed

Cluster #1518

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ distribution-version = "2201.12.0"
[[package]]
org = "ballerina"
name = "crypto"
version = "2.10.0"
version = "2.10.1"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"}
Expand Down Expand Up @@ -215,15 +215,15 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.5.0"
version = "1.5.1"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "os"
version = "1.10.0"
version = "1.10.1"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
Expand All @@ -233,7 +233,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "task"
version = "2.11.0"
version = "2.11.1"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"},
Expand Down
81 changes: 61 additions & 20 deletions ballerina/listener_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,41 @@ public isolated class Listener {

private handle EMPTY_JAVA_STRING = java:fromString("");
private ListenerConfiguration config = {};
private task:JobId? jobId = ();
private task:Listener taskListener;

# Gets invoked during object initialization.
#
# + listenerConfig - Configurations for FTP listener
# + return - `()` or else an `ftp:Error` upon failure to initialize the listener
public isolated function init(*ListenerConfiguration listenerConfig) returns Error? {
self.config = listenerConfig.clone();

lock {
decimal pollingInterval = self.config.pollingInterval;
CoordinationConfiguration? coordination = self.config.coordination;

task:Listener|error taskListener;
if coordination is CoordinationConfiguration {
taskListener = new ({
trigger: {interval: pollingInterval},
warmBackupConfig: {
databaseConfig: coordination.databaseConfig,
livenessCheckInterval: coordination.livenessCheckInterval,
taskId: coordination.nodeId,
groupId: coordination.clusterName,
heartbeatFrequency: coordination.heartbeatFrequency
}
});
} else {
taskListener = new ({
trigger: {interval: pollingInterval}
});
}

if taskListener is error {
return error Error("Failed to create internal task listener: " + taskListener.message());
}
self.taskListener = taskListener;
return initListener(self, self.config);
}
}
Expand Down Expand Up @@ -94,16 +120,14 @@ public isolated class Listener {

isolated function internalStart() returns error? {
lock {
self.jobId = check task:scheduleJobRecurByFrequency(new Job(self), self.config.pollingInterval);
check self.taskListener.attach(getPollingService(self));
check self.taskListener.'start();
}
}

isolated function stop() returns error? {
lock {
var id = self.jobId;
if id is task:JobId {
check task:unscheduleJob(id);
}
check self.taskListener.gracefulStop();
return cleanup(self);
}
}
Expand Down Expand Up @@ -131,21 +155,17 @@ public isolated class Listener {
}
}

class Job {

*task:Job;
private Listener ftpListener;
isolated function getPollingService(Listener initializedListener) returns task:Service {
return service object {
private final Listener ftpListener = initializedListener;

public isolated function execute() {
var result = self.ftpListener.poll();
if result is error {
log:printError("Error while executing poll function", 'error = result);
isolated function execute() {
var result = self.ftpListener.poll();
if result is error {
log:printError("Error while executing poll function", 'error = result);
}
}
}

public isolated function init(Listener initializedListener) {
self.ftpListener = initializedListener;
}
};
}

# Configuration for the FTP listener.
Expand All @@ -172,8 +192,10 @@ class Job {
# + sftpCompression - Compression algorithms (SFTP only)
# + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only)
# + proxy - Proxy configuration for SFTP connections (SFTP only)
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# malformed CSV records are skipped and written to a separate file in the current directory
# + coordination - Configuration for distributed task coordination using warm backup approach.
# When configured, only one node in the cluster will actively poll while others act as standby.
public type ListenerConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -193,6 +215,7 @@ public type ListenerConfiguration record {|
TransferCompression[] sftpCompression = [NO];
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
CoordinationConfiguration coordination?;
|};

# Fail-safe options for CSV content processing.
Expand All @@ -212,3 +235,21 @@ public enum ErrorLogContentType {
# FTP service for handling file system change events.
public type Service distinct service object {
};

# Represents the configuration required for distributed task coordination.
# When configured, multiple FTP listener instances coordinate so that only one actively polls
# while others act as warm standby nodes.
#
# + databaseConfig - The database configuration for task coordination
# + livenessCheckInterval - The interval (in seconds) to check the liveness of the active node. Default is 30 seconds.
# + nodeId - Unique identifier for the current node. Must be distinct for each node in the distributed system.
# + clusterName - The name of the cluster of FTP listeners that coordinate together.
# It is recommended to use a unique name for each cluster.
# + heartbeatFrequency - The interval (in seconds) for the node to update its heartbeat status. Default is 1 second.
public type CoordinationConfiguration record {|
task:DatabaseConfig databaseConfig;
int livenessCheckInterval = 30;
string nodeId;
string clusterName;
int heartbeatFrequency = 1;
|};
92 changes: 92 additions & 0 deletions ballerina/tests/listener_coordination_test.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/task;
import ballerina/test;

// Test that ListenerConfiguration works without coordination (backward compatibility)
@test:Config {}
public function testListenerConfigurationWithoutCoordination() {
ListenerConfiguration config = {
protocol: FTP,
host: "ftp.example.com",
port: 21,
auth: {
credentials: {
username: "user",
password: "pass"
}
},
path: "/data",
pollingInterval: 30
};

test:assertTrue(config.coordination is ());
}

// Test listener initialization without coordination still works
@test:Config {}
public function testListenerInitWithoutCoordination() returns error? {
Listener ftpListener = check new ({
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {
credentials: {
username: "wso2",
password: "wso2123"
}
},
path: "/home/in",
pollingInterval: 2
});

// If we reach here, initialization succeeded without coordination
check ftpListener.gracefulStop();
}

// Test listener initialization with coordination config
// Note: This test expects an error since no coordination database is available
@test:Config {}
public function testListenerInitWithCoordinationFailsWithoutDb() {
Listener|Error result = new ({
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {
credentials: {
username: "wso2",
password: "wso2123"
}
},
path: "/home/in",
pollingInterval: 2,
coordination: {
databaseConfig: <task:MysqlConfig>{
host: "nonexistent-db.local",
user: "user",
password: "pass",
database: "coordination"
},
nodeId: "test-node",
clusterName: "test-cluster"
}
});

// We expect this to fail because the coordination database is not available
test:assertTrue(result is Error,
msg = "Expected error when coordination database is unavailable");
}
Loading