diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 4fb2ee3a5..3de2b07b2 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -79,6 +79,16 @@ dependencies { externalJars(group: 'commons-io', name: 'commons-io', version: "${commonsIoVersion}") { transitive = false } + + externalJars(group: 'io.ballerina.lib', name: 'data.jsondata-native', version: "${stdlibDataJsonDataVersion}") { + transitive = false + } + externalJars(group: 'io.ballerina.lib', name: 'data.xmldata-native', version: "${stdlibDataXmlDataVersion}") { + transitive = false + } + externalJars(group: 'io.ballerina.lib', name: 'data.csv-native', version: "${stdlibDataCsvVersion}") { + transitive = false + } } task updateTomlFiles { @@ -110,6 +120,9 @@ task updateTomlFiles { newConfig = newConfig.replace("@jcl.slf4j.version@", stdlibDependentJclSlf4jVersion) newConfig = newConfig.replace("@commons.io.version@", stdlibDependentCommonsIoVersion) newConfig = newConfig.replace("@commons.lang3.version@", stdlibDependentCommonsLang3Version) + newConfig = newConfig.replace("@data.jsondata.version@", stdlibDataJsonDataVersion) + newConfig = newConfig.replace("@data.xmldata.version@", stdlibDataXmlDataVersion) + newConfig = newConfig.replace("@data.csv.version@", stdlibDataCsvVersion) ballerinaTomlFile.text = newConfig diff --git a/ballerina/client_endpoint.bal b/ballerina/client_endpoint.bal index c6923dbda..7ceff8074 100644 --- a/ballerina/client_endpoint.bal +++ b/ballerina/client_endpoint.bal @@ -15,6 +15,9 @@ // under the License. import ballerina/io; +import ballerina/jballerina.java; +import ballerina/data.jsondata as _; +import ballerina/data.xmldata; # Represents an FTP client that intracts with an FTP server public isolated client class Client { @@ -37,8 +40,11 @@ public isolated client class Client { # stream|ftp:Error channel = client->get(path); # ``` # + # Deprecated: Use `getText` or `getJson` instead for retrieving file content. + # # + path - The resource path # + return - A byte stream from which the file can be read or `ftp:Error` in case of errors + @deprecated remote isolated function get(string path) returns stream|Error { ByteStream|Error byteStream = new (self, path); if byteStream is ByteStream { @@ -49,15 +55,94 @@ public isolated client class Client { } + # Retrieves the file content as bytes from a remote resource. + # ```ballerina + # byte[] content = check client->getBytes(path); + # ``` + # + # + path - The resource path + # + return - content as a byte array or `ftp:Error` in case of errors + remote isolated function getBytes(string path) returns byte[]|Error { + return getBytes(self, path); + } + + # Retrieves the file content as text from a remote resource. + # ```ballerina + # string content = check client->getText(path); + # ``` + # + # + path - The resource path + # + return - content as a string or `ftp:Error` in case of errors + remote isolated function getText(string path) returns string|Error { + return getText(self, path); + } + + # Retrieves the file content as json from a remote resource. + # ```ballerina + # json content = check client->getJson(path); + # ``` + # + # + path - The resource path + # + targetType - The target type of the json content + # (e.g., `json`, `record {}`, or a user-defined record type) + # + return - content as a json or `ftp:Error` in case of errors + remote isolated function getJson(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { + name: "getJson", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + + # Retrieves the file content as xml from a remote resource. + # ```ballerina + # xml content = check client->getXml(path); + # ``` + # + # + path - The resource path + # + targetType - The target type of the xml content + # (e.g., `xml`, `record {}`, or a user-defined record type) + # + return - content as a xml or `ftp:Error` in case of errors + remote isolated function getXml(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { + name: "getXml", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + + # Fetches file content from the FTP server as CSV. + # When the expected data type is record[], the first entry of the csv file should contain matching headers. + # ```ballerina + # string[][] content = check client->getCsv(path); + # ``` + # + # + path - The path to the file on the FTP server + # + targetType - The target type of the CSV content + # (e.g., `string[][]`, `record {}[]`, or an array of user-defined record type) + # + return - content as a string[][] or `ftp:Error` in case of errors + remote isolated function getCsv(string path, typedesc targetType = <>) returns targetType|Error = @java:Method { + name: "getCsv", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + + # Retrieves the file content as a byte stream from a remote resource. + # ```ballerina + # stream response = check client->getBytesAsStream(path); + # ``` + # + # + path - The path to the file on the FTP server + # + return - A byte stream from which the file can be read or `ftp:Error` in case of errors + remote isolated function getBytesAsStream(string path) returns stream|Error = @java:Method { + name: "getBytesAsStream", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + # Appends the content to an existing file in an FTP server. # ```ballerina # ftp:Error? response = client->append(path, channel); # ``` + # Deprecated: Use `putText`, `putJson`, `putXml`, `putCsv`, `putBytes` or `putCsvFromStream` # # + path - The resource path # + content - Content to be written to the file in server # + return - `()` or else an `ftp:Error` if failed to establish # the communication with the FTP server + @deprecated remote isolated function append(string path, stream|string|xml|json content) returns Error? { return append(self, getInputContent(path, content)); @@ -67,6 +152,7 @@ public isolated client class Client { # ```ballerina # ftp:Error? response = client->put(path, channel); # ``` + # Deprecated: Use `putText`, `putJson`, `putXml`, `putCsv`, `putBytes` or `putCsvFromStream` # # + path - The resource path # + content - Content to be written to the file in server @@ -75,6 +161,7 @@ public isolated client class Client { # uploading # + return - `()` or else an `ftp:Error` if failed to establish # the communication with the FTP server + @deprecated remote isolated function put(string path, stream |string|xml|json content, Compression compressionType = NONE) returns Error? { boolean compress = false; @@ -84,6 +171,65 @@ public isolated client class Client { return put(self, getInputContent(path, content, compress)); } + # Adds a byte array as a file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putBytes(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putBytes(string path, byte[] content, FileWriteOption option = OVERWRITE) returns Error? = @java:Method { + name: "putBytes", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + + # Adds a file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putText(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putText(string path, string content, FileWriteOption option = OVERWRITE) returns Error? = @java:Method { + name: "putText", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" + } external; + + # Adds a file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putJson(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putJson(string path, json|record{} content, FileWriteOption option = OVERWRITE) returns Error? { + string jsonVal = content is json ? content.toJsonString() : content.toJsonString(); + return putJson(self, path, jsonVal, option); + } + + # Adds a file to an FTP server with the specified write option. + # ```ballerina + # ftp:Error? response = client->putXml(path, content, option); + # ``` + # + # + path - The resource path + # + content - Content to be written to the file in server + # + option - To indicate whether to overwrite or append the given content + # + return - `()` or else an `ftp:Error` if failed to write + remote isolated function putXml(string path, xml|record{} content, FileWriteOption option = OVERWRITE) returns Error? { + xml|error xmldata = content is xml ? content : xmldata:toXml(content); + if xmldata is error { + return error Error("Failed to convert record to XML: " + xmldata.message()); + } + return putXml(self, path, xmldata, option); + } + # Creates a new directory in an FTP server. # ```ballerina # ftp:Error? response = client->mkdir(path); @@ -171,6 +317,15 @@ public isolated client class Client { } } +# Represents a file opening options for writing. +# +# + OVERWRITE - Overwrite(truncate the existing content) +# + APPEND - Append to the existing content +public enum FileWriteOption { + OVERWRITE, + APPEND +} + # Compression type. # # + ZIP - Zip compression @@ -190,12 +345,16 @@ public enum Compression { # prevents the underlying VFS from attempting to change to the actual server root. # If `false`, treats the actual server root as `/`, which may cause a `CWD /` command # that can fail on servers restricting root access (e.g., chrooted environments). +# + laxDataBinding - If set to `true`, enables relaxed data binding for XML and JSON responses. +# null values in JSON/XML are allowed to be mapped to optional fields +# missing fields in JSON/XML are allowed to be mapped as null values public type ClientConfiguration record {| Protocol protocol = FTP; string host = "127.0.0.1"; int port = 21; AuthConfiguration auth?; boolean userDirIsRoot = false; + boolean laxDataBinding = false; |}; isolated function getInputContent(string path, stream|string|xml|json content, diff --git a/ballerina/content_byte_stream.bal b/ballerina/content_byte_stream.bal new file mode 100644 index 000000000..df03c38b3 --- /dev/null +++ b/ballerina/content_byte_stream.bal @@ -0,0 +1,70 @@ +// Copyright (c) 2025 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/jballerina.java; + +# Represents the type of the record which returned from the contentByteStream.next() call. +# +# + value - The array of byte +public type ContentStreamEntry record {| + byte[] value; +|}; + +# `ContentByteStream` used to initialize a stream of type byte[] for content callbacks. +# This stream wraps byte array content and provides it as chunks. +public class ContentByteStream { + + private boolean isClosed = false; + private Error? err; + + public isolated function init(Error? err = ()) { + self.err = err; + } + + # Reads and return the next `byte[]` chunk of the stream. + # + # + return - A `record` of `byte[]`s when the stream is available, + # `()` if the stream has reached the end or else an `error` + public isolated function next() returns record {|byte[] value;|}|error? { + return externGetContentStreamEntry(self); + } + + # Closes the stream. The primary usage of this function is to close the stream without reaching the end. + # If the stream reaches the end, the `contentByteStream.next` will automatically close the stream. + # + # + return - Returns `()` when the closing was successful or an `error` + public isolated function close() returns error? { + if !self.isClosed { + var closeResult = externCloseContentStream(self); + if closeResult is () { + self.isClosed = true; + } + return closeResult; + } + return (); + } +} + +isolated function externGetContentStreamEntry(ContentByteStream iterator) + returns record {|byte[] value;|}|error? = @java:Method { + 'class: "io.ballerina.stdlib.ftp.ContentByteStreamIteratorUtils", + name: "next" +} external; + +isolated function externCloseContentStream(ContentByteStream iterator) returns error? = @java:Method { + 'class: "io.ballerina.stdlib.ftp.ContentByteStreamIteratorUtils", + name: "close" +} external; diff --git a/ballerina/external_functions.bal b/ballerina/external_functions.bal index 908b33faf..ab15529b7 100644 --- a/ballerina/external_functions.bal +++ b/ballerina/external_functions.bal @@ -21,6 +21,16 @@ isolated function initEndpoint(Client clientEndpoint, map config) retur 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; +isolated function getBytes(Client clientEndpoint, string path) returns byte[]|Error = @java:Method { + name: "getBytes", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" +} external; + +isolated function getText(Client clientEndpoint, string path) returns string|Error = @java:Method { + name: "getText", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" +} external; + isolated function delete(Client clientEndpoint, string path) returns Error? = @java:Method { name: "delete", 'class: "io.ballerina.stdlib.ftp.client.FtpClient" @@ -36,6 +46,16 @@ isolated function put(Client clientEndpoint, InputContent inputContent) returns 'class: "io.ballerina.stdlib.ftp.client.FtpClient" } external; +isolated function putJson(Client clientEndpoint, string path, string content, FileWriteOption option) returns Error? = @java:Method { + name: "putJson", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" +} external; + +isolated function putXml(Client clientEndpoint, string path, xml content, FileWriteOption option) returns Error? = @java:Method { + name: "putXml", + 'class: "io.ballerina.stdlib.ftp.client.FtpClient" +} external; + isolated function mkdir(Client clientEndpoint, string path) returns Error? = @java:Method { name: "mkdir", 'class: "io.ballerina.stdlib.ftp.client.FtpClient" diff --git a/ballerina/tests/client_endpoint_test.bal b/ballerina/tests/client_endpoint_test.bal index 2e8e48316..b4c7b137d 100644 --- a/ballerina/tests/client_endpoint_test.bal +++ b/ballerina/tests/client_endpoint_test.bal @@ -242,6 +242,75 @@ public function testPutCompressedFileContent() returns error? { } } +@test:Config { dependsOn: [testPutFileContent]} +function testPutBytes() returns error? { + byte[] content = "hello-bytes".toBytes(); + string path = "/home/in/put-bytes.txt"; + + check (clientEp)->putBytes(path, content); + byte[] got = check (clientEp)->getBytes(path); + + test:assertEquals(got.length(), content.length(), msg = "Byte length mismatch"); + foreach int i in 0 ..< content.length() { + test:assertEquals(got[i], content[i], msg = "Byte content mismatch at index " + i.toString()); + } +} + +@test:Config { dependsOn: [testPutFileContent]} +function testGetBytesAsStream() returns error? { + byte[] content = "hello-bytes".toBytes(); + string path = "/home/in/put-bytes.txt"; + + check (clientEp)->putBytes(path, content); + stream got = check (clientEp)->getBytesAsStream(path); + byte[] accumulatedBytes = []; + check from byte[] byteChunk in got + do { + accumulatedBytes.push(...byteChunk); + }; + + test:assertEquals(accumulatedBytes.length(), content.length(), msg = "Byte length mismatch"); + foreach int i in 0 ..< content.length() { + test:assertEquals(accumulatedBytes[i], content[i], msg = "Byte content mismatch at index " + i.toString()); + } +} + +@test:Config { dependsOn: [testPutFileContent]} +function testPutJson() returns error? { + json j = { name: "wso2", count: 2, ok: true }; + string path = "/home/in/put.json"; + + check (clientEp)->putJson(path, j); + json got = check (clientEp)->getJson(path); + + test:assertEquals(got, j, msg = "JSON content mismatch"); +} + +@test:Config { dependsOn: [testPutFileContent]} +function testPutXml() returns error? { + xml x = xml`42`; + string path = "/home/in/put.xml"; + + check (clientEp)->putXml(path, x); + xml got = check (clientEp)->getXml(path); + + // Compare string representations for stability + test:assertEquals(got.toString(), x.toString(), msg = "XML content mismatch"); + return; +} + +@test:Config { dependsOn: [testPutFileContent]} +function testPutText() returns error? { + string txt = "hello text content"; + string path = "/home/in/put.txt"; + + check (clientEp)->putText(path, txt); + string got = check (clientEp)->getText(path); + + test:assertEquals(got, txt, msg = "Text content mismatch"); + return; +} + @test:Config {} function testFtpUserDirIsRootTrue() returns error? { stream|Error res = (ftpUserHomeRootClientEp)->get("test1.txt"); diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index 4002bc2e3..4b892c86c 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -47,3 +47,22 @@ groupId = "io.ballerina.stdlib" artifactId = "ftp-native" version = "@toml.version@" path = "../native/build/libs/ftp-native-@project.version@.jar" + +[[platform.java21.dependency]] +groupId = "io.ballerina.lib" +artifactId = "data.jsondata-native" +version = "@data.jsondata.version@" +path = "./lib/data.jsondata-native-@data.jsondata.version@.jar" + +[[platform.java21.dependency]] +groupId = "io.ballerina.lib" +artifactId = "data.xmldata-native" +version = "@data.xmldata.version@" +path = "./lib/data.xmldata-native-@data.xmldata.version@.jar" + + +[[platform.java21.dependency]] +groupId = "io.ballerina.lib" +artifactId = "data.csv-native" +version = "@data.csv.version@" +path = "./lib/data.csv-native-@data.csv.version@.jar" diff --git a/build.gradle b/build.gradle index e3089d31c..5f90d3ea7 100644 --- a/build.gradle +++ b/build.gradle @@ -78,6 +78,9 @@ subprojects { ballerinaStdLibs "io.ballerina.stdlib:time-ballerina:${stdlibTimeVersion}" ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}" ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}" + + ballerinaStdLibs "io.ballerina.lib:data.jsondata-ballerina:${stdlibDataJsonDataVersion}" + ballerinaStdLibs "io.ballerina.lib:data.xmldata-ballerina:${stdlibDataXmlDataVersion}" } } diff --git a/gradle.properties b/gradle.properties index 794d5a3b6..ca61906ea 100644 --- a/gradle.properties +++ b/gradle.properties @@ -36,3 +36,6 @@ stdlibIoVersion=1.8.0 stdlibTimeVersion=2.7.0 observeVersion=1.5.0 observeInternalVersion=1.5.0 +stdlibDataJsonDataVersion=1.1.2 +stdlibDataXmlDataVersion=1.5.2 +stdlibDataCsvVersion=0.8.1 diff --git a/native/build.gradle b/native/build.gradle index 18322e4dd..93265d64e 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -35,6 +35,9 @@ dependencies { implementation group: 'org.apache.commons', name: 'commons-vfs2', version: "${commonsVfsVersion}" implementation group: 'org.apache.commons', name: 'commons-lang3', version: "${commonsLang3Version}" implementation group: 'commons-io', name: 'commons-io', version: "${commonsIoVersion}" + implementation group: 'io.ballerina.lib', name: 'data.jsondata-native', version: "${stdlibDataJsonDataVersion}" + implementation group: 'io.ballerina.lib', name: 'data.xmldata-native', version: "${stdlibDataXmlDataVersion}" + implementation group: 'io.ballerina.lib', name: 'data.csv-native', version: "${stdlibDataCsvVersion}" } checkstyle { diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java b/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java new file mode 100644 index 000000000..ffff80eec --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/ContentByteStreamIteratorUtils.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2025 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.ballerina.stdlib.ftp; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.creators.ValueCreator; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.values.BMap; +import io.ballerina.runtime.api.values.BObject; +import io.ballerina.runtime.api.values.BString; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import static io.ballerina.stdlib.ftp.util.FtpConstants.ARRAY_SIZE; +import static io.ballerina.stdlib.ftp.util.FtpConstants.FIELD_VALUE; +import static io.ballerina.stdlib.ftp.util.FtpUtil.getFtpPackage; + +public class ContentByteStreamIteratorUtils { + + /** + * Gets the next chunk of bytes from the stream. + * This method is called by Ballerina runtime when iterating the stream. + * + * @return The next record with byte array value, or null if stream is exhausted + */ + public static Object next(BObject recordIterator) { + InputStream inputStream = (InputStream) recordIterator.getNativeData("Input_Stream"); + BMap streamEntry = ValueCreator.createRecordValue(getFtpPackage(), "ContentStreamEntry"); + try { + byte[] buffer = new byte[ARRAY_SIZE]; + int readNumber = inputStream.read(buffer); + if (readNumber == -1) { + inputStream.close(); + recordIterator.set(StringUtils.fromString("isClosed"), true); + return null; + } + byte[] returnArray; + if (readNumber < ARRAY_SIZE) { + returnArray = Arrays.copyOfRange(buffer, 0, readNumber); + } else { + returnArray = buffer; + } + streamEntry.put(FIELD_VALUE, ValueCreator.createArrayValue(returnArray)); + } catch (IOException e) { + streamEntry.put(FIELD_VALUE, ErrorCreator.createError(StringUtils.fromString("Unable to parse value"))); + } + return streamEntry; + } + + /** + * Closes the stream iterator. + * + * @return null (no error) + */ + public static Object close(BObject recordIterator) { + InputStream inputStream = (InputStream) recordIterator.getNativeData("Input_Stream"); + try { + inputStream.close(); + } catch (IOException e) { + throw ErrorCreator.createError(StringUtils.fromString("Unable to clean input stream"), e); + } + return null; + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java index e74115721..b39954b9c 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClient.java @@ -18,12 +18,22 @@ package io.ballerina.stdlib.ftp.client; +import io.ballerina.lib.data.jsondata.json.Native; import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.Module; import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.creators.TypeCreator; +import io.ballerina.runtime.api.creators.ValueCreator; +import io.ballerina.runtime.api.types.PredefinedTypes; import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.utils.XmlUtils; +import io.ballerina.runtime.api.values.BArray; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.runtime.api.values.BTypedesc; +import io.ballerina.runtime.api.values.BXml; import io.ballerina.stdlib.ftp.exception.BallerinaFtpException; import io.ballerina.stdlib.ftp.exception.RemoteFileSystemConnectorException; import io.ballerina.stdlib.ftp.transport.RemoteFileSystemConnectorFactory; @@ -67,6 +77,11 @@ private FtpClient() { public static Object initClientEndpoint(BObject clientEndpoint, BMap config) { String protocol = (config.getStringValue(StringUtils.fromString(FtpConstants.ENDPOINT_CONFIG_PROTOCOL))) .getValue(); + + // Keep databinding config for later + clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING, + config.getBooleanValue(StringUtils.fromString(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING))); + Map authMap = FtpUtil.getAuthMap(config, protocol); clientEndpoint.addNativeData(FtpConstants.ENDPOINT_CONFIG_USERNAME, authMap.get(FtpConstants.ENDPOINT_CONFIG_USERNAME)); @@ -136,6 +151,122 @@ public static Object get(BObject clientConnector) { return FtpClientHelper.generateInputStreamEntry((InputStream) clientConnector.getNativeData(READ_INPUT_STREAM)); } + public static Object getBytes(Environment env, BObject clientConnector, BString filePath) { + Object content = getAllContent(env, clientConnector, filePath); + if (!(content instanceof byte[])) { + throw (BError) content; + } + return ValueCreator.createArrayValue((byte[]) content); + } + + public static Object getText(Environment env, BObject clientConnector, BString filePath) { + Object content = getAllContent(env, clientConnector, filePath); + if (!(content instanceof byte[])) { + throw (BError) content; + } + return StringUtils.fromString(new String((byte[]) content)); + } + + public static Object getJson(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { + Object content = getAllContent(env, clientConnector, filePath); + if (!(content instanceof byte[])) { + throw (BError) content; + } + + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); + BMap mapValue = ValueCreator.createRecordValue( + io.ballerina.lib.data.ModuleUtils.getModule(), "Options"); + if (laxDataBinding) { + BMap allowDataProjection = mapValue.getMapValue(StringUtils.fromString("allowDataProjection")); + allowDataProjection.put(StringUtils.fromString("nilAsOptionalField"), Boolean.TRUE); + allowDataProjection.put(StringUtils.fromString("absentAsNilableType"), Boolean.TRUE); + mapValue.put(StringUtils.fromString("allowDataProjection"), allowDataProjection); + } else { + mapValue.put(StringUtils.fromString("allowDataProjection"), Boolean.FALSE); + } + Object bJson = Native.parseBytes(ValueCreator.createArrayValue((byte[]) content), mapValue, typeDesc); + if (bJson instanceof BError) { + throw ErrorCreator.createError(((BError) bJson).getErrorMessage()); + } + return bJson; + } + + public static Object getXml(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { + Object content = getAllContent(env, clientConnector, filePath); + if (!(content instanceof byte[])) { + throw (BError) content; + } + + if (typeDesc.getDescribingType().getQualifiedName().equals("xml")) { + Object bXml = XmlUtils.parse(StringUtils.fromString(new String((byte[]) content))); + if (bXml instanceof BError) { + throw ErrorCreator.createError(((BError) bXml).getErrorMessage()); + } + return bXml; + } + + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); + BMap mapValue = ValueCreator.createRecordValue( + new Module("ballerina", "data.xmldata", "1"), + "SourceOptions"); + mapValue.put(StringUtils.fromString("allowDataProjection"), laxDataBinding); + + Object bXml = io.ballerina.lib.data.xmldata.xml.Native.parseBytes( + ValueCreator.createArrayValue((byte[]) content), mapValue, typeDesc); + if (bXml instanceof BError) { + throw ErrorCreator.createError(((BError) bXml).getErrorMessage()); + } + return bXml; + } + + public static Object getCsv(Environment env, BObject clientConnector, BString filePath, BTypedesc typeDesc) { + Object content = getAllContent(env, clientConnector, filePath); + if (!(content instanceof byte[])) { + throw (BError) content; + } + + boolean laxDataBinding = (boolean) clientConnector.getNativeData(FtpConstants.ENDPOINT_CONFIG_LAX_DATABINDING); + BMap mapValue = ValueCreator.createRecordValue( + io.ballerina.lib.data.csvdata.utils.ModuleUtils.getModule(), "ParseOptions"); + mapValue.put(StringUtils.fromString("allowDataProjection"), laxDataBinding); + + Object csv = io.ballerina.lib.data.csvdata.csv.Native.parseBytes( + ValueCreator.createArrayValue((byte[]) content), mapValue, typeDesc); + if (csv instanceof BError) { + throw ErrorCreator.createError(((BError) csv).getErrorMessage()); + } + return csv; + } + + public static Object getBytesAsStream(Environment env, BObject clientConnector, BString filePath) { + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + FtpClientListener connectorListener = new FtpClientListener(balFuture, false, + remoteFileSystemBaseMessage -> + FtpClientHelper.executeStreamingAction(remoteFileSystemBaseMessage, + balFuture, TypeCreator.createArrayType(PredefinedTypes.TYPE_BYTE))); + VfsClientConnectorImpl connector = (VfsClientConnectorImpl) clientConnector. + getNativeData(VFS_CLIENT_CONNECTOR); + connector.addListener(connectorListener); + connector.send(null, FtpAction.GET, filePath.getValue(), null); + return getResult(balFuture); + }); + } + + private static Object getAllContent(Environment env, BObject clientConnector, BString filePath) { + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + FtpClientListener connectorListener = new FtpClientListener(balFuture, false, + remoteFileSystemBaseMessage -> FtpClientHelper.executeGetAllAction(remoteFileSystemBaseMessage, + balFuture)); + VfsClientConnectorImpl connector = (VfsClientConnectorImpl) clientConnector. + getNativeData(VFS_CLIENT_CONNECTOR); + connector.addListener(connectorListener); + connector.send(null, FtpAction.GET_ALL, filePath.getValue(), null); + return getResult(balFuture); + }); + } + public static Object closeInputByteStream(BObject clientObject) { InputStream readInputStream = (InputStream) clientObject.getNativeData(READ_INPUT_STREAM); if (readInputStream != null) { @@ -233,6 +364,53 @@ public static Object put(Environment env, BObject clientConnector, BMap { + CompletableFuture balFuture = new CompletableFuture<>(); + FtpClientListener connectorListener = new FtpClientListener(balFuture, true, + remoteFileSystemBaseMessage -> FtpClientHelper.executeGenericAction()); + VfsClientConnectorImpl connector + = (VfsClientConnectorImpl) clientConnector.getNativeData(VFS_CLIENT_CONNECTOR); + connector.addListener(connectorListener); + String filePath = path.getValue(); + if (options.getValue().equals("OVERWRITE")) { + connector.send(message, FtpAction.PUT, filePath, null); + } else { + connector.send(message, FtpAction.APPEND, filePath, null); + } + return getResult(balFuture); + }); + } + public static Object delete(Environment env, BObject clientConnector, BString filePath) { return env.yieldAndRun(() -> { CompletableFuture balFuture = new CompletableFuture<>(); @@ -299,7 +477,7 @@ public static Object rename(Environment env, BObject clientConnector, BString or return FtpUtil.createError(e.getMessage(), Error.errorType()); } return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); + CompletableFuture balFuture = new CompletableFuture<>(); FtpClientListener connectorListener = new FtpClientListener(balFuture, true, remoteFileSystemBaseMessage -> FtpClientHelper.executeGenericAction()); VfsClientConnectorImpl connector = (VfsClientConnectorImpl) clientConnector. diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java index 02250d2b5..ef1567bdc 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/client/FtpClientHelper.java @@ -20,8 +20,12 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; +import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; +import io.ballerina.runtime.api.types.PredefinedTypes; +import io.ballerina.runtime.api.types.StreamType; +import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BMap; @@ -35,6 +39,7 @@ import io.ballerina.stdlib.ftp.util.BufferHolder; import io.ballerina.stdlib.ftp.util.FtpConstants; import io.ballerina.stdlib.ftp.util.FtpUtil; +import io.ballerina.stdlib.ftp.util.ModuleUtils; import io.ballerina.stdlib.io.channels.base.Channel; import org.apache.commons.vfs2.FileSystemException; import org.slf4j.Logger; @@ -97,6 +102,48 @@ static boolean executeGetAction(RemoteFileSystemBaseMessage remoteFileSystemBase return true; } + static boolean executeStreamingAction(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage, + CompletableFuture balFuture, Type streamValueType) { + try { + if (remoteFileSystemBaseMessage instanceof RemoteFileSystemMessage) { + final InputStream in = ((RemoteFileSystemMessage) remoteFileSystemBaseMessage).getInputStream(); + ByteChannel byteChannel = new FtpByteChannel(in); + Channel channel = new FtpChannel(byteChannel); + InputStream inputStream = channel.getInputStream(); + Object streamEntry = createStreamWithContent(inputStream, streamValueType); + balFuture.complete(streamEntry); + } + } catch (IOException e) { + log.error("Error occurred while reading stream: ", e); + } + return true; + } + + private static Object createStreamWithContent(InputStream content, Type streamValueType) { + try { + BObject contentByteStreamObject = ValueCreator.createObjectValue( + ModuleUtils.getModule(), "ContentByteStream", null, null + ); + contentByteStreamObject.addNativeData("Input_Stream", content); + StreamType streamType = TypeCreator.createStreamType(streamValueType, PredefinedTypes.TYPE_NULL); + return ValueCreator.createStreamValue(streamType, contentByteStreamObject); + } catch (Exception e) { + log.error("Failed to create stream with content", e); + // Fallback to returning byte array if stream creation fails + return ErrorCreator.createError(StringUtils.fromString("Unable to create stream"), e); + } + } + + + static boolean executeGetAllAction(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage, + CompletableFuture balFuture) { + if (remoteFileSystemBaseMessage instanceof RemoteFileSystemMessage) { + byte[] content = ((RemoteFileSystemMessage) remoteFileSystemBaseMessage).getBytesArray(); + balFuture.complete(content); + } + return true; + } + public static BMap generateInputStreamEntry(InputStream inputStream) { BMap streamEntry = ValueCreator.createRecordValue(getFtpPackage(), STREAM_ENTRY_RECORD); try { @@ -167,7 +214,7 @@ static boolean executeListAction(RemoteFileSystemBaseMessage remoteFileSystemBas final BMap ballerinaFileInfo = ValueCreator.createRecordValue( new Module(FtpConstants.FTP_ORG_NAME, FtpConstants.FTP_MODULE_NAME, - FtpUtil.getFtpPackage().getMajorVersion()), FtpConstants.FTP_FILE_INFO, fileInfoParams); + getFtpPackage().getMajorVersion()), FtpConstants.FTP_FILE_INFO, fileInfoParams); arrayValue.add(i++, ballerinaFileInfo); } balFuture.complete(arrayValue); diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contract/FtpAction.java b/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contract/FtpAction.java index c4fef06e0..4d0524d79 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contract/FtpAction.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contract/FtpAction.java @@ -22,5 +22,5 @@ * {@link VfsClientConnector} supported FTP actions. */ public enum FtpAction { - APPEND, MKDIR, RMDIR, PUT, GET, DELETE, RENAME, SIZE, LIST, ISDIR + APPEND, MKDIR, RMDIR, PUT, GET, GET_ALL, DELETE, RENAME, SIZE, LIST, ISDIR } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contractimpl/VfsClientConnectorImpl.java b/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contractimpl/VfsClientConnectorImpl.java index 6b83f0d72..d9bb89a22 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contractimpl/VfsClientConnectorImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/transport/client/connector/contractimpl/VfsClientConnectorImpl.java @@ -190,6 +190,16 @@ public void send(RemoteFileSystemMessage message, FtpAction action, String fileP + " not found"); } break; + case GET_ALL: + if (fileObject.exists()) { + byte[] content = fileObject.getContent().getByteArray(); + remoteFileSystemListener.onMessage(new RemoteFileSystemMessage(content)); + } else { + throw new RemoteFileSystemConnectorException( + "Failed to read file: " + maskUrlPassword(fileObject.getName().getURI()) + + " not found"); + } + break; case SIZE: remoteFileSystemListener.onMessage(new RemoteFileSystemMessage(fileObject.getContent().getSize())); break; diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/transport/message/RemoteFileSystemMessage.java b/native/src/main/java/io/ballerina/stdlib/ftp/transport/message/RemoteFileSystemMessage.java index 5d620645b..90833efb9 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/transport/message/RemoteFileSystemMessage.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/transport/message/RemoteFileSystemMessage.java @@ -28,6 +28,7 @@ */ public class RemoteFileSystemMessage extends RemoteFileSystemBaseMessage { + private byte[] bytesArray; private ByteBuffer bytes; private InputStream inputStream; private String text; @@ -51,6 +52,10 @@ public RemoteFileSystemMessage(String text) { this.text = text; } + public RemoteFileSystemMessage(byte[] bytesArray) { + this.bytesArray = bytesArray; + } + public RemoteFileSystemMessage(long size) { this.size = size; } @@ -67,6 +72,10 @@ public InputStream getInputStream() { return inputStream; } + public byte[] getBytesArray() { + return bytesArray; + } + public String getText() { return text; } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index 58057af70..0e48032a5 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -78,6 +78,7 @@ private FtpConstants() { public static final String ENDPOINT_CONFIG_CREDENTIALS = "credentials"; public static final String ENDPOINT_CONFIG_PRIVATE_KEY = "privateKey"; public static final String ENDPOINT_CONFIG_PREFERRED_METHODS = "preferredMethods"; + public static final String ENDPOINT_CONFIG_LAX_DATABINDING = "laxDataBinding"; public static final String INPUT_CONTENT_FILE_PATH_KEY = "filePath"; public static final String INPUT_CONTENT_IS_FILE_KEY = "isFile"; diff --git a/native/src/main/java/module-info.java b/native/src/main/java/module-info.java index 7ac071612..b83a28836 100644 --- a/native/src/main/java/module-info.java +++ b/native/src/main/java/module-info.java @@ -24,6 +24,9 @@ requires org.slf4j; requires java.logging; requires org.apache.commons.vfs2; + requires io.ballerina.lib.data; + requires io.ballerina.lib.data.xmldata; + requires io.ballerina.lib.data.csvdata; exports io.ballerina.stdlib.ftp.client; exports io.ballerina.stdlib.ftp.server; exports io.ballerina.stdlib.ftp.util;