Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
de3e2fb
Add enableCsvFailSafe field into configs
Nuvindu Dec 2, 2025
e733b31
Update native code to incorporate csv fail safe support in ftp APIs
Nuvindu Dec 2, 2025
3b1439b
Update test case to validate csv fail safe support in ftp listener
Nuvindu Dec 2, 2025
19f0a9c
Add resource for fail safe test cases
Nuvindu Dec 2, 2025
2694f9d
Add uuid dependency
Nuvindu Dec 2, 2025
47ceffd
Update csv data dependency version
Nuvindu Dec 2, 2025
3927790
Update toml files
Nuvindu Dec 2, 2025
affc2e1
[Automated] Update the native jar versions
Nuvindu Dec 3, 2025
b1da130
Add test cases to validate the fail safe support
Nuvindu Dec 3, 2025
a94e6d0
Merge branch 'master' of https://github.com/ballerina-platform/module…
Nuvindu Dec 7, 2025
4ff8ad3
[Automated] Update the native jar versions
Nuvindu Dec 7, 2025
5da7747
Add new configs to csv fail safe
Nuvindu Dec 7, 2025
4a71a9f
Update native code for new design changes in fail safe support
Nuvindu Dec 7, 2025
67574b0
Fix configs in fail safe test cases
Nuvindu Dec 8, 2025
f50a9cd
Add file name prefix to the fail safe error log file
Nuvindu Dec 8, 2025
e8fe7ad
Remove generated error files in fail safe test cases
Nuvindu Dec 8, 2025
787f2cd
Add docs for the csv fail safe support
Nuvindu Dec 8, 2025
d1da848
Merge branch 'master' of https://github.com/ballerina-platform/module…
Nuvindu Dec 22, 2025
d9534f5
[Automated] Update the native jar versions
Nuvindu Dec 22, 2025
c83fffd
Fix the field name in csv fail safe config
Nuvindu Dec 22, 2025
1a34c7e
Add the data csv timestamped version
Nuvindu Dec 22, 2025
2f35440
Merge branch 'fail-safe-support' of https://github.com/Nuvindu/module…
Nuvindu Dec 22, 2025
24fe18e
Add file separator for a path
Nuvindu Dec 22, 2025
b1d47fc
[Automated] Update the native jar versions
Nuvindu Dec 22, 2025
ff919c6
[Automated] Update the native jar versions
Nuvindu Dec 23, 2025
f0c4f8f
Update log version to 2.14.0
Nuvindu Dec 23, 2025
ed83980
Fix file path in a test case
Nuvindu Dec 23, 2025
b7e2350
Update dependency versions
Nuvindu Dec 23, 2025
e28f6ac
[Automated] Update the native jar versions
Nuvindu Dec 23, 2025
b3b651a
Fix test failures
Nuvindu Dec 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,36 @@ listener ftp:Listener ftpsListener = check new({
});
```

##### CSV fail-safe mode

The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`<filename>_error.log`), allowing the processing to continue without interruption.

This is only supported in `onFileCsv` trigger methods in ftp listener.

**Listener Configuration**:

```ballerina
listener ftp:Listener remoteServer = check new({
protocol: ftp:FTP,
host: "<The FTP host>",
port: <The FTP port>,
path: "<The remote FTP directory location>",
pollingInterval: <Polling interval>,
auth: {
credentials: {username: "<The FTP username>", password: "<The FTP password>"}
},
csvFailSafe: {
contentType: ftp:RAW_AND_METADATA
}
});
```

The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered.

- `METADATA` (default) - Logs only metadata about the error (location, error message)
- `RAW` - Logs the raw content of the malformed record (offending row)
- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging

### Secure access with SFTP

SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol.
Expand Down
11 changes: 5 additions & 6 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerina"
name = "ftp"
version = "2.16.0"
version = "2.16.1"
authors = ["Ballerina"]
keywords = ["FTP", "SFTP", "remote file", "file transfer", "client", "service"]
repository = "https://github.com/ballerina-platform/module-ballerina-ftp"
Expand Down Expand Up @@ -45,8 +45,8 @@ path = "./lib/commons-lang3-3.18.0.jar"
[[platform.java21.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "ftp-native"
version = "2.16.0"
path = "../native/build/libs/ftp-native-2.16.0.jar"
version = "2.16.1"
path = "../native/build/libs/ftp-native-2.16.1-SNAPSHOT.jar"

[[platform.java21.dependency]]
groupId = "io.ballerina.lib"
Expand All @@ -60,9 +60,8 @@ artifactId = "data.xmldata-native"
version = "1.5.2"
path = "./lib/data.xmldata-native-1.5.2.jar"


[[platform.java21.dependency]]
groupId = "io.ballerina.lib"
artifactId = "data.csv-native"
version = "0.8.1"
path = "./lib/data.csv-native-0.8.1.jar"
version = "0.9.0-20251209-183500-7b18d54"
path = "./lib/data.csv-native-0.9.0-20251209-183500-7b18d54.jar"
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "ftp-compiler-plugin"
class = "io.ballerina.stdlib.ftp.plugin.FtpCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.0.jar"
path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.1-SNAPSHOT.jar"
41 changes: 34 additions & 7 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ distribution-version = "2201.12.0"
[[package]]
org = "ballerina"
name = "crypto"
version = "2.9.2"
version = "2.10.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"}
Expand All @@ -22,9 +22,10 @@ modules = [
[[package]]
org = "ballerina"
name = "data.csv"
version = "0.8.1"
version = "0.9.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "log"}
]
modules = [
{org = "ballerina", packageName = "data.csv", moduleName = "data.csv"}
Expand Down Expand Up @@ -54,15 +55,31 @@ modules = [
{org = "ballerina", packageName = "data.xmldata", moduleName = "data.xmldata"}
]

[[package]]
org = "ballerina"
name = "file"
version = "1.11.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "os"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerina", packageName = "file", moduleName = "file"}
]

[[package]]
org = "ballerina"
name = "ftp"
version = "2.16.0"
version = "2.16.1"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "data.csv"},
{org = "ballerina", name = "data.jsondata"},
{org = "ballerina", name = "data.xmldata"},
{org = "ballerina", name = "file"},
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.runtime"},
Expand Down Expand Up @@ -184,7 +201,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "log"
version = "2.13.0"
version = "2.14.0"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
Expand All @@ -203,10 +220,20 @@ dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "os"
version = "1.10.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "task"
version = "2.10.0"
version = "2.11.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"},
Expand All @@ -233,7 +260,7 @@ modules = [
[[package]]
org = "ballerina"
name = "time"
version = "2.7.0"
version = "2.8.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand Down
30 changes: 30 additions & 0 deletions ballerina/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,36 @@ listener ftp:Listener ftpsListener = check new({
});
```

##### CSV fail-safe mode

The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`<filename>_error.log`), allowing the processing to continue without interruption.

This is only supported in `onFileCsv` trigger methods in ftp listener.

**Listener Configuration**:

```ballerina
listener ftp:Listener remoteServer = check new({
protocol: ftp:FTP,
host: "<The FTP host>",
port: <The FTP port>,
path: "<The remote FTP directory location>",
pollingInterval: <Polling interval>,
auth: {
credentials: {username: "<The FTP username>", password: "<The FTP password>"}
},
csvFailSafe: {
contentType: ftp:RAW_AND_METADATA
}
});
```

The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered.

- `METADATA` (default) - Logs only metadata about the error (location, error message)
- `RAW` - Logs the raw content of the malformed record (offending row)
- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging

### Secure access with SFTP

SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol.
Expand Down
71 changes: 71 additions & 0 deletions ballerina/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,76 @@ task startTestUtilAsDaemon(type: com.github.psxpaul.task.ExecFork) {
standardOutput = new File("$buildDir/tempUtilLogs.log")
}

task waitForFtpsTestServer() {
dependsOn startTestUtilAsDaemon
doLast {
def host = "127.0.0.1"
def explicitPort = 21214
def implicitPort = 21217

def connectTimeoutMs = 500
def readTimeoutMs = 500
def maxWaitMs = 30_000
def pollIntervalMs = 500

def isExplicitFtpsReady = { ->
java.net.Socket socket = null
try {
socket = new java.net.Socket()
socket.connect(new java.net.InetSocketAddress(host, explicitPort as int), connectTimeoutMs)
socket.setSoTimeout(readTimeoutMs)
def reader = new java.io.BufferedReader(new java.io.InputStreamReader(socket.getInputStream()))
def line = reader.readLine()
try {
if (line != null && line.startsWith("220")) {
def out = new java.io.OutputStreamWriter(socket.getOutputStream())
out.write("QUIT\r\n")
out.flush()
}
} catch (Exception ignored) {
// ignore
}
return (line != null && line.startsWith("220"))
} catch (Exception ignored) {
return false
} finally {
try {
socket?.close()
} catch (Exception ignored) {
// ignore
}
}
}

def isTcpPortOpen = { int port ->
java.net.Socket socket = null
try {
socket = new java.net.Socket()
socket.connect(new java.net.InetSocketAddress(host, port as int), connectTimeoutMs)
return true
} catch (Exception ignored) {
return false
} finally {
try {
socket?.close()
} catch (Exception ignored) {
// ignore
}
}
}

long start = System.currentTimeMillis()
while (System.currentTimeMillis() - start < maxWaitMs) {
if (isExplicitFtpsReady() && isTcpPortOpen(implicitPort)) {
return
}
sleep(pollIntervalMs)
}

throw new GradleException("FTPS mock server was not ready on ${host}:${explicitPort} after waiting ${maxWaitMs / 1000} seconds")
}
}

publishing {
publications {
maven(MavenPublication) {
Expand Down Expand Up @@ -188,6 +258,7 @@ test.dependsOn ":${packageName}-native:build"
populateTestUtilExecutableName.dependsOn ":${packageName}-test-utils:build"
startTestUtilAsDaemon.dependsOn populateTestUtilExecutableName
test.dependsOn startTestUtilAsDaemon
test.dependsOn waitForFtpsTestServer
test.dependsOn ":${packageName}-compiler-plugin:build"

publishToMavenLocal.dependsOn build
Expand Down
3 changes: 3 additions & 0 deletions ballerina/client_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ public enum Compression {
# + sftpCompression - Compression algorithms (SFTP only)
# + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only)
# + proxy - Proxy configuration for SFTP connections (SFTP only)
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# malformed CSV records are skipped and written to a separate file in the current directory
public type ClientConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -463,6 +465,7 @@ public type ClientConfiguration record {|
FileTransferMode fileTransferMode = BINARY;
TransferCompression[] sftpCompression = [NO];
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
|};

isolated function getInputContent(string path, stream<byte[] & readonly, io:Error?>|string|xml|json content,
Expand Down
17 changes: 17 additions & 0 deletions ballerina/listener_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class Job {
# + sftpCompression - Compression algorithms (SFTP only)
# + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only)
# + proxy - Proxy configuration for SFTP connections (SFTP only)
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# malformed CSV records are skipped and written to a separate file in the current directory
public type ListenerConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -190,8 +192,23 @@ public type ListenerConfiguration record {|
FileTransferMode fileTransferMode = BINARY;
TransferCompression[] sftpCompression = [NO];
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
|};

# Fail-safe options for CSV content processing.
#
# + contentType - Specifies the type of content to log in case of errors
public type FailSafeOptions record {|
ErrorLogContentType contentType = METADATA;
|};

# Specifies the type of content to log in case of errors during fail-safe CSV processing.
public enum ErrorLogContentType {
METADATA,
RAW,
RAW_AND_METADATA
};

# FTP service for handling file system change events.
public type Service distinct service object {
};
Loading