diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index 8513cbbf3..5a0aad6ce 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -454,6 +454,8 @@ public enum Compression { # malformed CSV records are skipped and written to a separate file in the current directory # + retryConfig - Configuration for retry behavior on transient failures for non-streaming read operations # (getBytes, getText, getJson, getXml, getCsv). If not specified, no retry is attempted. +# + circuitBreaker - Circuit breaker configuration for handling server failures gracefully. +# When enabled, the client will fail fast if the server is experiencing issues. public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -469,6 +471,7 @@ public type ClientConfiguration record {| string sftpSshKnownHosts?; FailSafeOptions csvFailSafe?; RetryConfig retryConfig?; + CircuitBreakerConfig circuitBreaker?; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/commons.bal b/ballerina/commons.bal index e05b9430c..e231717f4 100644 --- a/ballerina/commons.bal +++ b/ballerina/commons.bal @@ -255,3 +255,48 @@ public type FileDependencyCondition record {| DependencyMatchingMode matchingMode = ALL; int requiredFileCount = 1; |}; + +# Categories of errors that can trip the circuit breaker. +# Used to configure which types of failures should count towards the circuit breaker threshold. +public enum FailureCategory { + # Connection-level failures (timeout, refused, reset, DNS resolution). + # Maps to ConnectionError type. + CONNECTION_ERROR, + # Authentication failures (invalid credentials, key rejected). + # Detected via FTP 530 response code. + AUTHENTICATION_ERROR, + # Transient server errors that may succeed on retry. + # Maps to ServiceUnavailableError type (FTP codes 421, 425, 426, 450, 451, 452). + TRANSIENT_ERROR, + # All errors regardless of type + ALL_ERRORS +} + +# Configuration for the sliding time window used in failure calculation. +# The rolling window divides time into discrete buckets for efficient tracking of request success/failure rates. +# +# + requestVolumeThreshold - Minimum number of requests in the window before evaluating failure threshold. +# Circuit breaker will not trip until this many requests have been made. +# + timeWindow - Time period in seconds for the sliding window +# + bucketSize - Granularity of time buckets in seconds. Must be less than timeWindow. +public type RollingWindow record {| + int requestVolumeThreshold = 10; + decimal timeWindow = 60; + decimal bucketSize = 10; +|}; + +# Configuration for circuit breaker behavior. +# The circuit breaker prevents cascade failures by temporarily blocking requests when the server is experiencing issues. +# +# + rollingWindow - Time window configuration for failure tracking +# + failureThreshold - Failure ratio threshold (0.0 to 1.0) that trips the circuit. +# For example, 0.5 means the circuit will open when 50% of requests fail. +# + resetTime - Seconds to wait in OPEN state before transitioning to HALF_OPEN to test recovery +# + failureCategories - Error categories that count as failures for the circuit breaker. +# Only errors matching these categories will contribute to the failure ratio. +public type CircuitBreakerConfig record {| + RollingWindow rollingWindow = {}; + float failureThreshold = 0.5; + decimal resetTime = 30; + FailureCategory[] failureCategories = [CONNECTION_ERROR, TRANSIENT_ERROR]; +|}; diff --git a/ballerina/error.bal b/ballerina/error.bal index a2a1c976f..5f3dd2209 100644 --- a/ballerina/error.bal +++ b/ballerina/error.bal @@ -40,3 +40,9 @@ public type ServiceUnavailableError distinct Error; # Represents an error that occurs when all retry attempts have been exhausted. # This error wraps the last failure encountered during retry attempts. public type AllRetryAttemptsFailedError distinct Error; + +# Error returned when the circuit breaker is in OPEN state. +# This indicates the FTP server is unavailable and requests are being blocked +# to prevent cascade failures. The client should implement fallback logic +# or wait for the circuit to transition to HALF_OPEN state. +public type CircuitBreakerOpenError distinct ServiceUnavailableError; diff --git a/ballerina/tests/circuit_breaker_test.bal b/ballerina/tests/circuit_breaker_test.bal new file mode 100644 index 000000000..dc1a81215 --- /dev/null +++ b/ballerina/tests/circuit_breaker_test.bal @@ -0,0 +1,449 @@ +// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; +import ballerina/lang.runtime; + +// Test client creation with invalid failure threshold (> 1.0) +@test:Config {} +public function testClientWithInvalidFailureThresholdHigh() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + failureThreshold: 1.5, + resetTime: 30 + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold > 1.0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("failureThreshold"), + msg = "Error message should mention failureThreshold"); + } +} + +// Test client creation with invalid failure threshold (< 0.0) +@test:Config {} +public function testClientWithInvalidFailureThresholdLow() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + failureThreshold: -0.1, + resetTime: 30 + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold < 0.0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("failureThreshold"), + msg = "Error message should mention failureThreshold"); + } +} + +// Test client creation with bucketSize greater than timeWindow +@test:Config {} +public function testClientWithInvalidBucketSize() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + timeWindow: 30, + bucketSize: 60 // bucket larger than window + } + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when bucketSize > timeWindow"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("timeWindow") || cbClient.message().includes("bucketSize"), + msg = "Error message should mention timeWindow or bucketSize"); + } +} + +// Test client creation with invalid requestVolumeThreshold (<= 0) +@test:Config {} +public function testClientWithInvalidRequestVolumeThreshold() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 0, + timeWindow: 60, + bucketSize: 10 + } + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when requestVolumeThreshold <= 0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("requestVolumeThreshold"), + msg = "Error message should mention requestVolumeThreshold"); + } +} + +// Test client creation with invalid resetTime (<= 0) +@test:Config {} +public function testClientWithInvalidResetTime() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + resetTime: 0 + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when resetTime <= 0"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("resetTime"), + msg = "Error message should mention resetTime"); + } +} + +// Test client creation with empty failureCategories +@test:Config {} +public function testClientWithEmptyFailureCategories() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 30, + failureCategories: [] + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when failureCategories is empty"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("failureCategories"), + msg = "Error message should mention failureCategories"); + } +} + +// Test client creation with timeWindow not divisible by bucketSize +@test:Config {} +public function testClientWithNonDivisibleTimeWindow() { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 11 + } + } + }; + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail when timeWindow is not divisible by bucketSize"); + if cbClient is Error { + test:assertTrue(cbClient.message().includes("timeWindow") || cbClient.message().includes("bucketSize"), + msg = "Error message should mention timeWindow or bucketSize"); + } +} + +// Test that operations work normally with circuit breaker enabled +@test:Config {} +public function testOperationsWithCircuitBreaker() returns error? { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 5, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 30 + } + }; + Client cbClient = check new (config); + + // Test basic operations work with circuit breaker enabled + boolean exists = check cbClient->exists("/home/in"); + test:assertTrue(exists, msg = "Directory should exist"); + + FileInfo[] files = check cbClient->list("/home/in"); + test:assertTrue(files.length() >= 0, msg = "List operation should return files array"); + + check cbClient->close(); +} + +// Test that client creation fails when server doesn't exist (connection error, not circuit breaker) +@test:Config {} +public function testClientCreationFailsForNonExistentServer() { + // Create a client pointing to a non-existent server + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21299, // Non-existent port + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 5, + failureCategories: [CONNECTION_ERROR] + } + }; + + Client|Error cbClient = new (config); + test:assertTrue(cbClient is Error, msg = "Client creation should fail for non-existent server"); +} + +// Test that circuit breaker opens after failures and returns CircuitBreakerOpenError +@test:Config {} +public function testCircuitBreakerOpensAfterFailures() returns error? { + // Configure circuit breaker with low thresholds to trigger quickly + // - requestVolumeThreshold: 2 (only need 2 requests before circuit can trip) + // - failureThreshold: 0.5 (50% failure rate trips the circuit) + // - ALL_ERRORS: any error counts as failure + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 2, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 30, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + + // Make requests that will fail (reading non-existent files) + // These failures should accumulate and trip the circuit + string nonExistentPath = "/non/existent/file/path/that/does/not/exist.txt"; + + // First failure + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is Error, msg = "First request should fail (file not found)"); + test:assertFalse(result1 is CircuitBreakerOpenError, + msg = "First failure should be regular error, not circuit breaker error"); + + // Second failure - after this, we have 2 requests with 100% failure rate + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is Error, msg = "Second request should fail"); + test:assertFalse(result2 is CircuitBreakerOpenError, + msg = "Second failure should be regular error, not circuit breaker error"); + + // Third request - circuit should now be OPEN and reject immediately + byte[]|Error result3 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result3 is CircuitBreakerOpenError, + msg = "Third request should fail with CircuitBreakerOpenError (circuit is open)"); + + if result3 is CircuitBreakerOpenError { + string message = result3.message(); + test:assertTrue(message.includes("Circuit breaker is OPEN"), + msg = "Error message should indicate circuit breaker is open"); + } + + check cbClient->close(); +} + +// Test HALF_OPEN transition: allow a single trial request after reset time, then reopen on failure +@test:Config {} +public function testCircuitBreakerHalfOpenTrial() returns error? { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 1, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + string nonExistentPath = "/non/existent/file/path/half-open-test.txt"; + + // Trip the circuit quickly + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is Error, msg = "Initial request should fail"); + + // Circuit should now be OPEN + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is CircuitBreakerOpenError, + msg = "Circuit should be OPEN after failure threshold is met"); + + // Wait for reset time to elapse + runtime:sleep(2); + + // First request after reset should be allowed (HALF_OPEN trial) + byte[]|Error trialResult = cbClient->getBytes(nonExistentPath); + test:assertFalse(trialResult is CircuitBreakerOpenError, + msg = "HALF_OPEN should allow a single trial request"); + + // Trial fails, circuit should move back to OPEN and block immediately + byte[]|Error result3 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result3 is CircuitBreakerOpenError, + msg = "Circuit should reopen after failed HALF_OPEN trial"); + + check cbClient->close(); +} + +// Test HALF_OPEN transition: successful trial request should close the circuit +@test:Config {} +public function testCircuitBreakerHalfOpenSuccessfulTrial() returns error? { + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 1, + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, + resetTime: 1, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + string nonExistentPath = "/non/existent/file/path/half-open-success-test.txt"; + + // Trip the circuit quickly + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is Error, msg = "Initial request should fail"); + + // Circuit should now be OPEN + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is CircuitBreakerOpenError, + msg = "Circuit should be OPEN after failure threshold is met"); + + // Wait for reset time to elapse + runtime:sleep(2); + + // First request after reset should be allowed (HALF_OPEN trial) + // This time, make a successful request + boolean|Error trialResult = cbClient->exists("/home/in"); + test:assertFalse(trialResult is CircuitBreakerOpenError, + msg = "HALF_OPEN should allow a single trial request"); + test:assertTrue(trialResult is boolean, + msg = "Trial request should succeed"); + + // Circuit should now be CLOSED, so next request should succeed + boolean|Error result3 = cbClient->exists("/home/in"); + test:assertFalse(result3 is CircuitBreakerOpenError, + msg = "Circuit should be CLOSED after successful HALF_OPEN trial"); + test:assertTrue(result3 is boolean, + msg = "Request should succeed when circuit is CLOSED"); + + check cbClient->close(); +} + +// Test interaction between retry and circuit breaker mechanisms +@test:Config {} +public function testRetryAndCircuitBreakerInteraction() returns error? { + // Configure both retry and circuit breaker + // Each retry attempt counts as a separate request in circuit breaker + // With 1 initial + 3 retries = 4 attempts per request + // Setting threshold to 8 means circuit trips after 2 full request cycles + ClientConfiguration config = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + retryConfig: { + count: 3, + interval: 0.5, + backOffFactor: 1.0, + maxWaitInterval: 2.0 + }, + circuitBreaker: { + rollingWindow: { + requestVolumeThreshold: 8, // 2 full requests (4 attempts each) + timeWindow: 60, + bucketSize: 10 + }, + failureThreshold: 0.5, // 50% failure rate trips circuit + resetTime: 30, + failureCategories: [ALL_ERRORS] + } + }; + + Client cbClient = check new (config); + string nonExistentPath = "/non/existent/file/retry-cb-test.txt"; + + // First request: Will fail after all retries (4 attempts total) + // Circuit breaker records 4 failures but threshold (8) not met yet + byte[]|Error result1 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result1 is AllRetryAttemptsFailedError, + msg = "First request should fail after all retries"); + + // Second request: Will also fail after all retries (4 more attempts) + // After 8 total failures, circuit trips at 100% failure rate + byte[]|Error result2 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result2 is AllRetryAttemptsFailedError, + msg = "Second request should fail after all retries"); + + // Third request: Circuit should now be OPEN and block immediately + // This should NOT go through retries, should fail fast + byte[]|Error result3 = cbClient->getBytes(nonExistentPath); + test:assertTrue(result3 is CircuitBreakerOpenError, + msg = "Third request should fail fast with circuit breaker open"); + + check cbClient->close(); +} diff --git a/changelog.md b/changelog.md index 8995364f4..b68d050c4 100644 --- a/changelog.md +++ b/changelog.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## unreleased ### Added + +- [Add Circuit breaker support](https://github.com/ballerina-platform/ballerina-library/issues/8382) - [Add automatic retry support with exponential backoff for FTP client](https://github.com/ballerina-platform/ballerina-library/issues/8585) - [Add FTP Listener Coordination Support](https://github.com/ballerina-platform/ballerina-library/issues/8490) - [Add distinct error types](https://github.com/ballerina-platform/ballerina-library/issues/8597) diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 50fa64433..496b85d3a 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -3,7 +3,7 @@ _Owners_: @shafreenAnfar @dilanSachi @Bhashinee _Reviewers_: @shafreenAnfar @Bhashinee _Created_: 2020/10/28 -_Updated_: 2026/01/26 +_Updated_: 2026/02/03 _Edition_: Swan Lake ## Introduction @@ -32,6 +32,10 @@ The conforming implementation of the specification is released and included in t - [3.2.2. Secure Client](#322-secure-client) - [3.2.3. Client with Retry Configuration](#323-client-with-retry-configuration) - [3.3. Functions](#33-functions) + - [3.4. Circuit Breaker](#34-circuit-breaker) + - [State Machine](#state-machine) + - [Configuration](#configuration) + - [Usage Example](#usage-example) - [4. Listener](#4-listener) - [4.1. Configurations](#41-configurations) - [4.2. Initialization](#42-initialization) @@ -163,7 +167,6 @@ public type Error distinct error; * `ConnectionError` - Represents errors that occur when connecting to the FTP/SFTP server. This includes network failures, host unreachable, connection refused, etc. ```ballerina -# Represents an error that occurs when connecting to the FTP/SFTP server. public type ConnectionError distinct Error; ``` @@ -182,24 +185,37 @@ public type FileAlreadyExistsError distinct Error; public type InvalidConfigError distinct Error; ``` -* `ServiceUnavailableError` - Represents errors that occur when the FTP/SFTP service is temporarily unavailable. This is a transient error indicating the operation may succeed on retry. Common causes include server overload (FTP code 421), connection issues (425, 426), temporary file locks (450), or server-side processing errors (451). This error type is designed for use with retry and circuit breaker patterns. +* `ServiceUnavailableError` - Represents errors that occur when the FTP/SFTP service is temporarily unavailable. This is a transient error indicating the operation may succeed on retry. Common causes include server overload (FTP code 421), connection issues (425, 426), temporary file locks (450), or server-side processing errors (451). ```ballerina public type ServiceUnavailableError distinct Error; ``` +* `AllRetryAttemptsFailedError` - Represents an error that occurs when all retry attempts have been exhausted. This error wraps the last failure encountered during retry attempts. +```ballerina +public type AllRetryAttemptsFailedError distinct Error; +``` + +* `CircuitBreakerOpenError` - Error returned when the circuit breaker is in OPEN state. This indicates the FTP server is unavailable and requests are being blocked to prevent cascade failures. This is a distinct subtype of `ServiceUnavailableError`. +```ballerina +public type CircuitBreakerOpenError distinct ServiceUnavailableError; +``` + All specific error types are subtypes of the base `Error` type, allowing for both specific and general error handling: ```ballerina // Handle specific error types ftp:Client|ftp:Error result = new(config); if result is ftp:ConnectionError { // Handle connection failures specifically +} else if result is ftp:CircuitBreakerOpenError { + // Circuit breaker is open - implement fallback logic +} else if result is ftp:AllRetryAttemptsFailedError { + // All retries exhausted - consider alerting } else if result is ftp:ServiceUnavailableError { // Transient error - retry the operation } else if result is ftp:Error { // Handle any other FTP error } ``` - ### 2.4. Retry Configuration * `RetryConfig` record represents the configuration for automatic retries of operations. ```ballerina @@ -241,6 +257,8 @@ public type ClientConfiguration record {| boolean laxDataBinding = false; # Retry configuration for read operations RetryConfig retryConfig?; + # Circuit breaker configuration to prevent cascade failures + CircuitBreakerConfig circuitBreaker?; |}; ``` * InputContent record represents the configurations for the input given for `put` and `append` operations. @@ -663,6 +681,84 @@ remote isolated function list(string path) returns FileInfo[]|Error; # an error occurred while checking the path remote isolated function isDirectory(string path) returns boolean|Error; ``` + +### 3.4. Circuit Breaker +The circuit breaker pattern prevents cascade failures by temporarily blocking requests to an FTP server experiencing issues. When failures reach a threshold, the circuit "opens" and subsequent requests fail fast with `CircuitBreakerOpenError` without attempting the actual operation. + +#### State Machine +The circuit breaker operates in three states: +- **CLOSED**: Normal operation. Requests are allowed and failures are tracked. +- **OPEN**: Circuit is tripped. All requests are rejected immediately with `CircuitBreakerOpenError`. +- **HALF_OPEN**: After the reset time elapses, one trial request is allowed. Success returns to CLOSED; failure returns to OPEN. + +#### Configuration +* CircuitBreakerConfig record contains the circuit breaker settings. +```ballerina +public type CircuitBreakerConfig record {| + # Rolling window configuration for failure tracking + RollingWindow rollingWindow = {}; + # Failure ratio threshold (0.0 to 1.0) that trips the circuit + float failureThreshold = 0.5; + # Time in seconds to wait before transitioning from OPEN to HALF_OPEN + decimal resetTime = 30; + # Categories of failures that count towards tripping the circuit + FailureCategory[] failureCategories = [CONNECTION_ERROR, TRANSIENT_ERROR]; +|}; +``` +* RollingWindow record configures the sliding window for tracking failures. +```ballerina +public type RollingWindow record {| + # Minimum number of requests in the window before the circuit can trip + int requestVolumeThreshold = 10; + # Time window in seconds for tracking failures + decimal timeWindow = 60; + # Size of each time bucket in seconds (timeWindow / bucketSize = number of buckets) + decimal bucketSize = 10; +|}; +``` +* FailureCategory enum specifies which types of failures count towards tripping the circuit. +```ballerina +public enum FailureCategory { + # Connection failures (network issues, timeouts) + CONNECTION_ERROR, + # Authentication failures (invalid credentials) + AUTHENTICATION_ERROR, + # Server disconnection during operation + TRANSIENT_ERROR, + # All errors count as failures + ALL_ERRORS +} +``` +#### Usage Example +```ballerina +ftp:ClientConfiguration ftpConfig = { + protocol: ftp:FTP, + host: "ftp.example.com", + port: 21, + auth: { + credentials: {username: "user", password: "pass"} + }, + circuitBreaker: { + failureThreshold: 0.5, + resetTime: 30, + rollingWindow: { + requestVolumeThreshold: 5, + timeWindow: 60, + bucketSize: 10 + }, + failureCategories: [ftp:CONNECTION_ERROR, ftp:TRANSIENT_ERROR] + } +}; + +ftp:Client ftpClient = check new(ftpConfig); + +// Operations will fail fast with CircuitBreakerOpenError when circuit is open +byte[]|ftp:Error content = ftpClient->getBytes("/file.txt"); +if content is ftp:CircuitBreakerOpenError { + // Handle circuit breaker open state - server is unavailable +} +``` + ## 4. Listener The `ftp:Listener` is used to listen to a remote FTP location and trigger a `WatchEvent` type of event when new files are added to or deleted from the directory. The `onFileChange` function is invoked when a new file is added diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 895b2c25e..13b1aca99 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -33,6 +33,8 @@ import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; import io.ballerina.runtime.api.values.BXml; +import io.ballerina.stdlib.ftp.client.circuitbreaker.CircuitBreaker; +import io.ballerina.stdlib.ftp.client.circuitbreaker.CircuitBreakerConfig; import io.ballerina.stdlib.ftp.exception.BallerinaFtpException; import io.ballerina.stdlib.ftp.exception.FtpInvalidConfigException; import io.ballerina.stdlib.ftp.exception.RemoteFileSystemConnectorException; @@ -98,6 +100,71 @@ private FtpClient() { // private constructor } + /** + * Gets the circuit breaker for a client, if configured. + * + * @param clientConnector The FTP client connector + * @return The circuit breaker or null if not configured + */ + private static CircuitBreaker getCircuitBreaker(BObject clientConnector) { + return (CircuitBreaker) clientConnector.getNativeData(FtpConstants.CIRCUIT_BREAKER_NATIVE_DATA); + } + + /** + * Records that a request is starting. Call this BEFORE executing an operation. + * + * @param clientConnector The FTP client connector + */ + private static void recordCircuitBreakerRequestStart(BObject clientConnector) { + CircuitBreaker cb = getCircuitBreaker(clientConnector); + if (cb != null) { + cb.recordRequestStart(); + } + } + + /** + * Records the outcome of an operation. Call this AFTER the operation completes. + * + * @param clientConnector The FTP client connector + * @param result The result from the operation + * @return The result unchanged (circuit breaker state updated as side effect) + */ + private static Object recordCircuitBreakerOutcome(BObject clientConnector, Object result) { + CircuitBreaker cb = getCircuitBreaker(clientConnector); + if (cb == null) { + return result; + } + + if (result instanceof BError bError) { + // Pass the original cause to preserve exception type for categorization + Throwable cause = bError.getCause(); + if (cause != null) { + cb.recordOutcome(cause); + } else { + // Fallback to wrapping message if no cause available + cb.recordOutcome(new RuntimeException(bError.getMessage())); + } + } else { + cb.recordOutcome(null); + } + return result; + } + + /** + * Checks if circuit is open before starting an operation. + * Returns the circuit breaker error if open, null otherwise. + * + * @param clientConnector The FTP client connector + * @return BError if circuit is open, null if operation can proceed + */ + private static BError getCircuitBreakerErrorIfOpen(BObject clientConnector) { + CircuitBreaker cb = getCircuitBreaker(clientConnector); + if (cb != null && cb.isOpen()) { + return cb.createServiceUnavailableError(); + } + return null; + } + public static Object initClientEndpoint(BObject clientEndpoint, BMap config) { String protocol = extractProtocol(config); BError basicConfigError = configureClientEndpointBasic(clientEndpoint, config, protocol); @@ -116,8 +183,8 @@ public static Object initClientEndpoint(BObject clientEndpoint, BMap config) { @@ -287,22 +354,30 @@ private static Object extractVfsConfigurations(BMap config, Map< } } - private static Object createAndStoreConnector(BObject clientEndpoint, Map ftpConfig) { - + private static Object createAndStoreConnector(BObject clientEndpoint, Map ftpConfig, + BMap config) { + String url; try { url = FtpUtil.createUrl(clientEndpoint, ""); } catch (BallerinaFtpException e) { return FtpUtil.createError(e.getMessage(), Error.errorType()); } - + ftpConfig.put(FtpConstants.URI, url); clientEndpoint.addNativeData(FtpConstants.PROPERTY_MAP, ftpConfig); - + RemoteFileSystemConnectorFactory fileSystemConnectorFactory = new RemoteFileSystemConnectorFactoryImpl(); try { VfsClientConnector connector = fileSystemConnectorFactory.createVfsClientConnector(ftpConfig); clientEndpoint.addNativeData(VFS_CLIENT_CONNECTOR, connector); + + // Initialize circuit breaker if configured + Object circuitBreakerError = initializeCircuitBreaker(clientEndpoint, config); + if (circuitBreakerError != null) { + return circuitBreakerError; + } + return null; } catch (RemoteFileSystemConnectorException e) { String errorType = FtpUtil.getErrorTypeForException(e); @@ -310,6 +385,24 @@ private static Object createAndStoreConnector(BObject clientEndpoint, Map config) { + BMap cbConfig = (BMap) config.getMapValue( + StringUtils.fromString(FtpConstants.CIRCUIT_BREAKER)); + if (cbConfig == null) { + return null; + } + + try { + CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.fromBMap(cbConfig); + CircuitBreaker circuitBreaker = new CircuitBreaker(circuitBreakerConfig); + clientEndpoint.addNativeData(FtpConstants.CIRCUIT_BREAKER_NATIVE_DATA, circuitBreaker); + log.debug("Circuit breaker initialized for FTP client"); + return null; + } catch (BallerinaFtpException e) { + return FtpUtil.createError(e.getMessage(), Error.errorType()); + } + } + private static Object configureFtpsSecureSocket(BMap secureSocket, Map ftpConfig) { FtpUtil.configureFtpsMode(secureSocket, ftpConfig); FtpUtil.configureFtpsDataChannelProtection(secureSocket, ftpConfig); @@ -442,8 +535,16 @@ public static Object getBytesAsStream(Environment env, BObject clientConnector, if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> @@ -451,7 +552,8 @@ public static Object getBytesAsStream(Environment env, BObject clientConnector, balFuture, TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE), laxDataBinding)); connector.addListener(connectorListener); connector.send(null, FtpAction.GET, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -461,8 +563,16 @@ public static Object getCsvAsStream(Environment env, BObject clientConnector, BS if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> @@ -470,7 +580,8 @@ public static Object getCsvAsStream(Environment env, BObject clientConnector, BS balFuture, typeDesc.getDescribingType(), laxDataBinding)); connector.addListener(connectorListener); connector.send(null, FtpAction.GET, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -479,14 +590,23 @@ private static Object getAllContent(Environment env, BObject clientConnector, BS if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> FtpClientHelper.executeGetAllAction(remoteFileSystemBaseMessage, balFuture)); connector.addListener(connectorListener); connector.send(null, FtpAction.GET_ALL, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -757,14 +877,12 @@ private InputStream fetchNextStream() { private byte[] toBytes(Object value) { // Each element is a record with a 'value' field. - @SuppressWarnings("unchecked") BMap streamRecord = (BMap) value; Object val = streamRecord.get(FtpConstants.FIELD_VALUE); if (val instanceof BArray array) { return bytesFromArray(array); } - @SuppressWarnings("unchecked") BMap recordValue = (BMap) val; return bytesFromRecord(recordValue, isFirstRow); } @@ -791,7 +909,15 @@ private static Object putGenericAction(Environment env, BObject clientConnector, if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, true, remoteFileSystemBaseMessage -> FtpClientHelper.executeGenericAction()); @@ -802,7 +928,8 @@ private static Object putGenericAction(Environment env, BObject clientConnector, } else { connector.send(message, FtpAction.APPEND, filePath, null); } - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -894,14 +1021,23 @@ private static Object executeSinglePathAction(Environment env, BObject clientCon if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); Function messageHandler = messageHandlerFactory.apply(balFuture); FtpClientListener connectorListener = new FtpClientListener(balFuture, closeInput, messageHandler); connector.addListener(connectorListener); connector.send(null, action, filePath.getValue(), null); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } @@ -921,6 +1057,13 @@ private static Object executeTwoPathAction(Environment env, BObject clientConnec if (connector == null) { return FtpUtil.createError(CLIENT_CLOSED_ERROR_MESSAGE, FTP_ERROR); } + + // Check circuit breaker before proceeding + BError cbError = getCircuitBreakerErrorIfOpen(clientConnector); + if (cbError != null) { + return cbError; + } + String destinationUrl; try { destinationUrl = FtpUtil.createUrl(clientConnector, destinationPath.getValue()); @@ -928,12 +1071,14 @@ private static Object executeTwoPathAction(Environment env, BObject clientConnec return FtpUtil.createError(e.getMessage(), Error.errorType()); } return env.yieldAndRun(() -> { + recordCircuitBreakerRequestStart(clientConnector); CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, true, remoteFileSystemBaseMessage -> FtpClientHelper.executeGenericAction()); connector.addListener(connectorListener); connector.send(null, action, sourcePath.getValue(), destinationUrl); - return getResult(balFuture); + Object result = getResult(balFuture); + return recordCircuitBreakerOutcome(clientConnector, result); }); } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java index ea47098cc..cc52490ef 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java @@ -295,7 +295,6 @@ public void close() { } } - @SuppressWarnings("unchecked") private static void callStreamNext(Environment env, BObject entity, BufferHolder bufferHolder, BObject iteratorObj) { Object result = env.getRuntime().callMethod(iteratorObj, BYTE_STREAM_NEXT_FUNC, null); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java index 40088a794..318191414 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java @@ -1,17 +1,17 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -52,6 +52,12 @@ protected static Object executeWithRetry(BObject clientConnector, Supplier= config.getRequestVolumeThreshold()) { + float failureRatio = health.getFailureRatio(); + if (failureRatio > config.getFailureThreshold()) { + state = CircuitState.OPEN; + log.info("Circuit breaker transitioning from CLOSED to OPEN " + + "(failure ratio {} > threshold {})", + failureRatio, config.getFailureThreshold()); + } + } + break; + } + } + + private boolean resetTimeElapsed() { + Instant lastError = health.getLastErrorTime(); + if (lastError == null) { + return true; + } + long elapsedMillis = Instant.now().toEpochMilli() - lastError.toEpochMilli(); + return elapsedMillis >= config.getResetTimeMillis(); + } + + private void recordSuccess() { + health.recordSuccess(); + } + + private void recordFailure() { + health.recordFailure(); + } + + private boolean shouldCountAsFailure(Throwable e) { + Set configuredCategories = config.getFailureCategories(); + + // If ALL_ERRORS is configured, any exception trips the circuit + if (configuredCategories.contains(FailureCategory.ALL_ERRORS)) { + return true; + } + + // Categorize the exception + FailureCategory category = FailureCategorizer.categorize(e); + if (category == null) { + // Exception doesn't match any specific category + return false; + } + + // Check if the category is in the configured list + return configuredCategories.contains(category); + } + + /** + * Gets the current circuit state. + * + * @return The current state + */ + public CircuitState getState() { + long stamp = lock.tryOptimisticRead(); + CircuitState currentState = this.state; + if (lock.validate(stamp)) { + return currentState; + } + // Fall back to a read lock if the optimistic read was invalidated. + stamp = lock.readLock(); + try { + return this.state; + } finally { + lock.unlockRead(stamp); + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java new file mode 100644 index 000000000..888f2c20a --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitBreakerConfig.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BDecimal; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.ftp.exception.BallerinaFtpException; + +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for circuit breaker behavior. + * Parsed from Ballerina CircuitBreakerConfig record. + */ +public class CircuitBreakerConfig { + // Rolling window configuration + private final int requestVolumeThreshold; + private final long timeWindowMillis; + private final long bucketSizeMillis; + private final int numberOfBuckets; + + // Circuit breaker thresholds + private final float failureThreshold; + private final long resetTimeMillis; + + // Failure categories + private final Set failureCategories; + + // Ballerina field names + private static final BString ROLLING_WINDOW = StringUtils.fromString("rollingWindow"); + private static final BString REQUEST_VOLUME_THRESHOLD = StringUtils.fromString("requestVolumeThreshold"); + private static final BString TIME_WINDOW = StringUtils.fromString("timeWindow"); + private static final BString BUCKET_SIZE = StringUtils.fromString("bucketSize"); + private static final BString FAILURE_THRESHOLD = StringUtils.fromString("failureThreshold"); + private static final BString RESET_TIME = StringUtils.fromString("resetTime"); + private static final BString FAILURE_CATEGORIES = StringUtils.fromString("failureCategories"); + + private CircuitBreakerConfig(int requestVolumeThreshold, long timeWindowMillis, long bucketSizeMillis, + float failureThreshold, long resetTimeMillis, + Set failureCategories) { + this.requestVolumeThreshold = requestVolumeThreshold; + this.timeWindowMillis = timeWindowMillis; + this.bucketSizeMillis = bucketSizeMillis; + this.numberOfBuckets = (int) (timeWindowMillis / bucketSizeMillis); + this.failureThreshold = failureThreshold; + this.resetTimeMillis = resetTimeMillis; + this.failureCategories = failureCategories; + } + + /** + * Creates a CircuitBreakerConfig from a Ballerina BMap. + * + * @param config The Ballerina CircuitBreakerConfig record + * @return A new CircuitBreakerConfig instance + * @throws BallerinaFtpException if the configuration is invalid + */ + public static CircuitBreakerConfig fromBMap(BMap config) throws BallerinaFtpException { + // Extract rolling window configuration + BMap rollingWindow = (BMap) config.getMapValue(ROLLING_WINDOW); + int requestVolumeThreshold = rollingWindow.getIntValue(REQUEST_VOLUME_THRESHOLD).intValue(); + long timeWindowMillis = decimalToMillis(rollingWindow.get(TIME_WINDOW)); + long bucketSizeMillis = decimalToMillis(rollingWindow.get(BUCKET_SIZE)); + + // Extract thresholds + float failureThreshold = ((Number) config.get(FAILURE_THRESHOLD)).floatValue(); + long resetTimeMillis = decimalToMillis(config.get(RESET_TIME)); + + // Extract failure categories + BArray categoriesArray = config.getArrayValue(FAILURE_CATEGORIES); + Set failureCategories = new HashSet<>(); + for (int i = 0; i < categoriesArray.size(); i++) { + String categoryStr = categoriesArray.getBString(i).getValue(); + FailureCategory category = FailureCategory.fromString(categoryStr); + if (category != null) { + failureCategories.add(category); + } + } + + // Validate configuration + validate(failureThreshold, timeWindowMillis, bucketSizeMillis, requestVolumeThreshold, resetTimeMillis, + failureCategories); + + return new CircuitBreakerConfig(requestVolumeThreshold, timeWindowMillis, bucketSizeMillis, + failureThreshold, resetTimeMillis, failureCategories); + } + + private static long decimalToMillis(Object value) { + if (value instanceof BDecimal decimalValue) { + return (long) (decimalValue.floatValue() * 1000); + } + return (long) (((Number) value).doubleValue() * 1000); + } + + private static void validate(float failureThreshold, long timeWindowMillis, long bucketSizeMillis, + int requestVolumeThreshold, long resetTimeMillis, + Set failureCategories) + throws BallerinaFtpException { + if (failureCategories == null || failureCategories.isEmpty()) { + throw new BallerinaFtpException("Circuit breaker failureCategories must not be empty"); + } + if (failureThreshold < 0.0f || failureThreshold > 1.0f) { + throw new BallerinaFtpException("Circuit breaker failureThreshold must be between 0.0 and 1.0"); + } + if (requestVolumeThreshold <= 0) { + throw new BallerinaFtpException("Circuit breaker requestVolumeThreshold must be greater than 0"); + } + if (resetTimeMillis <= 0) { + throw new BallerinaFtpException("Circuit breaker resetTime must be greater than 0"); + } + if (bucketSizeMillis <= 0) { + throw new BallerinaFtpException("Circuit breaker bucketSize must be greater than 0"); + } + if (timeWindowMillis < bucketSizeMillis) { + throw new BallerinaFtpException("Circuit breaker timeWindow must be greater than or equal to bucketSize"); + } + if (timeWindowMillis % bucketSizeMillis != 0) { + throw new BallerinaFtpException("Circuit breaker timeWindow must be evenly divisible by bucketSize"); + } + } + + public int getRequestVolumeThreshold() { + return requestVolumeThreshold; + } + + public long getTimeWindowMillis() { + return timeWindowMillis; + } + + public long getBucketSizeMillis() { + return bucketSizeMillis; + } + + public int getNumberOfBuckets() { + return numberOfBuckets; + } + + public float getFailureThreshold() { + return failureThreshold; + } + + public long getResetTimeMillis() { + return resetTimeMillis; + } + + public Set getFailureCategories() { + return failureCategories; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java new file mode 100644 index 000000000..d9b54f5e1 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitHealth.java @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import java.time.Instant; + +/** + * Tracks the health metrics of the circuit breaker using a sliding window of buckets. + *

+ * This class is NOT thread-safe. All methods must be called while holding the + * write lock from the {@link CircuitBreaker}'s {@link java.util.concurrent.locks.StampedLock}. + *

+ */ +public class CircuitHealth { + private final Bucket[] buckets; + private final int numberOfBuckets; + private final long bucketSizeMillis; + private final long timeWindowMillis; + + private boolean lastRequestSuccess; + private int lastUsedBucketId; + private final Instant startTime; + private Instant lastRequestTime; + private Instant lastErrorTime; + + /** + * Creates a new CircuitHealth with the specified configuration. + * + * @param numberOfBuckets Number of buckets in the rolling window + * @param bucketSizeMillis Size of each bucket in milliseconds + * @param timeWindowMillis Total time window in milliseconds + */ + public CircuitHealth(int numberOfBuckets, long bucketSizeMillis, long timeWindowMillis) { + this.numberOfBuckets = numberOfBuckets; + this.bucketSizeMillis = bucketSizeMillis; + this.timeWindowMillis = timeWindowMillis; + this.buckets = new Bucket[numberOfBuckets]; + for (int i = 0; i < numberOfBuckets; i++) { + buckets[i] = new Bucket(); + } + this.startTime = Instant.now(); + this.lastRequestTime = Instant.now(); + this.lastRequestSuccess = true; + this.lastUsedBucketId = 0; + } + + /** + * Gets the current bucket ID based on elapsed time. + * + * @return The current bucket index + */ + public int getCurrentBucketId() { + long elapsedMillis = Instant.now().toEpochMilli() - startTime.toEpochMilli(); + long windowElapsed = elapsedMillis % timeWindowMillis; + return (int) ((windowElapsed / bucketSizeMillis) % numberOfBuckets); + } + + /** + * Prepares the rolling window by resetting stale buckets. + */ + public void prepareRollingWindow() { + Instant currentTime = Instant.now(); + long idleTimeMillis = currentTime.toEpochMilli() - lastRequestTime.toEpochMilli(); + + // If idle longer than entire window, reset all buckets + if (idleTimeMillis > timeWindowMillis) { + resetAllBuckets(); + return; + } + + int currentBucketId = getCurrentBucketId(); + + // Reset buckets that have become stale + if (currentBucketId == lastUsedBucketId && idleTimeMillis > timeWindowMillis) { + // Same bucket but idle exceeded the full time window - reset all + resetAllBuckets(); + } else if (currentBucketId < lastUsedBucketId) { + // Wrapped around - reset from 0 to current and from lastUsed+1 to end + for (int i = 0; i <= currentBucketId; i++) { + buckets[i].reset(); + } + for (int i = lastUsedBucketId + 1; i < numberOfBuckets; i++) { + buckets[i].reset(); + } + } else if (currentBucketId > lastUsedBucketId) { + // Reset buckets between last used and current + for (int i = lastUsedBucketId + 1; i <= currentBucketId; i++) { + buckets[i].reset(); + } + } + } + + /** + * Resets all buckets to their initial state. + */ + public void resetAllBuckets() { + for (Bucket bucket : buckets) { + bucket.reset(); + } + } + + /** + * Gets the total request count across all buckets. + * + * @return Total number of requests + */ + public int getTotalRequestCount() { + int total = 0; + for (Bucket bucket : buckets) { + total += bucket.getTotalCount(); + } + return total; + } + + /** + * Gets the total failure count across all buckets. + * + * @return Total number of failures + */ + public int getTotalFailureCount() { + int total = 0; + for (Bucket bucket : buckets) { + total += bucket.getFailureCount(); + } + return total; + } + + /** + * Calculates the current failure ratio across all buckets. + * + * @return Failure ratio between 0.0 and 1.0 + */ + public float getFailureRatio() { + int totalCount = getTotalRequestCount(); + return totalCount == 0 ? 0.0f : (float) getTotalFailureCount() / totalCount; + } + + /** + * Records a request in the current bucket. + */ + public void recordRequest() { + int bucketId = getCurrentBucketId(); + buckets[bucketId].incrementTotalCount(); + lastUsedBucketId = bucketId; + lastRequestTime = Instant.now(); + } + + /** + * Records a successful request. + */ + public void recordSuccess() { + lastRequestSuccess = true; + } + + /** + * Records a failed request in the current bucket. + */ + public void recordFailure() { + int bucketId = getCurrentBucketId(); + buckets[bucketId].incrementFailureCount(); + lastRequestSuccess = false; + lastErrorTime = Instant.now(); + } + + public boolean isLastRequestSuccess() { + return lastRequestSuccess; + } + + public Instant getLastErrorTime() { + return lastErrorTime; + } + + public void setLastUsedBucketId(int bucketId) { + this.lastUsedBucketId = bucketId; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java new file mode 100644 index 000000000..153b86842 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/CircuitState.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +/** + * Represents the possible states of a circuit breaker. + * + *

State transitions: + *

    + *
  • CLOSED -> OPEN: When failure threshold is exceeded
  • + *
  • OPEN -> HALF_OPEN: After reset time has elapsed
  • + *
  • HALF_OPEN -> CLOSED: When trial request succeeds
  • + *
  • HALF_OPEN -> OPEN: When trial request fails
  • + *
+ */ +public enum CircuitState { + /** + * Circuit is closed - normal operation, all requests pass through. + * Failures are tracked and counted towards the threshold. + */ + CLOSED, + + /** + * Circuit is open - requests are blocked immediately. + * Returns CircuitBreakerOpenError without attempting the operation. + */ + OPEN, + + /** + * Circuit is in recovery mode - allows one trial request. + * If the trial succeeds, transitions to CLOSED. + * If the trial fails, transitions back to OPEN. + */ + HALF_OPEN +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java new file mode 100644 index 000000000..ae3042d79 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategorizer.java @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +import io.ballerina.stdlib.ftp.exception.ErrorTypeProvider; +import io.ballerina.stdlib.ftp.util.FtpErrorCodeAnalyzer; +import io.ballerina.stdlib.ftp.util.FtpUtil; + +import java.io.EOFException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.Locale; + +/** + * Categorizes exceptions into failure categories for circuit breaker evaluation. + * Uses a three-tier approach: + * 1. Check if exception implements ErrorTypeProvider (structured error types) + * 2. Check exception type in cause chain + * 3. Fall back to message-based categorization with FtpErrorCodeAnalyzer + */ +public final class FailureCategorizer { + + private FailureCategorizer() { + // Utility class + } + + /** + * Categorizes an exception into a FailureCategory. + * First tries ErrorTypeProvider, then exception types, then message patterns. + * + * @param throwable The exception to categorize + * @return The FailureCategory, or null if the exception doesn't match any specific category + */ + public static FailureCategory categorize(Throwable throwable) { + if (throwable == null) { + return null; + } + + // First, check if any exception in the chain implements ErrorTypeProvider + Throwable current = throwable; + while (current != null) { + FailureCategory category = categorizeByErrorTypeProvider(current); + if (category != null) { + return category; + } + + Throwable next = current.getCause(); + if (next == current) { + break; + } + current = next; + } + + // Second, try to categorize by walking the cause chain and checking exception types + current = throwable; + while (current != null) { + FailureCategory category = categorizeByExceptionType(current); + if (category != null) { + return category; + } + + // Also check message at each level using FtpErrorCodeAnalyzer + category = categorizeByMessage(current.getMessage()); + if (category != null) { + return category; + } + + Throwable next = current.getCause(); + if (next == current) { + break; + } + current = next; + } + + return null; + } + + private static FailureCategory categorizeByErrorTypeProvider(Throwable throwable) { + if (!(throwable instanceof ErrorTypeProvider provider)) { + return null; + } + + String errorType = provider.errorType(); + if (errorType == null) { + return null; + } + + // Map error types to failure categories + if (errorType.equals(FtpUtil.ErrorType.ConnectionError.errorType())) { + return FailureCategory.CONNECTION_ERROR; + } + if (errorType.equals(FtpUtil.ErrorType.ServiceUnavailableError.errorType())) { + return FailureCategory.TRANSIENT_ERROR; + } + + return null; + } + + private static FailureCategory categorizeByExceptionType(Throwable throwable) { + // Connection errors + if (throwable instanceof ConnectException || + throwable instanceof SocketTimeoutException || + throwable instanceof UnknownHostException || + throwable instanceof NoRouteToHostException) { + return FailureCategory.CONNECTION_ERROR; + } + + // Server disconnect (EOF indicates unexpected connection close) - now TRANSIENT_ERROR + if (throwable instanceof EOFException) { + return FailureCategory.TRANSIENT_ERROR; + } + + // SocketException could be either connection or transient + if (throwable instanceof SocketException) { + String message = throwable.getMessage(); + if (message != null) { + String lower = message.toLowerCase(Locale.ROOT); + if (lower.contains("connection reset") || lower.contains("broken pipe")) { + return FailureCategory.TRANSIENT_ERROR; + } + } + return FailureCategory.CONNECTION_ERROR; + } + + return null; + } + + private static FailureCategory categorizeByMessage(String message) { + if (message == null) { + return null; + } + + // Use FtpErrorCodeAnalyzer for FTP response codes + if (FtpErrorCodeAnalyzer.isServiceUnavailable(message)) { + return FailureCategory.TRANSIENT_ERROR; + } + + if (FtpErrorCodeAnalyzer.isAuthenticationError(message)) { + return FailureCategory.AUTHENTICATION_ERROR; + } + + String lower = message.toLowerCase(Locale.ROOT); + + // Connection errors - network/connectivity issues + if (lower.contains("connection refused") || + lower.contains("connection timed out") || + lower.contains("connect timed out") || + lower.contains("network is unreachable") || + lower.contains("no route to host") || + lower.contains("host is unreachable") || + lower.contains("unknown host") || + lower.contains("connect failed") || + lower.contains("failed to connect") || + lower.contains("could not connect")) { + return FailureCategory.CONNECTION_ERROR; + } + + // Additional authentication error patterns (beyond FTP codes) + if (lower.contains("authentication failed") || + lower.contains("invalid credentials") || + lower.contains("auth fail") || + lower.contains("login failed") || + lower.contains("access denied") || + lower.contains("not authorized") || + lower.contains("login incorrect")) { + return FailureCategory.AUTHENTICATION_ERROR; + } + + // Transient errors - connection lost during operation + if (lower.contains("connection reset") || + lower.contains("broken pipe") || + lower.contains("connection closed") || + lower.contains("unexpected end") || + lower.contains("stream closed") || + lower.contains("socket closed") || + lower.contains("remote host closed")) { + return FailureCategory.TRANSIENT_ERROR; + } + + return null; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java new file mode 100644 index 000000000..e2cb68803 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/circuitbreaker/FailureCategory.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client.circuitbreaker; + +/** + * Categories of errors that can trip the circuit breaker. + * Maps to the Ballerina FailureCategory enum. + */ +public enum FailureCategory { + /** + * Connection-level failures (timeout, refused, reset, DNS resolution). + * Maps to ConnectionError type. + */ + CONNECTION_ERROR, + + /** + * Authentication failures (invalid credentials, key rejected). + * Detected via FTP 530 response code. + */ + AUTHENTICATION_ERROR, + + /** + * Transient server errors that may succeed on retry. + * Maps to ServiceUnavailableError type (FTP codes 421, 425, 426, 450, 451, 452). + */ + TRANSIENT_ERROR, + + /** + * All errors regardless of type. When configured, any error will trip the circuit. + */ + ALL_ERRORS; + + /** + * Converts a Ballerina enum string value to the corresponding Java enum. + * + * @param value The string value from Ballerina (e.g., "CONNECTION_ERROR") + * @return The corresponding FailureCategory, or null if not found + */ + public static FailureCategory fromString(String value) { + if (value == null) { + return null; + } + try { + return FailureCategory.valueOf(value); + } catch (IllegalArgumentException e) { + return null; + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index 7fc507fd6..c40a3de92 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -213,4 +213,9 @@ private FtpConstants() { public static final String OP_GET_JSON = "getJson"; public static final String OP_GET_XML = "getXml"; public static final String OP_GET_CSV = "getCsv"; + + // Circuit breaker configuration + public static final String CIRCUIT_BREAKER = "circuitBreaker"; + public static final String CIRCUIT_BREAKER_NATIVE_DATA = "CircuitBreaker"; + public static final String CIRCUIT_BREAKER_OPEN_ERROR = "CircuitBreakerOpenError"; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java index f5a32b565..dc586ef54 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java @@ -624,7 +624,8 @@ public enum ErrorType { FileAlreadyExistsError("FileAlreadyExistsError"), InvalidConfigError("InvalidConfigError"), ServiceUnavailableError("ServiceUnavailableError"), - AllRetryAttemptsFailedError("AllRetryAttemptsFailedError"); + AllRetryAttemptsFailedError("AllRetryAttemptsFailedError"), + CircuitBreakerOpenError("CircuitBreakerOpenError"); private String errorType;