Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
de3e2fb
Add enableCsvFailSafe field into configs
Nuvindu Dec 2, 2025
e733b31
Update native code to incorporate csv fail safe support in ftp APIs
Nuvindu Dec 2, 2025
3b1439b
Update test case to validate csv fail safe support in ftp listener
Nuvindu Dec 2, 2025
19f0a9c
Add resource for fail safe test cases
Nuvindu Dec 2, 2025
2694f9d
Add uuid dependency
Nuvindu Dec 2, 2025
47ceffd
Update csv data dependency version
Nuvindu Dec 2, 2025
3927790
Update toml files
Nuvindu Dec 2, 2025
affc2e1
[Automated] Update the native jar versions
Nuvindu Dec 3, 2025
b1da130
Add test cases to validate the fail safe support
Nuvindu Dec 3, 2025
a94e6d0
Merge branch 'master' of https://github.com/ballerina-platform/module…
Nuvindu Dec 7, 2025
4ff8ad3
[Automated] Update the native jar versions
Nuvindu Dec 7, 2025
5da7747
Add new configs to csv fail safe
Nuvindu Dec 7, 2025
4a71a9f
Update native code for new design changes in fail safe support
Nuvindu Dec 7, 2025
67574b0
Fix configs in fail safe test cases
Nuvindu Dec 8, 2025
f50a9cd
Add file name prefix to the fail safe error log file
Nuvindu Dec 8, 2025
e8fe7ad
Remove generated error files in fail safe test cases
Nuvindu Dec 8, 2025
787f2cd
Add docs for the csv fail safe support
Nuvindu Dec 8, 2025
d1da848
Merge branch 'master' of https://github.com/ballerina-platform/module…
Nuvindu Dec 22, 2025
d9534f5
[Automated] Update the native jar versions
Nuvindu Dec 22, 2025
c83fffd
Fix the field name in csv fail safe config
Nuvindu Dec 22, 2025
1a34c7e
Add the data csv timestamped version
Nuvindu Dec 22, 2025
2f35440
Merge branch 'fail-safe-support' of https://github.com/Nuvindu/module…
Nuvindu Dec 22, 2025
24fe18e
Add file separator for a path
Nuvindu Dec 22, 2025
b1d47fc
[Automated] Update the native jar versions
Nuvindu Dec 22, 2025
ff919c6
[Automated] Update the native jar versions
Nuvindu Dec 23, 2025
f0c4f8f
Update log version to 2.14.0
Nuvindu Dec 23, 2025
ed83980
Fix file path in a test case
Nuvindu Dec 23, 2025
b7e2350
Update dependency versions
Nuvindu Dec 23, 2025
e28f6ac
[Automated] Update the native jar versions
Nuvindu Dec 23, 2025
b3b651a
Fix test failures
Nuvindu Dec 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,36 @@ service on remoteServer {

The FTP listener automatically routes files to the appropriate content handler based on file extension: `.txt` → `onFileText()`, `.json` → `onFileJson()`, `.xml` → `onFileXml()`, `.csv` → `onFileCsv()`, and other extensions → `onFile()` (fallback handler). You can override the default routing using the `@ftp:FunctionConfig` annotation to specify a custom file name pattern for each handler method.

##### CSV fail-safe mode

The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`<filename>_error.log`), allowing the processing to continue without interruption.

This is only supported in `onFileCsv` trigger methods in ftp listener.

**Listener Configuration**:

```ballerina
listener ftp:Listener remoteServer = check new({
protocol: ftp:FTP,
host: "<The FTP host>",
port: <The FTP port>,
path: "<The remote FTP directory location>",
pollingInterval: <Polling interval>,
auth: {
credentials: {username: "<The FTP username>", password: "<The FTP password>"}
},
csvFailSafe: {
contentType: ftp:RAW_AND_METADATA
}
});
```

The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered.

- `METADATA` (default) - Logs only metadata about the error (location, error message)
- `RAW` - Logs the raw content of the malformed record (offending row)
- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging

### Secure access with SFTP

SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol.
Expand Down
4 changes: 2 additions & 2 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ path = "./lib/data.xmldata-native-1.5.2.jar"
[[platform.java21.dependency]]
groupId = "io.ballerina.lib"
artifactId = "data.csv-native"
version = "0.8.1"
path = "./lib/data.csv-native-0.8.1.jar"
version = "0.9.0-SNAPSHOT"
path = "./lib/data.csv-native-0.9.0-SNAPSHOT.jar"
161 changes: 153 additions & 8 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,25 @@
dependencies-toml-version = "2"
distribution-version = "2201.12.0"

[[package]]
org = "ballerina"
name = "crypto"
version = "2.9.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerina", packageName = "crypto", moduleName = "crypto"}
]

[[package]]
org = "ballerina"
name = "data.csv"
version = "0.8.1"
version = "0.9.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "log"}
]
modules = [
{org = "ballerina", packageName = "data.csv", moduleName = "data.csv"}
Expand Down Expand Up @@ -42,18 +55,39 @@ modules = [
{org = "ballerina", packageName = "data.xmldata", moduleName = "data.xmldata"}
]

[[package]]
org = "ballerina"
name = "file"
version = "1.11.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "os"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerina", packageName = "file", moduleName = "file"}
]

[[package]]
org = "ballerina"
name = "ftp"
version = "2.16.0"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "data.csv"},
{org = "ballerina", name = "data.jsondata"},
{org = "ballerina", name = "data.xmldata"},
{org = "ballerina", name = "file"},
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.runtime"},
{org = "ballerina", name = "lang.string"},
{org = "ballerina", name = "log"},
{org = "ballerina", name = "task"}
{org = "ballerina", name = "task"},
{org = "ballerina", name = "test"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerina", packageName = "ftp", moduleName = "ftp"}
Expand All @@ -79,11 +113,83 @@ modules = [
{org = "ballerina", packageName = "jballerina.java", moduleName = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.__internal"
version = "0.0.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.object"}
]

[[package]]
org = "ballerina"
name = "lang.array"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.__internal"}
]

[[package]]
org = "ballerina"
name = "lang.error"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.int"
version = "0.0.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.__internal"},
{org = "ballerina", name = "lang.object"}
]

[[package]]
org = "ballerina"
name = "lang.object"
version = "0.0.0"

[[package]]
org = "ballerina"
name = "lang.regexp"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.runtime"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
modules = [
{org = "ballerina", packageName = "lang.runtime", moduleName = "lang.runtime"}
]

[[package]]
org = "ballerina"
name = "lang.string"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.regexp"}
]
modules = [
{org = "ballerina", packageName = "lang.string", moduleName = "lang.string"}
]

[[package]]
org = "ballerina"
name = "lang.value"
Expand All @@ -95,7 +201,7 @@ dependencies = [
[[package]]
org = "ballerina"
name = "log"
version = "2.12.0"
version = "2.14.0"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
Expand All @@ -109,28 +215,67 @@ modules = [
[[package]]
org = "ballerina"
name = "observe"
version = "1.5.1"
version = "1.6.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "os"
version = "1.9.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "task"
version = "2.7.0"
version = "2.11.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "time"}
{org = "ballerina", name = "time"},
{org = "ballerina", name = "uuid"}
]
modules = [
{org = "ballerina", packageName = "task", moduleName = "task"}
]

[[package]]
org = "ballerina"
name = "test"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.array"},
{org = "ballerina", name = "lang.error"}
]
modules = [
{org = "ballerina", packageName = "test", moduleName = "test"}
]

[[package]]
org = "ballerina"
name = "time"
version = "2.7.0"
version = "2.8.0"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
modules = [
{org = "ballerina", packageName = "time", moduleName = "time"}
]

[[package]]
org = "ballerina"
name = "uuid"
version = "1.10.0"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.int"},
{org = "ballerina", name = "time"}
]

30 changes: 30 additions & 0 deletions ballerina/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,36 @@ service on remoteServer {

The FTP listener automatically routes files to the appropriate content handler based on file extension: `.txt` → `onFileText()`, `.json` → `onFileJson()`, `.xml` → `onFileXml()`, `.csv` → `onFileCsv()`, and other extensions → `onFile()` (fallback handler). You can override the default routing using the `@ftp:FunctionConfig` annotation to specify a custom file name pattern for each handler method.

##### CSV fail-safe mode

The FTP listener supports a fail-safe mode for CSV file processing When enabled, malformed CSV records are automatically skipped and written to a separate error log file in the current directory (`<filename>_error.log`), allowing the processing to continue without interruption.

This is only supported in `onFileCsv` trigger methods in ftp listener.

**Listener Configuration**:

```ballerina
listener ftp:Listener remoteServer = check new({
protocol: ftp:FTP,
host: "<The FTP host>",
port: <The FTP port>,
path: "<The remote FTP directory location>",
pollingInterval: <Polling interval>,
auth: {
credentials: {username: "<The FTP username>", password: "<The FTP password>"}
},
csvFailSafe: {
contentType: ftp:RAW_AND_METADATA
}
});
```

The `contentType` field in `csvFailSafe` specifies what information to log when a malformed CSV record is encountered.

- `METADATA` (default) - Logs only metadata about the error (location, error message)
- `RAW` - Logs the raw content of the malformed record (offending row)
- `RAW_AND_METADATA` - Logs both the raw content and metadata for comprehensive debugging

### Secure access with SFTP

SFTP is a secure protocol alternative to the FTP, which runs on top of the SSH protocol.
Expand Down
3 changes: 3 additions & 0 deletions ballerina/client_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ public enum Compression {
# + sftpCompression - Compression algorithms (SFTP only)
# + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only)
# + proxy - Proxy configuration for SFTP connections (SFTP only)
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# malformed CSV records are skipped and written to a separate file in the current directory
public type ClientConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -451,6 +453,7 @@ public type ClientConfiguration record {|
FtpFileTransfer ftpFileTransfer = BINARY;
TransferCompression[] sftpCompression = [NO];
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
|};

isolated function getInputContent(string path, stream<byte[] & readonly, io:Error?>|string|xml|json content,
Expand Down
17 changes: 17 additions & 0 deletions ballerina/listener_endpoint.bal
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ class Job {
# + sftpCompression - Compression algorithms (SFTP only)
# + sftpSshKnownHosts - Path to SSH known_hosts file (SFTP only)
# + proxy - Proxy configuration for SFTP connections (SFTP only)
# + csvFailSafe - Configuration for fail-safe CSV content processing. In the fail-safe mode,
# malformed CSV records are skipped and written to a separate file in the current directory
public type ListenerConfiguration record {|
Protocol protocol = FTP;
string host = "127.0.0.1";
Expand All @@ -204,8 +206,23 @@ public type ListenerConfiguration record {|
FtpFileTransfer ftpFileTransfer = BINARY;
TransferCompression[] sftpCompression = [NO];
string sftpSshKnownHosts?;
FailSafeOptions csvFailSafe?;
|};

# Fail-safe options for CSV content processing.
#
# + contentType - Specifies the type of content to log in case of errors
public type FailSafeOptions record {|
ErrorLogContentType contentType = METADATA;
|};

# Specifies the type of content to log in case of errors during fail-safe CSV processing.
public enum ErrorLogContentType {
METADATA,
RAW,
RAW_AND_METADATA
};

# FTP service for handling file system change events.
public type Service distinct service object {
};
Loading
Loading