diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index e9a2bdcee..d69c2349f 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "ftp" -version = "2.15.1" +version = "2.16.0" authors = ["Ballerina"] keywords = ["FTP", "SFTP", "remote file", "file transfer", "client", "service"] repository = "https://github.com/ballerina-platform/module-ballerina-ftp" @@ -45,8 +45,8 @@ path = "./lib/commons-lang3-3.18.0.jar" [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "ftp-native" -version = "2.15.1" -path = "../native/build/libs/ftp-native-2.15.1.jar" +version = "2.16.0" +path = "../native/build/libs/ftp-native-2.16.0-SNAPSHOT.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 305a0fffa..bfb8913c1 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "ftp-compiler-plugin" class = "io.ballerina.stdlib.ftp.plugin.FtpCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.15.1.jar" +path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.0-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 931abc14d..289d90ccf 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,10 +7,21 @@ dependencies-toml-version = "2" distribution-version = "2201.12.0" +[[package]] +org = "ballerina" +name = "data.csv" +version = "0.8.1" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] +modules = [ + {org = "ballerina", packageName = "data.csv", moduleName = "data.csv"} +] + [[package]] org = "ballerina" name = "data.jsondata" -version = "1.1.2" +version = "1.1.3" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "lang.object"} @@ -34,8 +45,9 @@ modules = [ [[package]] org = "ballerina" name = "ftp" -version = "2.15.1" +version = "2.16.0" dependencies = [ + {org = "ballerina", name = "data.csv"}, {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "data.xmldata"}, {org = "ballerina", name = "io"}, @@ -163,7 +175,7 @@ modules = [ [[package]] org = "ballerina" name = "observe" -version = "1.5.0" +version = "1.5.1" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index 7ceff8074..d3b94464e 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -14,12 +14,13 @@ // specific language governing permissions and limitations // under the License. -import ballerina/io; -import ballerina/jballerina.java; +import ballerina/data.csv as _; import ballerina/data.jsondata as _; import ballerina/data.xmldata; +import ballerina/io; +import ballerina/jballerina.java; -# Represents an FTP client that intracts with an FTP server +# Represents an FTP client that interacts with an FTP server public isolated client class Client { private final readonly & ClientConfiguration config; @@ -36,12 +37,11 @@ public isolated client class Client { } # Retrieves the file content from a remote resource. + # Deprecated: Use `getText`, `getJson`, `getXml`, `getCsv`, `getBytes`, or the streaming variants `getBytesAsStream`/`getCsvAsStream` instead. # ```ballerina # stream|ftp:Error channel = client->get(path); # ``` # - # Deprecated: Use `getText` or `getJson` instead for retrieving file content. - # # + path - The resource path # + return - A byte stream from which the file can be read or `ftp:Error` in case of errors @deprecated @@ -59,7 +59,7 @@ public isolated client class Client { # ```ballerina # byte[] content = check client->getBytes(path); # ``` - # + # # + path - The resource path # + return - content as a byte array or `ftp:Error` in case of errors remote isolated function getBytes(string path) returns byte[]|Error { @@ -86,7 +86,7 @@ public isolated client class Client { # + targetType - The target type of the json content # (e.g., `json`, `record {}`, or a user-defined record type) # + return - content as a json or `ftp:Error` in case of errors - remote isolated function getJson(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { + remote isolated function getJson(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { name: "getJson", 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; @@ -100,7 +100,7 @@ public isolated client class Client { # + targetType - The target type of the xml content # (e.g., `xml`, `record {}`, or a user-defined record type) # + return - content as a xml or `ftp:Error` in case of errors - remote isolated function getXml(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { + remote isolated function getXml(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { name: "getXml", 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; @@ -110,12 +110,12 @@ public isolated client class Client { # ```ballerina # string[][] content = check client->getCsv(path); # ``` - # + # # + path - The path to the file on the FTP server # + targetType - The target type of the CSV content # (e.g., `string[][]`, `record {}[]`, or an array of user-defined record type) # + return - content as a string[][] or `ftp:Error` in case of errors - remote isolated function getCsv(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { + remote isolated function getCsv(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { name: "getCsv", 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; @@ -132,16 +132,29 @@ public isolated client class Client { 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; + # Retrieves the file content as a CSV stream from a remote resource. + # ```ballerina + # stream response = check client->getCsvAsStream(path); + # ``` + # + # + path - The path to the file on the FTP server + # + targetType - The target element type of the stream (e.g., `string[]` or `record {}`) + # + return - A stream from which the file can be read or `ftp:Error` in case of errors + remote isolated function getCsvAsStream(string path, typedesc targetType = <>) returns stream|Error = @java:Method { + name: "getCsvAsStream", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + # Appends the content to an existing file in an FTP server. # ```ballerina # ftp:Error? response = client->append(path, channel); # ``` - # Deprecated: Use `putText`, `putJson`, `putXml`, `putCsv`, `putBytes` or `putCsvFromStream` # # + path - The resource path # + content - Content to be written to the file in server # + return - `()` or else an `ftp:Error` if failed to establish # the communication with the FTP server + # Deprecated: Use `putText`, `putJson`, `putXml`, `putCsv`, `putBytes` or `putCsvAsStream` with option `APPEND`. @deprecated remote isolated function append(string path, stream|string|xml|json content) returns Error? { @@ -152,7 +165,6 @@ public isolated client class Client { # ```ballerina # ftp:Error? response = client->put(path, channel); # ``` - # Deprecated: Use `putText`, `putJson`, `putXml`, `putCsv`, `putBytes` or `putCsvFromStream` # # + path - The resource path # + content - Content to be written to the file in server @@ -161,6 +173,7 @@ public isolated client class Client { # uploading # + return - `()` or else an `ftp:Error` if failed to establish # the communication with the FTP server + # Deprecated: Use `putText`, `putJson`, `putXml`, `putCsv`, `putBytes` or `putCsvAsStream`. @deprecated remote isolated function put(string path, stream |string|xml|json content, Compression compressionType = NONE) returns Error? { @@ -175,7 +188,7 @@ public isolated client class Client { # ```ballerina # ftp:Error? response = client->putBytes(path, content, option); # ``` - # + # # + path - The resource path # + content - Content to be written to the file in server # + option - To indicate whether to overwrite or append the given content @@ -189,7 +202,7 @@ public isolated client class Client { # ```ballerina # ftp:Error? response = client->putText(path, content, option); # ``` - # + # # + path - The resource path # + content - Content to be written to the file in server # + option - To indicate whether to overwrite or append the given content @@ -203,26 +216,25 @@ public isolated client class Client { # ```ballerina # ftp:Error? response = client->putJson(path, content, option); # ``` - # + # # + path - The resource path # + content - Content to be written to the file in server # + option - To indicate whether to overwrite or append the given content # + return - `()` or else an `ftp:Error` if failed to write - remote isolated function putJson(string path, json|record{} content, FileWriteOption option = OVERWRITE) returns Error? { - string jsonVal = content is json ? content.toJsonString() : content.toJsonString(); - return putJson(self, path, jsonVal, option); + remote isolated function putJson(string path, json|record {} content, FileWriteOption option = OVERWRITE) returns Error? { + return putJson(self, path, content.toJsonString(), option); } # Adds a file to an FTP server with the specified write option. # ```ballerina # ftp:Error? response = client->putXml(path, content, option); # ``` - # + # # + path - The resource path # + content - Content to be written to the file in server # + option - To indicate whether to overwrite or append the given content # + return - `()` or else an `ftp:Error` if failed to write - remote isolated function putXml(string path, xml|record{} content, FileWriteOption option = OVERWRITE) returns Error? { + remote isolated function putXml(string path, xml|record {} content, FileWriteOption option = OVERWRITE) returns Error? { xml|error xmldata = content is xml ? content : xmldata:toXml(content); if xmldata is error { return error Error("Failed to convert record to XML: " + xmldata.message()); @@ -230,6 +242,47 @@ public isolated client class Client { return putXml(self, path, xmldata, option); } + # Adds a CSV file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putCsv(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putCsv(string path, string[][]|record {}[] content, FileWriteOption option = OVERWRITE) returns Error? { + return putCsv(self, path, content, option); + } + + # Adds a byte[] stream as a file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putBytesAsStream(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putBytesAsStream(string path, stream content, FileWriteOption option = OVERWRITE) returns Error? = @java:Method { + name: "putBytesAsStream", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + + # Adds a CSV file from string[][] or record{}[] elements as a file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putCsvAsStream(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putCsvAsStream(string path, stream content, FileWriteOption option = OVERWRITE) returns Error? = @java:Method { + name: "putCsvAsStream", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + # Creates a new directory in an FTP server. # ```ballerina # ftp:Error? response = client->mkdir(path); diff --git a/ballerina/content_csv_record_stream.bal b/ballerina/content_csv_record_stream.bal new file mode 100644 index 000000000..724dcc8b8 --- /dev/null +++ b/ballerina/content_csv_record_stream.bal @@ -0,0 +1,69 @@ +// Copyright (c) 2025 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/jballerina.java; + +# Represents the type of the record which returned from the contentByteStream.next() call. +# +# + value - The record +public type ContentCsvRecordStreamEntry record {| + record {} value; +|}; + +# `ContentCsvRecordStream` used to initialize a stream of type string[] for content callbacks. +public class ContentCsvRecordStream { + + private boolean isClosed = false; + private Error? err; + + public isolated function init(Error? err = ()) { + self.err = err; + } + + # Reads and return the next CSV record as `record{}`. + # + # + return - A record containing a `record{}` value when the stream is available, + # `()` if the stream has reached the end or else an `error` + public isolated function next() returns record {|record {} value;|}|error? { + return externGetContentCsvRecordStreamEntry(self); + } + + # Closes the stream. The primary usage of this function is to close the stream without reaching the end. + # If the stream reaches the end, the `contentByteStream.next` will automatically close the stream. + # + # + return - Returns `()` when the closing was successful or an `error` + public isolated function close() returns error? { + if !self.isClosed { + var closeResult = externCloseContentCsvRecordStream(self); + if closeResult is () { + self.isClosed = true; + } + return closeResult; + } + return (); + } +} + +isolated function externGetContentCsvRecordStreamEntry(ContentCsvRecordStream iterator) + returns record {|record {} value;|}|error? = @java:Method { + 'class: "io.ballerina.stdlib.ftp.ContentCsvStreamIteratorUtils", + name: "next" +} external; + +isolated function externCloseContentCsvRecordStream(ContentCsvRecordStream iterator) returns error? = @java:Method { + 'class: "io.ballerina.stdlib.ftp.ContentCsvStreamIteratorUtils", + name: "close" +} external; diff --git a/ballerina/content_csv_string_stream.bal b/ballerina/content_csv_string_stream.bal new file mode 100644 index 000000000..4e70cef4f --- /dev/null +++ b/ballerina/content_csv_string_stream.bal @@ -0,0 +1,69 @@ +// Copyright (c) 2025 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/jballerina.java; + +# Represents the type of the record which returned from the contentByteStream.next() call. +# +# + value - The array of strings +public type ContentCsvStringArrayStreamEntry record {| + string[] value; +|}; + +# `ContentCsvStringStream` used to initialize a stream of type string[] for content callbacks. +public class ContentCsvStringArrayStream { + + private boolean isClosed = false; + private Error? err; + + public isolated function init(Error? err = ()) { + self.err = err; + } + + # Reads and return the next CSV record as `string[]`. + # + # + return - A record containing a string array when the stream is available, + # `()` if the stream has reached the end or else an `error` + public isolated function next() returns record {|string[] value;|}|error? { + return externGetContentCsvStreamEntry(self); + } + + # Closes the stream. The primary usage of this function is to close the stream without reaching the end. + # If the stream reaches the end, the `contentByteStream.next` will automatically close the stream. + # + # + return - Returns `()` when the closing was successful or an `error` + public isolated function close() returns error? { + if !self.isClosed { + var closeResult = externCloseContentCsvStream(self); + if closeResult is () { + self.isClosed = true; + } + return closeResult; + } + return (); + } +} + +isolated function externGetContentCsvStreamEntry(ContentCsvStringArrayStream iterator) + returns record {|string[] value;|}|error? = @java:Method { + 'class: "io.ballerina.stdlib.ftp.ContentCsvStreamIteratorUtils", + name: "next" +} external; + +isolated function externCloseContentCsvStream(ContentCsvStringArrayStream iterator) returns error? = @java:Method { + 'class: "io.ballerina.stdlib.ftp.ContentCsvStreamIteratorUtils", + name: "close" +} external; diff --git a/ballerina/external_functions.bal b/ballerina/external_functions.bal index ab15529b7..32e474227 100644 --- a/ballerina/external_functions.bal +++ b/ballerina/external_functions.bal @@ -56,6 +56,11 @@ isolated function putXml(Client clientEndpoint, string path, xml content, FileWr 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; +isolated function putCsv(Client clientEndpoint, string path, string[][]|record {}[] content, FileWriteOption option) returns Error? = @java:Method { + name: "putCsv", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" +} external; + isolated function mkdir(Client clientEndpoint, string path) returns Error? = @java:Method { name: "mkdir", 'class: "io.ballerina.stdlib.ftp.client.FtpClient" diff --git a/ballerina/tests/client_endpoint_test.bal b/ballerina/tests/client_endpoint_test.bal index b4c7b137d..df0be73f6 100644 --- a/ballerina/tests/client_endpoint_test.bal +++ b/ballerina/tests/client_endpoint_test.bal @@ -15,10 +15,10 @@ // under the License. import ballerina/io; -import ballerina/test; -import ballerina/lang.runtime as runtime; import ballerina/lang.'string as strings; +import ballerina/lang.runtime as runtime; import ballerina/log; +import ballerina/test; string filePath = "/home/in/test1.txt"; string nonFittingFilePath = "/home/in/test4.txt"; @@ -45,11 +45,21 @@ ClientConfiguration config = { userDirIsRoot: false }; +// Create the config to access mock FTP server +ClientConfiguration configLaxDataBinding = { + protocol: FTP, + host: "127.0.0.1", + port: 21212, + auth: {credentials: {username: "wso2", password: "wso2123"}}, + userDirIsRoot: false, + laxDataBinding: true +}; + ClientConfiguration ftpUserHomeRootConfig = { protocol: FTP, host: "127.0.0.1", port: 21212, - auth: { credentials: { username: "wso2", password: "wso2123" } }, + auth: {credentials: {username: "wso2", password: "wso2123"}}, userDirIsRoot: true }; @@ -86,6 +96,7 @@ ClientConfiguration sftpConfigUserDirRoot = { Client? anonClientEp = (); Client? clientEp = (); +Client? clientEpLaxDataBinding = (); Client? sftpClientEp = (); Client? sftpClientUserDirRootEp = (); Client? ftpUserHomeRootClientEp = (); @@ -100,6 +111,7 @@ function initTestEnvironment() returns error? { io:println("Starting servers"); anonClientEp = check new (anonConfig); clientEp = check new (config); + clientEpLaxDataBinding = check new (configLaxDataBinding); sftpClientEp = check new (sftpConfig); sftpClientUserDirRootEp = check new (sftpConfigUserDirRoot); ftpUserHomeRootClientEp = check new (ftpUserHomeRootConfig); @@ -242,7 +254,7 @@ public function testPutCompressedFileContent() returns error? { } } -@test:Config { dependsOn: [testPutFileContent]} +@test:Config {dependsOn: [testPutFileContent]} function testPutBytes() returns error? { byte[] content = "hello-bytes".toBytes(); string path = "/home/in/put-bytes.txt"; @@ -256,7 +268,7 @@ function testPutBytes() returns error? { } } -@test:Config { dependsOn: [testPutFileContent]} +@test:Config {dependsOn: [testPutFileContent]} function testGetBytesAsStream() returns error? { byte[] content = "hello-bytes".toBytes(); string path = "/home/in/put-bytes.txt"; @@ -264,10 +276,10 @@ function testGetBytesAsStream() returns error? { check (clientEp)->putBytes(path, content); stream got = check (clientEp)->getBytesAsStream(path); byte[] accumulatedBytes = []; - check from byte[] byteChunk in got - do { - accumulatedBytes.push(...byteChunk); - }; + check from byte[] byteChunk in got + do { + accumulatedBytes.push(...byteChunk); + }; test:assertEquals(accumulatedBytes.length(), content.length(), msg = "Byte length mismatch"); foreach int i in 0 ..< content.length() { @@ -275,9 +287,9 @@ function testGetBytesAsStream() returns error? { } } -@test:Config { dependsOn: [testPutFileContent]} +@test:Config {dependsOn: [testPutFileContent]} function testPutJson() returns error? { - json j = { name: "wso2", count: 2, ok: true }; + json j = {name: "wso2", count: 2, ok: true}; string path = "/home/in/put.json"; check (clientEp)->putJson(path, j); @@ -286,9 +298,9 @@ function testPutJson() returns error? { test:assertEquals(got, j, msg = "JSON content mismatch"); } -@test:Config { dependsOn: [testPutFileContent]} +@test:Config {dependsOn: [testPutFileContent]} function testPutXml() returns error? { - xml x = xml`42`; + xml x = xml `42`; string path = "/home/in/put.xml"; check (clientEp)->putXml(path, x); @@ -296,10 +308,21 @@ function testPutXml() returns error? { // Compare string representations for stability test:assertEquals(got.toString(), x.toString(), msg = "XML content mismatch"); - return; } -@test:Config { dependsOn: [testPutFileContent]} +@test:Config {dependsOn: [testPutFileContent]} +function testPutXmlFailure() returns error? { + string txt = "hello text content"; + string path = "/home/in/putXml.txt"; + + check (clientEp)->putText(path, txt); + xml|Error got = (clientEp)->getXml(path); + + // Compare string representations for stability + test:assertTrue(got is Error, msg = "XML content binding should have failed for non-XML content"); +} + +@test:Config {dependsOn: [testPutFileContent]} function testPutText() returns error? { string txt = "hello text content"; string path = "/home/in/put.txt"; @@ -308,7 +331,82 @@ function testPutText() returns error? { string got = check (clientEp)->getText(path); test:assertEquals(got, txt, msg = "Text content mismatch"); - return; +} + +@test:Config {dependsOn: [testPutFileContent]} +public function testPutCsvStringAndReadAllAndStream() returns error? { + string csvPath = "/home/in/test-csv-string.csv"; + string[][] csvData = [ + ["id", "name", "email", "age", "registered_date", "active"], + ["1", "Alice Smith", "alice@example.com", "25", "2024-01-15", "true"], + ["2", "Bob Johnson", "bob@example.com", "30", "2024-02-20", "false"], + ["3", "Carol White", "carol@example.com", "28", "2024-03-10", "true"] + ]; + + Error? resp = (clientEp)->putCsv(csvPath, csvData); + if resp is Error { + test:assertFail(msg = "Error while putting CSV string: " + resp.message()); + } else { + log:printInfo("Executed `put` operation for CSV string"); + } + + string[][] gotAll = check (clientEp)->getCsv(csvPath); + test:assertEquals(gotAll, csvData.slice(1), msg = "CSV text mismatch when reading all at once"); + + stream str = check (clientEp)->getCsvAsStream(csvPath); + string[][] actual = []; + check from string[] row in str + do { + actual.push(row); + }; + test:assertEquals(actual, csvData.slice(1), msg = "CSV stream content mismatch when reading as stream"); +} + +@test:Config { + dependsOn: [testPutCsvStringAndReadAllAndStream] +} +public function testPutCsvFromRecordsAndReadAllAndStream() returns error? { + string csvPath = "/home/in/test-csv-records.csv"; + + record {|string name; int age;|}[] records = [ + {name: "Charlie", age: 22}, + {name: "Dana", age: 28}, + {name: "Eve", age: 35} + ]; + + Error? resp = (clientEp)->putCsv(csvPath, records); + if resp is Error { + test:assertFail(msg = "Error while putting CSV string: " + resp.message()); + } else { + log:printInfo("Executed `put` operation for CSV string"); + } + + record {|string name; int age;|}[] gotAll = check (clientEp)->getCsv(csvPath); + test:assertEquals(gotAll, records, msg = "CSV text mismatch when reading all at once"); + + stream str = check (clientEp)->getCsvAsStream(csvPath); + record {|string name; int age;|}[] actual = []; + check from record {|string name; int age;|} row in str + do { + actual.push(row); + }; + test:assertEquals(actual, records, msg = "CSV stream content mismatch when reading as stream"); +} + +@test:Config { + dependsOn: [testPutCsvFromRecordsAndReadAllAndStream] +} +public function testStreamClose() returns error? { + string csvPath = "/home/in/test-csv-records.csv"; + stream str = check (clientEp)->getCsvAsStream(csvPath); + check str.close(); + + stream str1 = check (clientEp)->getCsvAsStream(csvPath); + check str1.close(); + + string path = "/home/in/put-bytes.txt"; + stream got = check (clientEp)->getBytesAsStream(path); + check got.close(); } @test:Config {} @@ -445,6 +543,244 @@ public function testPutLargeFileContent() returns error? { } } +@test:Config { dependsOn: [testPutLargeFileContent] } +function testPutBytesAsStreamNew() returns error? { + string path = "/home/in/put-bytes-stream.txt"; + (byte[])[] & readonly chunks = [ + "hello-".toBytes().cloneReadOnly(), + "world".toBytes().cloneReadOnly() + ]; + stream s = chunks.toStream(); + + Error? resp = (clientEp)->putBytesAsStream(path, s); + if resp is Error { + test:assertFail("putBytesAsStream failed: " + resp.message()); + } + + string got = check (clientEp)->getText(path); + test:assertEquals(got, "hello-world", msg = "putBytesAsStream content mismatch"); +} + +@test:Config {dependsOn: [testPutBytesAsStreamNew]} +function testPutCsvAsStreamWithStringRows() returns error? { + string csvPath = "/home/in/csv-stream-rows.csv"; + string[][] rows = [ + ["id", "name"], + ["1", "A"], + ["2", "B"] + ]; + stream s = rows.toStream(); + + Error? resp = (clientEp)->putCsvAsStream(csvPath, s); + if resp is Error { + test:assertFail("putCsvAsStream (string[]) failed: " + resp.message()); + } + + // getCsv should return only data rows when targetType is string[][] + string[][] gotAll = check (clientEp)->getCsv(csvPath); + test:assertEquals(gotAll, rows.slice(1), msg = "CSV stream rows mismatch (get all)"); + + stream str = check (clientEp)->getCsvAsStream(csvPath); + string[][] actual = []; + check from string[] row in str + do { + actual.push(row); + }; + test:assertEquals(actual, rows.slice(1), msg = "CSV stream rows mismatch (get stream)"); +} + +type Rec record {| + string name; + int age; +|}; + +type PersonStrict record {| + string name; + int age; +|}; + +type PersonLax record {| + string name; + int? age; +|}; + +@test:Config {dependsOn: [testPutCsvAsStreamWithStringRows]} +function testPutCsvAsStreamWithRecords() returns error? { + string csvPath = "/home/in/csv-stream-records.csv"; + Rec[] records = [ + {name: "Charlie", age: 22}, + {name: "Dana", age: 28} + ]; + + stream s = records.toStream(); + Error? resp = (clientEp)->putCsvAsStream(csvPath, s); + if resp is Error { + test:assertFail("putCsvAsStream (records) failed: " + resp.message()); + } + + Rec[] gotAll = check (clientEp)->getCsv(csvPath); + test:assertEquals(gotAll, records, msg = "CSV record stream mismatch (get all)"); + + stream str = check (clientEp)->getCsvAsStream(csvPath); + Rec[] actual = []; + check from Rec row in str + do { + actual.push(row); + }; + test:assertEquals(actual, records, msg = "CSV record stream mismatch (get stream)"); +} + +@test:Config {dependsOn: [testPutCsvAsStreamWithRecords]} +function testGetBytesAndTextNonExistent() returns error? { + string missing = "/home/in/does-not-exist.txt"; + var r1 = (clientEp)->getBytes(missing); + test:assertTrue(r1 is Error, msg = "getBytes should error for missing file"); + + var r2 = (clientEp)->getText(missing); + test:assertTrue(r2 is Error, msg = "getText should error for missing file"); +} + +// Strict vs lax data binding for JSON +@test:Config {dependsOn: [testGetBytesAndTextNonExistent]} +function testJsonTypedBinding_strict_and_lax() returns error? { + string path = "/home/in/json-typed-projection.json"; + json content = {name: "Alice"}; // missing age + check (clientEp)->putJson(path, content); + + PersonStrict|Error strictRes = (clientEp)->getJson(path); + test:assertTrue(strictRes is Error, msg = "Strict JSON binding should fail when required field absent"); + + PersonLax laxVal = check (clientEpLaxDataBinding)->getJson(path); + test:assertEquals(laxVal.name, "Alice"); + test:assertEquals(laxVal.age is (), true, msg = "Lax binding should map absent field to nil"); +} + +type XPersonStrict record {| + string name; + int age; +|}; + +type XPersonLax record {| + string name; + int age; +|}; + +// Strict vs lax data binding for XML +@test:Config {dependsOn: [testJsonTypedBinding_strict_and_lax]} +function testXmlTypedBinding_strict_and_lax() returns error? { + string path = "/home/in/xml-typed-projection.xml"; + xml x = xml `Alice32
12132131
`; + check (clientEp)->putXml(path, x); + + var strictRes = (clientEp)->getXml(path, XPersonStrict); + test:assertTrue(strictRes is Error, msg = "Strict XML binding should fail when required field absent"); + + XPersonLax laxVal = check (clientEpLaxDataBinding)->getXml(path, XPersonLax); + test:assertEquals(laxVal.name, "Alice"); +} + +type CsvPersonStrict record {| + string name; + int age; +|}; + +type CsvPersonLax record {| + string name; +|}; + +// Strict vs lax data binding for CSV +@test:Config {dependsOn: [testXmlTypedBinding_strict_and_lax]} +function testCsvTypedBinding_strict_and_lax() returns error? { + string csvPath = "/home/in/csv-typed-projection.csv"; + // Write CSV with missing age field for second row + string[][] csvData = [ + ["name", "age"], + ["Alice", "25"], + ["Bob", ""] // Missing age value + ]; + check (clientEp)->putCsv(csvPath, csvData, OVERWRITE); + + // Strict binding should fail when required field is absent + CsvPersonStrict[]|Error strictRes = (clientEp)->getCsv(csvPath); + test:assertTrue(strictRes is Error, msg = "Strict CSV binding should fail when required field absent"); + + // Lax binding should succeed with nil for missing field + CsvPersonLax[] laxVal = check (clientEpLaxDataBinding)->getCsv(csvPath); + test:assertEquals(laxVal.length(), 2, msg = "Should have 2 records"); + test:assertEquals(laxVal[0].name, "Alice"); + test:assertEquals(laxVal[1].name, "Bob"); +} + +// Strict vs lax data binding for CSV with streaming +@test:Config {dependsOn: [testCsvTypedBinding_strict_and_lax]} +function testCsvStreamTypedBinding_strict_and_lax() returns error? { + string csvPath = "/home/in/csv-stream-typed-projection.csv"; + // Write CSV with missing age field + string[][] csvData = [ + ["name", "age"], + ["Charlie", "30"], + ["Diana", ""] // Missing age + ]; + check (clientEp)->putCsv(csvPath, csvData, OVERWRITE); + + // Strict streaming should fail + stream|Error strictStreamRes = (clientEp)->getCsvAsStream(csvPath); + if strictStreamRes is stream { + // Try to consume the stream - should error when hitting the row with missing age + CsvPersonStrict[]|error consumed = from CsvPersonStrict row in strictStreamRes + select row; + test:assertTrue(consumed is error, msg = "Strict CSV stream should error on missing required field"); + } else { + // Also acceptable if the stream creation itself fails + test:assertTrue(true, msg = "Strict CSV stream failed at creation, which is acceptable"); + } + + // Lax streaming should succeed + stream laxStream = check (clientEpLaxDataBinding)->getCsvAsStream(csvPath); + CsvPersonLax[] laxRecords = []; + check from CsvPersonLax row in laxStream + do { + laxRecords.push(row); + }; + + test:assertEquals(laxRecords.length(), 2, msg = "Should have 2 records in lax stream"); + test:assertEquals(laxRecords[0].name, "Charlie"); + test:assertEquals(laxRecords[1].name, "Diana"); +} + +@test:Config {dependsOn: [testCsvStreamTypedBinding_strict_and_lax]} +function testPutTextWithAppendOption() returns error? { + string path = "/home/in/append-option.txt"; + check (clientEp)->putText(path, "Hello", OVERWRITE); + check (clientEp)->putText(path, " + world", APPEND); + string got = check (clientEp)->getText(path); + test:assertEquals(got, "Hello + world", msg = "APPEND option should append content"); +} + +@test:Config {dependsOn: [testPutTextWithAppendOption]} +function testPutCsvWithAppendOption() returns error? { + string path = "/home/in/csv-append.csv"; + string[][] first = [ + ["id", "name"], + ["1", "Alpha"] + ]; + Error? r1 = (clientEp)->putCsv(path, first, OVERWRITE); + if r1 is Error { + test:assertFail("putCsv overwrite failed: " + r1.message()); + } + + // Append data rows (no header) + string[][] more = [["2", "Beta"], ["3", "Gamma"]]; + Error? r2 = (clientEp)->putCsv(path, more, APPEND); + if r2 is Error { + test:assertFail("putCsv append failed: " + r2.message()); + } + + string[][] gotAll = check (clientEp)->getCsv(path); + test:assertEquals(gotAll, [["1", "Alpha"], ["2", "Beta"], ["3", "Gamma"]], + msg = "CSV append should not duplicate header and should add rows"); +} + isolated function matchStreamContent(stream binaryStream, string matchedString) returns boolean|error { string fullContent = ""; string tempContent = ""; diff --git a/build.gradle b/build.gradle index 5f90d3ea7..519f9b836 100644 --- a/build.gradle +++ b/build.gradle @@ -81,6 +81,7 @@ subprojects { ballerinaStdLibs "io.ballerina.lib:data.jsondata-ballerina:${stdlibDataJsonDataVersion}" ballerinaStdLibs "io.ballerina.lib:data.xmldata-ballerina:${stdlibDataXmlDataVersion}" + ballerinaStdLibs "io.ballerina.lib:data.csv-ballerina:${stdlibDataCsvVersion}" } } diff --git a/gradle.properties b/gradle.properties index 38af3dd09..1e351d309 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ org.gradle.caching=true group=io.ballerina.stdlib -version=2.15.2-SNAPSHOT +version=2.16.0-SNAPSHOT checkstylePluginVersion=10.12.0 testngVersion=7.6.1 diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java b/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java index ffff80eec..7ae1596fd 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java @@ -17,12 +17,12 @@ */ package io.ballerina.stdlib.ftp; -import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.ftp.util.FtpUtil; import java.io.IOException; import java.io.InputStream; @@ -30,10 +30,15 @@ import static io.ballerina.stdlib.ftp.util.FtpConstants.ARRAY_SIZE; import static io.ballerina.stdlib.ftp.util.FtpConstants.FIELD_VALUE; +import static io.ballerina.stdlib.ftp.util.FtpConstants.FTP_ERROR; import static io.ballerina.stdlib.ftp.util.FtpUtil.getFtpPackage; public class ContentByteStreamIteratorUtils { + private ContentByteStreamIteratorUtils() { + // private constructor + } + /** * Gets the next chunk of bytes from the stream. * This method is called by Ballerina runtime when iterating the stream. @@ -58,10 +63,10 @@ public static Object next(BObject recordIterator) { returnArray = buffer; } streamEntry.put(FIELD_VALUE, ValueCreator.createArrayValue(returnArray)); + return streamEntry; } catch (IOException e) { - streamEntry.put(FIELD_VALUE, ErrorCreator.createError(StringUtils.fromString("Unable to parse value"))); + return FtpUtil.createError("Unable to parse value", e, FTP_ERROR); } - return streamEntry; } /** @@ -74,7 +79,7 @@ public static Object close(BObject recordIterator) { try { inputStream.close(); } catch (IOException e) { - throw ErrorCreator.createError(StringUtils.fromString("Unable to clean input stream"), e); + return FtpUtil.createError("Unable to clean input stream value", e, FTP_ERROR); } return null; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java b/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java new file mode 100644 index 000000000..ccbff4e37 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/ContentCsvStreamIteratorUtils.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2025 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.ballerina.stdlib.ftp; + +import io.ballerina.lib.data.csvdata.csv.Native; +import io.ballerina.lib.data.csvdata.utils.ModuleUtils; +import io.ballerina.runtime.api.creators.TypeCreator; +import io.ballerina.runtime.api.creators.ValueCreator; +import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BError; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.ftp.util.FtpUtil; + +import java.io.IOException; +import java.io.InputStream; + +import static io.ballerina.stdlib.ftp.util.FtpConstants.FIELD_VALUE; +import static io.ballerina.stdlib.ftp.util.FtpConstants.FTP_ERROR; +import static io.ballerina.stdlib.ftp.util.FtpConstants.NATIVE_INPUT_STREAM; +import static io.ballerina.stdlib.ftp.util.FtpConstants.NATIVE_LAX_DATABINDING; +import static io.ballerina.stdlib.ftp.util.FtpConstants.NATIVE_STREAM_VALUE_TYPE; +import static io.ballerina.stdlib.ftp.util.FtpUtil.getFtpPackage; + +/** + * Iterator utilities for streaming CSV content over an InputStream. + * Parses the CSV once into a BArray on first access and iterates entries subsequently. + */ +public class ContentCsvStreamIteratorUtils { + + private ContentCsvStreamIteratorUtils() { + // private constructor + } + + // Native data keys + private static final String KEY_INDEX = "index"; + private static final String KEY_DATA = "data"; + private static final String KEY_LENGTH = "length"; + + // Record names + private static final String REC_STRING_ARRAY_ENTRY = "ContentCsvStringArrayStreamEntry"; + private static final String REC_RECORD_ENTRY = "ContentCsvRecordStreamEntry"; + + private static final BString PROP_IS_CLOSED = StringUtils.fromString("isClosed"); + + public static Object next(BObject recordIterator) { + final Type elementType = (Type) recordIterator.getNativeData(NATIVE_STREAM_VALUE_TYPE); + final String recordTypeName = resolveRecordTypeName(elementType); + final BMap streamEntry = + ValueCreator.createRecordValue(getFtpPackage(), recordTypeName); + + Object dataIndex = recordIterator.getNativeData(KEY_INDEX); + if (dataIndex == null) { + // First access: parse entire stream into memory (behavior preserved) + final InputStream inputStream = (InputStream) recordIterator.getNativeData(NATIVE_INPUT_STREAM); + if (inputStream == null) { + recordIterator.set(PROP_IS_CLOSED, true); + return FtpUtil.createError("Input stream is not available", FTP_ERROR); + } + + try { + byte[] bytes = inputStream.readAllBytes(); + inputStream.close(); + final boolean laxDataBinding = getBoolean(recordIterator.getNativeData(NATIVE_LAX_DATABINDING)); + + // Build ParseOptions record + BMap parseOptions = + ValueCreator.createRecordValue(ModuleUtils.getModule(), "ParseOptions"); + parseOptions.put(StringUtils.fromString("allowDataProjection"), laxDataBinding); + + // Parse + Object parsed = Native.parseBytes( + ValueCreator.createArrayValue(bytes), + parseOptions, + ValueCreator.createTypedescValue(TypeCreator.createArrayType(elementType))); + + if (parsed instanceof BError) { + recordIterator.set(PROP_IS_CLOSED, true); + return FtpUtil.createError(((BError) parsed).getErrorMessage().getValue(), FTP_ERROR); + } + + if (!(parsed instanceof BArray dataArray)) { + recordIterator.set(PROP_IS_CLOSED, true); + return FtpUtil.createError("Unexpected parse result type", FTP_ERROR); + } + + long length = dataArray.getLength(); + if (length == 0) { + recordIterator.set(PROP_IS_CLOSED, true); + return null; + } + + int index = 0; + // Cache for subsequent iterations + recordIterator.addNativeData(KEY_DATA, dataArray); + recordIterator.addNativeData(KEY_INDEX, index + 1); + recordIterator.addNativeData(KEY_LENGTH, length); + + streamEntry.put(FIELD_VALUE, dataArray.get(index)); + return streamEntry; + } catch (IOException e) { + recordIterator.set(PROP_IS_CLOSED, true); + return FtpUtil.createError("Unable to read input stream: " + e.getMessage(), e, + FTP_ERROR); + } catch (Throwable t) { + recordIterator.set(PROP_IS_CLOSED, true); + return FtpUtil.createError("CSV parsing failed: " + t.getMessage(), t, + FTP_ERROR); + } + } + + // Subsequent access: iterate cached array + int index = (int) dataIndex; + long count = toLong(recordIterator.getNativeData(KEY_LENGTH)); + if (index >= count) { + recordIterator.set(PROP_IS_CLOSED, true); + return null; + } + + BArray dataArray = (BArray) recordIterator.getNativeData(KEY_DATA); + if (dataArray == null) { + recordIterator.set(PROP_IS_CLOSED, true); + return FtpUtil.createError("Iterator state corrupted: data is missing", FTP_ERROR); + } + + recordIterator.addNativeData(KEY_INDEX, index + 1); + streamEntry.put(FIELD_VALUE, dataArray.get(index)); + return streamEntry; + } + + public static Object close(BObject recordIterator) { + InputStream inputStream = (InputStream) recordIterator.getNativeData(NATIVE_INPUT_STREAM); + if (inputStream == null) { + return null; + } + try { + inputStream.close(); + } catch (IOException e) { + return FtpUtil.createError("Unable to clean input stream", e, FTP_ERROR); + } finally { + recordIterator.set(PROP_IS_CLOSED, true); + } + return null; + } + + // -------- helpers -------- + + private static String resolveRecordTypeName(Type type) { + return (type != null && type.getTag() == TypeTags.ARRAY_TAG) + ? REC_STRING_ARRAY_ENTRY + : REC_RECORD_ENTRY; + } + + private static boolean getBoolean(Object value) { + return value instanceof Boolean && (Boolean) value; + } + + private static long toLong(Object value) { + if (value instanceof Long) { + return (Long) value; + } + return 0L; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index b39954b9c..1dc9017eb 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -31,6 +31,7 @@ import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BStream; import io.ballerina.runtime.api.values.BString; import io.ballerina.runtime.api.values.BTypedesc; import io.ballerina.runtime.api.values.BXml; @@ -43,6 +44,7 @@ import io.ballerina.stdlib.ftp.transport.impl.RemoteFileSystemConnectorFactoryImpl; import io.ballerina.stdlib.ftp.transport.message.RemoteFileSystemMessage; import io.ballerina.stdlib.ftp.util.BufferHolder; +import io.ballerina.stdlib.ftp.util.CSVUtils; import io.ballerina.stdlib.ftp.util.FtpConstants; import io.ballerina.stdlib.ftp.util.FtpUtil; import io.ballerina.stdlib.io.utils.IOUtils; @@ -52,12 +54,18 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Enumeration; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import static io.ballerina.stdlib.ftp.util.FtpConstants.ENDPOINT_CONFIG_PREFERRED_METHODS; import static io.ballerina.stdlib.ftp.util.FtpConstants.ENTITY_BYTE_STREAM; +import static io.ballerina.stdlib.ftp.util.FtpConstants.FTP_ERROR; import static io.ballerina.stdlib.ftp.util.FtpConstants.READ_INPUT_STREAM; import static io.ballerina.stdlib.ftp.util.FtpConstants.VFS_CLIENT_CONNECTOR; import static io.ballerina.stdlib.ftp.util.FtpUtil.ErrorType.Error; @@ -132,6 +140,10 @@ public static Object initClientEndpoint(BObject clientEndpoint, BMap { @@ -147,6 +159,10 @@ public static Object getFirst(Environment env, BObject clientConnector, BString }); } + /** + * @deprecated : use getBytesAsStream or getCsvAsStream instead of this legacy accessor. + */ + @Deprecated public static Object get(BObject clientConnector) { return FtpClientHelper.generateInputStreamEntry((InputStream) clientConnector.getNativeData(READ_INPUT_STREAM)); } @@ -154,7 +170,7 @@ public static Object get(BObject clientConnector) { public static Object getBytes(Environment env, BObject clientConnector, BString filePath) { Object content = getAllContent(env, clientConnector, filePath); if (!(content instanceof byte[])) { - throw (BError) content; + return content; } return ValueCreator.createArrayValue((byte[]) content); } @@ -162,15 +178,15 @@ public static Object getBytes(Environment env, BObject clientConnector, BString public static Object getText(Environment env, BObject clientConnector, BString filePath) { Object content = getAllContent(env, clientConnector, filePath); if (!(content instanceof byte[])) { - throw (BError) content; + return content; } - return StringUtils.fromString(new String((byte[]) content)); + return StringUtils.fromString(new String((byte[]) content, StandardCharsets.UTF_8)); } public static Object getJson(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { Object content = getAllContent(env, clientConnector, filePath); if (!(content instanceof byte[])) { - throw (BError) content; + return content; } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); @@ -186,7 +202,7 @@ public static Object getJson(Environment env, BObject clientConnector, BString f } Object bJson = Native.parseBytes(ValueCreator.createArrayValue((byte[]) content), mapValue, typeDesc); if (bJson instanceof BError) { - throw ErrorCreator.createError(((BError) bJson).getErrorMessage()); + return FtpUtil.createError(((BError) bJson).getErrorMessage().getValue(), FTP_ERROR); } return bJson; } @@ -194,15 +210,15 @@ public static Object getJson(Environment env, BObject clientConnector, BString f public static Object getXml(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { Object content = getAllContent(env, clientConnector, filePath); if (!(content instanceof byte[])) { - throw (BError) content; + return content; } if (typeDesc.getDescribingType().getQualifiedName().equals("xml")) { - Object bXml = XmlUtils.parse(StringUtils.fromString(new String((byte[]) content))); - if (bXml instanceof BError) { - throw ErrorCreator.createError(((BError) bXml).getErrorMessage()); + try { + return XmlUtils.parse(StringUtils.fromString(new String((byte[]) content, StandardCharsets.UTF_8))); + } catch (BError e) { + return FtpUtil.createError(e.getErrorMessage().getValue(), FTP_ERROR); } - return bXml; } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); @@ -214,7 +230,7 @@ public static Object getXml(Environment env, BObject clientConnector, BString fi Object bXml = io.ballerina.lib.data.xmldata.xml.Native.parseBytes( ValueCreator.createArrayValue((byte[]) content), mapValue, typeDesc); if (bXml instanceof BError) { - throw ErrorCreator.createError(((BError) bXml).getErrorMessage()); + return FtpUtil.createError(((BError) bXml).getErrorMessage().getValue(), FTP_ERROR); } return bXml; } @@ -222,29 +238,54 @@ public static Object getXml(Environment env, BObject clientConnector, BString fi public static Object getCsv(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { Object content = getAllContent(env, clientConnector, filePath); if (!(content instanceof byte[])) { - throw (BError) content; + return content; } boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); BMap mapValue = ValueCreator.createRecordValue( io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), "ParseOptions"); - mapValue.put(StringUtils.fromString("allowDataProjection"), laxDataBinding); + if (laxDataBinding) { + BMap allowDataProjection = mapValue.getMapValue(StringUtils.fromString("allowDataProjection")); + allowDataProjection.put(StringUtils.fromString("nilAsOptionalField"), Boolean.TRUE); + allowDataProjection.put(StringUtils.fromString("absentAsNilableType"), Boolean.TRUE); + mapValue.put(StringUtils.fromString("allowDataProjection"), allowDataProjection); + } else { + mapValue.put(StringUtils.fromString("allowDataProjection"), Boolean.FALSE); + } Object csv = io.ballerina.lib.data.csvdata.csv.Native.parseBytes( ValueCreator.createArrayValue((byte[]) content), mapValue, typeDesc); if (csv instanceof BError) { - throw ErrorCreator.createError(((BError) csv).getErrorMessage()); + return FtpUtil.createError(((BError) csv).getErrorMessage().getValue(), FTP_ERROR); } return csv; } public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + FtpClientListener connectorListener = new FtpClientListener(balFuture, false, + remoteFileSystemBaseMessage -> + FtpClientHelper.executeStreamingAction(remoteFileSystemBaseMessage, + balFuture, TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE), laxDataBinding)); + VfsClientConnectorImpl connector = (VfsClientConnectorImpl) clientConnector. + getNativeData(VFS_CLIENT_CONNECTOR); + connector.addListener(connectorListener); + connector.send(null, FtpAction.GET, filePath.getValue(), null); + return getResult(balFuture); + }); + } + + public static Object getCsvAsStream(Environment env, BObject clientConnector, BString filePath, + BTypedesc typeDesc) { + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); return env.yieldAndRun(() -> { CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, false, remoteFileSystemBaseMessage -> FtpClientHelper.executeStreamingAction(remoteFileSystemBaseMessage, - balFuture, TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE))); + balFuture, typeDesc.getDescribingType(), laxDataBinding)); VfsClientConnectorImpl connector = (VfsClientConnectorImpl) clientConnector. getNativeData(VFS_CLIENT_CONNECTOR); connector.addListener(connectorListener); @@ -283,6 +324,10 @@ public static Object closeInputByteStream(BObject clientObject) { } } + /** + * @deprecated : use putBytes/putText/putJson/putXml/putCsv or the streaming variants with APPEND option. + */ + @Deprecated public static Object append(Environment env, BObject clientConnector, BMap inputContent) { boolean isFile = inputContent.getBooleanValue(StringUtils.fromString( FtpConstants.INPUT_CONTENT_IS_FILE_KEY)); @@ -309,6 +354,10 @@ public static Object append(Environment env, BObject clientConnector, BMap inputContent) { Map propertyMap = new HashMap<>( (Map) clientConnector.getNativeData(FtpConstants.PROPERTY_MAP)); @@ -372,26 +421,170 @@ public static Object putBytes(Environment env, BObject clientConnector, BString } public static Object putText(Environment env, BObject clientConnector, BString path, BString inputContent, - BString options) { - InputStream stream = new ByteArrayInputStream(inputContent.getValue().getBytes()); + BString options) { + InputStream stream = new ByteArrayInputStream(inputContent.getValue().getBytes(StandardCharsets.UTF_8)); RemoteFileSystemMessage message = new RemoteFileSystemMessage(stream); return putGenericAction(env, clientConnector, path, options, message); } public static Object putJson(Environment env, BObject clientConnector, BString path, BString inputContent, BString options) { - InputStream stream = new ByteArrayInputStream(inputContent.getValue().getBytes()); + InputStream stream = new ByteArrayInputStream(inputContent.getValue().getBytes(StandardCharsets.UTF_8)); RemoteFileSystemMessage message = new RemoteFileSystemMessage(stream); return putGenericAction(env, clientConnector, path, options, message); } public static Object putXml(Environment env, BObject clientConnector, BString path, BXml inputContent, - BString options) { - InputStream stream = new ByteArrayInputStream(inputContent.toString().getBytes()); + BString options) { + InputStream stream = new ByteArrayInputStream(inputContent.toString().getBytes(StandardCharsets.UTF_8)); RemoteFileSystemMessage message = new RemoteFileSystemMessage(stream); return putGenericAction(env, clientConnector, path, options, message); } + public static Object putCsv(Environment env, BObject clientConnector, BString path, BArray inputContent, + BString options) { + boolean addHeader = !options.getValue().equals(FtpConstants.WRITE_OPTION_APPEND); + String convertToCsv = CSVUtils.convertToCsv(inputContent, addHeader); + InputStream stream = new ByteArrayInputStream(convertToCsv.getBytes(StandardCharsets.UTF_8)); + RemoteFileSystemMessage message = new RemoteFileSystemMessage(stream); + return putGenericAction(env, clientConnector, path, options, message); + } + + public static Object putBytesAsStream(Environment env, BObject clientConnector, BString path, BStream inputContent, + BString options) { + try { + InputStream stream = createInputStreamFromIterator(env, inputContent.getIteratorObj()); + RemoteFileSystemMessage message = new RemoteFileSystemMessage(stream); + return putGenericAction(env, clientConnector, path, options, message); + } catch (Exception e) { + return FtpUtil.createError(e.getMessage(), FTP_ERROR); + } + } + + public static Object putCsvAsStream(Environment env, BObject clientConnector, BString path, BStream inputContent, + BString options) { + try { + InputStream stream = createInputStreamFromIterator(env, inputContent.getIteratorObj()); + RemoteFileSystemMessage message = new RemoteFileSystemMessage(stream); + return putGenericAction(env, clientConnector, path, options, message); + } catch (Exception e) { + return FtpUtil.createError(e.getMessage(), FTP_ERROR); + } + } + + /** + * Creates an InputStream from a Ballerina iterator using SequenceInputStream. + */ + private static InputStream createInputStreamFromIterator(Environment environment, BObject iterator) { + IteratorToInputStream streamIterator = new IteratorToInputStream(environment, iterator); + return new SequenceInputStream(asEnumeration(streamIterator)); + } + + /** + * Converts an Iterator to an Enumeration for compatibility with SequenceInputStream. + */ + private static Enumeration asEnumeration(Iterator iterator) { + return new Enumeration() { + @Override + public boolean hasMoreElements() { + return iterator.hasNext(); + } + + @Override + public T nextElement() { + return iterator.next(); + } + }; + } + + /** + * Lightweight adapter that turns a Ballerina iterator into a sequence of InputStreams. + * Keeps state minimal to reduce nesting and cognitive complexity in the outer method. + */ + private static final class IteratorToInputStream implements Iterator { + private final Environment env; + private final BObject iterator; + private InputStream nextStream; + private boolean hasChecked; + private boolean isFirstRow = true; + + IteratorToInputStream(Environment env, BObject iterator) { + this.env = env; + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + if (hasChecked) { + return nextStream != null; + } + nextStream = fetchNextStream(); + hasChecked = true; + return nextStream != null; + } + + @Override + public InputStream next() { + if (!hasChecked && !hasNext()) { + throw new NoSuchElementException(); + } + hasChecked = false; + InputStream result = nextStream; + nextStream = null; + return result; + } + + private InputStream fetchNextStream() { + final Object next; + try { + next = env.getRuntime().callMethod(iterator, "next", null); + } catch (Exception e) { + throw FtpUtil.createError("Failed to read iterator", e, FTP_ERROR); + } + if (next == null) { + return null; + } + if (next instanceof BError err) { + throw FtpUtil.createError("Iterator error: " + err.getMessage(), FTP_ERROR); + } + + byte[] bytes = toBytes(next); + if (bytes.length == 0) { + return null; + } + isFirstRow = false; + return new ByteArrayInputStream(bytes); + } + + private byte[] toBytes(Object value) { + // Each element is a record with a 'value' field. + @SuppressWarnings("unchecked") + BMap streamRecord = (BMap) value; + Object val = streamRecord.get(FtpConstants.FIELD_VALUE); + + if (val instanceof BArray array) { + return bytesFromArray(array); + } + @SuppressWarnings("unchecked") + BMap recordValue = (BMap) val; + return bytesFromRecord(recordValue, isFirstRow); + } + + private static byte[] bytesFromArray(BArray array) { + // If it's a byte[] just return it; else it's CSV row from string[] + if (array.getElementType().getTag() == io.ballerina.runtime.api.types.TypeTags.BYTE_TAG) { + return array.getBytes(); + } + String csvRow = CSVUtils.convertArrayToCsvRow(array) + System.lineSeparator(); + return csvRow.getBytes(StandardCharsets.UTF_8); + } + + private static byte[] bytesFromRecord(BMap balRecord, boolean includeHeader) { + String csvRow = CSVUtils.convertRecordToCsvRow(balRecord, includeHeader) + System.lineSeparator(); + return csvRow.getBytes(StandardCharsets.UTF_8); + } + } + private static Object putGenericAction(Environment env, BObject clientConnector, BString path, BString options, RemoteFileSystemMessage message) { return env.yieldAndRun(() -> { @@ -402,7 +595,7 @@ private static Object putGenericAction(Environment env, BObject clientConnector, = (VfsClientConnectorImpl) clientConnector.getNativeData(VFS_CLIENT_CONNECTOR); connector.addListener(connectorListener); String filePath = path.getValue(); - if (options.getValue().equals("OVERWRITE")) { + if (options.getValue().equals(FtpConstants.WRITE_OPTION_OVERWRITE)) { connector.send(message, FtpAction.PUT, filePath, null); } else { connector.send(message, FtpAction.APPEND, filePath, null); @@ -523,9 +716,10 @@ public static Object getResult(CompletableFuture balFuture) { try { return balFuture.get(); } catch (InterruptedException e) { - throw ErrorCreator.createError(e); + Thread.currentThread().interrupt(); + return ErrorCreator.createError(e); } catch (Throwable throwable) { - throw ErrorCreator.createError(throwable); + return ErrorCreator.createError(throwable); } } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java index ef1567bdc..3d6d5a916 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java @@ -20,12 +20,13 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; -import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; +import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.PredefinedTypes; import io.ballerina.runtime.api.types.StreamType; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BMap; @@ -53,6 +54,7 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -63,6 +65,7 @@ import static io.ballerina.stdlib.ftp.util.FtpConstants.BYTE_STREAM_NEXT_FUNC; import static io.ballerina.stdlib.ftp.util.FtpConstants.ENTITY_BYTE_STREAM; import static io.ballerina.stdlib.ftp.util.FtpConstants.FIELD_VALUE; +import static io.ballerina.stdlib.ftp.util.FtpConstants.FTP_ERROR; import static io.ballerina.stdlib.ftp.util.FtpConstants.READ_INPUT_STREAM; import static io.ballerina.stdlib.ftp.util.FtpConstants.STREAM_ENTRY_RECORD; import static io.ballerina.stdlib.ftp.util.FtpUtil.getFtpPackage; @@ -97,46 +100,63 @@ static boolean executeGetAction(RemoteFileSystemBaseMessage remoteFileSystemBase balFuture.complete(streamEntry); } } catch (IOException e) { - log.error("Error occurred while reading stream: ", e); + log.error("{}", FtpConstants.ERR_READING_STREAM, e); + balFuture.complete(FtpUtil.createError(FtpConstants.ERR_READING_STREAM, e, FTP_ERROR)); } return true; } static boolean executeStreamingAction(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage, - CompletableFuture balFuture, Type streamValueType) { + CompletableFuture balFuture, Type streamValueType, + boolean laxDataBinding) { try { if (remoteFileSystemBaseMessage instanceof RemoteFileSystemMessage) { final InputStream in = ((RemoteFileSystemMessage) remoteFileSystemBaseMessage).getInputStream(); ByteChannel byteChannel = new FtpByteChannel(in); Channel channel = new FtpChannel(byteChannel); InputStream inputStream = channel.getInputStream(); - Object streamEntry = createStreamWithContent(inputStream, streamValueType); + Object streamEntry = createStreamWithContent(inputStream, streamValueType, laxDataBinding); balFuture.complete(streamEntry); } } catch (IOException e) { - log.error("Error occurred while reading stream: ", e); + log.error("{}", FtpConstants.ERR_READING_STREAM, e); + balFuture.complete(FtpUtil.createError(FtpConstants.ERR_READING_STREAM, e, FTP_ERROR)); } return true; } - private static Object createStreamWithContent(InputStream content, Type streamValueType) { + private static Object createStreamWithContent(InputStream content, Type streamValueType, + boolean laxDataBinding) { try { + String streamName = ""; + if (streamValueType.getTag() == TypeTags.ARRAY_TAG) { + if (((ArrayType) streamValueType).getElementType().getTag() == TypeTags.BYTE_TAG) { + streamName = "ContentByteStream"; + } else { + streamName = "ContentCsvStringArrayStream"; + } + } else { + streamName = "ContentCsvRecordStream"; + } + BObject contentByteStreamObject = ValueCreator.createObjectValue( - ModuleUtils.getModule(), "ContentByteStream", null, null + ModuleUtils.getModule(), streamName, null, null ); - contentByteStreamObject.addNativeData("Input_Stream", content); - StreamType streamType = TypeCreator.createStreamType(streamValueType, PredefinedTypes.TYPE_NULL); + contentByteStreamObject.addNativeData(FtpConstants.NATIVE_INPUT_STREAM, content); + contentByteStreamObject.addNativeData(FtpConstants.NATIVE_LAX_DATABINDING, laxDataBinding); + contentByteStreamObject.addNativeData(FtpConstants.NATIVE_STREAM_VALUE_TYPE, streamValueType); + StreamType streamType = TypeCreator.createStreamType(streamValueType, + TypeCreator.createUnionType(PredefinedTypes.TYPE_ERROR, PredefinedTypes.TYPE_NULL)); return ValueCreator.createStreamValue(streamType, contentByteStreamObject); } catch (Exception e) { log.error("Failed to create stream with content", e); - // Fallback to returning byte array if stream creation fails - return ErrorCreator.createError(StringUtils.fromString("Unable to create stream"), e); + return FtpUtil.createError(FtpConstants.ERR_CREATE_STREAM, e, FTP_ERROR); } } static boolean executeGetAllAction(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage, - CompletableFuture balFuture) { + CompletableFuture balFuture) { if (remoteFileSystemBaseMessage instanceof RemoteFileSystemMessage) { byte[] content = ((RemoteFileSystemMessage) remoteFileSystemBaseMessage).getBytesArray(); balFuture.complete(content); @@ -283,10 +303,11 @@ public void close() { } else { String textContent = (inputContent.getStringValue(StringUtils.fromString( FtpConstants.INPUT_CONTENT_TEXT_CONTENT_KEY))).getValue(); - return new ByteArrayInputStream(textContent.getBytes()); + return new ByteArrayInputStream(textContent.getBytes(StandardCharsets.UTF_8)); } } + @SuppressWarnings("unchecked") private static void callStreamNext(Environment env, BObject entity, BufferHolder bufferHolder, BObject iteratorObj) { Object result = env.getRuntime().callMethod(iteratorObj, BYTE_STREAM_NEXT_FUNC, null); @@ -294,7 +315,8 @@ private static void callStreamNext(Environment env, BObject entity, BufferHolder handleStreamEnd(entity, bufferHolder); return; } - BArray arrayValue = ((BMap) result).getArrayValue(FIELD_VALUE); + BMap recordVal = (BMap) result; + BArray arrayValue = recordVal.getArrayValue(FIELD_VALUE); if (arrayValue == null) { handleStreamEnd(entity, bufferHolder); return; diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/CSVUtils.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/CSVUtils.java new file mode 100644 index 000000000..411182d66 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/CSVUtils.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2025 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.ballerina.stdlib.ftp.util; + +import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BString; + +public class CSVUtils { + + private CSVUtils() { + // private constructor + } + + public static String convertToCsv(BArray inputContent, boolean addHeader) { + if (inputContent.isEmpty()) { + return ""; + } + + String lineSeparator = System.lineSeparator(); + StringBuilder csvBuilder = new StringBuilder(); + + // Check the type of the first element + Object firstElement = inputContent.get(0); + + if (firstElement instanceof BArray) { + // Handle string[][] or any[][] - array of arrays + for (int i = 0; i < inputContent.size(); i++) { + BArray row = (BArray) inputContent.get(i); + csvBuilder.append(convertArrayToCsvRow(row)); + csvBuilder.append(lineSeparator); + } + } else { + // Handle record[] - array of records + BMap firstRecord = (BMap) firstElement; + BString[] keys = firstRecord.getKeys(); + + // Add header row with field names + if (addHeader) { + csvBuilder.append(keysToRow(keys)); + csvBuilder.append(lineSeparator); + } + + // Add data rows + for (int i = 0; i < inputContent.size(); i++) { + BMap recordVal = (BMap) inputContent.get(i); + csvBuilder.append(recordToCsvRow(recordVal, keys)); + csvBuilder.append(lineSeparator); + } + } + + return csvBuilder.toString(); + } + + /** + * Converts a BMap (record) to a CSV row with optional header (public method for streaming). + * + * @param balRecord The record to convert + * @param includeHeader If true and this is the first row, includes header row + * @return CSV formatted string + */ + public static String convertRecordToCsvRow(BMap balRecord, boolean includeHeader) { + BString[] keys = balRecord.getKeys(); + StringBuilder result = new StringBuilder(); + + if (includeHeader) { + result.append(keysToRow(keys)); + result.append(System.lineSeparator()); + } + + result.append(recordToCsvRow(balRecord, keys)); + return result.toString(); + } + + /** + * Converts an array of keys to a CSV row. + */ + private static String keysToRow(BString[] keys) { + StringBuilder row = new StringBuilder(); + for (int i = 0; i < keys.length; i++) { + row.append(escapeCsvValue(keys[i].getValue())); + if (i < keys.length - 1) { + row.append(","); + } + } + return row.toString(); + } + + /** + * Converts a BArray to a CSV row. + */ + public static String convertArrayToCsvRow(BArray array) { + StringBuilder row = new StringBuilder(); + for (int i = 0; i < array.size(); i++) { + Object value = array.get(i); + row.append(escapeCsvValue(valueToString(value))); + if (i < array.size() - 1) { + row.append(","); + } + } + return row.toString(); + } + + /** + * Converts a BMap (record) to a CSV row using the specified key order. + */ + private static String recordToCsvRow(BMap balRecord, BString[] keys) { + StringBuilder row = new StringBuilder(); + for (int i = 0; i < keys.length; i++) { + Object value = balRecord.get(keys[i]); + row.append(escapeCsvValue(valueToString(value))); + if (i < keys.length - 1) { + row.append(","); + } + } + return row.toString(); + } + + /** + * Converts a value to its string representation. + */ + private static String valueToString(Object value) { + if (value == null) { + return ""; + } + if (value instanceof BString) { + return ((BString) value).getValue(); + } + return value.toString(); + } + + /** + * Escapes a CSV value according to RFC 4180. + * - Wrap in quotes if contains comma, quote, or newline + * - Escape internal quotes by doubling them + */ + private static String escapeCsvValue(String value) { + if (value == null || value.isEmpty()) { + return ""; + } + + boolean needsQuotes = value.contains(",") || value.contains("\"") || + value.contains("\n") || value.contains("\r"); + + if (needsQuotes) { + // Escape internal quotes by doubling them + String escaped = value.replace("\"", "\"\""); + return "\"" + escaped + "\""; + } + + return value; + } + +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index fe841df1c..0c821517b 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -56,6 +56,7 @@ private FtpConstants() { public static final BString FIELD_VALUE = StringUtils.fromString("value"); public static final String FTP_CALLER = "Caller"; public static final String FTP_CLIENT = "Client"; + public static final String FTP_ERROR = "Error"; public static final String FTP_SERVER_EVENT = "WatchEvent"; public static final String FTP_WATCHEVENT_ADDED_FILES = "addedFiles"; @@ -100,4 +101,17 @@ private FtpConstants() { public static final String BALLERINA_FTP_PACKAGE_NAME = "io.ballerina.stdlib.ftp"; public static final String NO_AUTH_METHOD_ERROR = "No preferred auth method is specified in the " + "SFTP client configurations"; + + // Native data keys used in client helper streams + public static final String NATIVE_INPUT_STREAM = "Input_Stream"; + public static final String NATIVE_LAX_DATABINDING = "Lax_Data_Binding"; + public static final String NATIVE_STREAM_VALUE_TYPE = "Type"; + + // Write options (string value of Ballerina enum) + public static final String WRITE_OPTION_OVERWRITE = "OVERWRITE"; + public static final String WRITE_OPTION_APPEND = "APPEND"; + + // Common error messages + public static final String ERR_READING_STREAM = "Error occurred while reading stream"; + public static final String ERR_CREATE_STREAM = "Unable to create stream"; }