Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ballerina/client_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ public enum Compression {
# malformed CSV records are skipped and written to a separate file in the current directory
# + retryConfig - Configuration for retry behavior on transient failures for non-streaming read operations
# (getBytes, getText, getJson, getXml, getCsv). If not specified, no retry is attempted.
# + circuitBreaker - Circuit breaker configuration for handling server failures gracefully.
# When enabled, the client will fail fast if the server is experiencing issues.
public type ClientConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -469,6 +471,7 @@ public type ClientConfiguration record {|
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
RetryConfig retryConfig?;
CircuitBreakerConfig circuitBreaker?;
|};

isolated function getInputContent(string path, stream<byte[] & readonly, io:Error?>|string|xml|json content,
Expand Down
45 changes: 45 additions & 0 deletions ballerina/commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,48 @@ public type FileDependencyCondition record {|
DependencyMatchingMode matchingMode = ALL;
int requiredFileCount = 1;
|};

# Categories of errors that can trip the circuit breaker.
# Used to configure which types of failures should count towards the circuit breaker threshold.
public enum FailureCategory {
# Connection-level failures (timeout, refused, reset, DNS resolution).
# Maps to ConnectionError type.
CONNECTION_ERROR,
# Authentication failures (invalid credentials, key rejected).
# Detected via FTP 530 response code.
AUTHENTICATION_ERROR,
# Transient server errors that may succeed on retry.
# Maps to ServiceUnavailableError type (FTP codes 421, 425, 426, 450, 451, 452).
TRANSIENT_ERROR,
# All errors regardless of type
ALL_ERRORS
}

# Configuration for the sliding time window used in failure calculation.
# The rolling window divides time into discrete buckets for efficient tracking of request success/failure rates.
#
# + requestVolumeThreshold - Minimum number of requests in the window before evaluating failure threshold.
# Circuit breaker will not trip until this many requests have been made.
# + timeWindow - Time period in seconds for the sliding window
# + bucketSize - Granularity of time buckets in seconds. Must be less than timeWindow.
public type RollingWindow record {|
int requestVolumeThreshold = 10;
decimal timeWindow = 60;
decimal bucketSize = 10;
|};

# Configuration for circuit breaker behavior.
# The circuit breaker prevents cascade failures by temporarily blocking requests when the server is experiencing issues.
#
# + rollingWindow - Time window configuration for failure tracking
# + failureThreshold - Failure ratio threshold (0.0 to 1.0) that trips the circuit.
# For example, 0.5 means the circuit will open when 50% of requests fail.
# + resetTime - Seconds to wait in OPEN state before transitioning to HALF_OPEN to test recovery
# + failureCategories - Error categories that count as failures for the circuit breaker.
# Only errors matching these categories will contribute to the failure ratio.
public type CircuitBreakerConfig record {|
RollingWindow rollingWindow = {};
float failureThreshold = 0.5;
decimal resetTime = 30;
FailureCategory[] failureCategories = [CONNECTION_ERROR, TRANSIENT_ERROR];
|};
6 changes: 6 additions & 0 deletions ballerina/error.bal
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ public type ServiceUnavailableError distinct Error;
# Represents an error that occurs when all retry attempts have been exhausted.
# This error wraps the last failure encountered during retry attempts.
public type AllRetryAttemptsFailedError distinct Error;

# Error returned when the circuit breaker is in OPEN state.
# This indicates the FTP server is unavailable and requests are being blocked
# to prevent cascade failures. The client should implement fallback logic
# or wait for the circuit to transition to HALF_OPEN state.
public type CircuitBreakerOpenError distinct ServiceUnavailableError;
193 changes: 193 additions & 0 deletions ballerina/tests/circuit_breaker_test.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/test;

// Test client creation with invalid failure threshold (> 1.0)
@test:Config {}
public function testClientWithInvalidFailureThresholdHigh() {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
failureThreshold: 1.5,
resetTime: 30
}
};
Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold > 1.0");
if cbClient is Error {
test:assertTrue(cbClient.message().includes("failureThreshold"),
msg = "Error message should mention failureThreshold");
}
}

// Test client creation with invalid failure threshold (< 0.0)
@test:Config {}
public function testClientWithInvalidFailureThresholdLow() {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
failureThreshold: -0.1,
resetTime: 30
}
};
Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold < 0.0");
if cbClient is Error {
test:assertTrue(cbClient.message().includes("failureThreshold"),
msg = "Error message should mention failureThreshold");
}
}

// Test client creation with bucketSize greater than timeWindow
@test:Config {}
public function testClientWithInvalidBucketSize() {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
timeWindow: 30,
bucketSize: 60 // bucket larger than window
}
}
};
Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail when bucketSize > timeWindow");
if cbClient is Error {
test:assertTrue(cbClient.message().includes("timeWindow") || cbClient.message().includes("bucketSize"),
msg = "Error message should mention timeWindow or bucketSize");
}
}

// Test that operations work normally with circuit breaker enabled
@test:Config {}
public function testOperationsWithCircuitBreaker() returns error? {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
requestVolumeThreshold: 5,
timeWindow: 60,
bucketSize: 10
},
failureThreshold: 0.5,
resetTime: 30
}
};
Client cbClient = check new (config);

// Test basic operations work with circuit breaker enabled
boolean exists = check cbClient->exists("/home/in");
test:assertTrue(exists, msg = "Directory should exist");

FileInfo[] files = check cbClient->list("/home/in");
test:assertTrue(files.length() >= 0, msg = "List operation should return files array");

check cbClient->close();
}

// Test that client creation fails when server doesn't exist (connection error, not circuit breaker)
@test:Config {}
public function testClientCreationFailsForNonExistentServer() {
// Create a client pointing to a non-existent server
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21299, // Non-existent port
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
requestVolumeThreshold: 1,
timeWindow: 60,
bucketSize: 10
},
failureThreshold: 0.5,
resetTime: 5,
failureCategories: [CONNECTION_ERROR]
}
};

Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail for non-existent server");
}

// Test that circuit breaker opens after failures and returns CircuitBreakerOpenError
@test:Config {}
public function testCircuitBreakerOpensAfterFailures() returns error? {
// Configure circuit breaker with low thresholds to trigger quickly
// - requestVolumeThreshold: 2 (only need 2 requests before circuit can trip)
// - failureThreshold: 0.5 (50% failure rate trips the circuit)
// - ALL_ERRORS: any error counts as failure
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
requestVolumeThreshold: 2,
timeWindow: 60,
bucketSize: 10
},
failureThreshold: 0.5,
resetTime: 30,
failureCategories: [ALL_ERRORS]
}
};

Client cbClient = check new (config);

// Make requests that will fail (reading non-existent files)
// These failures should accumulate and trip the circuit
string nonExistentPath = "/non/existent/file/path/that/does/not/exist.txt";

// First failure
byte[]|Error result1 = cbClient->getBytes(nonExistentPath);
test:assertTrue(result1 is Error, msg = "First request should fail (file not found)");
test:assertFalse(result1 is CircuitBreakerOpenError,
msg = "First failure should be regular error, not circuit breaker error");

// Second failure - after this, we have 2 requests with 100% failure rate
byte[]|Error result2 = cbClient->getBytes(nonExistentPath);
test:assertTrue(result2 is Error, msg = "Second request should fail");
test:assertFalse(result2 is CircuitBreakerOpenError,
msg = "Second failure should be regular error, not circuit breaker error");

// Third request - circuit should now be OPEN and reject immediately
byte[]|Error result3 = cbClient->getBytes(nonExistentPath);
test:assertTrue(result3 is CircuitBreakerOpenError,
msg = "Third request should fail with CircuitBreakerOpenError (circuit is open)");

if result3 is CircuitBreakerOpenError {
string message = result3.message();
test:assertTrue(message.includes("Circuit breaker is OPEN"),
msg = "Error message should indicate circuit breaker is open");
}

check cbClient->close();
}
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## unreleased

### Added

- [Add Circuit breaker support](https://github.com/ballerina-platform/ballerina-library/issues/8382)
- [Add automatic retry support with exponential backoff for FTP client](https://github.com/ballerina-platform/ballerina-library/issues/8585)
- [Add FTP Listener Coordination Support](https://github.com/ballerina-platform/ballerina-library/issues/8490)
- [Add distinct error types](https://github.com/ballerina-platform/ballerina-library/issues/8597)
Expand Down
Loading
Loading