diff --git a/README.md b/README.md index 39d354add..00ebff8b4 100644 --- a/README.md +++ b/README.md @@ -407,6 +407,36 @@ listener ftp:Listener ftpsListener = check new({ }); ``` +##### CSV fail-safe mode + +The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`_error.log`), allowing the processing to continue without interruption. + +This is only supported in `onFileCsv` trigger methods in ftp listener. + +**Listener Configuration**: + +```ballerina +listener ftp:Listener remoteServer = check new({ + protocol: ftp:FTP, + host: "", + port: , + path: "", + pollingInterval: , + auth: { + credentials: {username: "", password: ""} + }, + csvFailSafe: { + contentType: ftp:RAW_AND_METADATA + } +}); +``` + +The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered. + +- `METADATA` (default) - Logs only metadata about the error (location, error message) +- `RAW` - Logs the raw content of the malformed record (offending row) +- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging + ### Secure access with SFTP SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol. diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 78b582f82..3bd712377 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "ftp" -version = "2.16.0" +version = "2.16.1" authors = ["Ballerina"] keywords = ["FTP", "SFTP", "remote file", "file transfer", "client", "service"] repository = "https://github.com/ballerina-platform/module-ballerina-ftp" @@ -45,8 +45,8 @@ path = "./lib/commons-lang3-3.18.0.jar" [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "ftp-native" -version = "2.16.0" -path = "../native/build/libs/ftp-native-2.16.0.jar" +version = "2.16.1" +path = "../native/build/libs/ftp-native-2.16.1-SNAPSHOT.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" @@ -60,9 +60,8 @@ artifactId = "data.xmldata-native" version = "1.5.2" path = "./lib/data.xmldata-native-1.5.2.jar" - [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" -version = "0.8.1" -path = "./lib/data.csv-native-0.8.1.jar" +version = "0.9.0-20251209-183500-7b18d54" +path = "./lib/data.csv-native-0.9.0-20251209-183500-7b18d54.jar" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index cdd5081b2..d5859e71f 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "ftp-compiler-plugin" class = "io.ballerina.stdlib.ftp.plugin.FtpCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.0.jar" +path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.1-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index b9722980f..3a6dbd88d 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -10,7 +10,7 @@ distribution-version = "2201.12.0" [[package]] org = "ballerina" name = "crypto" -version = "2.9.2" +version = "2.10.0" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} @@ -22,9 +22,10 @@ modules = [ [[package]] org = "ballerina" name = "data.csv" -version = "0.8.1" +version = "0.9.0" dependencies = [ - {org = "ballerina", name = "jballerina.java"} + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "log"} ] modules = [ {org = "ballerina", packageName = "data.csv", moduleName = "data.csv"} @@ -54,15 +55,31 @@ modules = [ {org = "ballerina", packageName = "data.xmldata", moduleName = "data.xmldata"} ] +[[package]] +org = "ballerina" +name = "file" +version = "1.11.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "os"}, + {org = "ballerina", name = "time"} +] +modules = [ + {org = "ballerina", packageName = "file", moduleName = "file"} +] + [[package]] org = "ballerina" name = "ftp" -version = "2.16.0" +version = "2.16.1" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.csv"}, {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "data.xmldata"}, + {org = "ballerina", name = "file"}, {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.runtime"}, @@ -184,7 +201,7 @@ dependencies = [ [[package]] org = "ballerina" name = "log" -version = "2.13.0" +version = "2.14.0" dependencies = [ {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, @@ -203,10 +220,20 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "os" +version = "1.10.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "io"}, + {org = "ballerina", name = "jballerina.java"} +] + [[package]] org = "ballerina" name = "task" -version = "2.10.0" +version = "2.11.0" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"}, @@ -233,7 +260,7 @@ modules = [ [[package]] org = "ballerina" name = "time" -version = "2.7.0" +version = "2.8.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] diff --git a/ballerina/README.md b/ballerina/README.md index 8da21b2e0..3dcf2f5b3 100644 --- a/ballerina/README.md +++ b/ballerina/README.md @@ -399,6 +399,36 @@ listener ftp:Listener ftpsListener = check new({ }); ``` +##### CSV fail-safe mode + +The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`_error.log`), allowing the processing to continue without interruption. + +This is only supported in `onFileCsv` trigger methods in ftp listener. + +**Listener Configuration**: + +```ballerina +listener ftp:Listener remoteServer = check new({ + protocol: ftp:FTP, + host: "", + port: , + path: "", + pollingInterval: , + auth: { + credentials: {username: "", password: ""} + }, + csvFailSafe: { + contentType: ftp:RAW_AND_METADATA + } +}); +``` + +The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered. + +- `METADATA` (default) - Logs only metadata about the error (location, error message) +- `RAW` - Logs the raw content of the malformed record (offending row) +- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging + ### Secure access with SFTP SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol. diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 3de2b07b2..223a716de 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -158,6 +158,76 @@ task startTestUtilAsDaemon(type: com.github.psxpaul.task.ExecFork) { standardOutput = new File("$buildDir/tempUtilLogs.log") } +task waitForFtpsTestServer() { + dependsOn startTestUtilAsDaemon + doLast { + def host = "127.0.0.1" + def explicitPort = 21214 + def implicitPort = 21217 + + def connectTimeoutMs = 500 + def readTimeoutMs = 500 + def maxWaitMs = 30_000 + def pollIntervalMs = 500 + + def isExplicitFtpsReady = { -> + java.net.Socket socket = null + try { + socket = new java.net.Socket() + socket.connect(new java.net.InetSocketAddress(host, explicitPort as int), connectTimeoutMs) + socket.setSoTimeout(readTimeoutMs) + def reader = new java.io.BufferedReader(new java.io.InputStreamReader(socket.getInputStream())) + def line = reader.readLine() + try { + if (line != null && line.startsWith("220")) { + def out = new java.io.OutputStreamWriter(socket.getOutputStream()) + out.write("QUIT\r\n") + out.flush() + } + } catch (Exception ignored) { + // ignore + } + return (line != null && line.startsWith("220")) + } catch (Exception ignored) { + return false + } finally { + try { + socket?.close() + } catch (Exception ignored) { + // ignore + } + } + } + + def isTcpPortOpen = { int port -> + java.net.Socket socket = null + try { + socket = new java.net.Socket() + socket.connect(new java.net.InetSocketAddress(host, port as int), connectTimeoutMs) + return true + } catch (Exception ignored) { + return false + } finally { + try { + socket?.close() + } catch (Exception ignored) { + // ignore + } + } + } + + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < maxWaitMs) { + if (isExplicitFtpsReady() && isTcpPortOpen(implicitPort)) { + return + } + sleep(pollIntervalMs) + } + + throw new GradleException("FTPS mock server was not ready on ${host}:${explicitPort} after waiting ${maxWaitMs / 1000} seconds") + } +} + publishing { publications { maven(MavenPublication) { @@ -188,6 +258,7 @@ test.dependsOn ":${packageName}-native:build" populateTestUtilExecutableName.dependsOn ":${packageName}-test-utils:build" startTestUtilAsDaemon.dependsOn populateTestUtilExecutableName test.dependsOn startTestUtilAsDaemon +test.dependsOn waitForFtpsTestServer test.dependsOn ":${packageName}-compiler-plugin:build" publishToMavenLocal.dependsOn build diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index 9529bccb8..ba048f8ad 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -450,6 +450,8 @@ public enum Compression { # + sftpCompression - Compression algorithms (SFTP only) # + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only) # + proxy - Proxy configuration for SFTP connections (SFTP only) +# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, +# malformed CSV records are skipped and written to a separate file in the current directory public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -463,6 +465,7 @@ public type ClientConfiguration record {| FileTransferMode fileTransferMode = BINARY; TransferCompression[] sftpCompression = [NO]; string sftpSshKnownHosts?; + FailSafeOptions csvFailSafe?; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/listener_endpoint.bal b/ballerina/listener_endpoint.bal index 770181d7d..2ae02d8cb 100644 --- a/ballerina/listener_endpoint.bal +++ b/ballerina/listener_endpoint.bal @@ -172,6 +172,8 @@ class Job { # + sftpCompression - Compression algorithms (SFTP only) # + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only) # + proxy - Proxy configuration for SFTP connections (SFTP only) +# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode, +# malformed CSV records are skipped and written to a separate file in the current directory public type ListenerConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; @@ -190,8 +192,23 @@ public type ListenerConfiguration record {| FileTransferMode fileTransferMode = BINARY; TransferCompression[] sftpCompression = [NO]; string sftpSshKnownHosts?; + FailSafeOptions csvFailSafe?; |}; +# Fail-safe options for CSV content processing. +# +# + contentType - Specifies the type of content to log in case of errors +public type FailSafeOptions record {| + ErrorLogContentType contentType = METADATA; +|}; + +# Specifies the type of content to log in case of errors during fail-safe CSV processing. +public enum ErrorLogContentType { + METADATA, + RAW, + RAW_AND_METADATA +}; + # FTP service for handling file system change events. public type Service distinct service object { }; diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index 87635ed25..dfa0e8b6c 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -14,6 +14,7 @@ // specific language governing permissions and limitations // under the License. +import ballerina/file; import ballerina/io; import ballerina/lang.runtime; import ballerina/log; @@ -73,6 +74,7 @@ int xmlRecordFilesProcessed = 0; const string JSON_TEST_FILE = "tests/resources/datafiles/test_data.json"; const string XML_TEST_FILE = "tests/resources/datafiles/test_data.xml"; const string CSV_TEST_FILE = "tests/resources/datafiles/test_data.csv"; +const string CSV_TEST_FILE_WITH_ERROR = "tests/resources/datafiles/test_data_with_error.csv"; const string TEXT_TEST_FILE = "tests/resources/datafiles/test_text.txt"; const string GENERIC_TEST_FILE = "tests/resources/datafiles/file2.txt"; @@ -682,6 +684,67 @@ public function testOnFileCsvStream() returns error? { test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); } +@test:Config { + dependsOn: [testOptionalParametersWithoutCaller] +} +public function testOnFileCsvStreamWithFailSafe() returns error? { + // Reset state + csvStreamRowsProcessed = 0; + lastFileInfo = (); + contentMethodInvoked = false; + + // Service with onFileCsv (stream variant) + Service csvStreamService = service object { + remote function onFileCsv(stream content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `onFileCsv (stream) invoked for: ${fileInfo.name}`); + lastFileInfo = fileInfo; + contentMethodInvoked = true; + + // Process stream + error? processStream = content.forEach(function(string[] row) { + csvStreamRowsProcessed += 1; + log:printInfo(string `Processing CSV row: ${row.length()} columns`); + }); + + if processStream is error { + log:printError("Error processing CSV stream", processStream); + return processStream; + } + } + }; + + // Create listener for .csvstream files + Listener csvStreamListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: CONTENT_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "csvstream.*\\.csv", + csvFailSafe: {} + }); + + check csvStreamListener.attach(csvStreamService); + check csvStreamListener.'start(); + runtime:registerListener(csvStreamListener); + + // Upload CSV file for stream processing + stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE_WITH_ERROR, 5); + check (clientEp)->put(CONTENT_TEST_DIR + "/csvstream.csv", csvStream); + runtime:sleep(15); + + // Cleanup + runtime:deregisterListener(csvStreamListener); + check csvStreamListener.gracefulStop(); + + test:assertTrue(contentMethodInvoked, "onFileCsv (stream) should have been invoked"); + test:assertTrue(csvStreamRowsProcessed > 0, string `Should have processed CSV rows, got ${csvStreamRowsProcessed}`); + + FileInfo fileInfo = check lastFileInfo.ensureType(); + test:assertTrue(fileInfo.name.endsWith(".csv"), "Should process .csvstream files"); +} + @test:Config { dependsOn: [testOnFileCsvStream] } @@ -719,7 +782,7 @@ public function testOnFileCsvRecordArray() returns error? { runtime:registerListener(csvRecordListener); // Upload CSV file for record deserialization - stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE, 5); + stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE); check (clientEp)->put(CONTENT_TEST_DIR + "/csvrecord.csv", csvStream); runtime:sleep(15); @@ -737,6 +800,63 @@ public function testOnFileCsvRecordArray() returns error? { test:assertEquals(firstEmployee.age, 25, "First employee's age should match"); } +@test:Config { + dependsOn: [testOnFileCsvStream] +} +public function testOnFileCsvRecordArrayWithFailSafe() returns error? { + // Reset state + csvRecordArrayReceived = []; + csvRecordFilesProcessed = 0; + lastFileInfo = (); + contentMethodInvoked = false; + + // Service with onFileCsv (record array variant) + Service csvRecordService = service object { + remote function onFileCsv(Employee[] content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `onFileCsv (Employee[]) invoked for: ${fileInfo.name}, records: ${content.length()}`); + csvRecordArrayReceived = content; + lastFileInfo = fileInfo; + csvRecordFilesProcessed += 1; + contentMethodInvoked = true; + } + }; + + // Create listener for .csvrecord files + Listener csvRecordListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: CONTENT_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "csvstream.*\\.csv", + csvFailSafe: {} + }); + + check csvRecordListener.attach(csvRecordService); + check csvRecordListener.'start(); + runtime:registerListener(csvRecordListener); + + // Upload CSV file for record deserialization + stream csvStream = check io:fileReadBlocksAsStream(CSV_TEST_FILE_WITH_ERROR); + check (clientEp)->put(CONTENT_TEST_DIR + "/csvstream.csv", csvStream); + runtime:sleep(15); + + // Cleanup + runtime:deregisterListener(csvRecordListener); + check csvRecordListener.gracefulStop(); + + test:assertTrue(contentMethodInvoked, "onFileCsv (record array) should have been invoked"); + test:assertTrue(csvRecordArrayReceived.length() >= 2, + string `Should have deserialized at least 2 Employee records, got ${csvRecordArrayReceived.length()}`); + + // Verify first record + Employee firstEmployee = csvRecordArrayReceived[0]; + test:assertEquals(firstEmployee.name, "Bob Williams", "First employee's name should match"); + test:assertEquals(firstEmployee.age, 35, "First employee's age should match"); + check file:remove("csvstream_error.log"); +} + @test:Config { dependsOn: [testOnFileCsvRecordArray] } diff --git a/ballerina/tests/resources/datafiles/test_data_with_error.csv b/ballerina/tests/resources/datafiles/test_data_with_error.csv new file mode 100644 index 000000000..4cbcff936 --- /dev/null +++ b/ballerina/tests/resources/datafiles/test_data_with_error.csv @@ -0,0 +1,4 @@ +name,age,city,salary +Alice Johnson,invalid,Seattle,75000 +Bob Williams,35,Boston,95000 +Carol Davis,42,Chicago,110000 diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index 4b892c86c..dee596fac 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -60,7 +60,6 @@ artifactId = "data.xmldata-native" version = "@data.xmldata.version@" path = "./lib/data.xmldata-native-@data.xmldata.version@.jar" - [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "data.csv-native" diff --git a/build.gradle b/build.gradle index 3f483e2a5..f07494074 100644 --- a/build.gradle +++ b/build.gradle @@ -74,10 +74,13 @@ subprojects { /* Standard libraries */ ballerinaStdLibs "io.ballerina.stdlib:uuid-ballerina:${stdlibUuidVersion}" ballerinaStdLibs "io.ballerina.stdlib:log-ballerina:${stdlibLogVersion}" + ballerinaStdLibs "io.ballerina.stdlib:os-ballerina:${stdlibOsVersion}" ballerinaStdLibs "io.ballerina.stdlib:io-ballerina:${stdlibIoVersion}" ballerinaStdLibs "io.ballerina.stdlib:crypto-ballerina:${stdlibCryptoVersion}" ballerinaStdLibs "io.ballerina.stdlib:task-ballerina:${stdlibTaskVersion}" + ballerinaStdLibs "io.ballerina.stdlib:uuid-ballerina:${stdlibUuidVersion}" ballerinaStdLibs "io.ballerina.stdlib:time-ballerina:${stdlibTimeVersion}" + ballerinaStdLibs "io.ballerina.stdlib:file-ballerina:${stdlibFileVersion}" ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}" ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}" diff --git a/gradle.properties b/gradle.properties index 1756a311d..9cfbd8697 100644 --- a/gradle.properties +++ b/gradle.properties @@ -33,19 +33,21 @@ ballerinaLangVersion=2201.12.0 # Level 1 stdlibIoVersion=1.8.0 -stdlibTimeVersion=2.7.0 +stdlibTimeVersion=2.8.0 # Level 2 -stdlibCryptoVersion=2.9.2 -stdlibLogVersion=2.13.0 +stdlibCryptoVersion=2.10.0 +stdlibLogVersion=2.14.0 +stdlibOsVersion=1.10.0 # Level 3 -stdlibTaskVersion=2.10.0 +stdlibTaskVersion=2.11.0 stdlibDataJsonDataVersion=1.1.3 stdlibDataXmlDataVersion=1.5.2 +stdlibFileVersion=1.11.0 # Level 4 -stdlibDataCsvVersion=0.8.1 +stdlibDataCsvVersion=0.9.0-20251209-183500-7b18d54 stdlibUuidVersion=1.10.0 observeVersion=1.5.0 diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java b/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java index 45b0e9842..ff75bb865 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java @@ -19,6 +19,7 @@ import io.ballerina.lib.data.csvdata.csv.Native; import io.ballerina.lib.data.csvdata.utils.ModuleUtils; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.PredefinedTypes; @@ -94,7 +95,7 @@ public static Object createRecordStream(InputStream content, Type streamValueTyp return ValueCreator.createStreamValue(streamType, contentCsvStreamObject); } - public static Object next(BObject recordIterator) { + public static Object next(Environment environment, BObject recordIterator) { final Type elementType = (Type) recordIterator.getNativeData(NATIVE_STREAM_VALUE_TYPE); final String recordTypeName = resolveRecordTypeName(elementType); final BMap streamEntry = @@ -118,9 +119,9 @@ public static Object next(BObject recordIterator) { BMap parseOptions = ValueCreator.createRecordValue(ModuleUtils.getModule(), "ParseOptions"); parseOptions.put(StringUtils.fromString("allowDataProjection"), laxDataBinding); - // Parse Object parsed = Native.parseBytes( + environment, ValueCreator.createArrayValue(bytes), parseOptions, ValueCreator.createTypedescValue(TypeCreator.createArrayType(elementType))); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index 01963149f..24a5dd27a 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -72,6 +72,7 @@ import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToString; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToXml; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertToBallerinaByteArray; +import static io.ballerina.stdlib.ftp.util.FtpContentConverter.deriveFileNamePrefix; import static io.ballerina.stdlib.ftp.util.FtpUtil.ErrorType.Error; import static io.ballerina.stdlib.ftp.util.FtpUtil.extractCompressionConfiguration; import static io.ballerina.stdlib.ftp.util.FtpUtil.extractFileTransferConfiguration; @@ -121,6 +122,8 @@ private static void configureClientEndpointBasic(BObject clientEndpoint, BMap authMap = FtpUtil.getAuthMap(config, protocol); clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_USERNAME, @@ -333,7 +336,10 @@ public static Object getCsv(Environment env, BObject clientConnector, BString fi } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); - return convertBytesToCsv((byte[]) content, typeDesc.getDescribingType(), laxDataBinding); + BMap csvFailSafe = (BMap) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_CSV_FAIL_SAFE); + String fileNamePrefix = deriveFileNamePrefix(filePath); + return convertBytesToCsv(env, (byte[]) content, typeDesc.getDescribingType(), + laxDataBinding, csvFailSafe, fileNamePrefix); } public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java index 7dde69d02..a013a5eb3 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java @@ -18,6 +18,7 @@ package io.ballerina.stdlib.ftp.server; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.concurrent.StrandMetadata; @@ -65,6 +66,7 @@ import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToString; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertBytesToXml; import static io.ballerina.stdlib.ftp.util.FtpContentConverter.convertToBallerinaByteArray; +import static io.ballerina.stdlib.ftp.util.FtpContentConverter.deriveFileNamePrefix; /** * Handles content-based callbacks for FTP listener. @@ -76,20 +78,23 @@ public class FtpContentCallbackHandler { private final FileSystemManager fileSystemManager; private final FileSystemOptions fileSystemOptions; private final boolean laxDataBinding; + private final BMap csvFailSafe; public FtpContentCallbackHandler(Runtime ballerinaRuntime, FileSystemManager fileSystemManager, - FileSystemOptions fileSystemOptions, boolean laxDataBinding) { + FileSystemOptions fileSystemOptions, boolean laxDataBinding, + BMap csvFailSafe) { this.ballerinaRuntime = ballerinaRuntime; this.fileSystemManager = fileSystemManager; this.fileSystemOptions = fileSystemOptions; this.laxDataBinding = laxDataBinding; + this.csvFailSafe = csvFailSafe; } /** * Processes content callbacks for added files in the event. * Routes each file to the appropriate content handler based on file extension and annotations. */ - public void processContentCallbacks(BObject service, RemoteFileSystemEvent event, + public void processContentCallbacks(Environment env, BObject service, RemoteFileSystemEvent event, FormatMethodsHolder holder, BObject callerObject) { List addedFiles = event.getAddedFiles(); @@ -110,7 +115,7 @@ public void processContentCallbacks(BObject service, RemoteFileSystemEvent event // Convert content based on method signature MethodType methodType = methodTypeOpt.get(); - Object convertedContent = convertFileContent(fileObject, inputStream, methodType); + Object convertedContent = convertFileContent(env, fileObject, inputStream, methodType); // Prepare method arguments Object[] methodArguments = prepareContentMethodArguments(methodType, convertedContent, @@ -153,7 +158,8 @@ private byte[] fetchAllFileContentFromRemote(FileObject fileObject, InputStream /** * Converts file content to the appropriate Ballerina type based on the method signature. */ - private Object convertFileContent(FileObject fileObject, InputStream inputStream, MethodType methodType) + private Object convertFileContent(Environment environment, FileObject fileObject, InputStream inputStream, + MethodType methodType) throws Exception { String methodName = methodType.getName(); Parameter firstParameter = methodType.getParameters()[0]; @@ -178,12 +184,14 @@ private Object convertFileContent(FileObject fileObject, InputStream inputStream } } else { byte[] fileContent = fetchAllFileContentFromRemote(fileObject, inputStream); + String fileNamePrefix = deriveFileNamePrefix(fileObject); return switch (methodName) { case ON_FILE_REMOTE_FUNCTION -> convertToBallerinaByteArray(fileContent); case ON_FILE_TEXT_REMOTE_FUNCTION -> convertBytesToString(fileContent); case ON_FILE_JSON_REMOTE_FUNCTION -> convertBytesToJson(fileContent, firstParamType, laxDataBinding); case ON_FILE_XML_REMOTE_FUNCTION -> convertBytesToXml(fileContent, firstParamType, laxDataBinding); - case ON_FILE_CSV_REMOTE_FUNCTION -> convertBytesToCsv(fileContent, firstParamType, laxDataBinding); + case ON_FILE_CSV_REMOTE_FUNCTION -> convertBytesToCsv(environment, fileContent, firstParamType, + laxDataBinding, csvFailSafe, fileNamePrefix); default -> throw new IllegalArgumentException("Unknown content method: " + methodName); }; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java index aac0979ac..ef559f5fd 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java @@ -18,6 +18,7 @@ package io.ballerina.stdlib.ftp.server; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.concurrent.StrandMetadata; @@ -77,16 +78,23 @@ public class FtpListener implements RemoteFileSystemListener { private static final Logger log = LoggerFactory.getLogger(FtpListener.class); private final Runtime runtime; + private Environment environment = null; private Map registeredServices = new HashMap<>(); private BObject caller; private FileSystemManager fileSystemManager; private FileSystemOptions fileSystemOptions; private boolean laxDataBinding; + private BMap csvFailSafe = ValueCreator.createMapValue(); FtpListener(Runtime runtime) { this.runtime = runtime; } + FtpListener(Environment environment) { + this.environment = environment; + this.runtime = environment.getRuntime(); + } + public void setFileSystemManager(FileSystemManager fileSystemManager) { this.fileSystemManager = fileSystemManager; } @@ -99,6 +107,10 @@ public void setLaxDataBinding(boolean laxDataBinding) { this.laxDataBinding = laxDataBinding; } + public void setCsvFailSafeConfigs(BMap csvFailSafe) { + this.csvFailSafe = csvFailSafe; + } + @Override public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage) { if (remoteFileSystemBaseMessage instanceof RemoteFileSystemEvent) { @@ -106,7 +118,7 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage if (runtime != null) { for (BObject service : registeredServices.values()) { - dispatchFileEventToService(service, event); + dispatchFileEventToService(this.environment, service, event); } } else { log.error("Runtime should not be null."); @@ -115,13 +127,13 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage return true; } - private void dispatchFileEventToService(BObject service, RemoteFileSystemEvent event) { + private void dispatchFileEventToService(Environment env, BObject service, RemoteFileSystemEvent event) { FormatMethodsHolder formatMethodHolder = new FormatMethodsHolder(service); Optional onFileDeletedMethodType = getOnFileDeletedMethod(service); // Dispatch Strategy: Check handler availability in order if (formatMethodHolder.hasContentMethods()) { - processContentBasedCallbacks(service, event, formatMethodHolder); + processContentBasedCallbacks(env, service, event, formatMethodHolder); } else if (onFileDeletedMethodType.isPresent()) { if (!event.getDeletedFiles().isEmpty()) { processDeletionCallback(service, event, onFileDeletedMethodType.get()); @@ -145,7 +157,7 @@ private void dispatchFileEventToService(BObject service, RemoteFileSystemEvent e * Uses ContentMethodRouter to dispatch files to appropriate handlers. * Also handles file deletion events via onFileDeleted method if available. */ - private void processContentBasedCallbacks(BObject service, RemoteFileSystemEvent event, + private void processContentBasedCallbacks(Environment env, BObject service, RemoteFileSystemEvent event, FormatMethodsHolder holder) { // Process added files with content methods if (!event.getAddedFiles().isEmpty()) { @@ -155,8 +167,8 @@ private void processContentBasedCallbacks(BObject service, RemoteFileSystemEvent } else { try { FtpContentCallbackHandler contentHandler = new FtpContentCallbackHandler( - runtime, fileSystemManager, fileSystemOptions, laxDataBinding); - contentHandler.processContentCallbacks(service, event, holder, caller); + runtime, fileSystemManager, fileSystemOptions, laxDataBinding, csvFailSafe); + contentHandler.processContentCallbacks(env, service, event, holder, caller); } catch (Exception e) { log.error("Error in content callback processing for added files", e); } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java index 11be3fe53..af2e642af 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java @@ -70,6 +70,8 @@ public class FtpListenerHelper { private static final String CLOSE_CALLER_ERROR = "Error occurred while closing the caller: "; private static final BString CLIENT_INSTANCE = StringUtils.fromString("client"); + public static final BString CSV_FAIL_SAFE = StringUtils.fromString("csvFailSafe"); + private FtpListenerHelper() { // private constructor } @@ -84,7 +86,7 @@ public static Object init(Environment env, BObject ftpListener, BMap paramMap = getServerConnectorParamMap(serviceEndpointConfig); RemoteFileSystemConnectorFactory fileSystemConnectorFactory = new RemoteFileSystemConnectorFactoryImpl(); - final FtpListener listener = new FtpListener(env.getRuntime()); + final FtpListener listener = new FtpListener(env); RemoteFileSystemServerConnector serverConnector; List dependencyConditions = parseFileDependencyConditions(serviceEndpointConfig); @@ -107,6 +109,8 @@ public static Object init(Environment env, BObject ftpListener, BMap csvFailSafe = serviceEndpointConfig.getMapValue(CSV_FAIL_SAFE); + listener.setCsvFailSafeConfigs(csvFailSafe); ftpListener.addNativeData(FtpConstants.FTP_SERVER_CONNECTOR, serverConnector); // This is a temporary solution diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index 1d0eed4d1..c1aa9137e 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -105,6 +105,7 @@ private FtpConstants() { public static final String FTPS_DATA_CHANNEL_PROTECTION_PRIVATE = "PRIVATE"; public static final String FTPS_DATA_CHANNEL_PROTECTION_SAFE = "SAFE"; public static final String FTPS_DATA_CHANNEL_PROTECTION_CONFIDENTIAL = "CONFIDENTIAL"; + public static final String ENDPOINT_CONFIG_CSV_FAIL_SAFE = "csvFailSafe"; // Advanced file selection configuration constants public static final String ENDPOINT_CONFIG_FILE_AGE_FILTER = "fileAgeFilter"; diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java index 10b871dda..a2c7c2b02 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpContentConverter.java @@ -20,6 +20,7 @@ import io.ballerina.lib.data.ModuleUtils; import io.ballerina.lib.data.xmldata.xml.Native; +import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.Type; @@ -31,11 +32,14 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; +import org.apache.commons.vfs2.FileObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.nio.charset.StandardCharsets; +import static io.ballerina.lib.data.csvdata.csv.Native.parseBytes; import static io.ballerina.stdlib.ftp.util.FtpConstants.FTP_ERROR; import static io.ballerina.stdlib.ftp.util.FtpUtil.ErrorType.Error; @@ -47,6 +51,17 @@ public final class FtpContentConverter { private static final Logger log = LoggerFactory.getLogger(FtpContentConverter.class); private static final BString ALLOW_DATA_PROJECTION = StringUtils.fromString("allowDataProjection"); + public static final BString FILE_PATH = StringUtils.fromString("filePath"); + public static final String CURRENT_DIRECTORY_PATH = System.getProperty("user.dir"); + public static final String ERROR_LOG_FILE_NAME = "error.log"; + public static final BString APPEND = StringUtils.fromString("APPEND"); + public static final BString FILE_WRITE_OPTION = StringUtils.fromString("fileWriteOption"); + public static final BString CONTENT_TYPE = StringUtils.fromString("contentType"); + public static final BString RAW_TYPE = StringUtils.fromString("RAW"); + public static final BString FILE_OUTPUT_MODE = StringUtils.fromString("fileOutputMode"); + public static final BString FAIL_SAFE = StringUtils.fromString("failSafe"); + public static final String FAIL_SAFE_OPTIONS = "FailSafeOptions"; + public static final String FILE_OUTPUT_MODE_TYPE = "FileOutputMode"; private FtpContentConverter() { // private constructor @@ -120,15 +135,16 @@ public static Object convertBytesToXml(byte[] content, Type targetType, boolean * @param targetType The target Ballerina type for data binding * @return Ballerina CSV data (string[][], record[][], or custom type) or BError */ - public static Object convertBytesToCsv(byte[] content, Type targetType, boolean laxDataBinding) { + public static Object convertBytesToCsv(Environment env, byte[] content, Type targetType, boolean laxDataBinding, + BMap csvFailSafeConfigs, String fileNamePrefix) { try { BArray byteArray = ValueCreator.createArrayValue(content); - BMap options = createCsvParseOptions(laxDataBinding); + BMap options = createCsvParseOptions(laxDataBinding, csvFailSafeConfigs, fileNamePrefix); Type referredType = TypeUtils.getReferredType(targetType); BTypedesc typedesc = ValueCreator.createTypedescValue(referredType); - Object result = io.ballerina.lib.data.csvdata.csv.Native.parseBytes(byteArray, options, typedesc); + Object result = parseBytes(env, byteArray, options, typedesc); if (result instanceof BError) { log.error("Failed to parse CSV content: {}", ((BError) result).getMessage()); @@ -182,9 +198,25 @@ private static BMap createXmlParseOptions(boolean laxDataBindin * * @return BMap containing parse options */ - private static BMap createCsvParseOptions(boolean laxDataBinding) { + private static BMap createCsvParseOptions(boolean laxDataBinding, + BMap csvFailSafeConfigs, String fileNamePrefix) { BMap mapValue = ValueCreator.createRecordValue( io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), "ParseOptions"); + if (csvFailSafeConfigs != null) { + BString contentType = csvFailSafeConfigs.getStringValue(CONTENT_TYPE); + BMap failSafe = + ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), + FAIL_SAFE_OPTIONS); + BMap fileOutputMode = + ValueCreator.createRecordValue(io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), + FILE_OUTPUT_MODE_TYPE); + String filePath = CURRENT_DIRECTORY_PATH + File.separator + fileNamePrefix + "_" + ERROR_LOG_FILE_NAME; + fileOutputMode.put(FILE_PATH, StringUtils.fromString(filePath)); + fileOutputMode.put(FILE_WRITE_OPTION, APPEND); + fileOutputMode.put(CONTENT_TYPE, contentType); + failSafe.put(FILE_OUTPUT_MODE, fileOutputMode); + mapValue.put(FAIL_SAFE, failSafe); + } if (laxDataBinding) { BMap allowDataProjection = mapValue.getMapValue(StringUtils.fromString("allowDataProjection")); allowDataProjection.put(StringUtils.fromString("nilAsOptionalField"), Boolean.TRUE); @@ -196,6 +228,16 @@ private static BMap createCsvParseOptions(boolean laxDataBindin return mapValue; } + public static String deriveFileNamePrefix(Object fileDetails) { + if (fileDetails instanceof FileObject fileObject) { + String baseName = fileObject.getName().getBaseName(); + String extension = fileObject.getName().getExtension(); + return extension.isEmpty() ? baseName : baseName.substring(0, baseName.length() - extension.length() - 1); + } + String filePath = fileDetails.toString(); + return filePath.replaceAll("\\.[^.]+$", ""); + } + /** * Creates a byte array from input stream. *