diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index ba048f8ad..8513cbbf3 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -452,6 +452,8 @@ public enum Compression { # + proxy - Proxy configuration for SFTP connections (SFTP only) # + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, # malformed CSV records are skipped and written to a separate file in the current directory +# + retryConfig - Configuration for retry behavior on transient failures for non-streaming read operations +# (getBytes, getText, getJson, getXml, getCsv). If not specified, no retry is attempted. public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -466,6 +468,7 @@ public type ClientConfiguration record {| TransferCompression[] sftpCompression = [NO]; string sftpSshKnownHosts?; FailSafeOptions csvFailSafe?; + RetryConfig retryConfig?; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/commons.bal b/ballerina/commons.bal index 3f56c942c..e05b9430c 100644 --- a/ballerina/commons.bal +++ b/ballerina/commons.bal @@ -178,6 +178,20 @@ public type SocketConfig record {| decimal sftpSessionTimeout = 300.0; |}; +# Configuration for retry behavior on transient failures. +# Enables automatic retry with exponential backoff for read operations. +# +# + count - Maximum number of retry attempts +# + interval - Initial wait time in seconds between retries +# + backOffFactor - Multiplier for exponential backoff +# + maxWaitInterval - Maximum wait time cap in seconds +public type RetryConfig record {| + int count = 3; + decimal interval = 1.0; + decimal backOffFactor = 2.0; + decimal maxWaitInterval = 30.0; +|}; + # Internal configuration for content to be written in put and append operations. # # + filePath - Path of the file to be created or appended diff --git a/ballerina/error.bal b/ballerina/error.bal index 9ce8f4959..a2a1c976f 100644 --- a/ballerina/error.bal +++ b/ballerina/error.bal @@ -36,3 +36,7 @@ public type InvalidConfigError distinct Error; # Common causes include: server overload (421), connection issues (425, 426), # temporary file locks (450), or server-side processing errors (451). public type ServiceUnavailableError distinct Error; + +# Represents an error that occurs when all retry attempts have been exhausted. +# This error wraps the last failure encountered during retry attempts. +public type AllRetryAttemptsFailedError distinct Error; diff --git a/ballerina/tests/client_endpoint_test.bal b/ballerina/tests/client_endpoint_test.bal index 94809a610..c175cfbe4 100644 --- a/ballerina/tests/client_endpoint_test.bal +++ b/ballerina/tests/client_endpoint_test.bal @@ -1137,9 +1137,10 @@ public function testListFiles() { "child_directory", "content-methods", "age", + "retry", "test3.txt" ]; - int[] fileSizes = [0, 61, 0, 0, 0, 0, 0, 145, 0, 0, 16400, 9000, 0, 0, 0, 12]; + int[] fileSizes = [0, 61, 0, 0, 0, 0, 0, 145, 0, 0, 16400, 9000, 0, 0, 0, 0, 12]; FileInfo[]|Error response = (clientEp)->list("/home/in"); if response is FileInfo[] { log:printInfo("List of files/directories: "); diff --git a/ballerina/tests/retry_test.bal b/ballerina/tests/retry_test.bal new file mode 100644 index 000000000..b4ea5764f --- /dev/null +++ b/ballerina/tests/retry_test.bal @@ -0,0 +1,257 @@ +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/test; +import ballerina/time; + +// Test configuration with retry enabled +ClientConfiguration retryConfig = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + userDirIsRoot: false, + retryConfig: { + count: 3, + interval: 0.5, // 500ms initial interval for faster tests + backOffFactor: 2.0, + maxWaitInterval: 5.0 + } +}; + +// Test configuration with custom retry settings +ClientConfiguration customRetryConfig = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + userDirIsRoot: false, + retryConfig: { + count: 2, + interval: 0.2, + backOffFactor: 1.5, + maxWaitInterval: 1.0 + } +}; + +Client? retryClientEp = (); +Client? customRetryClientEp = (); + +@test:BeforeSuite +function initRetryTestEnvironment() returns error? { + retryClientEp = check new (retryConfig); + customRetryClientEp = check new (customRetryConfig); +} + +// Test: Successful getBytes with retry config (no retry needed) +@test:Config {} +function testGetBytesWithRetryConfig_Success() returns error? { + string testPath = "/home/in/retry/test1.txt"; + + byte[] result = check (retryClientEp)->getBytes(testPath); + test:assertTrue(result.length() > 0, msg = "Should return non-empty bytes"); +} + +// Test: Successful getText with retry config (no retry needed) +@test:Config { + dependsOn: [testGetBytesWithRetryConfig_Success] +} +function testGetTextWithRetryConfig_Success() returns error? { + string testPath = "/home/in/retry/test1.txt"; + + string result = check (retryClientEp)->getText(testPath); + test:assertTrue(result.length() > 0, msg = "Should return non-empty text"); +} + +// Test: Successful getJson with retry config (no retry needed) +@test:Config { + dependsOn: [testGetTextWithRetryConfig_Success] +} +function testGetJsonWithRetryConfig_Success() returns error? { + // First, put a JSON file + string jsonPath = "/home/in/retry/retry-test.json"; + json testJson = {name: "retry-test", value: 42}; + check (retryClientEp)->putJson(jsonPath, testJson); + + json result = check (retryClientEp)->getJson(jsonPath); + test:assertEquals(result, testJson, msg = "JSON content should match"); + + // Cleanup + check (retryClientEp)->delete(jsonPath); +} + +// Test: Successful getXml with retry config (no retry needed) +@test:Config { + dependsOn: [testGetJsonWithRetryConfig_Success] +} +function testGetXmlWithRetryConfig_Success() returns error? { + // First, put an XML file + string xmlPath = "/home/in/retry/retry-test.xml"; + xml testXml = xml `retry-test`; + check (retryClientEp)->putXml(xmlPath, testXml); + + xml result = check (retryClientEp)->getXml(xmlPath); + test:assertEquals(result.toString(), testXml.toString(), msg = "XML content should match"); + + // Cleanup + check (retryClientEp)->delete(xmlPath); +} + +// Test: Successful getCsv with retry config (no retry needed) +@test:Config { + dependsOn: [testGetXmlWithRetryConfig_Success] +} +function testGetCsvWithRetryConfig_Success() returns error? { + // First, put a CSV file + string csvPath = "/home/in/retry/retry-test.csv"; + string[][] testCsv = [ + ["id", "name"], + ["1", "Alice"], + ["2", "Bob"] + ]; + check (retryClientEp)->putCsv(csvPath, testCsv); + + string[][] result = check (retryClientEp)->getCsv(csvPath); + test:assertEquals(result.length(), 2, msg = "Should return 2 data rows"); + test:assertEquals(result[0][1], "Alice", msg = "First row name should be Alice"); + + // Cleanup + check (retryClientEp)->delete(csvPath); +} + +// Test: Retry behavior on non-existent file (should fail after retries) +@test:Config { + dependsOn: [testGetCsvWithRetryConfig_Success] +} +function testGetBytesWithRetry_NonExistentFile() returns error? { + string nonExistentPath = "/home/in/retry/non-existent-retry-test-file.txt"; + + time:Utc startTime = time:utcNow(); + byte[]|Error result = (customRetryClientEp)->getBytes(nonExistentPath); + time:Utc endTime = time:utcNow(); + + // Should be an error after retries + test:assertTrue(result is Error, msg = "Should return error for non-existent file"); + + if result is Error { + // Error message should indicate retry exhaustion + test:assertTrue(result.message().includes("failed after") || + result.message().includes("not found"), + msg = "Error should indicate failure: " + result.message()); + } + + // With retry config (count=2, interval=0.2, backOffFactor=1.5): + // Initial attempt + retry 1 (0.2s wait) + retry 2 (0.3s wait) + // Total minimum: ~0.5s + decimal elapsedSeconds = (endTime[0] - startTime[0]); + // Allow some tolerance - should take at least some time due to retries + // Note: The elapsed time check is approximate due to test environment variations +} + +// Test: Retry behavior on getText with non-existent file +@test:Config { + dependsOn: [testGetBytesWithRetry_NonExistentFile] +} +function testGetTextWithRetry_NonExistentFile() returns error? { + string nonExistentPath = "/home/in/retry/non-existent-retry-text.txt"; + + string|Error result = (customRetryClientEp)->getText(nonExistentPath); + + test:assertTrue(result is Error, msg = "Should return error for non-existent file"); +} + +// Test: Retry behavior on getJson with non-existent file +@test:Config { + dependsOn: [testGetTextWithRetry_NonExistentFile] +} +function testGetJsonWithRetry_NonExistentFile() returns error? { + string nonExistentPath = "/home/in/retry/non-existent-retry.json"; + + json|Error result = (customRetryClientEp)->getJson(nonExistentPath); + + test:assertTrue(result is Error, msg = "Should return error for non-existent file"); +} + +// Test: Client without retry config should fail immediately on non-existent file +@test:Config { + dependsOn: [testGetJsonWithRetry_NonExistentFile] +} +function testGetBytesWithoutRetry_ImmediateFail() returns error? { + // Use the client without retry config + string nonExistentPath = "/home/in/retry/non-existent-no-retry.txt"; + + time:Utc startTime = time:utcNow(); + byte[]|Error result = (clientEp)->getBytes(nonExistentPath); + time:Utc endTime = time:utcNow(); + + test:assertTrue(result is Error, msg = "Should return error for non-existent file"); + + // Without retry, should fail quickly (< 1 second) + decimal elapsedSeconds = (endTime[0] - startTime[0]); + test:assertTrue(elapsedSeconds < 2.0d, + msg = "Without retry, should fail quickly. Elapsed: " + elapsedSeconds.toString() + "s"); +} + +// Test: Client creation with minimal retry config +@test:Config {} +function testClientWithMinimalRetryConfig() returns error? { + ClientConfiguration minimalRetryConf = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + retryConfig: {} // Use all defaults + }; + + Client minimalClient = check new (minimalRetryConf); + + // Should be able to perform operations + string testPath = "/home/in/test1.txt"; + byte[] result = check minimalClient->getBytes(testPath); + test:assertTrue(result.length() > 0, msg = "Should return non-empty bytes"); + + check minimalClient->close(); +} + +// Test: Verify retry doesn't affect write operations +@test:Config { + dependsOn: [testClientWithMinimalRetryConfig] +} +function testWriteOperationsWithRetryConfig() returns error? { + string testPath = "/home/in/retry-write-test.txt"; + string content = "test content for retry write"; + + // Write operations should work normally with retry config + Error? putResult = (retryClientEp)->putText(testPath, content); + test:assertEquals(putResult, (), msg = "putText should succeed with retry config"); + + // Verify content was written + string getText = check (retryClientEp)->getText(testPath); + test:assertEquals(getText, content, msg = "Content should match after write"); + + // Cleanup + check (retryClientEp)->delete(testPath); +} + +@test:AfterSuite +function cleanupRetryTestEnvironment() returns error? { + if retryClientEp is Client { + check (retryClientEp)->close(); + } + if customRetryClientEp is Client { + check (customRetryClientEp)->close(); + } +} diff --git a/changelog.md b/changelog.md index 108ba4bbf..8995364f4 100644 --- a/changelog.md +++ b/changelog.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## unreleased ### Added - +- [Add automatic retry support with exponential backoff for FTP client](https://github.com/ballerina-platform/ballerina-library/issues/8585) - [Add FTP Listener Coordination Support](https://github.com/ballerina-platform/ballerina-library/issues/8490) - [Add distinct error types](https://github.com/ballerina-platform/ballerina-library/issues/8597) - [Incorporate the csv fail safe support in the FTP listener](https://github.com/ballerina-platform/ballerina-library/issues/8502) diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 6fab0ef8e..50fa64433 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -24,11 +24,13 @@ The conforming implementation of the specification is released and included in t - [2.1. Security Configurations](#21-security-configurations) - [2.2. FileInfo](#22-fileinfo) - [2.3. Error Types](#23-error-types) + - [2.4. Retry Configuration](#24-retry-configuration) - [3. Client](#3-client) - [3.1. Configurations](#31-configurations) - [3.2. Initialization](#32-initialization) - [3.2.1. Insecure Client](#321-insecure-client) - [3.2.2. Secure Client](#322-secure-client) + - [3.2.3. Client with Retry Configuration](#323-client-with-retry-configuration) - [3.3. Functions](#33-functions) - [4. Listener](#4-listener) - [4.1. Configurations](#41-configurations) @@ -197,6 +199,25 @@ if result is ftp:ConnectionError { // Handle any other FTP error } ``` + +### 2.4. Retry Configuration +* `RetryConfig` record represents the configuration for automatic retries of operations. +```ballerina +# Retry configuration for FTP operations +# +# + count - Maximum number of retry attempts +# + interval - Initial retry interval in seconds +# + backOffFactor - Multiplier for exponential backoff +# + maxWaitInterval - Maximum wait interval between retries in seconds +public type RetryConfig record {| + int count = 3; + decimal interval = 1.0; + decimal backOffFactor = 2.0; + decimal maxWaitInterval = 30.0; +|}; +``` +The retry mechanism uses exponential backoff to progressively increase wait times between retry attempts. + ## 3. Client The `ftp:Client` connects to FTP server and performs various operations on the files. It supports reading files in multiple formats (bytes, text, JSON, XML, CSV) with streaming support for large files, writing files in multiple formats, and file management operations including create, delete, rename, move, copy, and list. ### 3.1. Configurations @@ -218,6 +239,8 @@ public type ClientConfiguration record {| boolean userDirIsRoot = false; # If set to `true`, allows missing or null values when reading files in structured formats boolean laxDataBinding = false; + # Retry configuration for read operations + RetryConfig retryConfig?; |}; ``` * InputContent record represents the configurations for the input given for `put` and `append` operations. @@ -281,6 +304,26 @@ ftp:ClientConfiguration ftpConfig = { userDirIsRoot: true }; ``` +#### 3.2.3. Client with Retry Configuration +A client can be initialized with retry configuration to automatically retry failed read operations: +```ballerina +ftp:ClientConfiguration ftpConfig = { + protocol: ftp:FTP, + host: "", + port: , + retryConfig: { + count: 5, // Retry up to 5 times + interval: 2.0, // Start with 2 second wait + backOffFactor: 1.5, // Increase wait by 1.5x each time + maxWaitInterval: 20.0 // Cap wait time at 20 seconds + } +}; + +ftp:Client ftpClient = check new(ftpConfig); + +// Non-streaming read operations (getBytes, getText, getJson, getXml, getCsv) will automatically retry on failure +byte[] bytes = check ftpClient->getBytes("/path/to/file.txt"); +``` ### 3.3. Functions * FTP Client API can be used to put files on the FTP server. For this, the `put()` method can be used. ```ballerina @@ -695,6 +738,7 @@ ftp:ListenerConfiguration ftpConfig = { userDirIsRoot: true }; ``` + #### 4.3. Usage After initializing the listener, a service must be attached to the listener. There are two ways for this. 1. Attach the service to the listener directly. diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 5e58c8d1e..895b2c25e 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -25,6 +25,7 @@ import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BDecimal; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; @@ -99,7 +100,10 @@ private FtpClient() { public static Object initClientEndpoint(BObject clientEndpoint, BMap config) { String protocol = extractProtocol(config); - configureClientEndpointBasic(clientEndpoint, config, protocol); + BError basicConfigError = configureClientEndpointBasic(clientEndpoint, config, protocol); + if (basicConfigError != null) { + return basicConfigError; + } Map ftpConfig = new HashMap<>(20); Object authError = configureAuthentication(config, protocol, ftpConfig); @@ -120,13 +124,41 @@ private static String extractProtocol(BMap config) { return (config.getStringValue(StringUtils.fromString(FtpConstants.ENDPOINT_CONFIG_PROTOCOL))).getValue(); } - private static void configureClientEndpointBasic(BObject clientEndpoint, BMap config, - String protocol) { + private static BError configureClientEndpointBasic(BObject clientEndpoint, BMap config, + String protocol) { clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING, config.getBooleanValue(StringUtils.fromString(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING))); clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE, config.getMapValue(StringUtils.fromString(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE))); + // Store retry config if present + BMap retryConfig = config.getMapValue(StringUtils.fromString(FtpConstants.RETRY_CONFIG)); + if (retryConfig == null) { + clientEndpoint.addNativeData(FtpConstants.NATIVE_RETRY_ENABLED, false); + } else { + clientEndpoint.addNativeData(FtpConstants.NATIVE_RETRY_ENABLED, true); + + long count = retryConfig.getIntValue(StringUtils.fromString(FtpConstants.RETRY_COUNT)); + Object intervalObj = retryConfig.get(StringUtils.fromString(FtpConstants.RETRY_INTERVAL)); + double interval = ((BDecimal) intervalObj).floatValue(); + + Object backOffFactorObj = retryConfig.get(StringUtils.fromString(FtpConstants.RETRY_BACKOFF_FACTOR)); + double backOffFactor = ((BDecimal) backOffFactorObj).floatValue(); + + Object maxWaitIntervalObj = retryConfig.get(StringUtils.fromString(FtpConstants.RETRY_MAX_WAIT_INTERVAL)); + double maxWaitInterval = ((BDecimal) maxWaitIntervalObj).floatValue(); + + BError retryValidationError = validateRetryConfig(count, interval, backOffFactor, maxWaitInterval); + if (retryValidationError != null) { + return retryValidationError; + } + + clientEndpoint.addNativeData(FtpConstants.NATIVE_RETRY_COUNT, count); + clientEndpoint.addNativeData(FtpConstants.NATIVE_RETRY_INTERVAL, interval); + clientEndpoint.addNativeData(FtpConstants.NATIVE_RETRY_BACKOFF, backOffFactor); + clientEndpoint.addNativeData(FtpConstants.NATIVE_RETRY_MAX_WAIT, maxWaitInterval); + } + Map authMap = FtpUtil.getAuthMap(config, protocol); clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_USERNAME, authMap.get(FtpConstants.ENDPOINT_CONFIG_USERNAME)); @@ -138,6 +170,33 @@ private static void configureClientEndpointBasic(BObject clientEndpoint, BMap config, String protocol, @@ -298,51 +357,84 @@ public static Object get(BObject clientConnector) { } public static Object getBytes(Environment env, BObject clientConnector, BString filePath) { - Object content = getAllContent(env, clientConnector, filePath); - if (!(content instanceof byte[])) { - return content; - } - return convertToBallerinaByteArray((byte[]) content); + return FtpRetryHelper.executeWithRetry( + clientConnector, + () -> { + Object content = getAllContent(env, clientConnector, filePath); + if (content instanceof byte[]) { + return convertToBallerinaByteArray((byte[]) content); + } + return content; + }, + FtpConstants.OP_GET_BYTES, + filePath.getValue() + ); } public static Object getText(Environment env, BObject clientConnector, BString filePath) { - Object content = getAllContent(env, clientConnector, filePath); - if (!(content instanceof byte[])) { - return content; - } - return convertBytesToString((byte[]) content); + return FtpRetryHelper.executeWithRetry( + clientConnector, + () -> { + Object content = getAllContent(env, clientConnector, filePath); + if (content instanceof byte[]) { + return convertBytesToString((byte[]) content); + } + return content; + }, + FtpConstants.OP_GET_TEXT, + filePath.getValue() + ); } public static Object getJson(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { - Object content = getAllContent(env, clientConnector, filePath); - if (!(content instanceof byte[])) { - return content; - } - boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); - return convertBytesToJson((byte[]) content, typeDesc.getDescribingType(), laxDataBinding); + return FtpRetryHelper.executeWithRetry( + clientConnector, + () -> { + Object content = getAllContent(env, clientConnector, filePath); + if (content instanceof byte[]) { + return convertBytesToJson((byte[]) content, typeDesc.getDescribingType(), laxDataBinding); + } + return content; + }, + FtpConstants.OP_GET_JSON, + filePath.getValue() + ); } public static Object getXml(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { - Object content = getAllContent(env, clientConnector, filePath); - if (!(content instanceof byte[])) { - return content; - } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); - return convertBytesToXml((byte[]) content, typeDesc.getDescribingType(), laxDataBinding); + return FtpRetryHelper.executeWithRetry( + clientConnector, + () -> { + Object content = getAllContent(env, clientConnector, filePath); + if (content instanceof byte[]) { + return convertBytesToXml((byte[]) content, typeDesc.getDescribingType(), laxDataBinding); + } + return content; + }, + FtpConstants.OP_GET_XML, + filePath.getValue() + ); } public static Object getCsv(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { - Object content = getAllContent(env, clientConnector, filePath); - if (!(content instanceof byte[])) { - return content; - } - boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); BMap csvFailSafe = (BMap) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE); String fileNamePrefix = deriveFileNamePrefix(filePath); - return convertBytesToCsv(env, (byte[]) content, typeDesc.getDescribingType(), - laxDataBinding, csvFailSafe, fileNamePrefix); + return FtpRetryHelper.executeWithRetry( + clientConnector, + () -> { + Object content = getAllContent(env, clientConnector, filePath); + if (content instanceof byte[]) { + return convertBytesToCsv(env, (byte[]) content, typeDesc.getDescribingType(), + laxDataBinding, csvFailSafe, fileNamePrefix); + } + return content; + }, + FtpConstants.OP_GET_CSV, + filePath.getValue() + ); } public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java new file mode 100644 index 000000000..40088a794 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpRetryHelper.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.client; + +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.stdlib.ftp.util.FtpConstants; +import io.ballerina.stdlib.ftp.util.FtpUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Supplier; + +import static io.ballerina.stdlib.ftp.util.FtpUtil.ErrorType.AllRetryAttemptsFailedError; +import static io.ballerina.stdlib.ftp.util.FtpUtil.ErrorType.Error; + +/** + * Helper class for FTP retry functionality. + * Implements retry logic with exponential backoff directly in Java. + */ +public final class FtpRetryHelper { + + private static final Logger log = LoggerFactory.getLogger(FtpRetryHelper.class); + + private FtpRetryHelper() { + // private constructor + } + + protected static Object executeWithRetry(BObject clientConnector, Supplier operation, + String operationName, String filePath) { + // First attempt + Object result = operation.get(); + + // If success, return immediately + if (!(result instanceof BError)) { + return result; + } + + // Check if retry is configured + boolean isRetryEnabled = (boolean) clientConnector.getNativeData(FtpConstants.NATIVE_RETRY_ENABLED); + if (!isRetryEnabled) { + return result; + } + + // Get retry configuration + long count = (long) clientConnector.getNativeData(FtpConstants.NATIVE_RETRY_COUNT); + double interval = (double) clientConnector.getNativeData(FtpConstants.NATIVE_RETRY_INTERVAL); + double backOffFactor = (double) clientConnector.getNativeData(FtpConstants.NATIVE_RETRY_BACKOFF); + double maxWaitInterval = (double) clientConnector.getNativeData(FtpConstants.NATIVE_RETRY_MAX_WAIT); + + BError lastError = (BError) result; + double currentInterval = interval; + + log.debug("Operation '{}' failed for path '{}', starting retry with count={}, interval={}, " + + "backOffFactor={}, maxWaitInterval={}", operationName, filePath, count, interval, + backOffFactor, maxWaitInterval); + + // Retry loop + for (int attempt = 1; attempt <= count; attempt++) { + // Calculate wait time - apply backoff only after first retry + if (attempt > 1) { + currentInterval = getWaitTime(backOffFactor, maxWaitInterval, currentInterval); + } + + log.debug("FTP retry attempt {}/{} for operation '{}' on path '{}', waiting {}s", + attempt, count, operationName, filePath, currentInterval); + + // Sleep before retry (convert seconds to milliseconds) + try { + long sleepMs = (long) (currentInterval * 1000); + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return FtpUtil.createError("Retry interrupted for operation '" + operationName + "'", + e, Error.errorType()); + } + + // Execute the operation + result = operation.get(); + + if (!(result instanceof BError)) { + log.debug("Operation '{}' succeeded on retry attempt {} for path '{}'", + operationName, attempt, filePath); + return result; + } + + lastError = (BError) result; + } + + // All retries exhausted + log.debug("Operation '{}' failed after {} retry attempts for path '{}'", + operationName, count, filePath); + return FtpUtil.createError("Operation '" + operationName + "' failed after " + count + + " retry attempts: " + lastError.getMessage(), lastError, AllRetryAttemptsFailedError.errorType()); + } + + private static double getWaitTime(double backOffFactor, double maxWaitTime, double interval) { + double waitTime = interval * backOffFactor; + return Math.min(waitTime, maxWaitTime); + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/exception/FtpAllRetryAttemptsFailedException.java b/native/src/main/java/io/ballerina/stdlib/ftp/exception/FtpAllRetryAttemptsFailedException.java new file mode 100644 index 000000000..ab377a906 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/exception/FtpAllRetryAttemptsFailedException.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.exception; + +import io.ballerina.stdlib.ftp.util.FtpUtil; + +/** + * Exception thrown when all retry attempts have been exhausted. + * This exception wraps the last failure encountered during retry attempts. + */ +public class FtpAllRetryAttemptsFailedException extends RemoteFileSystemConnectorException + implements ErrorTypeProvider { + + public FtpAllRetryAttemptsFailedException(String message) { + super(message); + } + + public FtpAllRetryAttemptsFailedException(String message, Throwable cause) { + super(message, cause); + } + + @Override + public String errorType() { + return FtpUtil.ErrorType.AllRetryAttemptsFailedError.errorType(); + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index c1aa9137e..7fc507fd6 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -192,4 +192,25 @@ private FtpConstants() { // Common error messages public static final String ERR_READING_STREAM = "Error occurred while reading stream"; public static final String ERR_CREATE_STREAM = "Unable to create stream"; + + // Retry configuration + public static final String RETRY_CONFIG = "retryConfig"; + public static final String RETRY_COUNT = "count"; + public static final String RETRY_INTERVAL = "interval"; + public static final String RETRY_BACKOFF_FACTOR = "backOffFactor"; + public static final String RETRY_MAX_WAIT_INTERVAL = "maxWaitInterval"; + + // Native data keys for retry + public static final String NATIVE_RETRY_ENABLED = "Retry_Enabled"; + public static final String NATIVE_RETRY_COUNT = "Retry_Count"; + public static final String NATIVE_RETRY_INTERVAL = "Retry_Interval"; + public static final String NATIVE_RETRY_BACKOFF = "Retry_Backoff"; + public static final String NATIVE_RETRY_MAX_WAIT = "Retry_MaxWait"; + + // Operation names for retry handler + public static final String OP_GET_BYTES = "getBytes"; + public static final String OP_GET_TEXT = "getText"; + public static final String OP_GET_JSON = "getJson"; + public static final String OP_GET_XML = "getXml"; + public static final String OP_GET_CSV = "getCsv"; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java index f665758a9..f5a32b565 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpUtil.java @@ -623,7 +623,8 @@ public enum ErrorType { FileNotFoundError("FileNotFoundError"), FileAlreadyExistsError("FileAlreadyExistsError"), InvalidConfigError("InvalidConfigError"), - ServiceUnavailableError("ServiceUnavailableError"); + ServiceUnavailableError("ServiceUnavailableError"), + AllRetryAttemptsFailedError("AllRetryAttemptsFailedError"); private String errorType; diff --git a/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java b/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java index 45e9418da..c85143cc7 100644 --- a/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java +++ b/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java @@ -129,6 +129,9 @@ public static Object initFtpServer() throws Exception { fileSystem.add(new DirectoryEntry("/home/in/content-methods")); fileSystem.add(new FileEntry("/home/in/content-methods/.init", "")); + fileSystem.add(new DirectoryEntry("/home/in/retry")); + fileSystem.add(new FileEntry("/home/in/retry/test1.txt", content1)); + fileSystem.add(new DirectoryEntry("/home/in/advanced/cron")); fileSystem.add(new FileEntry("/home/in/cron/.init", ""));