Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 33 additions & 39 deletions ballerina/listener_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,42 @@ import ballerina/task;
public isolated class Listener {

private handle EMPTY_JAVA_STRING = java:fromString("");
private ListenerConfiguration config = {};
private task:Listener taskListener;
private final readonly & ListenerConfiguration config;
private final task:Listener taskListener;

# Gets invoked during object initialization.
#
# + listenerConfig - Configurations for FTP listener
# + return - `()` or else an `ftp:Error` upon failure to initialize the listener
public isolated function init(*ListenerConfiguration listenerConfig) returns Error? {
self.config = listenerConfig.clone();

lock {
decimal pollingInterval = self.config.pollingInterval;
CoordinationConfig? coordination = self.config.coordination;

task:Listener|error taskListener;
if coordination is CoordinationConfig {
taskListener = new ({
trigger: {interval: pollingInterval},
warmBackupConfig: {
databaseConfig: coordination.databaseConfig,
livenessCheckInterval: coordination.livenessCheckInterval,
taskId: coordination.memberId,
groupId: coordination.coordinationGroup,
heartbeatFrequency: coordination.heartbeatFrequency
}
});
} else {
taskListener = new ({
trigger: {interval: pollingInterval}
});
}
self.config = listenerConfig.cloneReadOnly();

decimal pollingInterval = self.config.pollingInterval;
CoordinationConfig? coordination = self.config.coordination;

task:Listener|error taskListener;
if coordination is CoordinationConfig {
taskListener = new ({
trigger: {interval: pollingInterval},
warmBackupConfig: {
databaseConfig: coordination.databaseConfig,
livenessCheckInterval: coordination.livenessCheckInterval,
taskId: coordination.memberId,
groupId: coordination.coordinationGroup,
heartbeatFrequency: coordination.heartbeatFrequency
}
});
} else {
taskListener = new ({
trigger: {interval: pollingInterval}
});
}

if taskListener is error {
return error Error("Failed to create internal task listener: " + taskListener.message());
}
self.taskListener = taskListener;
return initListener(self, self.config);
if taskListener is error {
return error Error("Failed to create internal task listener: " + taskListener.message());
}
self.taskListener = taskListener;
return initListener(self, self.config);
}

# Starts the FTP listener and begins monitoring for file changes.
Expand Down Expand Up @@ -119,17 +117,13 @@ public isolated class Listener {
}

isolated function internalStart() returns error? {
lock {
check self.taskListener.attach(getPollingService(self));
check self.taskListener.'start();
}
check self.taskListener.attach(getPollingService(self));
check self.taskListener.'start();
}

isolated function stop() returns error? {
lock {
check self.taskListener.gracefulStop();
return cleanup(self);
}
check self.taskListener.gracefulStop();
return cleanup(self);
}

# Polls the FTP server for new or deleted files.
Expand Down Expand Up @@ -247,7 +241,7 @@ public type Service distinct service object {
# It is recommended to use a unique name for each group.
# + heartbeatFrequency - The interval (in seconds) for the node to update its heartbeat status. Default is 1 second.
public type CoordinationConfig record {|
task:DatabaseConfig databaseConfig;
task:DatabaseConfig databaseConfig = <task:MysqlConfig>{};
int livenessCheckInterval = 30;
string memberId;
string coordinationGroup;
Expand Down