diff --git a/ballerina/listener_endpoint.bal b/ballerina/listener_endpoint.bal index 872352c77..23eb3bf14 100644 --- a/ballerina/listener_endpoint.bal +++ b/ballerina/listener_endpoint.bal @@ -22,44 +22,42 @@ import ballerina/task; public isolated class Listener { private handle EMPTY_JAVA_STRING = java:fromString(""); - private ListenerConfiguration config = {}; - private task:Listener taskListener; + private final readonly & ListenerConfiguration config; + private final task:Listener taskListener; # Gets invoked during object initialization. # # + listenerConfig - Configurations for FTP listener # + return - `()` or else an `ftp:Error` upon failure to initialize the listener public isolated function init(*ListenerConfiguration listenerConfig) returns Error? { - self.config = listenerConfig.clone(); - - lock { - decimal pollingInterval = self.config.pollingInterval; - CoordinationConfig? coordination = self.config.coordination; - - task:Listener|error taskListener; - if coordination is CoordinationConfig { - taskListener = new ({ - trigger: {interval: pollingInterval}, - warmBackupConfig: { - databaseConfig: coordination.databaseConfig, - livenessCheckInterval: coordination.livenessCheckInterval, - taskId: coordination.memberId, - groupId: coordination.coordinationGroup, - heartbeatFrequency: coordination.heartbeatFrequency - } - }); - } else { - taskListener = new ({ - trigger: {interval: pollingInterval} - }); - } + self.config = listenerConfig.cloneReadOnly(); + + decimal pollingInterval = self.config.pollingInterval; + CoordinationConfig? coordination = self.config.coordination; + + task:Listener|error taskListener; + if coordination is CoordinationConfig { + taskListener = new ({ + trigger: {interval: pollingInterval}, + warmBackupConfig: { + databaseConfig: coordination.databaseConfig, + livenessCheckInterval: coordination.livenessCheckInterval, + taskId: coordination.memberId, + groupId: coordination.coordinationGroup, + heartbeatFrequency: coordination.heartbeatFrequency + } + }); + } else { + taskListener = new ({ + trigger: {interval: pollingInterval} + }); + } - if taskListener is error { - return error Error("Failed to create internal task listener: " + taskListener.message()); - } - self.taskListener = taskListener; - return initListener(self, self.config); + if taskListener is error { + return error Error("Failed to create internal task listener: " + taskListener.message()); } + self.taskListener = taskListener; + return initListener(self, self.config); } # Starts the FTP listener and begins monitoring for file changes. @@ -119,17 +117,13 @@ public isolated class Listener { } isolated function internalStart() returns error? { - lock { - check self.taskListener.attach(getPollingService(self)); - check self.taskListener.'start(); - } + check self.taskListener.attach(getPollingService(self)); + check self.taskListener.'start(); } isolated function stop() returns error? { - lock { - check self.taskListener.gracefulStop(); - return cleanup(self); - } + check self.taskListener.gracefulStop(); + return cleanup(self); } # Polls the FTP server for new or deleted files. @@ -247,7 +241,7 @@ public type Service distinct service object { # It is recommended to use a unique name for each group. # + heartbeatFrequency - The interval (in seconds) for the node to update its heartbeat status. Default is 1 second. public type CoordinationConfig record {| - task:DatabaseConfig databaseConfig; + task:DatabaseConfig databaseConfig = {}; int livenessCheckInterval = 30; string memberId; string coordinationGroup;