From 93c87f11d2bec728029c060f67f232e96414e327 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Sun, 1 Feb 2026 21:50:16 +0530 Subject: [PATCH 01/15] Add circuit breaker support --- ballerina/client_endpoint.bal | 3 + ballerina/commons.bal | 45 +++ ballerina/error.bal | 6 + .../stdlib/ftp/client/FtpClient.java | 172 ++++++++++- .../ftp/client/circuitbreaker/Bucket.java | 77 +++++ .../client/circuitbreaker/CircuitBreaker.java | 266 ++++++++++++++++++ .../circuitbreaker/CircuitBreakerConfig.java | 154 ++++++++++ .../client/circuitbreaker/CircuitHealth.java | 191 +++++++++++++ .../client/circuitbreaker/CircuitState.java | 51 ++++ .../circuitbreaker/FailureCategorizer.java | 220 +++++++++++++++ .../circuitbreaker/FailureCategory.java | 65 +++++ .../stdlib/ftp/util/FtpConstants.java | 5 + .../io/ballerina/stdlib/ftp/util/FtpUtil.java | 3 +- 13 files changed, 1245 insertions(+), 13 deletions(-) create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java create mode 100644 native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index 8513cbbf3..5a0aad6ce 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -454,6 +454,8 @@ public enum Compression { # malformed CSV records are skipped and written to a separate file in the current directory # + retryConfig - Configuration for retry behavior on transient failures for non-streaming read operations # (getBytes, getText, getJson, getXml, getCsv). If not specified, no retry is attempted. +# + circuitBreaker - Circuit breaker configuration for handling server failures gracefully. +# When enabled, the client will fail fast if the server is experiencing issues. public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -469,6 +471,7 @@ public type ClientConfiguration record {| string sftpSshKnownHosts?; FailSafeOptions csvFailSafe?; RetryConfig retryConfig?; + CircuitBreakerConfig circuitBreaker?; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/commons.bal b/ballerina/commons.bal index e05b9430c..e231717f4 100644 --- a/ballerina/commons.bal +++ b/ballerina/commons.bal @@ -255,3 +255,48 @@ public type FileDependencyCondition record {| DependencyMatchingMode matchingMode = ALL; int requiredFileCount = 1; |}; + +# Categories of errors that can trip the circuit breaker. +# Used to configure which types of failures should count towards the circuit breaker threshold. +public enum FailureCategory { + # Connection-level failures (timeout, refused, reset, DNS resolution). + # Maps to ConnectionError type. + CONNECTION_ERROR, + # Authentication failures (invalid credentials, key rejected). + # Detected via FTP 530 response code. + AUTHENTICATION_ERROR, + # Transient server errors that may succeed on retry. + # Maps to ServiceUnavailableError type (FTP codes 421, 425, 426, 450, 451, 452). + TRANSIENT_ERROR, + # All errors regardless of type + ALL_ERRORS +} + +# Configuration for the sliding time window used in failure calculation. +# The rolling window divides time into discrete buckets for efficient tracking of request success/failure rates. +# +# + requestVolumeThreshold - Minimum number of requests in the window before evaluating failure threshold. +# Circuit breaker will not trip until this many requests have been made. +# + timeWindow - Time period in seconds for the sliding window +# + bucketSize - Granularity of time buckets in seconds. Must be less than timeWindow. +public type RollingWindow record {| + int requestVolumeThreshold = 10; + decimal timeWindow = 60; + decimal bucketSize = 10; +|}; + +# Configuration for circuit breaker behavior. +# The circuit breaker prevents cascade failures by temporarily blocking requests when the server is experiencing issues. +# +# + rollingWindow - Time window configuration for failure tracking +# + failureThreshold - Failure ratio threshold (0.0 to 1.0) that trips the circuit. +# For example, 0.5 means the circuit will open when 50% of requests fail. +# + resetTime - Seconds to wait in OPEN state before transitioning to HALF_OPEN to test recovery +# + failureCategories - Error categories that count as failures for the circuit breaker. +# Only errors matching these categories will contribute to the failure ratio. +public type CircuitBreakerConfig record {| + RollingWindow rollingWindow = {}; + float failureThreshold = 0.5; + decimal resetTime = 30; + FailureCategory[] failureCategories = [CONNECTION_ERROR, TRANSIENT_ERROR]; +|}; diff --git a/ballerina/error.bal b/ballerina/error.bal index a2a1c976f..5f3dd2209 100644 --- a/ballerina/error.bal +++ b/ballerina/error.bal @@ -40,3 +40,9 @@ public type ServiceUnavailableError distinct Error; # Represents an error that occurs when all retry attempts have been exhausted. # This error wraps the last failure encountered during retry attempts. public type AllRetryAttemptsFailedError distinct Error; + +# Error returned when the circuit breaker is in OPEN state. +# This indicates the FTP server is unavailable and requests are being blocked +# to prevent cascade failures. The client should implement fallback logic +# or wait for the circuit to transition to HALF_OPEN state. +public type CircuitBreakerOpenError distinct ServiceUnavailableError; diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 895b2c25e..160f40cee 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -33,6 +33,8 @@ import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; import io.ballerina.runtime.api.values.BXml; +import io.ballerina.stdlib.ftp.client.circuitbreaker.CircuitBreaker; +import io.ballerina.stdlib.ftp.client.circuitbreaker.CircuitBreakerConfig; import io.ballerina.stdlib.ftp.exception.BallerinaFtpException; import io.ballerina.stdlib.ftp.exception.FtpInvalidConfigException; import io.ballerina.stdlib.ftp.exception.RemoteFileSystemConnectorException; @@ -98,6 +100,71 @@ private FtpClient() { // private constructor } + /** + * Gets the circuit breaker for a client, if configured. + * + * @param clientConnector The FTP client connector + * @return The circuit breaker or null if not configured + */ + private static CircuitBreaker getCircuitBreaker(BObject clientConnector) { + return (CircuitBreaker) clientConnector.getNativeData(FtpConstants.CIRCUIT_BREAKER_NATIVE_DATA); + } + + /** + * Records that a request is starting. Call this BEFORE executing an operation. + * + * @param clientConnector The FTP client connector + */ + private static void recordCircuitBreakerRequestStart(BObject clientConnector) { + CircuitBreaker cb = getCircuitBreaker(clientConnector); + if (cb != null) { + cb.recordRequestStart(); + } + } + + /** + * Records the outcome of an operation. Call this AFTER the operation completes. + * + * @param clientConnector The FTP client connector + * @param result The result from the operation + * @return The result unchanged (circuit breaker state updated as side effect) + */ + private static Object recordCircuitBreakerOutcome(BObject clientConnector, Object result) { + CircuitBreaker cb = getCircuitBreaker(clientConnector); + if (cb == null) { + return result; + } + + if (result instanceof BError bError) { + // Pass the original cause to preserve exception type for categorization + Throwable cause = bError.getCause(); + if (cause != null) { + cb.recordOutcome(cause); + } else { + // Fallback to wrapping message if no cause available + cb.recordOutcome(new RuntimeException(bError.getMessage())); + } + } else { + cb.recordOutcome(null); + } + return result; + } + + /** + * Checks if circuit is open before starting an operation. + * Returns the circuit breaker error if open, null otherwise. + * + * @param clientConnector The FTP client connector + * @return BError if circuit is open, null if operation can proceed + */ + private static BError getCircuitBreakerErrorIfOpen(BObject clientConnector) { + CircuitBreaker cb = getCircuitBreaker(clientConnector); + if (cb != null && cb.isOpen()) { + return cb.createServiceUnavailableError(); + } + return null; + } + public static Object initClientEndpoint(BObject clientEndpoint, BMap config) { String protocol = extractProtocol(config); BError basicConfigError = configureClientEndpointBasic(clientEndpoint, config, protocol); @@ -116,8 +183,8 @@ public static Object initClientEndpoint(BObject clientEndpoint, BMap config) { @@ -287,22 +354,30 @@ private static Object extractVfsConfigurations(BMap config, Map< } } - private static Object createAndStoreConnector(BObject clientEndpoint, Map ftpConfig) { - + private static Object createAndStoreConnector(BObject clientEndpoint, Map ftpConfig, + BMap config) { + String url; try { url = FtpUtil.createUrl(clientEndpoint, ""); } catch (BallerinaFtpException e) { return FtpUtil.createError(e.getMessage(), Error.errorType()); } - + ftpConfig.put(FtpConstants.URI, url); clientEndpoint.addNativeData(FtpConstants.PROPERTY_MAP, ftpConfig); - + RemoteFileSystemConnectorFactory fileSystemConnectorFactory = new RemoteFileSystemConnectorFactoryImpl(); try { VfsClientConnector connector = fileSystemConnectorFactory.createVfsClientConnector(ftpConfig); clientEndpoint.addNativeData(VFS_CLIENT_CONNECTOR, connector); + + // Initialize circuit breaker if configured + Object circuitBreakerError = initializeCircuitBreaker(clientEndpoint, config); + if (circuitBreakerError != null) { + return circuitBreakerError; + } + return null; } catch (RemoteFileSystemConnectorException e) { String errorType = FtpUtil.getErrorTypeForException(e); @@ -310,6 +385,25 @@ private static Object createAndStoreConnector(BObject clientEndpoint, Map config) { + BMap cbConfig = (BMap) config.getMapValue( + StringUtils.fromString(FtpConstants.CIRCUIT_BREAKER)); + if (cbConfig == null) { + return null; + } + + try { + CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.fromBMap(cbConfig); + CircuitBreaker circuitBreaker = new CircuitBreaker(circuitBreakerConfig); + clientEndpoint.addNativeData(FtpConstants.CIRCUIT_BREAKER_NATIVE_DATA, circuitBreaker); + log.debug("Circuit breaker initialized for FTP client"); + return null; + } catch (BallerinaFtpException e) { + return FtpUtil.createError(e.getMessage(), Error.errorType()); + } + } + private static Object configureFtpsSecureSocket(BMap secureSocket, Map ftpConfig) { FtpUtil.configureFtpsMode(secureSocket, ftpConfig); FtpUtil.configureFtpsDataChannelProtection(secureSocket, ftpConfig); @@ -442,8 +536,16 @@ public static Object getBytesAsStream(Environment env, BObject clientConnector, if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> @@ -451,7 +553,8 @@ public static Object getBytesAsStream(Environment env, BObject clientConnector, balFuture, TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE), laxDataBinding)); connector.addListener(connectorListener); connector.send(null, FtpAction.GET, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -461,8 +564,16 @@ public static Object getCsvAsStream(Environment env, BObject clientConnector, BS if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> @@ -470,7 +581,8 @@ public static Object getCsvAsStream(Environment env, BObject clientConnector, BS balFuture, typeDesc.getDescribingType(), laxDataBinding)); connector.addListener(connectorListener); connector.send(null, FtpAction.GET, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -479,14 +591,23 @@ private static Object getAllContent(Environment env, BObject clientConnector, BS if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> FtpClientHelper.executeGetAllAction(remoteFileSystemBaseMessage, balFuture)); connector.addListener(connectorListener); connector.send(null, FtpAction.GET_ALL, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -791,7 +912,15 @@ private static Object putGenericAction(Environment env, BObject clientConnector, if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, true, remoteFileSystemBaseMessage -> FtpClientHelper.executeGenericAction()); @@ -802,7 +931,8 @@ private static Object putGenericAction(Environment env, BObject clientConnector, } else { connector.send(message, FtpAction.APPEND, filePath, null); } - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -894,14 +1024,23 @@ private static Object executeSinglePathAction(Environment env, BObject clientCon if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); Function messageHandler = messageHandlerFactory.apply(balFuture); FtpClientListener connectorListener = new FtpClientListener(balFuture, closeInput, messageHandler); connector.addListener(connectorListener); connector.send(null, action, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -921,6 +1060,13 @@ private static Object executeTwoPathAction(Environment env, BObject clientConnec if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + String destinationUrl; try { destinationUrl = FtpUtil.createUrl(clientConnector, destinationPath.getValue()); @@ -928,12 +1074,14 @@ private static Object executeTwoPathAction(Environment env, BObject clientConnec return FtpUtil.createError(e.getMessage(), Error.errorType()); } return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, true, remoteFileSystemBaseMessage -> FtpClientHelper.executeGenericAction()); connector.addListener(connectorListener); connector.send(null, action, sourcePath.getValue(), destinationUrl); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java new file mode 100644 index 000000000..a490caa34 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import java.time.Instant; + +/** + * Represents a discrete time segment in the rolling window. + * Each bucket tracks the number of total requests and failures within its time period. + */ +public class Bucket { + private int totalCount; + private int failureCount; + private Instant lastUpdatedTime; + + /** + * Creates a new empty bucket. + */ + public Bucket() { + this.totalCount = 0; + this.failureCount = 0; + this.lastUpdatedTime = null; + } + + /** + * Increments the total request count. + */ + public void incrementTotalCount() { + this.totalCount++; + this.lastUpdatedTime = Instant.now(); + } + + /** + * Increments the failure count. + */ + public void incrementFailureCount() { + this.failureCount++; + this.lastUpdatedTime = Instant.now(); + } + + /** + * Resets the bucket to its initial state. + */ + public void reset() { + this.totalCount = 0; + this.failureCount = 0; + this.lastUpdatedTime = null; + } + + public int getTotalCount() { + return totalCount; + } + + public int getFailureCount() { + return failureCount; + } + + public Instant getLastUpdatedTime() { + return lastUpdatedTime; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java new file mode 100644 index 000000000..382db427f --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import io.ballerina.runtime.api.values.BError; +import io.ballerina.stdlib.ftp.util.FtpUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Set; +import java.util.concurrent.Callable; + +/** + * Circuit breaker implementation for FTP client operations. + * Prevents cascade failures by temporarily blocking requests when the server is experiencing issues. + */ +public class CircuitBreaker { + private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class); + + private final CircuitBreakerConfig config; + private final CircuitHealth health; + private volatile CircuitState state; + private final Object lock = new Object(); + + /** + * Creates a new CircuitBreaker with the specified configuration. + * + * @param config The circuit breaker configuration + */ + public CircuitBreaker(CircuitBreakerConfig config) { + this.config = config; + this.health = new CircuitHealth( + config.getNumberOfBuckets(), + config.getBucketSizeMillis(), + config.getTimeWindowMillis() + ); + this.state = CircuitState.CLOSED; + log.debug("Circuit breaker initialized with {} buckets, {}ms window, {}% failure threshold", + config.getNumberOfBuckets(), config.getTimeWindowMillis(), + config.getFailureThreshold() * 100); + } + + /** + * Executes an operation with circuit breaker protection. + * + * @param operation The operation to execute + * @param The return type of the operation + * @return The result of the operation + * @throws Exception If the operation fails or circuit is open + */ + public T execute(Callable operation) throws Exception { + synchronized (lock) { + updateState(); + + if (state == CircuitState.OPEN) { + throw createServiceUnavailableError(); + } + + // Record the request + health.prepareRollingWindow(); + health.recordRequest(); + } + + try { + T result = operation.call(); + synchronized (lock) { + recordSuccess(); + } + return result; + } catch (Exception e) { + synchronized (lock) { + if (shouldCountAsFailure(e)) { + recordFailure(); + log.debug("Circuit breaker recorded failure: {}", e.getMessage()); + } + } + throw e; + } + } + + /** + * Checks if the circuit is currently open (blocking requests). + * + * @return true if the circuit is open + */ + public boolean isOpen() { + synchronized (lock) { + updateState(); + return state == CircuitState.OPEN; + } + } + + /** + * Records that a request is about to be made. + * Call this BEFORE starting the operation. + */ + public void recordRequestStart() { + synchronized (lock) { + health.prepareRollingWindow(); + health.recordRequest(); + } + } + + /** + * Records the outcome of an operation. + * Call this AFTER the operation completes. + * + * @param error The throwable if the operation failed, null if successful + */ + public void recordOutcome(Throwable error) { + synchronized (lock) { + if (error == null) { + recordSuccess(); + } else if (shouldCountAsFailure(error)) { + recordFailure(); + log.debug("Circuit breaker recorded failure: {}", error.getMessage()); + } + // Update state after recording the outcome + updateState(); + } + } + + /** + * Creates an error to throw when the circuit is open. + * + * @return A BError representing the service unavailable state + */ + public BError createServiceUnavailableError() { + Instant lastError = health.getLastErrorTime(); + long remainingMillis = 0; + if (lastError != null) { + long elapsedMillis = Instant.now().toEpochMilli() - lastError.toEpochMilli(); + remainingMillis = Math.max(0, config.getResetTimeMillis() - elapsedMillis); + } + + String message = String.format( + "FTP server unavailable. Circuit breaker is OPEN. Retry in %.0f seconds.", + remainingMillis / 1000.0 + ); + + return FtpUtil.createError(message, FtpUtil.ErrorType.CircuitBreakerOpenError.errorType()); + } + + /** + * Updates the circuit state based on current metrics. + * Must be called within synchronized block. + */ + private void updateState() { + health.prepareRollingWindow(); + + switch (state) { + case OPEN: + if (resetTimeElapsed()) { + state = CircuitState.HALF_OPEN; + log.info("Circuit breaker transitioning from OPEN to HALF_OPEN"); + } + break; + + case HALF_OPEN: + if (health.isLastRequestSuccess()) { + state = CircuitState.CLOSED; + health.resetAllBuckets(); + log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); + } else { + state = CircuitState.OPEN; + log.info("Circuit breaker transitioning from HALF_OPEN to OPEN (trial failed)"); + } + break; + + case CLOSED: + if (health.getTotalRequestCount() >= config.getRequestVolumeThreshold()) { + float failureRatio = health.getFailureRatio(); + if (failureRatio > config.getFailureThreshold()) { + state = CircuitState.OPEN; + log.info("Circuit breaker transitioning from CLOSED to OPEN " + + "(failure ratio {} > threshold {})", + failureRatio, config.getFailureThreshold()); + } + } + break; + } + } + + /** + * Checks if the reset time has elapsed since the circuit opened. + * + * @return true if reset time has elapsed + */ + private boolean resetTimeElapsed() { + Instant lastError = health.getLastErrorTime(); + if (lastError == null) { + return true; + } + long elapsedMillis = Instant.now().toEpochMilli() - lastError.toEpochMilli(); + return elapsedMillis >= config.getResetTimeMillis(); + } + + /** + * Records a successful operation. + * Must be called within synchronized block. + */ + private void recordSuccess() { + health.recordSuccess(); + } + + /** + * Records a failed operation. + * Must be called within synchronized block. + */ + private void recordFailure() { + health.recordFailure(); + } + + /** + * Determines if an error should count as a failure for circuit breaker purposes. + * + * @param e The throwable to evaluate + * @return true if the error should count as a failure + */ + private boolean shouldCountAsFailure(Throwable e) { + Set configuredCategories = config.getFailureCategories(); + + // If ALL_ERRORS is configured, any exception trips the circuit + if (configuredCategories.contains(FailureCategory.ALL_ERRORS)) { + return true; + } + + // Categorize the exception + FailureCategory category = FailureCategorizer.categorize(e); + if (category == null) { + // Exception doesn't match any specific category + return false; + } + + // Check if the category is in the configured list + return configuredCategories.contains(category); + } + + /** + * Gets the current circuit state. + * + * @return The current state + */ + public CircuitState getState() { + synchronized (lock) { + return state; + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java new file mode 100644 index 000000000..d4273301d --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BDecimal; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.ftp.exception.BallerinaFtpException; + +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for circuit breaker behavior. + * Parsed from Ballerina CircuitBreakerConfig record. + */ +public class CircuitBreakerConfig { + // Rolling window configuration + private final int requestVolumeThreshold; + private final long timeWindowMillis; + private final long bucketSizeMillis; + private final int numberOfBuckets; + + // Circuit breaker thresholds + private final float failureThreshold; + private final long resetTimeMillis; + + // Failure categories + private final Set failureCategories; + + // Ballerina field names + private static final BString ROLLING_WINDOW = StringUtils.fromString("rollingWindow"); + private static final BString REQUEST_VOLUME_THRESHOLD = StringUtils.fromString("requestVolumeThreshold"); + private static final BString TIME_WINDOW = StringUtils.fromString("timeWindow"); + private static final BString BUCKET_SIZE = StringUtils.fromString("bucketSize"); + private static final BString FAILURE_THRESHOLD = StringUtils.fromString("failureThreshold"); + private static final BString RESET_TIME = StringUtils.fromString("resetTime"); + private static final BString FAILURE_CATEGORIES = StringUtils.fromString("failureCategories"); + + private CircuitBreakerConfig(int requestVolumeThreshold, long timeWindowMillis, long bucketSizeMillis, + float failureThreshold, long resetTimeMillis, + Set failureCategories) { + this.requestVolumeThreshold = requestVolumeThreshold; + this.timeWindowMillis = timeWindowMillis; + this.bucketSizeMillis = bucketSizeMillis; + this.numberOfBuckets = (int) (timeWindowMillis / bucketSizeMillis); + this.failureThreshold = failureThreshold; + this.resetTimeMillis = resetTimeMillis; + this.failureCategories = failureCategories; + } + + /** + * Creates a CircuitBreakerConfig from a Ballerina BMap. + * + * @param config The Ballerina CircuitBreakerConfig record + * @return A new CircuitBreakerConfig instance + * @throws BallerinaFtpException if the configuration is invalid + */ + @SuppressWarnings("unchecked") + public static CircuitBreakerConfig fromBMap(BMap config) throws BallerinaFtpException { + // Extract rolling window configuration + BMap rollingWindow = (BMap) config.getMapValue(ROLLING_WINDOW); + int requestVolumeThreshold = rollingWindow.getIntValue(REQUEST_VOLUME_THRESHOLD).intValue(); + long timeWindowMillis = decimalToMillis(rollingWindow.get(TIME_WINDOW)); + long bucketSizeMillis = decimalToMillis(rollingWindow.get(BUCKET_SIZE)); + + // Extract thresholds + float failureThreshold = ((Number) config.get(FAILURE_THRESHOLD)).floatValue(); + long resetTimeMillis = decimalToMillis(config.get(RESET_TIME)); + + // Extract failure categories + BArray categoriesArray = config.getArrayValue(FAILURE_CATEGORIES); + Set failureCategories = new HashSet<>(); + for (int i = 0; i < categoriesArray.size(); i++) { + String categoryStr = categoriesArray.getBString(i).getValue(); + FailureCategory category = FailureCategory.fromString(categoryStr); + if (category != null) { + failureCategories.add(category); + } + } + + // Validate configuration + validate(failureThreshold, timeWindowMillis, bucketSizeMillis); + + return new CircuitBreakerConfig(requestVolumeThreshold, timeWindowMillis, bucketSizeMillis, + failureThreshold, resetTimeMillis, failureCategories); + } + + private static long decimalToMillis(Object value) { + if (value instanceof BDecimal) { + return (long) (((BDecimal) value).floatValue() * 1000); + } + return (long) (((Number) value).doubleValue() * 1000); + } + + private static void validate(float failureThreshold, long timeWindowMillis, long bucketSizeMillis) + throws BallerinaFtpException { + if (failureThreshold < 0.0f || failureThreshold > 1.0f) { + throw new BallerinaFtpException("Circuit breaker failureThreshold must be between 0.0 and 1.0"); + } + if (bucketSizeMillis <= 0) { + throw new BallerinaFtpException("Circuit breaker bucketSize must be greater than 0"); + } + if (timeWindowMillis < bucketSizeMillis) { + throw new BallerinaFtpException("Circuit breaker timeWindow must be greater than or equal to bucketSize"); + } + } + + public int getRequestVolumeThreshold() { + return requestVolumeThreshold; + } + + public long getTimeWindowMillis() { + return timeWindowMillis; + } + + public long getBucketSizeMillis() { + return bucketSizeMillis; + } + + public int getNumberOfBuckets() { + return numberOfBuckets; + } + + public float getFailureThreshold() { + return failureThreshold; + } + + public long getResetTimeMillis() { + return resetTimeMillis; + } + + public Set getFailureCategories() { + return failureCategories; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java new file mode 100644 index 000000000..2805a4360 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import java.time.Instant; + +/** + * Tracks the health metrics of the circuit breaker using a sliding window of buckets. + */ +public class CircuitHealth { + private final Bucket[] buckets; + private final int numberOfBuckets; + private final long bucketSizeMillis; + private final long timeWindowMillis; + + private boolean lastRequestSuccess; + private int lastUsedBucketId; + private final Instant startTime; + private Instant lastRequestTime; + private Instant lastErrorTime; + + /** + * Creates a new CircuitHealth with the specified configuration. + * + * @param numberOfBuckets Number of buckets in the rolling window + * @param bucketSizeMillis Size of each bucket in milliseconds + * @param timeWindowMillis Total time window in milliseconds + */ + public CircuitHealth(int numberOfBuckets, long bucketSizeMillis, long timeWindowMillis) { + this.numberOfBuckets = numberOfBuckets; + this.bucketSizeMillis = bucketSizeMillis; + this.timeWindowMillis = timeWindowMillis; + this.buckets = new Bucket[numberOfBuckets]; + for (int i = 0; i < numberOfBuckets; i++) { + buckets[i] = new Bucket(); + } + this.startTime = Instant.now(); + this.lastRequestTime = Instant.now(); + this.lastRequestSuccess = true; + this.lastUsedBucketId = 0; + } + + /** + * Gets the current bucket ID based on elapsed time. + * + * @return The current bucket index + */ + public int getCurrentBucketId() { + long elapsedMillis = Instant.now().toEpochMilli() - startTime.toEpochMilli(); + long windowElapsed = elapsedMillis % timeWindowMillis; + return (int) (((windowElapsed / bucketSizeMillis) + 1) % numberOfBuckets); + } + + /** + * Prepares the rolling window by resetting stale buckets. + */ + public void prepareRollingWindow() { + Instant currentTime = Instant.now(); + long idleTimeMillis = currentTime.toEpochMilli() - lastRequestTime.toEpochMilli(); + + // If idle longer than entire window, reset all buckets + if (idleTimeMillis > timeWindowMillis) { + resetAllBuckets(); + return; + } + + int currentBucketId = getCurrentBucketId(); + + // Reset buckets that have become stale + if (currentBucketId == lastUsedBucketId && idleTimeMillis > bucketSizeMillis) { + // Same bucket but stale - reset all + resetAllBuckets(); + } else if (currentBucketId < lastUsedBucketId) { + // Wrapped around - reset from 0 to current and from lastUsed+1 to end + for (int i = 0; i <= currentBucketId; i++) { + buckets[i].reset(); + } + for (int i = lastUsedBucketId + 1; i < numberOfBuckets; i++) { + buckets[i].reset(); + } + } else if (currentBucketId > lastUsedBucketId) { + // Reset buckets between last used and current + for (int i = lastUsedBucketId + 1; i <= currentBucketId; i++) { + buckets[i].reset(); + } + } + } + + /** + * Resets all buckets to their initial state. + */ + public void resetAllBuckets() { + for (Bucket bucket : buckets) { + bucket.reset(); + } + } + + /** + * Gets the total request count across all buckets. + * + * @return Total number of requests + */ + public int getTotalRequestCount() { + int total = 0; + for (Bucket bucket : buckets) { + total += bucket.getTotalCount(); + } + return total; + } + + /** + * Gets the total failure count across all buckets. + * + * @return Total number of failures + */ + public int getTotalFailureCount() { + int total = 0; + for (Bucket bucket : buckets) { + total += bucket.getFailureCount(); + } + return total; + } + + /** + * Calculates the current failure ratio across all buckets. + * + * @return Failure ratio between 0.0 and 1.0 + */ + public float getFailureRatio() { + int totalCount = getTotalRequestCount(); + if (totalCount == 0) { + return 0.0f; + } + return (float) getTotalFailureCount() / totalCount; + } + + /** + * Records a request in the current bucket. + */ + public void recordRequest() { + int bucketId = getCurrentBucketId(); + buckets[bucketId].incrementTotalCount(); + lastUsedBucketId = bucketId; + lastRequestTime = Instant.now(); + } + + /** + * Records a successful request. + */ + public void recordSuccess() { + lastRequestSuccess = true; + } + + /** + * Records a failed request in the current bucket. + */ + public void recordFailure() { + int bucketId = getCurrentBucketId(); + buckets[bucketId].incrementFailureCount(); + lastRequestSuccess = false; + lastErrorTime = Instant.now(); + } + + public boolean isLastRequestSuccess() { + return lastRequestSuccess; + } + + public Instant getLastErrorTime() { + return lastErrorTime; + } + + public void setLastUsedBucketId(int bucketId) { + this.lastUsedBucketId = bucketId; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java new file mode 100644 index 000000000..053796c40 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +/** + * Represents the possible states of a circuit breaker. + * + *

State transitions: + *

    + *
  • CLOSED -> OPEN: When failure threshold is exceeded
  • + *
  • OPEN -> HALF_OPEN: After reset time has elapsed
  • + *
  • HALF_OPEN -> CLOSED: When trial request succeeds
  • + *
  • HALF_OPEN -> OPEN: When trial request fails
  • + *
+ */ +public enum CircuitState { + /** + * Circuit is closed - normal operation, all requests pass through. + * Failures are tracked and counted towards the threshold. + */ + CLOSED, + + /** + * Circuit is open - requests are blocked immediately. + * Returns CircuitBreakerOpenError without attempting the operation. + */ + OPEN, + + /** + * Circuit is in recovery mode - allows one trial request. + * If the trial succeeds, transitions to CLOSED. + * If the trial fails, transitions back to OPEN. + */ + HALF_OPEN +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java new file mode 100644 index 000000000..e7817b24f --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import io.ballerina.stdlib.ftp.exception.ErrorTypeProvider; +import io.ballerina.stdlib.ftp.util.FtpErrorCodeAnalyzer; +import io.ballerina.stdlib.ftp.util.FtpUtil; + +import java.io.EOFException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.Locale; + +/** + * Categorizes exceptions into failure categories for circuit breaker evaluation. + * Uses a three-tier approach: + * 1. Check if exception implements ErrorTypeProvider (structured error types) + * 2. Check exception type in cause chain + * 3. Fall back to message-based categorization with FtpErrorCodeAnalyzer + */ +public final class FailureCategorizer { + + private FailureCategorizer() { + // Utility class + } + + /** + * Categorizes an exception into a FailureCategory. + * First tries ErrorTypeProvider, then exception types, then message patterns. + * + * @param throwable The exception to categorize + * @return The FailureCategory, or null if the exception doesn't match any specific category + */ + public static FailureCategory categorize(Throwable throwable) { + if (throwable == null) { + return null; + } + + // First, check if any exception in the chain implements ErrorTypeProvider + Throwable current = throwable; + while (current != null) { + FailureCategory category = categorizeByErrorTypeProvider(current); + if (category != null) { + return category; + } + + Throwable next = current.getCause(); + if (next == current) { + break; + } + current = next; + } + + // Second, try to categorize by walking the cause chain and checking exception types + current = throwable; + while (current != null) { + FailureCategory category = categorizeByExceptionType(current); + if (category != null) { + return category; + } + + // Also check message at each level using FtpErrorCodeAnalyzer + category = categorizeByMessage(current.getMessage()); + if (category != null) { + return category; + } + + Throwable next = current.getCause(); + if (next == current) { + break; + } + current = next; + } + + return null; + } + + /** + * Categorizes based on ErrorTypeProvider interface. + * This leverages the structured error type system. + * + * @param throwable The exception to categorize + * @return The FailureCategory, or null if not an ErrorTypeProvider or no match + */ + private static FailureCategory categorizeByErrorTypeProvider(Throwable throwable) { + if (!(throwable instanceof ErrorTypeProvider provider)) { + return null; + } + + String errorType = provider.errorType(); + if (errorType == null) { + return null; + } + + // Map error types to failure categories + if (errorType.equals(FtpUtil.ErrorType.ConnectionError.errorType())) { + return FailureCategory.CONNECTION_ERROR; + } + if (errorType.equals(FtpUtil.ErrorType.ServiceUnavailableError.errorType())) { + return FailureCategory.TRANSIENT_ERROR; + } + + return null; + } + + /** + * Categorizes based on exception type. + * + * @param throwable The exception to categorize + * @return The FailureCategory, or null if no match + */ + private static FailureCategory categorizeByExceptionType(Throwable throwable) { + // Connection errors + if (throwable instanceof ConnectException || + throwable instanceof SocketTimeoutException || + throwable instanceof UnknownHostException || + throwable instanceof NoRouteToHostException) { + return FailureCategory.CONNECTION_ERROR; + } + + // Server disconnect (EOF indicates unexpected connection close) - now TRANSIENT_ERROR + if (throwable instanceof EOFException) { + return FailureCategory.TRANSIENT_ERROR; + } + + // SocketException could be either connection or transient + if (throwable instanceof SocketException) { + String message = throwable.getMessage(); + if (message != null) { + String lower = message.toLowerCase(Locale.ROOT); + if (lower.contains("connection reset") || lower.contains("broken pipe")) { + return FailureCategory.TRANSIENT_ERROR; + } + } + return FailureCategory.CONNECTION_ERROR; + } + + return null; + } + + /** + * Categorizes based on error message patterns using FtpErrorCodeAnalyzer. + * + * @param message The error message to categorize + * @return The FailureCategory, or null if no match + */ + private static FailureCategory categorizeByMessage(String message) { + if (message == null) { + return null; + } + + // Use FtpErrorCodeAnalyzer for FTP response codes + if (FtpErrorCodeAnalyzer.isServiceUnavailable(message)) { + return FailureCategory.TRANSIENT_ERROR; + } + + if (FtpErrorCodeAnalyzer.isAuthenticationError(message)) { + return FailureCategory.AUTHENTICATION_ERROR; + } + + String lower = message.toLowerCase(Locale.ROOT); + + // Connection errors - network/connectivity issues + if (lower.contains("connection refused") || + lower.contains("connection timed out") || + lower.contains("connect timed out") || + lower.contains("network is unreachable") || + lower.contains("no route to host") || + lower.contains("host is unreachable") || + lower.contains("unknown host") || + lower.contains("connect failed") || + lower.contains("failed to connect") || + lower.contains("could not connect")) { + return FailureCategory.CONNECTION_ERROR; + } + + // Additional authentication error patterns (beyond FTP codes) + if (lower.contains("authentication failed") || + lower.contains("invalid credentials") || + lower.contains("auth fail") || + lower.contains("login failed") || + lower.contains("access denied") || + lower.contains("not authorized") || + lower.contains("login incorrect")) { + return FailureCategory.AUTHENTICATION_ERROR; + } + + // Transient errors - connection lost during operation + if (lower.contains("connection reset") || + lower.contains("broken pipe") || + lower.contains("connection closed") || + lower.contains("unexpected end") || + lower.contains("stream closed") || + lower.contains("socket closed") || + lower.contains("remote host closed")) { + return FailureCategory.TRANSIENT_ERROR; + } + + return null; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java new file mode 100644 index 000000000..987f8a90e --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +/** + * Categories of errors that can trip the circuit breaker. + * Maps to the Ballerina FailureCategory enum. + */ +public enum FailureCategory { + /** + * Connection-level failures (timeout, refused, reset, DNS resolution). + * Maps to ConnectionError type. + */ + CONNECTION_ERROR, + + /** + * Authentication failures (invalid credentials, key rejected). + * Detected via FTP 530 response code. + */ + AUTHENTICATION_ERROR, + + /** + * Transient server errors that may succeed on retry. + * Maps to ServiceUnavailableError type (FTP codes 421, 425, 426, 450, 451, 452). + */ + TRANSIENT_ERROR, + + /** + * All errors regardless of type. When configured, any error will trip the circuit. + */ + ALL_ERRORS; + + /** + * Converts a Ballerina enum string value to the corresponding Java enum. + * + * @param value The string value from Ballerina (e.g., "CONNECTION_ERROR") + * @return The corresponding FailureCategory, or null if not found + */ + public static FailureCategory fromString(String value) { + if (value == null) { + return null; + } + try { + return FailureCategory.valueOf(value); + } catch (IllegalArgumentException e) { + return null; + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index 7fc507fd6..c40a3de92 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -213,4 +213,9 @@ private FtpConstants() { public static final String OP_GET_JSON = "getJson"; public static final String OP_GET_XML = "getXml"; public static final String OP_GET_CSV = "getCsv"; + + // Circuit breaker configuration + public static final String CIRCUIT_BREAKER = "circuitBreaker"; + public static final String CIRCUIT_BREAKER_NATIVE_DATA = "CircuitBreaker"; + public static final String CIRCUIT_BREAKER_OPEN_ERROR = "CircuitBreakerOpenError"; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java index f5a32b565..dc586ef54 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java @@ -624,7 +624,8 @@ public enum ErrorType { FileAlreadyExistsError("FileAlreadyExistsError"), InvalidConfigError("InvalidConfigError"), ServiceUnavailableError("ServiceUnavailableError"), - AllRetryAttemptsFailedError("AllRetryAttemptsFailedError"); + AllRetryAttemptsFailedError("AllRetryAttemptsFailedError"), + CircuitBreakerOpenError("CircuitBreakerOpenError"); private String errorType; From 5f99bf11396a42ba75717efd481509f91de0cb3b Mon Sep 17 00:00:00 2001 From: Niveathika Date: Sun, 1 Feb 2026 21:50:28 +0530 Subject: [PATCH 02/15] Add testcases --- ballerina/tests/circuit_breaker_test.bal | 193 +++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 ballerina/tests/circuit_breaker_test.bal diff --git a/ballerina/tests/circuit_breaker_test.bal b/ballerina/tests/circuit_breaker_test.bal new file mode 100644 index 000000000..9cc431e88 --- /dev/null +++ b/ballerina/tests/circuit_breaker_test.bal @@ -0,0 +1,193 @@ +// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; + +// Test client creation with invalid failure threshold (> 1.0) +@test:Config {} +public function testClientWithInvalidFailureThresholdHigh() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + failureThreshold: 1.5, + resetTime: 30 + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold > 1.0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("failureThreshold"), + msg = "Error message should mention failureThreshold"); + } +} + +// Test client creation with invalid failure threshold (< 0.0) +@test:Config {} +public function testClientWithInvalidFailureThresholdLow() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + failureThreshold: -0.1, + resetTime: 30 + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold < 0.0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("failureThreshold"), + msg = "Error message should mention failureThreshold"); + } +} + +// Test client creation with bucketSize greater than timeWindow +@test:Config {} +public function testClientWithInvalidBucketSize() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + timeWindow: 30, + bucketSize: 60 // bucket larger than window + } + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when bucketSize > timeWindow"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("timeWindow") || cbClient.message().includes("bucketSize"), + msg = "Error message should mention timeWindow or bucketSize"); + } +} + +// Test that operations work normally with circuit breaker enabled +@test:Config {} +public function testOperationsWithCircuitBreaker() returns error? { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 5, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 30 + } + }; + Client cbClient = check new (config); + + // Test basic operations work with circuit breaker enabled + boolean exists = check cbClient->exists("/home/in"); + test:assertTrue(exists, msg = "Directory should exist"); + + FileInfo[] files = check cbClient->list("/home/in"); + test:assertTrue(files.length() >= 0, msg = "List operation should return files array"); + + check cbClient->close(); +} + +// Test that client creation fails when server doesn't exist (connection error, not circuit breaker) +@test:Config {} +public function testClientCreationFailsForNonExistentServer() { + // Create a client pointing to a non-existent server + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21299, // Non-existent port + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 5, + failureCategories: [CONNECTION_ERROR] + } + }; + + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail for non-existent server"); +} + +// Test that circuit breaker opens after failures and returns CircuitBreakerOpenError +@test:Config {} +public function testCircuitBreakerOpensAfterFailures() returns error? { + // Configure circuit breaker with low thresholds to trigger quickly + // - requestVolumeThreshold: 2 (only need 2 requests before circuit can trip) + // - failureThreshold: 0.5 (50% failure rate trips the circuit) + // - ALL_ERRORS: any error counts as failure + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 2, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 30, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + + // Make requests that will fail (reading non-existent files) + // These failures should accumulate and trip the circuit + string nonExistentPath = "/non/existent/file/path/that/does/not/exist.txt"; + + // First failure + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is Error, msg = "First request should fail (file not found)"); + test:assertFalse(result1 is CircuitBreakerOpenError, + msg = "First failure should be regular error, not circuit breaker error"); + + // Second failure - after this, we have 2 requests with 100% failure rate + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is Error, msg = "Second request should fail"); + test:assertFalse(result2 is CircuitBreakerOpenError, + msg = "Second failure should be regular error, not circuit breaker error"); + + // Third request - circuit should now be OPEN and reject immediately + byte[]|Error result3 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result3 is CircuitBreakerOpenError, + msg = "Third request should fail with CircuitBreakerOpenError (circuit is open)"); + + if result3 is CircuitBreakerOpenError { + string message = result3.message(); + test:assertTrue(message.includes("Circuit breaker is OPEN"), + msg = "Error message should indicate circuit breaker is open"); + } + + check cbClient->close(); +} From 361ae0c9bd9ea9eb614e8aa272f290bf3535fa67 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Sun, 1 Feb 2026 21:53:06 +0530 Subject: [PATCH 03/15] Add documentation --- changelog.md | 2 + docs/spec/spec.md | 104 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 102 insertions(+), 4 deletions(-) diff --git a/changelog.md b/changelog.md index 8995364f4..b68d050c4 100644 --- a/changelog.md +++ b/changelog.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## unreleased ### Added + +- [Add Circuit breaker support](https://github.com/ballerina-platform/ballerina-library/issues/8382) - [Add automatic retry support with exponential backoff for FTP client](https://github.com/ballerina-platform/ballerina-library/issues/8585) - [Add FTP Listener Coordination Support](https://github.com/ballerina-platform/ballerina-library/issues/8490) - [Add distinct error types](https://github.com/ballerina-platform/ballerina-library/issues/8597) diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 50fa64433..2ab33c626 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -3,7 +3,7 @@ _Owners_: @shafreenAnfar @dilanSachi @Bhashinee _Reviewers_: @shafreenAnfar @Bhashinee _Created_: 2020/10/28 -_Updated_: 2026/01/26 +_Updated_: 2026/02/03 _Edition_: Swan Lake ## Introduction @@ -32,6 +32,10 @@ The conforming implementation of the specification is released and included in t - [3.2.2. Secure Client](#322-secure-client) - [3.2.3. Client with Retry Configuration](#323-client-with-retry-configuration) - [3.3. Functions](#33-functions) + - [3.4. Circuit Breaker](#34-circuit-breaker) + - [State Machine](#state-machine) + - [Configuration](#configuration) + - [Usage Example](#usage-example) - [4. Listener](#4-listener) - [4.1. Configurations](#41-configurations) - [4.2. Initialization](#42-initialization) @@ -163,7 +167,6 @@ public type Error distinct error; * `ConnectionError` - Represents errors that occur when connecting to the FTP/SFTP server. This includes network failures, host unreachable, connection refused, etc. ```ballerina -# Represents an error that occurs when connecting to the FTP/SFTP server. public type ConnectionError distinct Error; ``` @@ -182,24 +185,37 @@ public type FileAlreadyExistsError distinct Error; public type InvalidConfigError distinct Error; ``` -* `ServiceUnavailableError` - Represents errors that occur when the FTP/SFTP service is temporarily unavailable. This is a transient error indicating the operation may succeed on retry. Common causes include server overload (FTP code 421), connection issues (425, 426), temporary file locks (450), or server-side processing errors (451). This error type is designed for use with retry and circuit breaker patterns. +* `ServiceUnavailableError` - Represents errors that occur when the FTP/SFTP service is temporarily unavailable. This is a transient error indicating the operation may succeed on retry. Common causes include server overload (FTP code 421), connection issues (425, 426), temporary file locks (450), or server-side processing errors (451). ```ballerina public type ServiceUnavailableError distinct Error; ``` +* `AllRetryAttemptsFailedError` - Represents an error that occurs when all retry attempts have been exhausted. This error wraps the last failure encountered during retry attempts. +```ballerina +public type AllRetryAttemptsFailedError distinct Error; +``` + +* `CircuitBreakerOpenError` - Error returned when the circuit breaker is in OPEN state. This indicates the FTP server is unavailable and requests are being blocked to prevent cascade failures. This is a subtype of `ServiceUnavailableError`. +```ballerina +public type CircuitBreakerOpenError distinct ServiceUnavailableError; +``` + All specific error types are subtypes of the base `Error` type, allowing for both specific and general error handling: ```ballerina // Handle specific error types ftp:Client|ftp:Error result = new(config); if result is ftp:ConnectionError { // Handle connection failures specifically +} else if result is ftp:CircuitBreakerOpenError { + // Circuit breaker is open - implement fallback logic +} else if result is ftp:AllRetryAttemptsFailedError { + // All retries exhausted - consider alerting } else if result is ftp:ServiceUnavailableError { // Transient error - retry the operation } else if result is ftp:Error { // Handle any other FTP error } ``` - ### 2.4. Retry Configuration * `RetryConfig` record represents the configuration for automatic retries of operations. ```ballerina @@ -241,6 +257,8 @@ public type ClientConfiguration record {| boolean laxDataBinding = false; # Retry configuration for read operations RetryConfig retryConfig?; + # Circuit breaker configuration to prevent cascade failures + CircuitBreakerConfig circuitBreaker?; |}; ``` * InputContent record represents the configurations for the input given for `put` and `append` operations. @@ -663,6 +681,84 @@ remote isolated function list(string path) returns FileInfo[]|Error; # an error occurred while checking the path remote isolated function isDirectory(string path) returns boolean|Error; ``` + +### 3.4. Circuit Breaker +The circuit breaker pattern prevents cascade failures by temporarily blocking requests to an FTP server experiencing issues. When failures reach a threshold, the circuit "opens" and subsequent requests fail fast with `CircuitBreakerOpenError` without attempting the actual operation. + +#### State Machine +The circuit breaker operates in three states: +- **CLOSED**: Normal operation. Requests are allowed and failures are tracked. +- **OPEN**: Circuit is tripped. All requests are rejected immediately with `CircuitBreakerOpenError`. +- **HALF_OPEN**: After the reset time elapses, one trial request is allowed. Success returns to CLOSED; failure returns to OPEN. + +#### Configuration +* CircuitBreakerConfig record contains the circuit breaker settings. +```ballerina +public type CircuitBreakerConfig record {| + # Rolling window configuration for failure tracking + RollingWindow rollingWindow = {}; + # Failure ratio threshold (0.0 to 1.0) that trips the circuit + float failureThreshold = 0.5; + # Time in seconds to wait before transitioning from OPEN to HALF_OPEN + decimal resetTime = 30; + # Categories of failures that count towards tripping the circuit + FailureCategory[] failureCategories = [CONNECTION_ERROR, TRANSIENT_ERROR]; +|}; +``` +* RollingWindow record configures the sliding window for tracking failures. +```ballerina +public type RollingWindow record {| + # Minimum number of requests in the window before the circuit can trip + int requestVolumeThreshold = 10; + # Time window in seconds for tracking failures + decimal timeWindow = 60; + # Size of each time bucket in seconds (timeWindow / bucketSize = number of buckets) + decimal bucketSize = 10; +|}; +``` +* FailureCategory enum specifies which types of failures count towards tripping the circuit. +```ballerina +public enum FailureCategory { + # Connection failures (network issues, timeouts) + CONNECTION_ERROR, + # Authentication failures (invalid credentials) + AUTHENTICATION_ERROR, + # Server disconnection during operation + TRANSIENT_ERROR, + # All errors count as failures + ALL_ERRORS +} +``` +#### Usage Example +```ballerina +ftp:ClientConfiguration ftpConfig = { + protocol: ftp:FTP, + host: "ftp.example.com", + port: 21, + auth: { + credentials: {username: "user", password: "pass"} + }, + circuitBreaker: { + failureThreshold: 0.5, + resetTime: 30, + rollingWindow: { + requestVolumeThreshold: 5, + timeWindow: 60, + bucketSize: 10 + }, + failureCategories: [ftp:CONNECTION_ERROR, ftp:TRANSIENT_ERROR] + } +}; + +ftp:Client ftpClient = check new(ftpConfig); + +// Operations will fail fast with CircuitBreakerOpenError when circuit is open +byte[]|ftp:Error content = ftpClient->getBytes("/file.txt"); +if content is ftp:CircuitBreakerOpenError { + // Handle circuit breaker open state - server is unavailable +} +``` + ## 4. Listener The `ftp:Listener` is used to listen to a remote FTP location and trigger a `WatchEvent` type of event when new files are added to or deleted from the directory. The `onFileChange` function is invoked when a new file is added From 124829a1355b8c906bf08cdc661501b3e324c5ce Mon Sep 17 00:00:00 2001 From: Niveathika Date: Tue, 3 Feb 2026 20:40:39 +0530 Subject: [PATCH 04/15] Fix logic flows --- .../client/circuitbreaker/CircuitBreaker.java | 74 ++++++++++++++++--- .../circuitbreaker/CircuitBreakerConfig.java | 14 +++- .../client/circuitbreaker/CircuitHealth.java | 2 +- 3 files changed, 76 insertions(+), 14 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java index 382db427f..b40674a51 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java @@ -37,6 +37,7 @@ public class CircuitBreaker { private final CircuitBreakerConfig config; private final CircuitHealth health; private volatile CircuitState state; + private boolean trialRequestInProgress; private final Object lock = new Object(); /** @@ -52,6 +53,7 @@ public CircuitBreaker(CircuitBreakerConfig config) { config.getTimeWindowMillis() ); this.state = CircuitState.CLOSED; + this.trialRequestInProgress = false; log.debug("Circuit breaker initialized with {} buckets, {}ms window, {}% failure threshold", config.getNumberOfBuckets(), config.getTimeWindowMillis(), config.getFailureThreshold() * 100); @@ -69,9 +71,12 @@ public T execute(Callable operation) throws Exception { synchronized (lock) { updateState(); - if (state == CircuitState.OPEN) { + if (state == CircuitState.OPEN || (state == CircuitState.HALF_OPEN && trialRequestInProgress)) { throw createServiceUnavailableError(); } + if (state == CircuitState.HALF_OPEN) { + trialRequestInProgress = true; + } // Record the request health.prepareRollingWindow(); @@ -81,14 +86,36 @@ public T execute(Callable operation) throws Exception { try { T result = operation.call(); synchronized (lock) { - recordSuccess(); + if (state == CircuitState.HALF_OPEN) { + recordSuccess(); + state = CircuitState.CLOSED; + trialRequestInProgress = false; + health.resetAllBuckets(); + log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); + } else { + recordSuccess(); + updateState(); + } } return result; } catch (Exception e) { synchronized (lock) { - if (shouldCountAsFailure(e)) { + if (state == CircuitState.HALF_OPEN) { + if (shouldCountAsFailure(e)) { + recordFailure(); + state = CircuitState.OPEN; + log.info("Circuit breaker transitioning from HALF_OPEN to OPEN (trial failed)"); + } else { + recordSuccess(); + state = CircuitState.CLOSED; + health.resetAllBuckets(); + log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); + } + trialRequestInProgress = false; + } else if (shouldCountAsFailure(e)) { recordFailure(); log.debug("Circuit breaker recorded failure: {}", e.getMessage()); + updateState(); } } throw e; @@ -103,6 +130,9 @@ public T execute(Callable operation) throws Exception { public boolean isOpen() { synchronized (lock) { updateState(); + if (state == CircuitState.HALF_OPEN) { + return trialRequestInProgress; + } return state == CircuitState.OPEN; } } @@ -113,6 +143,13 @@ public boolean isOpen() { */ public void recordRequestStart() { synchronized (lock) { + updateState(); + if (state == CircuitState.OPEN || (state == CircuitState.HALF_OPEN && trialRequestInProgress)) { + return; + } + if (state == CircuitState.HALF_OPEN) { + trialRequestInProgress = true; + } health.prepareRollingWindow(); health.recordRequest(); } @@ -126,6 +163,27 @@ public void recordRequestStart() { */ public void recordOutcome(Throwable error) { synchronized (lock) { + if (state == CircuitState.HALF_OPEN) { + if (error == null) { + recordSuccess(); + state = CircuitState.CLOSED; + health.resetAllBuckets(); + log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); + } else if (shouldCountAsFailure(error)) { + recordFailure(); + log.debug("Circuit breaker recorded failure: {}", error.getMessage()); + state = CircuitState.OPEN; + log.info("Circuit breaker transitioning from HALF_OPEN to OPEN (trial failed)"); + } else { + recordSuccess(); + state = CircuitState.CLOSED; + health.resetAllBuckets(); + log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); + } + trialRequestInProgress = false; + return; + } + if (error == null) { recordSuccess(); } else if (shouldCountAsFailure(error)) { @@ -169,19 +227,13 @@ private void updateState() { case OPEN: if (resetTimeElapsed()) { state = CircuitState.HALF_OPEN; + trialRequestInProgress = false; log.info("Circuit breaker transitioning from OPEN to HALF_OPEN"); } break; case HALF_OPEN: - if (health.isLastRequestSuccess()) { - state = CircuitState.CLOSED; - health.resetAllBuckets(); - log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); - } else { - state = CircuitState.OPEN; - log.info("Circuit breaker transitioning from HALF_OPEN to OPEN (trial failed)"); - } + // HALF_OPEN transition is evaluated after the trial request completes. break; case CLOSED: diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java index d4273301d..0d4ff1b2d 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java @@ -98,7 +98,7 @@ public static CircuitBreakerConfig fromBMap(BMap config) throws } // Validate configuration - validate(failureThreshold, timeWindowMillis, bucketSizeMillis); + validate(failureThreshold, timeWindowMillis, bucketSizeMillis, requestVolumeThreshold, resetTimeMillis); return new CircuitBreakerConfig(requestVolumeThreshold, timeWindowMillis, bucketSizeMillis, failureThreshold, resetTimeMillis, failureCategories); @@ -111,17 +111,27 @@ private static long decimalToMillis(Object value) { return (long) (((Number) value).doubleValue() * 1000); } - private static void validate(float failureThreshold, long timeWindowMillis, long bucketSizeMillis) + private static void validate(float failureThreshold, long timeWindowMillis, long bucketSizeMillis, + int requestVolumeThreshold, long resetTimeMillis) throws BallerinaFtpException { if (failureThreshold < 0.0f || failureThreshold > 1.0f) { throw new BallerinaFtpException("Circuit breaker failureThreshold must be between 0.0 and 1.0"); } + if (requestVolumeThreshold <= 0) { + throw new BallerinaFtpException("Circuit breaker requestVolumeThreshold must be greater than 0"); + } + if (resetTimeMillis <= 0) { + throw new BallerinaFtpException("Circuit breaker resetTime must be greater than 0"); + } if (bucketSizeMillis <= 0) { throw new BallerinaFtpException("Circuit breaker bucketSize must be greater than 0"); } if (timeWindowMillis < bucketSizeMillis) { throw new BallerinaFtpException("Circuit breaker timeWindow must be greater than or equal to bucketSize"); } + if (timeWindowMillis % bucketSizeMillis != 0) { + throw new BallerinaFtpException("Circuit breaker timeWindow must be evenly divisible by bucketSize"); + } } public int getRequestVolumeThreshold() { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java index 2805a4360..8a6ffd187 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -64,7 +64,7 @@ public CircuitHealth(int numberOfBuckets, long bucketSizeMillis, long timeWindow public int getCurrentBucketId() { long elapsedMillis = Instant.now().toEpochMilli() - startTime.toEpochMilli(); long windowElapsed = elapsedMillis % timeWindowMillis; - return (int) (((windowElapsed / bucketSizeMillis) + 1) % numberOfBuckets); + return (int) ((windowElapsed / bucketSizeMillis) % numberOfBuckets); } /** From cb25070a890950b9d8010811097cee1406af9f7c Mon Sep 17 00:00:00 2001 From: Niveathika Date: Tue, 3 Feb 2026 20:44:30 +0530 Subject: [PATCH 05/15] Add testcases --- ballerina/tests/circuit_breaker_test.bal | 122 +++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/ballerina/tests/circuit_breaker_test.bal b/ballerina/tests/circuit_breaker_test.bal index 9cc431e88..54735153c 100644 --- a/ballerina/tests/circuit_breaker_test.bal +++ b/ballerina/tests/circuit_breaker_test.bal @@ -15,6 +15,7 @@ // under the License. import ballerina/test; +import ballerina/lang.runtime; // Test client creation with invalid failure threshold (> 1.0) @test:Config {} @@ -81,6 +82,79 @@ public function testClientWithInvalidBucketSize() { } } +// Test client creation with invalid requestVolumeThreshold (<= 0) +@test:Config {} +public function testClientWithInvalidRequestVolumeThreshold() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 0, + timeWindow: 60, + bucketSize: 10 + } + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when requestVolumeThreshold <= 0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("requestVolumeThreshold"), + msg = "Error message should mention requestVolumeThreshold"); + } +} + +// Test client creation with invalid resetTime (<= 0) +@test:Config {} +public function testClientWithInvalidResetTime() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + resetTime: 0 + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when resetTime <= 0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("resetTime"), + msg = "Error message should mention resetTime"); + } +} + +// Test client creation with timeWindow not divisible by bucketSize +@test:Config {} +public function testClientWithNonDivisibleTimeWindow() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 11 + } + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when timeWindow is not divisible by bucketSize"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("timeWindow") || cbClient.message().includes("bucketSize"), + msg = "Error message should mention timeWindow or bucketSize"); + } +} + // Test that operations work normally with circuit breaker enabled @test:Config {} public function testOperationsWithCircuitBreaker() returns error? { @@ -191,3 +265,51 @@ public function testCircuitBreakerOpensAfterFailures() returns error? { check cbClient->close(); } + +// Test HALF_OPEN transition: allow a single trial request after reset time, then reopen on failure +@test:Config {} +public function testCircuitBreakerHalfOpenTrial() returns error? { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 1, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + string nonExistentPath = "/non/existent/file/path/half-open-test.txt"; + + // Trip the circuit quickly + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is Error, msg = "Initial request should fail"); + + // Circuit should now be OPEN + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is CircuitBreakerOpenError, + msg = "Circuit should be OPEN after failure threshold is met"); + + // Wait for reset time to elapse + runtime:sleep(2); + + // First request after reset should be allowed (HALF_OPEN trial) + byte[]|Error trialResult = cbClient->getBytes(nonExistentPath); + test:assertFalse(trialResult is CircuitBreakerOpenError, + msg = "HALF_OPEN should allow a single trial request"); + + // Trial fails, circuit should move back to OPEN and block immediately + byte[]|Error result3 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result3 is CircuitBreakerOpenError, + msg = "Circuit should reopen after failed HALF_OPEN trial"); + + check cbClient->close(); +} From 6b92b24b269636cd8404a14e14b33d63e34eb7ba Mon Sep 17 00:00:00 2001 From: Niveathika Date: Wed, 4 Feb 2026 15:02:15 +0530 Subject: [PATCH 06/15] Remove unused function --- .../client/circuitbreaker/CircuitBreaker.java | 64 ------------------- 1 file changed, 64 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java index b40674a51..d5af98b11 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java @@ -25,7 +25,6 @@ import java.time.Instant; import java.util.Set; -import java.util.concurrent.Callable; /** * Circuit breaker implementation for FTP client operations. @@ -59,69 +58,6 @@ public CircuitBreaker(CircuitBreakerConfig config) { config.getFailureThreshold() * 100); } - /** - * Executes an operation with circuit breaker protection. - * - * @param operation The operation to execute - * @param The return type of the operation - * @return The result of the operation - * @throws Exception If the operation fails or circuit is open - */ - public T execute(Callable operation) throws Exception { - synchronized (lock) { - updateState(); - - if (state == CircuitState.OPEN || (state == CircuitState.HALF_OPEN && trialRequestInProgress)) { - throw createServiceUnavailableError(); - } - if (state == CircuitState.HALF_OPEN) { - trialRequestInProgress = true; - } - - // Record the request - health.prepareRollingWindow(); - health.recordRequest(); - } - - try { - T result = operation.call(); - synchronized (lock) { - if (state == CircuitState.HALF_OPEN) { - recordSuccess(); - state = CircuitState.CLOSED; - trialRequestInProgress = false; - health.resetAllBuckets(); - log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); - } else { - recordSuccess(); - updateState(); - } - } - return result; - } catch (Exception e) { - synchronized (lock) { - if (state == CircuitState.HALF_OPEN) { - if (shouldCountAsFailure(e)) { - recordFailure(); - state = CircuitState.OPEN; - log.info("Circuit breaker transitioning from HALF_OPEN to OPEN (trial failed)"); - } else { - recordSuccess(); - state = CircuitState.CLOSED; - health.resetAllBuckets(); - log.info("Circuit breaker transitioning from HALF_OPEN to CLOSED (trial succeeded)"); - } - trialRequestInProgress = false; - } else if (shouldCountAsFailure(e)) { - recordFailure(); - log.debug("Circuit breaker recorded failure: {}", e.getMessage()); - updateState(); - } - } - throw e; - } - } - /** * Checks if the circuit is currently open (blocking requests). * From 7585ca223c65854330dc986851500376a9e6e929 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Wed, 4 Feb 2026 15:02:26 +0530 Subject: [PATCH 07/15] Improve doc --- docs/spec/spec.md | 2 +- .../stdlib/ftp/client/circuitbreaker/CircuitHealth.java | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 2ab33c626..496b85d3a 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -195,7 +195,7 @@ public type ServiceUnavailableError distinct Error; public type AllRetryAttemptsFailedError distinct Error; ``` -* `CircuitBreakerOpenError` - Error returned when the circuit breaker is in OPEN state. This indicates the FTP server is unavailable and requests are being blocked to prevent cascade failures. This is a subtype of `ServiceUnavailableError`. +* `CircuitBreakerOpenError` - Error returned when the circuit breaker is in OPEN state. This indicates the FTP server is unavailable and requests are being blocked to prevent cascade failures. This is a distinct subtype of `ServiceUnavailableError`. ```ballerina public type CircuitBreakerOpenError distinct ServiceUnavailableError; ``` diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java index 8a6ffd187..e23b8d46a 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -22,6 +22,11 @@ /** * Tracks the health metrics of the circuit breaker using a sliding window of buckets. + *

+ * This class is NOT thread-safe. All methods must be called within synchronized blocks + * from the CircuitBreaker class. The CircuitBreaker maintains synchronization at a higher + * level to ensure thread-safe access to CircuitHealth state. + *

*/ public class CircuitHealth { private final Bucket[] buckets; @@ -83,8 +88,8 @@ public void prepareRollingWindow() { int currentBucketId = getCurrentBucketId(); // Reset buckets that have become stale - if (currentBucketId == lastUsedBucketId && idleTimeMillis > bucketSizeMillis) { - // Same bucket but stale - reset all + if (currentBucketId == lastUsedBucketId && idleTimeMillis > timeWindowMillis) { + // Same bucket but idle exceeded the full time window - reset all resetAllBuckets(); } else if (currentBucketId < lastUsedBucketId) { // Wrapped around - reset from 0 to current and from lastUsed+1 to end From ac99f003f91c62e90ed2f8c090e6befbd48e7539 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Wed, 4 Feb 2026 17:46:36 +0530 Subject: [PATCH 08/15] Fail fast for cicuit open --- .../stdlib/ftp/client/FtpRetryHelper.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java index 40088a794..957e61040 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java @@ -52,6 +52,12 @@ protected static Object executeWithRetry(BObject clientConnector, Supplier Date: Wed, 4 Feb 2026 15:02:33 +0530 Subject: [PATCH 09/15] Add test cases --- ballerina/tests/circuit_breaker_test.bal | 109 ++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/ballerina/tests/circuit_breaker_test.bal b/ballerina/tests/circuit_breaker_test.bal index 54735153c..9da7b7b07 100644 --- a/ballerina/tests/circuit_breaker_test.bal +++ b/ballerina/tests/circuit_breaker_test.bal @@ -1,4 +1,4 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. +// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except @@ -313,3 +313,110 @@ public function testCircuitBreakerHalfOpenTrial() returns error? { check cbClient->close(); } + +// Test HALF_OPEN transition: successful trial request should close the circuit +@test:Config {} +public function testCircuitBreakerHalfOpenSuccessfulTrial() returns error? { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 1, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + string nonExistentPath = "/non/existent/file/path/half-open-success-test.txt"; + + // Trip the circuit quickly + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is Error, msg = "Initial request should fail"); + + // Circuit should now be OPEN + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is CircuitBreakerOpenError, + msg = "Circuit should be OPEN after failure threshold is met"); + + // Wait for reset time to elapse + runtime:sleep(2); + + // First request after reset should be allowed (HALF_OPEN trial) + // This time, make a successful request + boolean|Error trialResult = cbClient->exists("/home/in"); + test:assertFalse(trialResult is CircuitBreakerOpenError, + msg = "HALF_OPEN should allow a single trial request"); + test:assertTrue(trialResult is boolean, + msg = "Trial request should succeed"); + + // Circuit should now be CLOSED, so next request should succeed + boolean|Error result3 = cbClient->exists("/home/in"); + test:assertFalse(result3 is CircuitBreakerOpenError, + msg = "Circuit should be CLOSED after successful HALF_OPEN trial"); + test:assertTrue(result3 is boolean, + msg = "Request should succeed when circuit is CLOSED"); + + check cbClient->close(); +} + +// Test interaction between retry and circuit breaker mechanisms +@test:Config {} +public function testRetryAndCircuitBreakerInteraction() returns error? { + // Configure both retry and circuit breaker + // Each retry attempt counts as a separate request in circuit breaker + // With 1 initial + 3 retries = 4 attempts per request + // Setting threshold to 8 means circuit trips after 2 full request cycles + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + retryConfig: { + count: 3, + interval: 0.5, + backOffFactor: 1.0, + maxWaitInterval: 2.0 + }, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 8, // 2 full requests (4 attempts each) + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, // 50% failure rate trips circuit + resetTime: 30, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + string nonExistentPath = "/non/existent/file/retry-cb-test.txt"; + + // First request: Will fail after all retries (4 attempts total) + // Circuit breaker records 4 failures but threshold (8) not met yet + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is AllRetryAttemptsFailedError, + msg = "First request should fail after all retries"); + + // Second request: Will also fail after all retries (4 more attempts) + // After 8 total failures, circuit trips at 100% failure rate + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is AllRetryAttemptsFailedError, + msg = "Second request should fail after all retries"); + + // Third request: Circuit should now be OPEN and block immediately + // This should NOT go through retries, should fail fast + byte[]|Error result3 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result3 is CircuitBreakerOpenError, + msg = "Third request should fail fast with circuit breaker open"); + + check cbClient->close(); +} From d9d7d582cd6398eeadaf9814f337197f6a01efb0 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Wed, 4 Feb 2026 20:28:13 +0530 Subject: [PATCH 10/15] Fix license --- .../java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java | 6 +++--- .../ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java | 2 +- .../stdlib/ftp/client/circuitbreaker/CircuitBreaker.java | 6 +++--- .../ftp/client/circuitbreaker/CircuitBreakerConfig.java | 6 +++--- .../stdlib/ftp/client/circuitbreaker/CircuitHealth.java | 6 +++--- .../stdlib/ftp/client/circuitbreaker/CircuitState.java | 6 +++--- .../ftp/client/circuitbreaker/FailureCategorizer.java | 6 +++--- .../stdlib/ftp/client/circuitbreaker/FailureCategory.java | 6 +++--- 8 files changed, 22 insertions(+), 22 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java index 957e61040..318191414 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java index a490caa34..a608da2ce 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2025 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java index d5af98b11..315c3e7a9 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java index 0d4ff1b2d..2bf2e1744 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java index e23b8d46a..348a976e6 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java index 053796c40..153b86842 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java index e7817b24f..7d652ab3f 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java index 987f8a90e..e2cb68803 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ From 32b14d08f3bb8aece6286cf966c130a81c6aaaaa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Feb 2026 05:21:08 +0000 Subject: [PATCH 11/15] Initial plan From ab22298ebad3efa829a44f3468f9fd0e132cb0e9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 6 Feb 2026 05:28:20 +0000 Subject: [PATCH 12/15] Replace synchronized blocks with StampedLock in CircuitBreaker for better concurrency Co-authored-by: niveathika <27669465+niveathika@users.noreply.github.com> --- .../client/circuitbreaker/CircuitBreaker.java | 60 +++++++++++++++---- .../client/circuitbreaker/CircuitHealth.java | 5 +- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java index 315c3e7a9..6cf298259 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java @@ -25,10 +25,15 @@ import java.time.Instant; import java.util.Set; +import java.util.concurrent.locks.StampedLock; /** * Circuit breaker implementation for FTP client operations. * Prevents cascade failures by temporarily blocking requests when the server is experiencing issues. + * + *

Uses {@link StampedLock} with optimistic reads on the hot path ({@code isOpen()}) + * to avoid contention when the circuit is CLOSED—the common case. Write locks are + * acquired only when state or health metrics need to be mutated.

*/ public class CircuitBreaker { private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class); @@ -36,8 +41,8 @@ public class CircuitBreaker { private final CircuitBreakerConfig config; private final CircuitHealth health; private volatile CircuitState state; - private boolean trialRequestInProgress; - private final Object lock = new Object(); + private volatile boolean trialRequestInProgress; + private final StampedLock lock = new StampedLock(); /** * Creates a new CircuitBreaker with the specified configuration. @@ -60,16 +65,36 @@ public CircuitBreaker(CircuitBreakerConfig config) { /** * Checks if the circuit is currently open (blocking requests). + * Uses an optimistic read first; falls back to a write lock only when a + * state transition may be required. * * @return true if the circuit is open */ public boolean isOpen() { - synchronized (lock) { + // Fast path: optimistic read – succeeds without blocking when the + // circuit is CLOSED and no state transition is needed. + long stamp = lock.tryOptimisticRead(); + CircuitState currentState = this.state; + boolean trialInProgress = this.trialRequestInProgress; + if (lock.validate(stamp)) { + if (currentState == CircuitState.CLOSED) { + return false; + } + if (currentState == CircuitState.HALF_OPEN) { + return trialInProgress; + } + } + + // Slow path: acquire a write lock so we can run updateState(). + stamp = lock.writeLock(); + try { updateState(); if (state == CircuitState.HALF_OPEN) { return trialRequestInProgress; } return state == CircuitState.OPEN; + } finally { + lock.unlockWrite(stamp); } } @@ -78,7 +103,8 @@ public boolean isOpen() { * Call this BEFORE starting the operation. */ public void recordRequestStart() { - synchronized (lock) { + long stamp = lock.writeLock(); + try { updateState(); if (state == CircuitState.OPEN || (state == CircuitState.HALF_OPEN && trialRequestInProgress)) { return; @@ -88,6 +114,8 @@ public void recordRequestStart() { } health.prepareRollingWindow(); health.recordRequest(); + } finally { + lock.unlockWrite(stamp); } } @@ -98,7 +126,8 @@ public void recordRequestStart() { * @param error The throwable if the operation failed, null if successful */ public void recordOutcome(Throwable error) { - synchronized (lock) { + long stamp = lock.writeLock(); + try { if (state == CircuitState.HALF_OPEN) { if (error == null) { recordSuccess(); @@ -128,6 +157,8 @@ public void recordOutcome(Throwable error) { } // Update state after recording the outcome updateState(); + } finally { + lock.unlockWrite(stamp); } } @@ -154,7 +185,7 @@ public BError createServiceUnavailableError() { /** * Updates the circuit state based on current metrics. - * Must be called within synchronized block. + * Must be called while holding the write lock. */ private void updateState() { health.prepareRollingWindow(); @@ -202,7 +233,7 @@ private boolean resetTimeElapsed() { /** * Records a successful operation. - * Must be called within synchronized block. + * Must be called while holding the write lock. */ private void recordSuccess() { health.recordSuccess(); @@ -210,7 +241,7 @@ private void recordSuccess() { /** * Records a failed operation. - * Must be called within synchronized block. + * Must be called while holding the write lock. */ private void recordFailure() { health.recordFailure(); @@ -247,8 +278,17 @@ private boolean shouldCountAsFailure(Throwable e) { * @return The current state */ public CircuitState getState() { - synchronized (lock) { - return state; + long stamp = lock.tryOptimisticRead(); + CircuitState currentState = this.state; + if (lock.validate(stamp)) { + return currentState; + } + // Fall back to a read lock if the optimistic read was invalidated. + stamp = lock.readLock(); + try { + return this.state; + } finally { + lock.unlockRead(stamp); } } } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java index 348a976e6..9c8148e46 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -23,9 +23,8 @@ /** * Tracks the health metrics of the circuit breaker using a sliding window of buckets. *

- * This class is NOT thread-safe. All methods must be called within synchronized blocks - * from the CircuitBreaker class. The CircuitBreaker maintains synchronization at a higher - * level to ensure thread-safe access to CircuitHealth state. + * This class is NOT thread-safe. All methods must be called while holding the + * write lock from the {@link CircuitBreaker}'s {@link java.util.concurrent.locks.StampedLock}. *

*/ public class CircuitHealth { From bb9415e10963190c629607e5cd9668b5f392d06b Mon Sep 17 00:00:00 2001 From: Niveathika Date: Fri, 6 Feb 2026 11:50:48 +0530 Subject: [PATCH 13/15] Fix unnecessary comments and warning suppression --- .../stdlib/ftp/client/FtpClient.java | 3 --- .../stdlib/ftp/client/FtpClientHelper.java | 1 - .../ftp/client/circuitbreaker/Bucket.java | 6 ++--- .../client/circuitbreaker/CircuitBreaker.java | 27 ------------------- .../circuitbreaker/FailureCategorizer.java | 19 ------------- 5 files changed, 3 insertions(+), 53 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 160f40cee..13b1aca99 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -385,7 +385,6 @@ private static Object createAndStoreConnector(BObject clientEndpoint, Map config) { BMap cbConfig = (BMap) config.getMapValue( StringUtils.fromString(FtpConstants.CIRCUIT_BREAKER)); @@ -878,14 +877,12 @@ private InputStream fetchNextStream() { private byte[] toBytes(Object value) { // Each element is a record with a 'value' field. - @SuppressWarnings("unchecked") BMap streamRecord = (BMap) value; Object val = streamRecord.get(FtpConstants.FIELD_VALUE); if (val instanceof BArray array) { return bytesFromArray(array); } - @SuppressWarnings("unchecked") BMap recordValue = (BMap) val; return bytesFromRecord(recordValue, isFirstRow); } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java index ea47098cc..cc52490ef 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java @@ -295,7 +295,6 @@ public void close() { } } - @SuppressWarnings("unchecked") private static void callStreamNext(Environment env, BObject entity, BufferHolder bufferHolder, BObject iteratorObj) { Object result = env.getRuntime().callMethod(iteratorObj, BYTE_STREAM_NEXT_FUNC, null); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java index a608da2ce..9658113e7 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/Bucket.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2025 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java index 6cf298259..994a52408 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreaker.java @@ -30,10 +30,6 @@ /** * Circuit breaker implementation for FTP client operations. * Prevents cascade failures by temporarily blocking requests when the server is experiencing issues. - * - *

Uses {@link StampedLock} with optimistic reads on the hot path ({@code isOpen()}) - * to avoid contention when the circuit is CLOSED—the common case. Write locks are - * acquired only when state or health metrics need to be mutated.

*/ public class CircuitBreaker { private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class); @@ -183,10 +179,6 @@ public BError createServiceUnavailableError() { return FtpUtil.createError(message, FtpUtil.ErrorType.CircuitBreakerOpenError.errorType()); } - /** - * Updates the circuit state based on current metrics. - * Must be called while holding the write lock. - */ private void updateState() { health.prepareRollingWindow(); @@ -217,11 +209,6 @@ private void updateState() { } } - /** - * Checks if the reset time has elapsed since the circuit opened. - * - * @return true if reset time has elapsed - */ private boolean resetTimeElapsed() { Instant lastError = health.getLastErrorTime(); if (lastError == null) { @@ -231,28 +218,14 @@ private boolean resetTimeElapsed() { return elapsedMillis >= config.getResetTimeMillis(); } - /** - * Records a successful operation. - * Must be called while holding the write lock. - */ private void recordSuccess() { health.recordSuccess(); } - /** - * Records a failed operation. - * Must be called while holding the write lock. - */ private void recordFailure() { health.recordFailure(); } - /** - * Determines if an error should count as a failure for circuit breaker purposes. - * - * @param e The throwable to evaluate - * @return true if the error should count as a failure - */ private boolean shouldCountAsFailure(Throwable e) { Set configuredCategories = config.getFailureCategories(); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java index 7d652ab3f..ae3042d79 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java @@ -94,13 +94,6 @@ public static FailureCategory categorize(Throwable throwable) { return null; } - /** - * Categorizes based on ErrorTypeProvider interface. - * This leverages the structured error type system. - * - * @param throwable The exception to categorize - * @return The FailureCategory, or null if not an ErrorTypeProvider or no match - */ private static FailureCategory categorizeByErrorTypeProvider(Throwable throwable) { if (!(throwable instanceof ErrorTypeProvider provider)) { return null; @@ -122,12 +115,6 @@ private static FailureCategory categorizeByErrorTypeProvider(Throwable throwable return null; } - /** - * Categorizes based on exception type. - * - * @param throwable The exception to categorize - * @return The FailureCategory, or null if no match - */ private static FailureCategory categorizeByExceptionType(Throwable throwable) { // Connection errors if (throwable instanceof ConnectException || @@ -157,12 +144,6 @@ private static FailureCategory categorizeByExceptionType(Throwable throwable) { return null; } - /** - * Categorizes based on error message patterns using FtpErrorCodeAnalyzer. - * - * @param message The error message to categorize - * @return The FailureCategory, or null if no match - */ private static FailureCategory categorizeByMessage(String message) { if (message == null) { return null; From e64f85f6b0d13005e1bf3dd682699616601779dd Mon Sep 17 00:00:00 2001 From: Niveathika Date: Fri, 6 Feb 2026 11:46:14 +0530 Subject: [PATCH 14/15] Add empty failure condition validation --- ballerina/tests/circuit_breaker_test.bal | 27 +++++++++++++++++++ .../circuitbreaker/CircuitBreakerConfig.java | 10 ++++--- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/ballerina/tests/circuit_breaker_test.bal b/ballerina/tests/circuit_breaker_test.bal index 9da7b7b07..dc1a81215 100644 --- a/ballerina/tests/circuit_breaker_test.bal +++ b/ballerina/tests/circuit_breaker_test.bal @@ -131,6 +131,33 @@ public function testClientWithInvalidResetTime() { } } +// Test client creation with empty failureCategories +@test:Config {} +public function testClientWithEmptyFailureCategories() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 30, + failureCategories: [] + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when failureCategories is empty"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("failureCategories"), + msg = "Error message should mention failureCategories"); + } +} + // Test client creation with timeWindow not divisible by bucketSize @test:Config {} public function testClientWithNonDivisibleTimeWindow() { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java index 2bf2e1744..b50b66f2e 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java @@ -74,7 +74,6 @@ private CircuitBreakerConfig(int requestVolumeThreshold, long timeWindowMillis, * @return A new CircuitBreakerConfig instance * @throws BallerinaFtpException if the configuration is invalid */ - @SuppressWarnings("unchecked") public static CircuitBreakerConfig fromBMap(BMap config) throws BallerinaFtpException { // Extract rolling window configuration BMap rollingWindow = (BMap) config.getMapValue(ROLLING_WINDOW); @@ -98,7 +97,8 @@ public static CircuitBreakerConfig fromBMap(BMap config) throws } // Validate configuration - validate(failureThreshold, timeWindowMillis, bucketSizeMillis, requestVolumeThreshold, resetTimeMillis); + validate(failureThreshold, timeWindowMillis, bucketSizeMillis, requestVolumeThreshold, resetTimeMillis, + failureCategories); return new CircuitBreakerConfig(requestVolumeThreshold, timeWindowMillis, bucketSizeMillis, failureThreshold, resetTimeMillis, failureCategories); @@ -112,8 +112,12 @@ private static long decimalToMillis(Object value) { } private static void validate(float failureThreshold, long timeWindowMillis, long bucketSizeMillis, - int requestVolumeThreshold, long resetTimeMillis) + int requestVolumeThreshold, long resetTimeMillis, + Set failureCategories) throws BallerinaFtpException { + if (failureCategories == null || failureCategories.isEmpty()) { + throw new BallerinaFtpException("Circuit breaker failureCategories must not be empty"); + } if (failureThreshold < 0.0f || failureThreshold > 1.0f) { throw new BallerinaFtpException("Circuit breaker failureThreshold must be between 0.0 and 1.0"); } From 9d506b3946c0f38b936d3531798065a88776ff9d Mon Sep 17 00:00:00 2001 From: Niveathika Date: Fri, 6 Feb 2026 11:46:21 +0530 Subject: [PATCH 15/15] Improve code --- .../ftp/client/circuitbreaker/CircuitBreakerConfig.java | 4 ++-- .../stdlib/ftp/client/circuitbreaker/CircuitHealth.java | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java index b50b66f2e..888f2c20a 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java @@ -105,8 +105,8 @@ public static CircuitBreakerConfig fromBMap(BMap config) throws } private static long decimalToMillis(Object value) { - if (value instanceof BDecimal) { - return (long) (((BDecimal) value).floatValue() * 1000); + if (value instanceof BDecimal decimalValue) { + return (long) (decimalValue.floatValue() * 1000); } return (long) (((Number) value).doubleValue() * 1000); } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java index 9c8148e46..d9b54f5e1 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -148,10 +148,7 @@ public int getTotalFailureCount() { */ public float getFailureRatio() { int totalCount = getTotalRequestCount(); - if (totalCount == 0) { - return 0.0f; - } - return (float) getTotalFailureCount() / totalCount; + return totalCount == 0 ? 0.0f : (float) getTotalFailureCount() / totalCount; } /**