From de3e2fb8811a2cdeaa02586100e347ea95f99186 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:34:15 +0530 Subject: [PATCH 01/27] Add enableCsvFailSafe field into configs --- ballerina/client_endpoint.bal | 4 ++++ ballerina/listener_endpoint.bal | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index 8b3ea79eb..100a1b30d 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -432,6 +432,9 @@ public enum Compression { # + laxDataBinding - If set to `true`, enables relaxed data binding for XML and JSON responses. # null values in JSON/XML are allowed to be mapped to optional fields # missing fields in JSON/XML are allowed to be mapped as null values +# + enableCsvFailSafe - If set to `true`, enables fail-safe mode for CSV content processing. +# In fail-safe mode, malformed CSV records are logged and skipped, +# allowing processing to continue for well-formed records public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -439,6 +442,7 @@ public type ClientConfiguration record {| AuthConfiguration auth?; boolean userDirIsRoot = false; boolean laxDataBinding = false; + boolean enableCsvFailSafe = false; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/listener_endpoint.bal b/ballerina/listener_endpoint.bal index 3df0ace6c..b81179011 100644 --- a/ballerina/listener_endpoint.bal +++ b/ballerina/listener_endpoint.bal @@ -180,6 +180,9 @@ class Job { # + laxDataBinding - If set to `true`, enables relaxed data binding for XML and JSON responses. # null values in JSON/XML are allowed to be mapped to optional fields # missing fields in JSON/XML are allowed to be mapped as null values +# + enableCsvFailSafe - If set to `true`, enables fail-safe mode for CSV content processing. +# In fail-safe mode, malformed CSV records are logged and skipped, +# allowing processing to continue for well-formed records public type ListenerConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -192,6 +195,7 @@ public type ListenerConfiguration record {| FileAgeFilter fileAgeFilter?; FileDependencyCondition[] fileDependencyConditions = []; boolean laxDataBinding = false; + boolean enableCsvFailSafe = false; |}; # FTP service for handling file system change events. From e733b3155bc9ef7d7a552caae1075f02685ba975 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:35:43 +0530 Subject: [PATCH 02/27] Update native code to incorporate csv fail safe support in ftp APIs --- .../ftp/ContentCsvStreamIteratorUtils.java | 5 +-- .../stdlib/ftp/client/FtpClient.java | 6 +++- .../ftp/server/FtpContentCallbackHandler.java | 16 ++++++--- .../stdlib/ftp/server/FtpListener.java | 24 +++++++++---- .../stdlib/ftp/server/FtpListenerHelper.java | 6 +++- .../stdlib/ftp/util/FtpConstants.java | 1 + .../stdlib/ftp/util/FtpContentConverter.java | 35 ++++++++++++++++--- 7 files changed, 74 insertions(+), 19 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java b/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java index 45b0e9842..ff75bb865 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java @@ -19,6 +19,7 @@ import io.ballerina.lib.data.csvdata.csv.Native; import io.ballerina.lib.data.csvdata.utils.ModuleUtils; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.PredefinedTypes; @@ -94,7 +95,7 @@ public static Object createRecordStream(InputStream content, Type streamValueTyp return ValueCreator.createStreamValue(streamType, contentCsvStreamObject); } - public static Object next(BObject recordIterator) { + public static Object next(Environment environment, BObject recordIterator) { final Type elementType = (Type) recordIterator.getNativeData(NATIVE_STREAM_VALUE_TYPE); final String recordTypeName = resolveRecordTypeName(elementType); final BMap streamEntry = @@ -118,9 +119,9 @@ public static Object next(BObject recordIterator) { BMap parseOptions = ValueCreator.createRecordValue(ModuleUtils.getModule(), "ParseOptions"); parseOptions.put(StringUtils.fromString("allowDataProjection"), laxDataBinding); - // Parse Object parsed = Native.parseBytes( + environment, ValueCreator.createArrayValue(bytes), parseOptions, ValueCreator.createTypedescValue(TypeCreator.createArrayType(elementType))); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 0194778a0..0491dd7b7 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -93,6 +93,8 @@ public static Object initClientEndpoint(BObject clientEndpoint, BMap authMap = FtpUtil.getAuthMap(config, protocol); clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_USERNAME, @@ -213,7 +215,9 @@ public static Object getCsv(Environment env, BObject clientConnector, BString fi } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); - return convertBytesToCsv((byte[]) content, typeDesc.getDescribingType(), laxDataBinding); + boolean enableCsvFailSafe = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE); + return convertBytesToCsv(env, (byte[]) content, typeDesc.getDescribingType(), + laxDataBinding, enableCsvFailSafe); } public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java index 7dde69d02..6f565383f 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java @@ -18,6 +18,7 @@ package io.ballerina.stdlib.ftp.server; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.concurrent.StrandMetadata; @@ -76,20 +77,23 @@ public class FtpContentCallbackHandler { private final FileSystemManager fileSystemManager; private final FileSystemOptions fileSystemOptions; private final boolean laxDataBinding; + private final boolean enableCsvFailSafe; public FtpContentCallbackHandler(Runtime ballerinaRuntime, FileSystemManager fileSystemManager, - FileSystemOptions fileSystemOptions, boolean laxDataBinding) { + FileSystemOptions fileSystemOptions, boolean laxDataBinding, + boolean enableCsvFailSafe) { this.ballerinaRuntime = ballerinaRuntime; this.fileSystemManager = fileSystemManager; this.fileSystemOptions = fileSystemOptions; this.laxDataBinding = laxDataBinding; + this.enableCsvFailSafe = enableCsvFailSafe; } /** * Processes content callbacks for added files in the event. * Routes each file to the appropriate content handler based on file extension and annotations. */ - public void processContentCallbacks(BObject service, RemoteFileSystemEvent event, + public void processContentCallbacks(Environment env, BObject service, RemoteFileSystemEvent event, FormatMethodsHolder holder, BObject callerObject) { List addedFiles = event.getAddedFiles(); @@ -110,7 +114,7 @@ public void processContentCallbacks(BObject service, RemoteFileSystemEvent event // Convert content based on method signature MethodType methodType = methodTypeOpt.get(); - Object convertedContent = convertFileContent(fileObject, inputStream, methodType); + Object convertedContent = convertFileContent(env, fileObject, inputStream, methodType); // Prepare method arguments Object[] methodArguments = prepareContentMethodArguments(methodType, convertedContent, @@ -153,7 +157,8 @@ private byte[] fetchAllFileContentFromRemote(FileObject fileObject, InputStream /** * Converts file content to the appropriate Ballerina type based on the method signature. */ - private Object convertFileContent(FileObject fileObject, InputStream inputStream, MethodType methodType) + private Object convertFileContent(Environment environment, FileObject fileObject, InputStream inputStream, + MethodType methodType) throws Exception { String methodName = methodType.getName(); Parameter firstParameter = methodType.getParameters()[0]; @@ -183,7 +188,8 @@ private Object convertFileContent(FileObject fileObject, InputStream inputStream case ON_FILE_TEXT_REMOTE_FUNCTION -> convertBytesToString(fileContent); case ON_FILE_JSON_REMOTE_FUNCTION -> convertBytesToJson(fileContent, firstParamType, laxDataBinding); case ON_FILE_XML_REMOTE_FUNCTION -> convertBytesToXml(fileContent, firstParamType, laxDataBinding); - case ON_FILE_CSV_REMOTE_FUNCTION -> convertBytesToCsv(fileContent, firstParamType, laxDataBinding); + case ON_FILE_CSV_REMOTE_FUNCTION -> convertBytesToCsv(environment, fileContent, firstParamType, + laxDataBinding, enableCsvFailSafe); default -> throw new IllegalArgumentException("Unknown content method: " + methodName); }; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java index c90e2c917..e1afbc1e1 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java @@ -18,6 +18,7 @@ package io.ballerina.stdlib.ftp.server; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.concurrent.StrandMetadata; @@ -75,16 +76,23 @@ public class FtpListener implements RemoteFileSystemListener { private static final Logger log = LoggerFactory.getLogger(FtpListener.class); private final Runtime runtime; + private Environment environment = null; private Map registeredServices = new HashMap<>(); private BObject caller; private FileSystemManager fileSystemManager; private FileSystemOptions fileSystemOptions; private boolean laxDataBinding; + private boolean enableCsvFailSafe = false; FtpListener(Runtime runtime) { this.runtime = runtime; } + FtpListener(Environment environment) { + this.environment = environment; + this.runtime = environment.getRuntime(); + } + public void setFileSystemManager(FileSystemManager fileSystemManager) { this.fileSystemManager = fileSystemManager; } @@ -97,6 +105,10 @@ public void setLaxDataBinding(boolean laxDataBinding) { this.laxDataBinding = laxDataBinding; } + public void setEnableCsvFailSafe(boolean enableCsvFailSafe) { + this.enableCsvFailSafe = enableCsvFailSafe; + } + @Override public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage) { if (remoteFileSystemBaseMessage instanceof RemoteFileSystemEvent) { @@ -104,7 +116,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage if (runtime != null) { for (BObject service : registeredServices.values()) { - dispatchFileEventToService(service, event); + dispatchFileEventToService(this.environment, service, event); } } else { log.error("Runtime should not be null."); @@ -113,13 +125,13 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage return true; } - private void dispatchFileEventToService(BObject service, RemoteFileSystemEvent event) { + private void dispatchFileEventToService(Environment env, BObject service, RemoteFileSystemEvent event) { FormatMethodsHolder formatMethodHolder = new FormatMethodsHolder(service); Optional onFileDeletedMethodType = getOnFileDeletedMethod(service); // Dispatch Strategy: Check handler availability in order if (formatMethodHolder.hasContentMethods()) { - processContentBasedCallbacks(service, event, formatMethodHolder); + processContentBasedCallbacks(env, service, event, formatMethodHolder); } else if (onFileDeletedMethodType.isPresent()) { if (!event.getDeletedFiles().isEmpty()) { processFileDeletedCallback(service, event, onFileDeletedMethodType.get()); @@ -143,7 +155,7 @@ private void dispatchFileEventToService(BObject service, RemoteFileSystemEvent e * Uses ContentMethodRouter to dispatch files to appropriate handlers. * Also handles file deletion events via onFileDeleted method if available. */ - private void processContentBasedCallbacks(BObject service, RemoteFileSystemEvent event, + private void processContentBasedCallbacks(Environment env, BObject service, RemoteFileSystemEvent event, FormatMethodsHolder holder) { // Process added files with content methods if (!event.getAddedFiles().isEmpty()) { @@ -153,8 +165,8 @@ private void processContentBasedCallbacks(BObject service, RemoteFileSystemEvent } else { try { FtpContentCallbackHandler contentHandler = new FtpContentCallbackHandler( - runtime, fileSystemManager, fileSystemOptions, laxDataBinding); - contentHandler.processContentCallbacks(service, event, holder, caller); + runtime, fileSystemManager, fileSystemOptions, laxDataBinding, enableCsvFailSafe); + contentHandler.processContentCallbacks(env, service, event, holder, caller); } catch (Exception e) { log.error("Error in content callback processing for added files", e); } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java index f22e4f20d..8c57c22b6 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java @@ -61,6 +61,8 @@ */ public class FtpListenerHelper { + public static final BString ENABLE_CSV_FAIL_SAFE = StringUtils.fromString("enableCsvFailSafe"); + private FtpListenerHelper() { // private constructor } @@ -75,7 +77,7 @@ public static Object init(Environment env, BObject ftpListener, BMap paramMap = getServerConnectorParamMap(serviceEndpointConfig); RemoteFileSystemConnectorFactory fileSystemConnectorFactory = new RemoteFileSystemConnectorFactoryImpl(); - final FtpListener listener = new FtpListener(env.getRuntime()); + final FtpListener listener = new FtpListener(env); RemoteFileSystemServerConnector serverConnector; List dependencyConditions = parseFileDependencyConditions(serviceEndpointConfig); @@ -98,6 +100,8 @@ public static Object init(Environment env, BObject ftpListener, BMap options = createCsvParseOptions(laxDataBinding); + BMap options = createCsvParseOptions(laxDataBinding, csvFailSafeConfigs); Type referredType = TypeUtils.getReferredType(targetType); BTypedesc typedesc = ValueCreator.createTypedescValue(referredType); - Object result = io.ballerina.lib.data.csvdata.csv.Native.parseBytes(byteArray, options, typedesc); + Object result = parseBytes(env, byteArray, options, typedesc); if (result instanceof BError) { log.error("Failed to parse CSV content: {}", ((BError) result).getMessage()); @@ -182,9 +196,22 @@ private static BMap createXmlParseOptions(boolean laxDataBindin * * @return BMap containing parse options */ - private static BMap createCsvParseOptions(boolean laxDataBinding) { + private static BMap createCsvParseOptions(boolean laxDataBinding, boolean enableCsvFailSafe) { BMap mapValue = ValueCreator.createRecordValue( io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), "ParseOptions"); + if (enableCsvFailSafe) { + BMap failSafe = + ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), + FAIL_SAFE_OPTIONS); + BMap fileOutputMode = + ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), + FILE_OUTPUT_MODE_TYPE); + fileOutputMode.put(FILE_PATH, StringUtils.fromString(CURRENT_DIRECTORY_PATH + ERROR_LOG_FILE_NAME)); + fileOutputMode.put(FILE_WRITE_OPTION, APPEND); + fileOutputMode.put(CONTENT_TYPE, RAW_TYPE); + failSafe.put(FILE_OUTPUT_MODE, fileOutputMode); + mapValue.put(FAIL_SAFE, failSafe); + } if (laxDataBinding) { BMap allowDataProjection = mapValue.getMapValue(StringUtils.fromString("allowDataProjection")); allowDataProjection.put(StringUtils.fromString("nilAsOptionalField"), Boolean.TRUE); From 3b1439b5db24f04f07068490e1c00c0277a70d15 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:36:42 +0530 Subject: [PATCH 03/27] Update test case to validate csv fail safe support in ftp listener --- .../tests/listener_content_methods_test.bal | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 87635ed25..0a98e86d0 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -14,6 +14,7 @@ // specific language governing permissions and limitations // under the License. +import ballerina/file; import ballerina/io; import ballerina/lang.runtime; import ballerina/log; @@ -73,6 +74,7 @@ int xmlRecordFilesProcessed = 0; const string JSON_TEST_FILE = "tests/resources/datafiles/test_data.json"; const string XML_TEST_FILE = "tests/resources/datafiles/test_data.xml"; const string CSV_TEST_FILE = "tests/resources/datafiles/test_data.csv"; +const string CSV_TEST_FILE_WITH_ERROR = "tests/resources/datafiles/test_data_with_error.csv"; const string TEXT_TEST_FILE = "tests/resources/datafiles/test_text.txt"; const string GENERIC_TEST_FILE = "tests/resources/datafiles/file2.txt"; @@ -711,7 +713,8 @@ public function testOnFileCsvRecordArray() returns error? { port: 21212, path: CONTENT_TEST_DIR, pollingInterval: 4, - fileNamePattern: "csvrecord.*\\.csv" + fileNamePattern: "csvrecord.*\\.csv", + enableCsvFailSafe: true }); check csvRecordListener.attach(csvRecordService); @@ -719,7 +722,7 @@ public function testOnFileCsvRecordArray() returns error? { runtime:registerListener(csvRecordListener); // Upload CSV file for record deserialization - stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE, 5); + stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE_WITH_ERROR); check (clientEp)->put(CONTENT_TEST_DIR + "/csvrecord.csv", csvStream); runtime:sleep(15); @@ -728,13 +731,14 @@ public function testOnFileCsvRecordArray() returns error? { check csvRecordListener.gracefulStop(); test:assertTrue(contentMethodInvoked, "onFileCsv (record array) should have been invoked"); - test:assertTrue(csvRecordArrayReceived.length() >= 3, - string `Should have deserialized at least 3 Employee records, got ${csvRecordArrayReceived.length()}`); + test:assertTrue(csvRecordArrayReceived.length() >= 2, + string `Should have deserialized at least 2 Employee records, got ${csvRecordArrayReceived.length()}`); // Verify first record Employee firstEmployee = csvRecordArrayReceived[0]; - test:assertEquals(firstEmployee.name, "Alice Johnson", "First employee's name should match"); - test:assertEquals(firstEmployee.age, 25, "First employee's age should match"); + test:assertEquals(firstEmployee.name, "Bob Williams", "First employee's name should match"); + test:assertEquals(firstEmployee.age, 35, "First employee's age should match"); + check file:remove("error.log"); } @test:Config { From 19f0a9c9b58d0be97c199363a5fa76203b8737c0 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:37:10 +0530 Subject: [PATCH 04/27] Add resource for fail safe test cases --- ballerina/tests/resources/datafiles/test_data_with_error.csv | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 ballerina/tests/resources/datafiles/test_data_with_error.csv diff --git a/ballerina/tests/resources/datafiles/test_data_with_error.csv b/ballerina/tests/resources/datafiles/test_data_with_error.csv new file mode 100644 index 000000000..4cbcff936 --- /dev/null +++ b/ballerina/tests/resources/datafiles/test_data_with_error.csv @@ -0,0 +1,4 @@ +name,age,city,salary +Alice Johnson,invalid,Seattle,75000 +Bob Williams,35,Boston,95000 +Carol Davis,42,Chicago,110000 From 2694f9de15bfaf33f98c898831ca87533394d41e Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:37:56 +0530 Subject: [PATCH 05/27] Add uuid dependency --- build.gradle | 1 + gradle.properties | 1 + 2 files changed, 2 insertions(+) diff --git a/build.gradle b/build.gradle index 519f9b836..64dc31d91 100644 --- a/build.gradle +++ b/build.gradle @@ -75,6 +75,7 @@ subprojects { ballerinaStdLibs "io.ballerina.stdlib:log-ballerina:${stdlibLogVersion}" ballerinaStdLibs "io.ballerina.stdlib:io-ballerina:${stdlibIoVersion}" ballerinaStdLibs "io.ballerina.stdlib:task-ballerina:${stdlibTaskVersion}" + ballerinaStdLibs "io.ballerina.stdlib:uuid-ballerina:${stdlibUuidVersion}" ballerinaStdLibs "io.ballerina.stdlib:time-ballerina:${stdlibTimeVersion}" ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}" ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}" diff --git a/gradle.properties b/gradle.properties index 1e351d309..1476fbddc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -31,6 +31,7 @@ ballerinaGradlePluginVersion=2.3.0 ballerinaLangVersion=2201.12.0 stdlibTaskVersion=2.7.0 +stdlibUuidVersion=1.10.0 stdlibLogVersion=2.12.0 stdlibIoVersion=1.8.0 stdlibTimeVersion=2.7.0 From 47ceffd281f9e056f533831f4b0e30975156382f Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:38:37 +0530 Subject: [PATCH 06/27] Update csv data dependency version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 1476fbddc..3c9a5890e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,4 +39,4 @@ observeVersion=1.5.0 observeInternalVersion=1.5.0 stdlibDataJsonDataVersion=1.1.2 stdlibDataXmlDataVersion=1.5.2 -stdlibDataCsvVersion=0.8.1 +stdlibDataCsvVersion=0.9.0 From 392779019c41263f3312ff606af413544c9cc05a Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 00:38:58 +0530 Subject: [PATCH 07/27] Update toml files --- ballerina/Ballerina.toml | 4 +- ballerina/Dependencies.toml | 153 ++++++++++++++++++++++++++++++++++-- 2 files changed, 147 insertions(+), 10 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index d69c2349f..5da0424fd 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -64,5 +64,5 @@ path = "./lib/data.xmldata-native-1.5.2.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" -version = "0.8.1" -path = "./lib/data.csv-native-0.8.1.jar" +version = "0.9.0" +path = "./lib/data.csv-native-0.9.0.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index c1b284ab4..2b9cca49a 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,12 +7,22 @@ dependencies-toml-version = "2" distribution-version = "2201.12.0" +[[package]] +org = "ballerina" +name = "crypto" +version = "2.9.2" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "time"} +] + [[package]] org = "ballerina" name = "data.csv" -version = "0.8.1" +version = "0.9.0" dependencies = [ - {org = "ballerina", name = "jballerina.java"} + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"} ] modules = [ {org = "ballerina", packageName = "data.csv", moduleName = "data.csv"} @@ -42,6 +52,21 @@ modules = [ {org = "ballerina", packageName = "data.xmldata", moduleName = "data.xmldata"} ] +[[package]] +org = "ballerina" +name = "file" +version = "1.11.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "file", moduleName = "file"} +] + [[package]] org = "ballerina" name = "ftp" @@ -50,10 +75,14 @@ dependencies = [ {org = "ballerina", name = "data.csv"}, {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "data.xmldata"}, + {org = "ballerina", name = "file"}, {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.runtime"}, + {org = "ballerina", name = "lang.string"}, {org = "ballerina", name = "log"}, - {org = "ballerina", name = "task"} + {org = "ballerina", name = "task"}, + {org = "ballerina", name = "test"} ] modules = [ {org = "ballerina", packageName = "ftp", moduleName = "ftp"} @@ -79,11 +108,83 @@ modules = [ {org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.__internal" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + +[[package]] +org = "ballerina" +name = "lang.array" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"} +] + +[[package]] +org = "ballerina" +name = "lang.error" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.int" +version = "0.0.0" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.__internal"}, + {org = "ballerina", name = "lang.object"} +] + [[package]] org = "ballerina" name = "lang.object" version = "0.0.0" +[[package]] +org = "ballerina" +name = "lang.regexp" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + +[[package]] +org = "ballerina" +name = "lang.runtime" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] +modules = [ + {org = "ballerina", packageName = "lang.runtime", moduleName = "lang.runtime"} +] + +[[package]] +org = "ballerina" +name = "lang.string" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.regexp"} +] +modules = [ + {org = "ballerina", packageName = "lang.string", moduleName = "lang.string"} +] + [[package]] org = "ballerina" name = "lang.value" @@ -95,7 +196,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.12.0" +version = "2.14.0" dependencies = [ {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, @@ -109,28 +210,64 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.5.1" +version = "1.6.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "os" +version = "1.9.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + [[package]] org = "ballerina" name = "task" -version = "2.7.0" +version = "2.11.0" dependencies = [ {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "time"} + {org = "ballerina", name = "time"}, + {org = "ballerina", name = "uuid"} ] modules = [ {org = "ballerina", packageName = "task", moduleName = "task"} ] +[[package]] +org = "ballerina" +name = "test" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.array"}, + {org = "ballerina", name = "lang.error"} +] +modules = [ + {org = "ballerina", packageName = "test", moduleName = "test"} +] + [[package]] org = "ballerina" name = "time" -version = "2.7.0" +version = "2.8.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "uuid" +version = "1.10.0" +dependencies = [ + {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.int"}, + {org = "ballerina", name = "time"} +] + From affc2e1ce2bd455c1f610d3c0ebf774fdf29ef43 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 10:15:45 +0530 Subject: [PATCH 08/27] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 5da0424fd..b31a14288 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -64,5 +64,5 @@ path = "./lib/data.xmldata-native-1.5.2.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" -version = "0.9.0" -path = "./lib/data.csv-native-0.9.0.jar" +version = "0.9.0-SNAPSHOT" +path = "./lib/data.csv-native-0.9.0-SNAPSHOT.jar" From b1da13045cc74a3e9b872b780a3395f1c22f59f8 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Wed, 3 Dec 2025 11:26:53 +0530 Subject: [PATCH 09/27] Add test cases to validate the fail safe support --- .../tests/listener_content_methods_test.bal | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 0a98e86d0..60a27796e 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -684,6 +684,67 @@ public function testOnFileCsvStream() returns error? { test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); } +@test:Config { + dependsOn: [testOptionalParametersWithoutCaller] +} +public function testOnFileCsvStreamWithFailSafe() returns error? { + // Reset state + csvStreamRowsProcessed = 0; + lastFileInfo = (); + contentMethodInvoked = false; + + // Service with onFileCsv (stream variant) + Service csvStreamService = service object { + remote function onFileCsv(stream content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `onFileCsv (stream) invoked for: ${fileInfo.name}`); + lastFileInfo = fileInfo; + contentMethodInvoked = true; + + // Process stream + error? processStream = content.forEach(function(string[] row) { + csvStreamRowsProcessed += 1; + log:printInfo(string `Processing CSV row: ${row.length()} columns`); + }); + + if processStream is error { + log:printError("Error processing CSV stream", processStream); + return processStream; + } + } + }; + + // Create listener for .csvstream files + Listener csvStreamListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: CONTENT_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "csvstream.*\\.csv", + enableCsvFailSafe: true + }); + + check csvStreamListener.attach(csvStreamService); + check csvStreamListener.'start(); + runtime:registerListener(csvStreamListener); + + // Upload CSV file for stream processing + stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE_WITH_ERROR, 5); + check (clientEp)->put(CONTENT_TEST_DIR + "/csvstream.csv", csvStream); + runtime:sleep(15); + + // Cleanup + runtime:deregisterListener(csvStreamListener); + check csvStreamListener.gracefulStop(); + + test:assertTrue(contentMethodInvoked, "onFileCsv (stream) should have been invoked"); + test:assertTrue(csvStreamRowsProcessed > 0, string `Should have processed CSV rows, got ${csvStreamRowsProcessed}`); + + FileInfo fileInfo = check lastFileInfo.ensureType(); + test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); +} + @test:Config { dependsOn: [testOnFileCsvStream] } @@ -705,6 +766,62 @@ public function testOnFileCsvRecordArray() returns error? { } }; + // Create listener for .csvrecord files + Listener csvRecordListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: CONTENT_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "csvrecord.*\\.csv" + }); + + check csvRecordListener.attach(csvRecordService); + check csvRecordListener.'start(); + runtime:registerListener(csvRecordListener); + + // Upload CSV file for record deserialization + stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE); + check (clientEp)->put(CONTENT_TEST_DIR + "/csvrecord.csv", csvStream); + runtime:sleep(15); + + // Cleanup + runtime:deregisterListener(csvRecordListener); + check csvRecordListener.gracefulStop(); + + test:assertTrue(contentMethodInvoked, "onFileCsv (record array) should have been invoked"); + test:assertTrue(csvRecordArrayReceived.length() >= 3, + string `Should have deserialized at least 3 Employee records, got ${csvRecordArrayReceived.length()}`); + + // Verify first record + Employee firstEmployee = csvRecordArrayReceived[0]; + test:assertEquals(firstEmployee.name, "Alice Johnson", "First employee's name should match"); + test:assertEquals(firstEmployee.age, 25, "First employee's age should match"); +} + +@test:Config { + groups: ["csv"], + dependsOn: [testOnFileCsvStream] +} +public function testOnFileCsvRecordArrayWithFailSafe() returns error? { + // Reset state + csvRecordArrayReceived = []; + csvRecordFilesProcessed = 0; + lastFileInfo = (); + contentMethodInvoked = false; + + // Service with onFileCsv (record array variant) + Service csvRecordService = service object { + remote function onFileCsv(Employee[] content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `onFileCsv (Employee[]) invoked for: ${fileInfo.name}, records: ${content.length()}`); + csvRecordArrayReceived = content; + lastFileInfo = fileInfo; + csvRecordFilesProcessed += 1; + contentMethodInvoked = true; + } + }; + // Create listener for .csvrecord files Listener csvRecordListener = check new ({ protocol: FTP, From 4ff8ad358bb02e0212f70c98b6451debf59e5467 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Sun, 7 Dec 2025 16:31:48 +0530 Subject: [PATCH 10/27] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 2b9cca49a..99448823e 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -15,6 +15,9 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} ] +modules = [ + {org = "ballerina", packageName = "crypto", moduleName = "crypto"} +] [[package]] org = "ballerina" @@ -72,6 +75,7 @@ org = "ballerina" name = "ftp" version = "2.16.0" dependencies = [ + {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.csv"}, {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "data.xmldata"}, @@ -82,7 +86,8 @@ dependencies = [ {org = "ballerina", name = "lang.string"}, {org = "ballerina", name = "log"}, {org = "ballerina", name = "task"}, - {org = "ballerina", name = "test"} + {org = "ballerina", name = "test"}, + {org = "ballerina", name = "time"} ] modules = [ {org = "ballerina", packageName = "ftp", moduleName = "ftp"} @@ -259,6 +264,9 @@ version = "2.8.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] +modules = [ + {org = "ballerina", packageName = "time", moduleName = "time"} +] [[package]] org = "ballerina" From 5da7747e338b58ed74ea48aa4d6be5b3a98d547c Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Sun, 7 Dec 2025 21:47:53 +0530 Subject: [PATCH 11/27] Add new configs to csv fail safe --- ballerina/client_endpoint.bal | 7 +++---- ballerina/listener_endpoint.bal | 21 +++++++++++++++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index ddfff07fd..a6b7da955 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -438,9 +438,8 @@ public enum Compression { # + sftpCompression - Compression algorithms (SFTP only) # + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only) # + proxy - Proxy configuration for SFTP connections (SFTP only) -# + enableCsvFailSafe - If set to `true`, enables fail-safe mode for CSV content processing. -# In fail-safe mode, malformed CSV records are logged and skipped, -# allowing processing to continue for well-formed records +# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, +# malformed CSV records are skipped and written to a separate file in the current directory public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -454,7 +453,7 @@ public type ClientConfiguration record {| FtpFileTransfer ftpFileTransfer = BINARY; TransferCompression[] sftpCompression = [NO]; string sftpSshKnownHosts?; - boolean enableCsvFailSafe = false; + FailSafeOptions csvFailSafe?; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/listener_endpoint.bal b/ballerina/listener_endpoint.bal index 30e75dd4a..b00f96c5d 100644 --- a/ballerina/listener_endpoint.bal +++ b/ballerina/listener_endpoint.bal @@ -186,9 +186,8 @@ class Job { # + sftpCompression - Compression algorithms (SFTP only) # + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only) # + proxy - Proxy configuration for SFTP connections (SFTP only) -# + enableCsvFailSafe - If set to `true`, enables fail-safe mode for CSV content processing. -# In fail-safe mode, malformed CSV records are logged and skipped, -# allowing processing to continue for well-formed records +# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, +# malformed CSV records are skipped and written to a separate file in the current directory public type ListenerConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -207,9 +206,23 @@ public type ListenerConfiguration record {| FtpFileTransfer ftpFileTransfer = BINARY; TransferCompression[] sftpCompression = [NO]; string sftpSshKnownHosts?; - boolean enableCsvFailSafe = false; + FailSafeOptions csvFailSafe?; |}; +# Fail-safe options for CSV content processing. +# +# + contentType - Specifies the type of content to log in case of errors +public type FailSafeOptions record {| + ErrorLogContentType contentType = METADATA; +|}; + +# Specifies the type of content to log in case of errors during fail-safe CSV processing. +public enum ErrorLogContentType { + METADATA, + RAW, + RAW_AND_METADATA +}; + # FTP service for handling file system change events. public type Service distinct service object { }; From 4a71a9f34bca6f3c78b8e7272de84aa47317ce02 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Sun, 7 Dec 2025 21:48:19 +0530 Subject: [PATCH 12/27] Update native code for new design changes in fail safe support --- .../java/io/ballerina/stdlib/ftp/client/FtpClient.java | 6 +++--- .../stdlib/ftp/server/FtpContentCallbackHandler.java | 8 ++++---- .../java/io/ballerina/stdlib/ftp/server/FtpListener.java | 8 ++++---- .../ballerina/stdlib/ftp/server/FtpListenerHelper.java | 6 +++--- .../java/io/ballerina/stdlib/ftp/util/FtpConstants.java | 2 +- .../ballerina/stdlib/ftp/util/FtpContentConverter.java | 9 +++++---- 6 files changed, 20 insertions(+), 19 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index e52f70acb..2a5ab43c4 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -99,7 +99,7 @@ public static Object initClientEndpoint(BObject clientEndpoint, BMap authMap = FtpUtil.getAuthMap(config, protocol); clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_USERNAME, @@ -232,9 +232,9 @@ public static Object getCsv(Environment env, BObject clientConnector, BString fi } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); - boolean enableCsvFailSafe = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE); + BMap csvFailSafe = (BMap) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE); return convertBytesToCsv(env, (byte[]) content, typeDesc.getDescribingType(), - laxDataBinding, enableCsvFailSafe); + laxDataBinding, csvFailSafe); } public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java index 6f565383f..8d9e22250 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java @@ -77,16 +77,16 @@ public class FtpContentCallbackHandler { private final FileSystemManager fileSystemManager; private final FileSystemOptions fileSystemOptions; private final boolean laxDataBinding; - private final boolean enableCsvFailSafe; + private BMap csvFailSafe = ValueCreator.createMapValue(); public FtpContentCallbackHandler(Runtime ballerinaRuntime, FileSystemManager fileSystemManager, FileSystemOptions fileSystemOptions, boolean laxDataBinding, - boolean enableCsvFailSafe) { + BMap csvFailSafe) { this.ballerinaRuntime = ballerinaRuntime; this.fileSystemManager = fileSystemManager; this.fileSystemOptions = fileSystemOptions; this.laxDataBinding = laxDataBinding; - this.enableCsvFailSafe = enableCsvFailSafe; + this.csvFailSafe = csvFailSafe; } /** @@ -189,7 +189,7 @@ private Object convertFileContent(Environment environment, FileObject fileObject case ON_FILE_JSON_REMOTE_FUNCTION -> convertBytesToJson(fileContent, firstParamType, laxDataBinding); case ON_FILE_XML_REMOTE_FUNCTION -> convertBytesToXml(fileContent, firstParamType, laxDataBinding); case ON_FILE_CSV_REMOTE_FUNCTION -> convertBytesToCsv(environment, fileContent, firstParamType, - laxDataBinding, enableCsvFailSafe); + laxDataBinding, csvFailSafe); default -> throw new IllegalArgumentException("Unknown content method: " + methodName); }; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java index e1afbc1e1..446c5a28b 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java @@ -82,7 +82,7 @@ public class FtpListener implements RemoteFileSystemListener { private FileSystemManager fileSystemManager; private FileSystemOptions fileSystemOptions; private boolean laxDataBinding; - private boolean enableCsvFailSafe = false; + private BMap csvFailSafe = ValueCreator.createMapValue(); FtpListener(Runtime runtime) { this.runtime = runtime; @@ -105,8 +105,8 @@ public void setLaxDataBinding(boolean laxDataBinding) { this.laxDataBinding = laxDataBinding; } - public void setEnableCsvFailSafe(boolean enableCsvFailSafe) { - this.enableCsvFailSafe = enableCsvFailSafe; + public void setCsvFailSafeConfigs(BMap csvFailSafe) { + this.csvFailSafe = csvFailSafe; } @Override @@ -165,7 +165,7 @@ private void processContentBasedCallbacks(Environment env, BObject service, Remo } else { try { FtpContentCallbackHandler contentHandler = new FtpContentCallbackHandler( - runtime, fileSystemManager, fileSystemOptions, laxDataBinding, enableCsvFailSafe); + runtime, fileSystemManager, fileSystemOptions, laxDataBinding, csvFailSafe); contentHandler.processContentCallbacks(env, service, event, holder, caller); } catch (Exception e) { log.error("Error in content callback processing for added files", e); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java index 8a829a15c..476c8ad3f 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java @@ -66,7 +66,7 @@ */ public class FtpListenerHelper { - public static final BString ENABLE_CSV_FAIL_SAFE = StringUtils.fromString("enableCsvFailSafe"); + public static final BString CSV_FAIL_SAFE = StringUtils.fromString("csvFailSafe"); private FtpListenerHelper() { // private constructor @@ -105,8 +105,8 @@ public static Object init(Environment env, BObject ftpListener, BMap csvFailSafe = serviceEndpointConfig.getMapValue(CSV_FAIL_SAFE); + listener.setCsvFailSafeConfigs(csvFailSafe); ftpListener.addNativeData(FtpConstants.FTP_SERVER_CONNECTOR, serverConnector); // This is a temporary solution diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index a35d82300..8ef442e32 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -80,7 +80,7 @@ private FtpConstants() { public static final String ENDPOINT_CONFIG_PRIVATE_KEY = "privateKey"; public static final String ENDPOINT_CONFIG_PREFERRED_METHODS = "preferredMethods"; public static final String ENDPOINT_CONFIG_LAX_DATABINDING = "laxDataBinding"; - public static final String ENDPOINT_CONFIG_CSV_FAIL_SAFE = "enableCsvFailSafe"; + public static final String ENDPOINT_CONFIG_CSV_FAIL_SAFE = "csvFailSafe"; // Advanced file selection configuration constants public static final String ENDPOINT_CONFIG_POLLING_INTERVAL = "pollingInterval"; diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java index e4c6ddfc1..6af3e77db 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java @@ -134,7 +134,7 @@ public static Object convertBytesToXml(byte[] content, Type targetType, boolean * @return Ballerina CSV data (string[][], record[][], or custom type) or BError */ public static Object convertBytesToCsv(Environment env, byte[] content, Type targetType, boolean laxDataBinding, - boolean csvFailSafeConfigs) { + BMap csvFailSafeConfigs) { try { BArray byteArray = ValueCreator.createArrayValue(content); BMap options = createCsvParseOptions(laxDataBinding, csvFailSafeConfigs); @@ -196,10 +196,11 @@ private static BMap createXmlParseOptions(boolean laxDataBindin * * @return BMap containing parse options */ - private static BMap createCsvParseOptions(boolean laxDataBinding, boolean enableCsvFailSafe) { + private static BMap createCsvParseOptions(boolean laxDataBinding, BMap csvFailSafeConfigs) { BMap mapValue = ValueCreator.createRecordValue( io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), "ParseOptions"); - if (enableCsvFailSafe) { + if (csvFailSafeConfigs != null) { + BString contentType = csvFailSafeConfigs.getStringValue(CONTENT_TYPE); BMap failSafe = ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), FAIL_SAFE_OPTIONS); @@ -208,7 +209,7 @@ private static BMap createCsvParseOptions(boolean laxDataBindin FILE_OUTPUT_MODE_TYPE); fileOutputMode.put(FILE_PATH, StringUtils.fromString(CURRENT_DIRECTORY_PATH + ERROR_LOG_FILE_NAME)); fileOutputMode.put(FILE_WRITE_OPTION, APPEND); - fileOutputMode.put(CONTENT_TYPE, RAW_TYPE); + fileOutputMode.put(CONTENT_TYPE, contentType); failSafe.put(FILE_OUTPUT_MODE, fileOutputMode); mapValue.put(FAIL_SAFE, failSafe); } From 67574b08431fc3b5ff476ce8ed5b43129f1a72e3 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 8 Dec 2025 09:40:56 +0530 Subject: [PATCH 13/27] Fix configs in fail safe test cases --- ballerina/tests/listener_content_methods_test.bal | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 60a27796e..a8ccd4115 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -722,7 +722,7 @@ public function testOnFileCsvStreamWithFailSafe() returns error? { path: CONTENT_TEST_DIR, pollingInterval: 4, fileNamePattern: "csvstream.*\\.csv", - enableCsvFailSafe: true + csvFailSafe: {} }); check csvStreamListener.attach(csvStreamService); @@ -831,7 +831,7 @@ public function testOnFileCsvRecordArrayWithFailSafe() returns error? { path: CONTENT_TEST_DIR, pollingInterval: 4, fileNamePattern: "csvrecord.*\\.csv", - enableCsvFailSafe: true + csvFailSafe: {} }); check csvRecordListener.attach(csvRecordService); From f50a9cddae79216745cc2c6165b219eea16b3e54 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 8 Dec 2025 10:47:56 +0530 Subject: [PATCH 14/27] Add file name prefix to the fail safe error log file --- .../stdlib/ftp/client/FtpClient.java | 4 +++- .../ftp/server/FtpContentCallbackHandler.java | 6 +++-- .../stdlib/ftp/util/FtpContentConverter.java | 23 +++++++++++++++---- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 2a5ab43c4..9913ca06f 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -72,6 +72,7 @@ import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToString; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToXml; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertToBallerinaByteArray; +import static io.ballerina.stdlib.ftp.util.FtpContentConverter.deriveFileNamePrefix; import static io.ballerina.stdlib.ftp.util.FtpUtil.ErrorType.Error; import static io.ballerina.stdlib.ftp.util.FtpUtil.extractCompressionConfiguration; import static io.ballerina.stdlib.ftp.util.FtpUtil.extractFileTransferConfiguration; @@ -233,8 +234,9 @@ public static Object getCsv(Environment env, BObject clientConnector, BString fi boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); BMap csvFailSafe = (BMap) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE); + String fileNamePrefix = deriveFileNamePrefix(filePath); return convertBytesToCsv(env, (byte[]) content, typeDesc.getDescribingType(), - laxDataBinding, csvFailSafe); + laxDataBinding, csvFailSafe, fileNamePrefix); } public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java index 8d9e22250..a013a5eb3 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java @@ -66,6 +66,7 @@ import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToString; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToXml; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertToBallerinaByteArray; +import static io.ballerina.stdlib.ftp.util.FtpContentConverter.deriveFileNamePrefix; /** * Handles content-based callbacks for FTP listener. @@ -77,7 +78,7 @@ public class FtpContentCallbackHandler { private final FileSystemManager fileSystemManager; private final FileSystemOptions fileSystemOptions; private final boolean laxDataBinding; - private BMap csvFailSafe = ValueCreator.createMapValue(); + private final BMap csvFailSafe; public FtpContentCallbackHandler(Runtime ballerinaRuntime, FileSystemManager fileSystemManager, FileSystemOptions fileSystemOptions, boolean laxDataBinding, @@ -183,13 +184,14 @@ private Object convertFileContent(Environment environment, FileObject fileObject } } else { byte[] fileContent = fetchAllFileContentFromRemote(fileObject, inputStream); + String fileNamePrefix = deriveFileNamePrefix(fileObject); return switch (methodName) { case ON_FILE_REMOTE_FUNCTION -> convertToBallerinaByteArray(fileContent); case ON_FILE_TEXT_REMOTE_FUNCTION -> convertBytesToString(fileContent); case ON_FILE_JSON_REMOTE_FUNCTION -> convertBytesToJson(fileContent, firstParamType, laxDataBinding); case ON_FILE_XML_REMOTE_FUNCTION -> convertBytesToXml(fileContent, firstParamType, laxDataBinding); case ON_FILE_CSV_REMOTE_FUNCTION -> convertBytesToCsv(environment, fileContent, firstParamType, - laxDataBinding, csvFailSafe); + laxDataBinding, csvFailSafe, fileNamePrefix); default -> throw new IllegalArgumentException("Unknown content method: " + methodName); }; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java index 6af3e77db..283872b3c 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java @@ -32,6 +32,7 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; +import org.apache.commons.vfs2.FileObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ public final class FtpContentConverter { private static final BString ALLOW_DATA_PROJECTION = StringUtils.fromString("allowDataProjection"); public static final BString FILE_PATH = StringUtils.fromString("filePath"); public static final String CURRENT_DIRECTORY_PATH = System.getProperty("user.dir"); - public static final String ERROR_LOG_FILE_NAME = "/error.log"; + public static final String ERROR_LOG_FILE_NAME = "error.log"; public static final BString APPEND = StringUtils.fromString("APPEND"); public static final BString FILE_WRITE_OPTION = StringUtils.fromString("fileWriteOption"); public static final BString CONTENT_TYPE = StringUtils.fromString("contentType"); @@ -134,10 +135,10 @@ public static Object convertBytesToXml(byte[] content, Type targetType, boolean * @return Ballerina CSV data (string[][], record[][], or custom type) or BError */ public static Object convertBytesToCsv(Environment env, byte[] content, Type targetType, boolean laxDataBinding, - BMap csvFailSafeConfigs) { + BMap csvFailSafeConfigs, String fileNamePrefix) { try { BArray byteArray = ValueCreator.createArrayValue(content); - BMap options = createCsvParseOptions(laxDataBinding, csvFailSafeConfigs); + BMap options = createCsvParseOptions(laxDataBinding, csvFailSafeConfigs, fileNamePrefix); Type referredType = TypeUtils.getReferredType(targetType); BTypedesc typedesc = ValueCreator.createTypedescValue(referredType); @@ -196,7 +197,8 @@ private static BMap createXmlParseOptions(boolean laxDataBindin * * @return BMap containing parse options */ - private static BMap createCsvParseOptions(boolean laxDataBinding, BMap csvFailSafeConfigs) { + private static BMap createCsvParseOptions(boolean laxDataBinding, + BMap csvFailSafeConfigs, String fileNamePrefix) { BMap mapValue = ValueCreator.createRecordValue( io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), "ParseOptions"); if (csvFailSafeConfigs != null) { @@ -207,7 +209,8 @@ private static BMap createCsvParseOptions(boolean laxDataBindin BMap fileOutputMode = ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), FILE_OUTPUT_MODE_TYPE); - fileOutputMode.put(FILE_PATH, StringUtils.fromString(CURRENT_DIRECTORY_PATH + ERROR_LOG_FILE_NAME)); + String filePath = CURRENT_DIRECTORY_PATH + "/" + fileNamePrefix + "_" + ERROR_LOG_FILE_NAME; + fileOutputMode.put(FILE_PATH, StringUtils.fromString(filePath)); fileOutputMode.put(FILE_WRITE_OPTION, APPEND); fileOutputMode.put(CONTENT_TYPE, contentType); failSafe.put(FILE_OUTPUT_MODE, fileOutputMode); @@ -224,6 +227,16 @@ private static BMap createCsvParseOptions(boolean laxDataBindin return mapValue; } + public static String deriveFileNamePrefix(Object fileDetails) { + if (fileDetails instanceof FileObject fileObject) { + String baseName = fileObject.getName().getBaseName(); + String extension = fileObject.getName().getExtension(); + return extension.isEmpty() ? baseName : baseName.substring(0, baseName.length() - extension.length() - 1); + } + String filePath = fileDetails.toString(); + return filePath.replaceAll("\\.[^.]+$", ""); + } + /** * Creates a byte array from input stream. * From e8fe7ad5ae8eac69f57f428097c554145c9fc11f Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 8 Dec 2025 10:48:18 +0530 Subject: [PATCH 15/27] Remove generated error files in fail safe test cases --- ballerina/tests/listener_content_methods_test.bal | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index a8ccd4115..13f0b273d 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -743,6 +743,7 @@ public function testOnFileCsvStreamWithFailSafe() returns error? { FileInfo fileInfo = check lastFileInfo.ensureType(); test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); + check file:remove("csvrecord_error.log"); } @test:Config { @@ -855,7 +856,7 @@ public function testOnFileCsvRecordArrayWithFailSafe() returns error? { Employee firstEmployee = csvRecordArrayReceived[0]; test:assertEquals(firstEmployee.name, "Bob Williams", "First employee's name should match"); test:assertEquals(firstEmployee.age, 35, "First employee's age should match"); - check file:remove("error.log"); + check file:remove("csvrecord_error.log"); } @test:Config { From 787f2cda8b4c50b847b00d4a319a21d453c6756b Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 8 Dec 2025 11:01:46 +0530 Subject: [PATCH 16/27] Add docs for the csv fail safe support --- README.md | 30 ++++++++++++++++++++++++++++++ ballerina/README.md | 30 ++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/README.md b/README.md index 0cbce4eec..951ed11af 100644 --- a/README.md +++ b/README.md @@ -353,6 +353,36 @@ service on remoteServer { The FTP listener automatically routes files to the appropriate content handler based on file extension: `.txt` → `onFileText()`, `.json` → `onFileJson()`, `.xml` → `onFileXml()`, `.csv` → `onFileCsv()`, and other extensions → `onFile()` (fallback handler). You can override the default routing using the `@ftp:FunctionConfig` annotation to specify a custom file name pattern for each handler method. +##### CSV fail-safe mode + +The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`_error.log`), allowing the processing to continue without interruption. + +This is only supported in `onFileCsv` trigger methods in ftp listener. + +**Listener Configuration**: + +```ballerina +listener ftp:Listener remoteServer = check new({ + protocol: ftp:FTP, + host: "", + port: , + path: "", + pollingInterval: , + auth: { + credentials: {username: "", password: ""} + }, + csvFailSafe: { + contentType: ftp:RAW_AND_METADATA + } +}); +``` + +The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered. + +- `METADATA` (default) - Logs only metadata about the error (location, error message) +- `RAW` - Logs the raw content of the malformed record (offending row) +- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging + ### Secure access with SFTP SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol. diff --git a/ballerina/README.md b/ballerina/README.md index 17baa2ece..122417963 100644 --- a/ballerina/README.md +++ b/ballerina/README.md @@ -345,6 +345,36 @@ service on remoteServer { The FTP listener automatically routes files to the appropriate content handler based on file extension: `.txt` → `onFileText()`, `.json` → `onFileJson()`, `.xml` → `onFileXml()`, `.csv` → `onFileCsv()`, and other extensions → `onFile()` (fallback handler). You can override the default routing using the `@ftp:FunctionConfig` annotation to specify a custom file name pattern for each handler method. +##### CSV fail-safe mode + +The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`_error.log`), allowing the processing to continue without interruption. + +This is only supported in `onFileCsv` trigger methods in ftp listener. + +**Listener Configuration**: + +```ballerina +listener ftp:Listener remoteServer = check new({ + protocol: ftp:FTP, + host: "", + port: , + path: "", + pollingInterval: , + auth: { + credentials: {username: "", password: ""} + }, + csvFailSafe: { + contentType: ftp:RAW_AND_METADATA + } +}); +``` + +The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered. + +- `METADATA` (default) - Logs only metadata about the error (location, error message) +- `RAW` - Logs the raw content of the malformed record (offending row) +- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging + ### Secure access with SFTP SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol. From d9534f543bb7d8f0506fafc8305b2a83a549392d Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 22 Dec 2025 16:14:41 +0530 Subject: [PATCH 17/27] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 10 +++++----- ballerina/CompilerPlugin.toml | 2 +- ballerina/Dependencies.toml | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index d15b4edc5..c19c77f25 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "ftp" -version = "2.16.0" +version = "2.16.1" authors = ["Ballerina"] keywords = ["FTP", "SFTP", "remote file", "file transfer", "client", "service"] repository = "https://github.com/ballerina-platform/module-ballerina-ftp" @@ -45,8 +45,8 @@ path = "./lib/commons-lang3-3.18.0.jar" [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "ftp-native" -version = "2.16.0" -path = "../native/build/libs/ftp-native-2.16.0.jar" +version = "2.16.1" +path = "../native/build/libs/ftp-native-2.16.1-SNAPSHOT.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" @@ -64,5 +64,5 @@ path = "./lib/data.xmldata-native-1.5.2.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" -version = "0.9.0-SNAPSHOT" -path = "./lib/data.csv-native-0.9.0-SNAPSHOT.jar" +version = "0.9.0-20251209-183500-7b18d54" +path = "./lib/data.csv-native-0.9.0-20251209-183500-7b18d54.jar" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index cdd5081b2..d5859e71f 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "ftp-compiler-plugin" class = "io.ballerina.stdlib.ftp.plugin.FtpCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.0.jar" +path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 770dda858..23a7d013d 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -73,7 +73,7 @@ modules = [ [[package]] org = "ballerina" name = "ftp" -version = "2.16.0" +version = "2.16.1" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.csv"}, @@ -201,7 +201,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.13.0" +version = "2.14.0" dependencies = [ {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, @@ -215,7 +215,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.5.0" +version = "1.5.1" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] From c83fffdd35e71ffe549cf1398c4d38f0392f7660 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 22 Dec 2025 16:15:45 +0530 Subject: [PATCH 18/27] Fix the field name in csv fail safe config --- ballerina/tests/listener_content_methods_test.bal | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 60a27796e..a8ccd4115 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -722,7 +722,7 @@ public function testOnFileCsvStreamWithFailSafe() returns error? { path: CONTENT_TEST_DIR, pollingInterval: 4, fileNamePattern: "csvstream.*\\.csv", - enableCsvFailSafe: true + csvFailSafe: {} }); check csvStreamListener.attach(csvStreamService); @@ -831,7 +831,7 @@ public function testOnFileCsvRecordArrayWithFailSafe() returns error? { path: CONTENT_TEST_DIR, pollingInterval: 4, fileNamePattern: "csvrecord.*\\.csv", - enableCsvFailSafe: true + csvFailSafe: {} }); check csvRecordListener.attach(csvRecordService); From 1a34c7e65e561bb0849028e37574e30d6dd0bde5 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 22 Dec 2025 16:16:27 +0530 Subject: [PATCH 19/27] Add the data csv timestamped version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 3157ae1fd..6df20148e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -45,7 +45,7 @@ stdlibDataJsonDataVersion=1.1.3 stdlibDataXmlDataVersion=1.5.2 # Level 4 -stdlibDataCsvVersion=0.9.0-SNAPSHOT +stdlibDataCsvVersion=0.9.0-20251209-183500-7b18d54 stdlibUuidVersion=1.10.0 observeVersion=1.5.0 From 24fe18ee20ef0bddbc423d493c93c5455964c404 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 22 Dec 2025 17:20:10 +0530 Subject: [PATCH 20/27] Add file separator for a path --- .../java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java index 283872b3c..a2c7c2b02 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.nio.charset.StandardCharsets; import static io.ballerina.lib.data.csvdata.csv.Native.parseBytes; @@ -209,7 +210,7 @@ private static BMap createCsvParseOptions(boolean laxDataBindin BMap fileOutputMode = ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), FILE_OUTPUT_MODE_TYPE); - String filePath = CURRENT_DIRECTORY_PATH + "/" + fileNamePrefix + "_" + ERROR_LOG_FILE_NAME; + String filePath = CURRENT_DIRECTORY_PATH + File.separator + fileNamePrefix + "_" + ERROR_LOG_FILE_NAME; fileOutputMode.put(FILE_PATH, StringUtils.fromString(filePath)); fileOutputMode.put(FILE_WRITE_OPTION, APPEND); fileOutputMode.put(CONTENT_TYPE, contentType); From b1d47fc7d881881d0275ee541f6f3988601dda06 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Mon, 22 Dec 2025 17:20:38 +0530 Subject: [PATCH 21/27] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 23a7d013d..4208e1119 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -10,7 +10,7 @@ distribution-version = "2201.12.0" [[package]] org = "ballerina" name = "crypto" -version = "2.9.2" +version = "2.10.0" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} @@ -215,7 +215,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.5.1" +version = "1.6.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -223,7 +223,7 @@ dependencies = [ [[package]] org = "ballerina" name = "os" -version = "1.9.0" +version = "1.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -233,7 +233,7 @@ dependencies = [ [[package]] org = "ballerina" name = "task" -version = "2.10.0" +version = "2.11.0" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"}, From ff919c62772a855db1ec9e68fa0e60cbea0f509a Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Tue, 23 Dec 2025 10:05:35 +0530 Subject: [PATCH 22/27] [Automated] Update the native jar versions --- ballerina/Ballerina.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index c19c77f25..3bd712377 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -60,7 +60,6 @@ artifactId = "data.xmldata-native" version = "1.5.2" path = "./lib/data.xmldata-native-1.5.2.jar" - [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" From f0c4f8fcbfa1379c1019758ccc341beb90fb3d2f Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Tue, 23 Dec 2025 10:06:05 +0530 Subject: [PATCH 23/27] Update log version to 2.14.0 --- build-config/resources/Ballerina.toml | 1 - gradle.properties | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index 4b892c86c..dee596fac 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -60,7 +60,6 @@ artifactId = "data.xmldata-native" version = "@data.xmldata.version@" path = "./lib/data.xmldata-native-@data.xmldata.version@.jar" - [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" diff --git a/gradle.properties b/gradle.properties index 6df20148e..39ea944db 100644 --- a/gradle.properties +++ b/gradle.properties @@ -37,7 +37,7 @@ stdlibTimeVersion=2.7.0 # Level 2 stdlibCryptoVersion=2.9.2 -stdlibLogVersion=2.13.0 +stdlibLogVersion=2.14.0 # Level 3 stdlibTaskVersion=2.10.0 From ed83980da41cb8e847ff8f4b999261790537c86c Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Tue, 23 Dec 2025 10:38:50 +0530 Subject: [PATCH 24/27] Fix file path in a test case --- ballerina/tests/listener_content_methods_test.bal | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 13f0b273d..124d4ac13 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -743,7 +743,7 @@ public function testOnFileCsvStreamWithFailSafe() returns error? { FileInfo fileInfo = check lastFileInfo.ensureType(); test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); - check file:remove("csvrecord_error.log"); + check file:remove("csvstream_error.log"); } @test:Config { @@ -802,7 +802,6 @@ public function testOnFileCsvRecordArray() returns error? { } @test:Config { - groups: ["csv"], dependsOn: [testOnFileCsvStream] } public function testOnFileCsvRecordArrayWithFailSafe() returns error? { @@ -831,7 +830,7 @@ public function testOnFileCsvRecordArrayWithFailSafe() returns error? { port: 21212, path: CONTENT_TEST_DIR, pollingInterval: 4, - fileNamePattern: "csvrecord.*\\.csv", + fileNamePattern: "csvstream.*\\.csv", csvFailSafe: {} }); @@ -841,7 +840,7 @@ public function testOnFileCsvRecordArrayWithFailSafe() returns error? { // Upload CSV file for record deserialization stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE_WITH_ERROR); - check (clientEp)->put(CONTENT_TEST_DIR + "/csvrecord.csv", csvStream); + check (clientEp)->put(CONTENT_TEST_DIR + "/csvstream.csv", csvStream); runtime:sleep(15); // Cleanup @@ -856,7 +855,7 @@ public function testOnFileCsvRecordArrayWithFailSafe() returns error? { Employee firstEmployee = csvRecordArrayReceived[0]; test:assertEquals(firstEmployee.name, "Bob Williams", "First employee's name should match"); test:assertEquals(firstEmployee.age, 35, "First employee's age should match"); - check file:remove("csvrecord_error.log"); + check file:remove("csvstream_error.log"); } @test:Config { From b7e235008a1a202e7a5d7522b15e2d7033f7a806 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Tue, 23 Dec 2025 10:39:13 +0530 Subject: [PATCH 25/27] Update dependency versions --- build.gradle | 1 + gradle.properties | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 87cbec55d..08911be26 100644 --- a/build.gradle +++ b/build.gradle @@ -79,6 +79,7 @@ subprojects { ballerinaStdLibs "io.ballerina.stdlib:task-ballerina:${stdlibTaskVersion}" ballerinaStdLibs "io.ballerina.stdlib:uuid-ballerina:${stdlibUuidVersion}" ballerinaStdLibs "io.ballerina.stdlib:time-ballerina:${stdlibTimeVersion}" + ballerinaStdLibs "io.ballerina.stdlib:file-ballerina:${stdlibFileVersion}" ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}" ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}" diff --git a/gradle.properties b/gradle.properties index 39ea944db..2fc1f5bf9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -33,16 +33,17 @@ ballerinaLangVersion=2201.12.0 # Level 1 stdlibIoVersion=1.8.0 -stdlibTimeVersion=2.7.0 +stdlibTimeVersion=2.8.0 # Level 2 -stdlibCryptoVersion=2.9.2 +stdlibCryptoVersion=2.10.0 stdlibLogVersion=2.14.0 # Level 3 -stdlibTaskVersion=2.10.0 +stdlibTaskVersion=2.11.0 stdlibDataJsonDataVersion=1.1.3 stdlibDataXmlDataVersion=1.5.2 +stdlibFileVersion=1.11.0 # Level 4 stdlibDataCsvVersion=0.9.0-20251209-183500-7b18d54 From e28f6ac1cef682d13e22cf1604b9ba931c335b88 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Tue, 23 Dec 2025 10:52:15 +0530 Subject: [PATCH 26/27] [Automated] Update the native jar versions --- ballerina/Dependencies.toml | 4 ++-- build.gradle | 1 + gradle.properties | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 4208e1119..3a6dbd88d 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -215,7 +215,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.6.0" +version = "1.5.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -223,7 +223,7 @@ dependencies = [ [[package]] org = "ballerina" name = "os" -version = "1.10.1" +version = "1.10.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, diff --git a/build.gradle b/build.gradle index 08911be26..f07494074 100644 --- a/build.gradle +++ b/build.gradle @@ -74,6 +74,7 @@ subprojects { /* Standard libraries */ ballerinaStdLibs "io.ballerina.stdlib:uuid-ballerina:${stdlibUuidVersion}" ballerinaStdLibs "io.ballerina.stdlib:log-ballerina:${stdlibLogVersion}" + ballerinaStdLibs "io.ballerina.stdlib:os-ballerina:${stdlibOsVersion}" ballerinaStdLibs "io.ballerina.stdlib:io-ballerina:${stdlibIoVersion}" ballerinaStdLibs "io.ballerina.stdlib:crypto-ballerina:${stdlibCryptoVersion}" ballerinaStdLibs "io.ballerina.stdlib:task-ballerina:${stdlibTaskVersion}" diff --git a/gradle.properties b/gradle.properties index 2fc1f5bf9..9cfbd8697 100644 --- a/gradle.properties +++ b/gradle.properties @@ -38,6 +38,7 @@ stdlibTimeVersion=2.8.0 # Level 2 stdlibCryptoVersion=2.10.0 stdlibLogVersion=2.14.0 +stdlibOsVersion=1.10.0 # Level 3 stdlibTaskVersion=2.11.0 From b3b651a65f1a66600d5b27d00e8a14ca65111c73 Mon Sep 17 00:00:00 2001 From: Nuvindu Date: Tue, 23 Dec 2025 11:28:30 +0530 Subject: [PATCH 27/27] Fix test failures --- ballerina/build.gradle | 71 +++++++++++++++++++ .../tests/listener_content_methods_test.bal | 1 - 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 3de2b07b2..223a716de 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -158,6 +158,76 @@ task startTestUtilAsDaemon(type: com.github.psxpaul.task.ExecFork) { standardOutput = new File("$buildDir/tempUtilLogs.log") } +task waitForFtpsTestServer() { + dependsOn startTestUtilAsDaemon + doLast { + def host = "127.0.0.1" + def explicitPort = 21214 + def implicitPort = 21217 + + def connectTimeoutMs = 500 + def readTimeoutMs = 500 + def maxWaitMs = 30_000 + def pollIntervalMs = 500 + + def isExplicitFtpsReady = { -> + java.net.Socket socket = null + try { + socket = new java.net.Socket() + socket.connect(new java.net.InetSocketAddress(host, explicitPort as int), connectTimeoutMs) + socket.setSoTimeout(readTimeoutMs) + def reader = new java.io.BufferedReader(new java.io.InputStreamReader(socket.getInputStream())) + def line = reader.readLine() + try { + if (line != null && line.startsWith("220")) { + def out = new java.io.OutputStreamWriter(socket.getOutputStream()) + out.write("QUIT\r\n") + out.flush() + } + } catch (Exception ignored) { + // ignore + } + return (line != null && line.startsWith("220")) + } catch (Exception ignored) { + return false + } finally { + try { + socket?.close() + } catch (Exception ignored) { + // ignore + } + } + } + + def isTcpPortOpen = { int port -> + java.net.Socket socket = null + try { + socket = new java.net.Socket() + socket.connect(new java.net.InetSocketAddress(host, port as int), connectTimeoutMs) + return true + } catch (Exception ignored) { + return false + } finally { + try { + socket?.close() + } catch (Exception ignored) { + // ignore + } + } + } + + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < maxWaitMs) { + if (isExplicitFtpsReady() && isTcpPortOpen(implicitPort)) { + return + } + sleep(pollIntervalMs) + } + + throw new GradleException("FTPS mock server was not ready on ${host}:${explicitPort} after waiting ${maxWaitMs / 1000} seconds") + } +} + publishing { publications { maven(MavenPublication) { @@ -188,6 +258,7 @@ test.dependsOn ":${packageName}-native:build" populateTestUtilExecutableName.dependsOn ":${packageName}-test-utils:build" startTestUtilAsDaemon.dependsOn populateTestUtilExecutableName test.dependsOn startTestUtilAsDaemon +test.dependsOn waitForFtpsTestServer test.dependsOn ":${packageName}-compiler-plugin:build" publishToMavenLocal.dependsOn build diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 124d4ac13..dfa0e8b6c 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -743,7 +743,6 @@ public function testOnFileCsvStreamWithFailSafe() returns error? { FileInfo fileInfo = check lastFileInfo.ensureType(); test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); - check file:remove("csvstream_error.log"); } @test:Config {