Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ballerina/client_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,11 @@ public enum Compression {
# + sftpCompression - Compression algorithms (SFTP only)
# + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only)
# + proxy - Proxy configuration for SFTP connections (SFTP only)
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# malformed CSV records are skipped and written to a separate file in the current directory
# + retryConfig - Configuration for retry behavior on transient failures. If not specified, no retry is attempted.
# + circuitBreaker - Circuit breaker configuration for handling server failures gracefully.
# When enabled, the client will fail fast if the server is experiencing issues.
public type ClientConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -466,6 +469,8 @@ public type ClientConfiguration record {|
TransferCompression[] sftpCompression = [NO];
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
RetryConfig retryConfig?;
CircuitBreakerConfig circuitBreaker?;
|};

isolated function getInputContent(string path, stream<byte[] & readonly, io:Error?>|string|xml|json content,
Expand Down
59 changes: 59 additions & 0 deletions ballerina/commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,20 @@ public type SocketConfig record {|
decimal sftpSessionTimeout = 300.0;
|};

# Configuration for retry behavior on transient failures.
# Enables automatic retry with exponential backoff for read operations.
#
# + count - Maximum number of retry attempts (default: 3)
# + interval - Initial wait time in seconds between retries (default: 1.0)
# + backOffFactor - Multiplier for exponential backoff (default: 2.0)
# + maxWaitInterval - Maximum wait time cap in seconds (default: 30.0)
public type RetryConfig record {|
int count = 3;
decimal interval = 1.0;
float backOffFactor = 2.0;
decimal maxWaitInterval = 30.0;
|};

# Internal configuration for content to be written in put and append operations.
#
# + filePath - Path of the file to be created or appended
Expand Down Expand Up @@ -241,3 +255,48 @@ public type FileDependencyCondition record {|
DependencyMatchingMode matchingMode = ALL;
int requiredFileCount = 1;
|};

# Categories of errors that can trip the circuit breaker.
# Used to configure which types of failures should count towards the circuit breaker threshold.
public enum FailureCategory {
# Connection-level failures (timeout, refused, reset, DNS resolution).
# Maps to ConnectionError type.
CONNECTION_ERROR,
# Authentication failures (invalid credentials, key rejected).
# Detected via FTP 530 response code.
AUTHENTICATION_ERROR,
# Transient server errors that may succeed on retry.
# Maps to ServiceUnavailableError type (FTP codes 421, 425, 426, 450, 451, 452).
TRANSIENT_ERROR,
# All errors regardless of type
ALL_ERRORS
}

# Configuration for the sliding time window used in failure calculation.
# The rolling window divides time into discrete buckets for efficient tracking of request success/failure rates.
#
# + requestVolumeThreshold - Minimum number of requests in the window before evaluating failure threshold.
# Circuit breaker will not trip until this many requests have been made.
# + timeWindow - Time period in seconds for the sliding window
# + bucketSize - Granularity of time buckets in seconds. Must be less than timeWindow.
public type RollingWindow record {|
int requestVolumeThreshold = 10;
decimal timeWindow = 60;
decimal bucketSize = 10;
|};

# Configuration for circuit breaker behavior.
# The circuit breaker prevents cascade failures by temporarily blocking requests when the server is experiencing issues.
#
# + rollingWindow - Time window configuration for failure tracking
# + failureThreshold - Failure ratio threshold (0.0 to 1.0) that trips the circuit.
# For example, 0.5 means the circuit will open when 50% of requests fail.
# + resetTime - Seconds to wait in OPEN state before transitioning to HALF_OPEN to test recovery
# + failureCategories - Error categories that count as failures for the circuit breaker.
# Only errors matching these categories will contribute to the failure ratio.
public type CircuitBreakerConfig record {|
RollingWindow rollingWindow = {};
float failureThreshold = 0.5;
decimal resetTime = 30;
FailureCategory[] failureCategories = [CONNECTION_ERROR, TRANSIENT_ERROR];
|};
10 changes: 10 additions & 0 deletions ballerina/error.bal
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,13 @@ public type InvalidConfigError distinct Error;
# Common causes include: server overload (421), connection issues (425, 426),
# temporary file locks (450), or server-side processing errors (451).
public type ServiceUnavailableError distinct Error;

# Represents an error that occurs when all retry attempts have been exhausted.
# This error wraps the last failure encountered during retry attempts.
public type AllRetryAttemptsFailedError distinct Error;

# Error returned when the circuit breaker is in OPEN state.
# This indicates the FTP server is unavailable and requests are being blocked
# to prevent cascade failures. The client should implement fallback logic
# or wait for the circuit to transition to HALF_OPEN state.
public type CircuitBreakerOpenError distinct ServiceUnavailableError;
193 changes: 193 additions & 0 deletions ballerina/tests/circuit_breaker_test.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/test;

// Test client creation with invalid failure threshold (> 1.0)
@test:Config {}
public function testClientWithInvalidFailureThresholdHigh() {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
failureThreshold: 1.5,
resetTime: 30
}
};
Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold > 1.0");
if cbClient is Error {
test:assertTrue(cbClient.message().includes("failureThreshold"),
msg = "Error message should mention failureThreshold");
}
}

// Test client creation with invalid failure threshold (< 0.0)
@test:Config {}
public function testClientWithInvalidFailureThresholdLow() {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
failureThreshold: -0.1,
resetTime: 30
}
};
Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail with invalid failure threshold < 0.0");
if cbClient is Error {
test:assertTrue(cbClient.message().includes("failureThreshold"),
msg = "Error message should mention failureThreshold");
}
}

// Test client creation with bucketSize greater than timeWindow
@test:Config {}
public function testClientWithInvalidBucketSize() {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
timeWindow: 30,
bucketSize: 60 // bucket larger than window
}
}
};
Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail when bucketSize > timeWindow");
if cbClient is Error {
test:assertTrue(cbClient.message().includes("timeWindow") || cbClient.message().includes("bucketSize"),
msg = "Error message should mention timeWindow or bucketSize");
}
}

// Test that operations work normally with circuit breaker enabled
@test:Config {}
public function testOperationsWithCircuitBreaker() returns error? {
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
requestVolumeThreshold: 5,
timeWindow: 60,
bucketSize: 10
},
failureThreshold: 0.5,
resetTime: 30
}
};
Client cbClient = check new (config);

// Test basic operations work with circuit breaker enabled
boolean exists = check cbClient->exists("/home/in");
test:assertTrue(exists, msg = "Directory should exist");

FileInfo[] files = check cbClient->list("/home/in");
test:assertTrue(files.length() >= 0, msg = "List operation should return files array");

check cbClient->close();
}

// Test that client creation fails when server doesn't exist (connection error, not circuit breaker)
@test:Config {}
public function testClientCreationFailsForNonExistentServer() {
// Create a client pointing to a non-existent server
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21299, // Non-existent port
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
requestVolumeThreshold: 1,
timeWindow: 60,
bucketSize: 10
},
failureThreshold: 0.5,
resetTime: 5,
failureCategories: [CONNECTION_ERROR]
}
};

Client|Error cbClient = new (config);
test:assertTrue(cbClient is Error, msg = "Client creation should fail for non-existent server");
}

// Test that circuit breaker opens after failures and returns CircuitBreakerOpenError
@test:Config {}
public function testCircuitBreakerOpensAfterFailures() returns error? {
// Configure circuit breaker with low thresholds to trigger quickly
// - requestVolumeThreshold: 2 (only need 2 requests before circuit can trip)
// - failureThreshold: 0.5 (50% failure rate trips the circuit)
// - ALL_ERRORS: any error counts as failure
ClientConfiguration config = {
protocol: FTP,
host: "127.0.0.1",
port: 21212,
auth: {credentials: {username: "wso2", password: "wso2123"}},
circuitBreaker: {
rollingWindow: {
requestVolumeThreshold: 2,
timeWindow: 60,
bucketSize: 10
},
failureThreshold: 0.5,
resetTime: 30,
failureCategories: [ALL_ERRORS]
}
};

Client cbClient = check new (config);

// Make requests that will fail (reading non-existent files)
// These failures should accumulate and trip the circuit
string nonExistentPath = "/non/existent/file/path/that/does/not/exist.txt";

// First failure
byte[]|Error result1 = cbClient->getBytes(nonExistentPath);
test:assertTrue(result1 is Error, msg = "First request should fail (file not found)");
test:assertFalse(result1 is CircuitBreakerOpenError,
msg = "First failure should be regular error, not circuit breaker error");

// Second failure - after this, we have 2 requests with 100% failure rate
byte[]|Error result2 = cbClient->getBytes(nonExistentPath);
test:assertTrue(result2 is Error, msg = "Second request should fail");
test:assertFalse(result2 is CircuitBreakerOpenError,
msg = "Second failure should be regular error, not circuit breaker error");

// Third request - circuit should now be OPEN and reject immediately
byte[]|Error result3 = cbClient->getBytes(nonExistentPath);
test:assertTrue(result3 is CircuitBreakerOpenError,
msg = "Third request should fail with CircuitBreakerOpenError (circuit is open)");

if result3 is CircuitBreakerOpenError {
string message = result3.message();
test:assertTrue(message.includes("Circuit breaker is OPEN"),
msg = "Error message should indicate circuit breaker is open");
}

check cbClient->close();
}
3 changes: 2 additions & 1 deletion ballerina/tests/client_endpoint_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -1137,9 +1137,10 @@ public function testListFiles() {
"child_directory",
"content-methods",
"age",
"retry",
"test3.txt"
];
int[] fileSizes = [0, 61, 0, 0, 0, 0, 0, 145, 0, 0, 16400, 9000, 0, 0, 0, 12];
int[] fileSizes = [0, 61, 0, 0, 0, 0, 0, 145, 0, 0, 16400, 9000, 0, 0, 0, 0, 12];
FileInfo[]|Error response = (<Client>clientEp)->list("/home/in");
if response is FileInfo[] {
log:printInfo("List of files/directories: ");
Expand Down
Loading
Loading