diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 3bd71237..151bcd38 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerina" name = "ftp" -version = "2.16.1" +version = "2.17.0" authors = ["Ballerina"] keywords = ["FTP", "SFTP", "remote file", "file transfer", "client", "service"] repository = "https://github.com/ballerina-platform/module-ballerina-ftp" @@ -45,8 +45,8 @@ path = "./lib/commons-lang3-3.18.0.jar" [[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "ftp-native" -version = "2.16.1" -path = "../native/build/libs/ftp-native-2.16.1-SNAPSHOT.jar" +version = "2.17.0" +path = "../native/build/libs/ftp-native-2.17.0-SNAPSHOT.jar" [[platform.java21.dependency]] groupId = "io.ballerina.lib" diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index d5859e71..9bc037a4 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "ftp-compiler-plugin" class = "io.ballerina.stdlib.ftp.plugin.FtpCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.16.1-SNAPSHOT.jar" +path = "../compiler-plugin/build/libs/ftp-compiler-plugin-2.17.0-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index af691651..0a55d029 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -73,7 +73,7 @@ modules = [ [[package]] org = "ballerina" name = "ftp" -version = "2.16.1" +version = "2.17.0" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "data.csv"}, diff --git a/ballerina/annotations.bal b/ballerina/annotations.bal index 346c0963..58d33244 100644 --- a/ballerina/annotations.bal +++ b/ballerina/annotations.bal @@ -14,17 +14,43 @@ // specific language governing permissions and limitations // under the License. +# Represents the delete action for file processing. +# When specified, the file will be deleted after processing. +public const DELETE = "DELETE"; + +# Configuration for moving a file after processing. +# +# + moveTo - Destination directory path where the file will be moved +# + preserveSubDirs - If true, preserves the subdirectory structure relative to the +# listener's root path. Defaults to true. +public type Move record {| + string moveTo; + boolean preserveSubDirs = true; +|}; + +# Type alias for Move record, used in union types for post-processing actions. +public type MOVE Move; + # Configuration for FTP service remote functions. -# Use this to override default file extension routing for content methods. +# Use this to override default file extension routing for content methods and +# specify automatic file actions after processing. # # + fileNamePattern - File name pattern (regex) that should be routed to this method. # Must be a subset of the listener's `fileNamePattern`. +# + afterProcess - Action to perform after successful processing. Can be DELETE or MOVE. +# If not specified, no action is taken (file remains in place). +# + afterError - Action to perform after processing error. Can be DELETE or MOVE. +# This action is executed immediately after the content handler returns an error or panics. +# If not specified, no action is taken (file remains in place). public type FtpFunctionConfig record {| - string fileNamePattern; + string fileNamePattern?; + MOVE|DELETE afterProcess?; + MOVE|DELETE afterError?; |}; # Annotation to configure FTP service remote functions. -# This can be used to specify which file patterns should be handled by a particular content method. +# This can be used to specify which file patterns should be handled by a particular content method +# and what actions to perform after processing. public annotation FtpFunctionConfig FunctionConfig on service remote function; # Configuration for FTP service monitoring. diff --git a/ballerina/tests/client_endpoint_test.bal b/ballerina/tests/client_endpoint_test.bal index 9faa29be..755bacfd 100644 --- a/ballerina/tests/client_endpoint_test.bal +++ b/ballerina/tests/client_endpoint_test.bal @@ -1222,7 +1222,10 @@ public function testListFiles() { "/home/in/sc-route-a": 0, "/home/in/sc-route-b": 0, "/home/in/sc-single": 0, - "/home/in/sc-legacy": 0 + "/home/in/sc-legacy": 0, + "/home/in/post-process": 0, + "/home/in/post-process-archive": 0, + "/home/in/post-process-error": 0 }; FileInfo[]|Error response = (clientEp)->list("/home/in"); if response is FileInfo[] { diff --git a/ballerina/tests/listener_content_methods_test.bal b/ballerina/tests/listener_content_methods_test.bal index dfa0e8b6..8883e5a5 100644 --- a/ballerina/tests/listener_content_methods_test.bal +++ b/ballerina/tests/listener_content_methods_test.bal @@ -71,15 +71,15 @@ PersonRecord? xmlRecordTypeReceived = (); int xmlRecordFilesProcessed = 0; // Test data file paths -const string JSON_TEST_FILE = "tests/resources/datafiles/test_data.json"; -const string XML_TEST_FILE = "tests/resources/datafiles/test_data.xml"; -const string CSV_TEST_FILE = "tests/resources/datafiles/test_data.csv"; -const string CSV_TEST_FILE_WITH_ERROR = "tests/resources/datafiles/test_data_with_error.csv"; -const string TEXT_TEST_FILE = "tests/resources/datafiles/test_text.txt"; -const string GENERIC_TEST_FILE = "tests/resources/datafiles/file2.txt"; +const JSON_TEST_FILE = "tests/resources/datafiles/test_data.json"; +const XML_TEST_FILE = "tests/resources/datafiles/test_data.xml"; +const CSV_TEST_FILE = "tests/resources/datafiles/test_data.csv"; +const CSV_TEST_FILE_WITH_ERROR = "tests/resources/datafiles/test_data_with_error.csv"; +const TEXT_TEST_FILE = "tests/resources/datafiles/test_text.txt"; +const GENERIC_TEST_FILE = "tests/resources/datafiles/file2.txt"; // Isolated directory for content listener tests to avoid interfering with testListFiles -const string CONTENT_TEST_DIR = "/home/in/content-methods"; +const CONTENT_TEST_DIR = "/home/in/content-methods"; @test:Config { } diff --git a/ballerina/tests/listener_on_error_test.bal b/ballerina/tests/listener_on_error_test.bal index 8b3573bf..b0e58413 100644 --- a/ballerina/tests/listener_on_error_test.bal +++ b/ballerina/tests/listener_on_error_test.bal @@ -1,16 +1,16 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the +// KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. @@ -32,7 +32,7 @@ string lastErrorFilePath = ""; int onErrorInvocationCount = 0; // Directory for onError tests -const string ON_ERROR_TEST_DIR = "/home/in/onerror-tests"; +const ON_ERROR_TEST_DIR = "/home/in/onerror-tests"; @test:Config { enable: true diff --git a/ballerina/tests/listener_on_file_deleted_test.bal b/ballerina/tests/listener_on_file_deleted_test.bal index b3a85918..e6f48bdc 100644 --- a/ballerina/tests/listener_on_file_deleted_test.bal +++ b/ballerina/tests/listener_on_file_deleted_test.bal @@ -27,9 +27,9 @@ boolean deleteCallerEventReceived = false; string[] deletedFilesWithCaller = []; // Test file paths -const string DELETE_TEST_FILE_PATH = "tests/resources/datafiles/file2.txt"; +const DELETE_TEST_FILE_PATH = "tests/resources/datafiles/file2.txt"; // Isolated directory for file deletion tests -const string DELETE_TEST_DIR = "/home/in/delete"; +const DELETE_TEST_DIR = "/home/in/delete"; @test:Config { groups: ["onDelete"] diff --git a/ballerina/tests/post_process_action_test.bal b/ballerina/tests/post_process_action_test.bal new file mode 100644 index 00000000..abbd48c0 --- /dev/null +++ b/ballerina/tests/post_process_action_test.bal @@ -0,0 +1,608 @@ +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/lang.runtime; +import ballerina/log; +import ballerina/test; + +// Test directory for post-process action tests +const POST_PROCESS_TEST_DIR = "/home/in/post-process"; +const POST_PROCESS_ARCHIVE_DIR = "/home/in/post-process-archive"; +const POST_PROCESS_ERROR_DIR = "/home/in/post-process-error"; + +// Global tracking variables for post-process tests +boolean deleteAfterProcessInvoked = false; +boolean moveAfterProcessInvoked = false; +boolean deleteAfterErrorInvoked = false; +boolean moveAfterErrorInvoked = false; +string lastProcessedFilePath = ""; + +@test:Config { +} +public function testAfterProcessDelete() returns error? { + // Reset state + deleteAfterProcessInvoked = false; + lastProcessedFilePath = ""; + + // Service with afterProcess: DELETE + Service deleteAfterProcessService = service object { + @FunctionConfig { + fileNamePattern: "delete-test.*\\.json", + afterProcess: DELETE + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file for deletion: ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + deleteAfterProcessInvoked = true; + } + }; + + // Create listener + Listener deleteListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "delete-test.*\\.json" + }); + + check deleteListener.attach(deleteAfterProcessService); + check deleteListener.'start(); + runtime:registerListener(deleteListener); + + // Upload test file + string testFilePath = POST_PROCESS_TEST_DIR + "/delete-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "action": "delete"}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(deleteListener); + check deleteListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(deleteAfterProcessInvoked, "onFileJson should have been invoked"); + test:assertTrue(lastProcessedFilePath.includes("delete-test"), "Should have processed delete-test file"); + + // Verify file was deleted + boolean fileExists = check (clientEp)->exists(testFilePath); + test:assertFalse(fileExists, "File should have been deleted after processing"); +} + +@test:Config { + dependsOn: [testAfterProcessDelete] +} +public function testAfterProcessMove() returns error? { + // Reset state + moveAfterProcessInvoked = false; + lastProcessedFilePath = ""; + + // Create archive directory + boolean archiveExists = check (clientEp)->exists(POST_PROCESS_ARCHIVE_DIR); + if !archiveExists { + check (clientEp)->mkdir(POST_PROCESS_ARCHIVE_DIR); + } + + // Service with afterProcess: MOVE + Service moveAfterProcessService = service object { + @FunctionConfig { + fileNamePattern: "move-test.*\\.json", + afterProcess: { + moveTo: POST_PROCESS_ARCHIVE_DIR + } + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file for move: ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + moveAfterProcessInvoked = true; + } + }; + + // Create listener + Listener moveListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "move-test.*\\.json" + }); + + check moveListener.attach(moveAfterProcessService); + check moveListener.'start(); + runtime:registerListener(moveListener); + + // Upload test file + string testFilePath = POST_PROCESS_TEST_DIR + "/move-test.json"; + string archiveFilePath = POST_PROCESS_ARCHIVE_DIR + "/move-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "action": "move"}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(moveListener); + check moveListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(moveAfterProcessInvoked, "onFileJson should have been invoked"); + test:assertTrue(lastProcessedFilePath.includes("move-test"), "Should have processed move-test file"); + + // Verify file was moved + boolean sourceExists = check (clientEp)->exists(testFilePath); + test:assertFalse(sourceExists, "Source file should not exist after move"); + + boolean archiveExists2 = check (clientEp)->exists(archiveFilePath); + test:assertTrue(archiveExists2, "File should exist in archive directory after move"); + + // Cleanup + check (clientEp)->delete(archiveFilePath); +} + +@test:Config { + dependsOn: [testAfterProcessMove] +} +public function testAfterProcessMoveWithPreserveSubDirsFalse() returns error? { + // Reset state + moveAfterProcessInvoked = false; + lastProcessedFilePath = ""; + + // Create subdirectory for testing + string subDir = POST_PROCESS_TEST_DIR + "/subdir"; + boolean subDirExists = check (clientEp)->exists(subDir); + if !subDirExists { + check (clientEp)->mkdir(subDir); + } + + // Service with afterProcess: MOVE with preserveSubDirs: false + Service moveNoPreserveService = service object { + @FunctionConfig { + fileNamePattern: "nopreserve-test.*\\.json", + afterProcess: { + moveTo: POST_PROCESS_ARCHIVE_DIR, + preserveSubDirs: false + } + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file (no preserve): ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + moveAfterProcessInvoked = true; + } + }; + + // Create listener for subdirectory + Listener noPreserveListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "(subdir|nopreserve-test.*\\.json)" + }); + + check noPreserveListener.attach(moveNoPreserveService); + check noPreserveListener.'start(); + runtime:registerListener(noPreserveListener); + + // Upload test file to subdirectory + string testFilePath = subDir + "/nopreserve-test.json"; + string archiveFilePath = POST_PROCESS_ARCHIVE_DIR + "/nopreserve-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "preserveSubDirs": false}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(noPreserveListener); + check noPreserveListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(moveAfterProcessInvoked, "onFileJson should have been invoked"); + + // Verify file was moved directly to archive (not preserving subdirectory) + boolean sourceExists = check (clientEp)->exists(testFilePath); + test:assertFalse(sourceExists, "Source file should not exist after move"); + + boolean archiveExists = check (clientEp)->exists(archiveFilePath); + test:assertTrue(archiveExists, "File should exist directly in archive directory (not in subdir)"); + + // Cleanup + check (clientEp)->delete(archiveFilePath); +} + +@test:Config { + dependsOn: [testAfterProcessMoveWithPreserveSubDirsFalse] +} +public function testAfterProcessMoveWithPreserveSubDirsTrue() returns error? { + // Reset state + moveAfterProcessInvoked = false; + lastProcessedFilePath = ""; + + // Create subdirectory for testing + string subDir = POST_PROCESS_TEST_DIR + "/preserve-subdir"; + boolean subDirExists = check (clientEp)->exists(subDir); + if !subDirExists { + check (clientEp)->mkdir(subDir); + } + + // Service with afterProcess: MOVE (preserveSubDirs defaults to true) + Service movePreserveService = service object { + @FunctionConfig { + fileNamePattern: "preserve-test.*\\.json", + afterProcess: { + moveTo: POST_PROCESS_ARCHIVE_DIR + } + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file (preserve): ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + moveAfterProcessInvoked = true; + } + }; + + // Create listener for subdirectory + Listener preserveListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "(preserve-subdir|preserve-test.*\\.json)" + }); + + check preserveListener.attach(movePreserveService); + check preserveListener.'start(); + runtime:registerListener(preserveListener); + + // Upload test file to subdirectory + string testFilePath = subDir + "/preserve-test.json"; + string archiveFilePath = POST_PROCESS_ARCHIVE_DIR + "/preserve-subdir/preserve-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "preserveSubDirs": true}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(preserveListener); + check preserveListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(moveAfterProcessInvoked, "onFileJson should have been invoked"); + + // Verify file was moved preserving subdirectory + boolean sourceExists = check (clientEp)->exists(testFilePath); + test:assertFalse(sourceExists, "Source file should not exist after move"); + + boolean archiveExists = check (clientEp)->exists(archiveFilePath); + test:assertTrue(archiveExists, "File should exist in archive subdirectory after move"); + + // Cleanup + check (clientEp)->delete(archiveFilePath); + check (clientEp)->rmdir(POST_PROCESS_ARCHIVE_DIR + "/preserve-subdir"); + check (clientEp)->rmdir(subDir); +} + +@test:Config { + dependsOn: [testAfterProcessMoveWithPreserveSubDirsTrue] +} +public function testAfterErrorDelete() returns error? { + // Reset state + deleteAfterErrorInvoked = false; + lastProcessedFilePath = ""; + + // Service with afterError: DELETE that throws an error + Service errorDeleteService = service object { + @FunctionConfig { + fileNamePattern: "error-delete-test.*\\.json", + afterError: DELETE + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file that will error: ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + deleteAfterErrorInvoked = true; + // Return an error to trigger afterError action + return error("Simulated processing error"); + } + }; + + // Create listener + Listener errorDeleteListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "error-delete-test.*\\.json" + }); + + check errorDeleteListener.attach(errorDeleteService); + check errorDeleteListener.'start(); + runtime:registerListener(errorDeleteListener); + + // Upload test file + string testFilePath = POST_PROCESS_TEST_DIR + "/error-delete-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "action": "error-delete"}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(errorDeleteListener); + check errorDeleteListener.gracefulStop(); + + // Verify file was processed (even though it errored) + test:assertTrue(deleteAfterErrorInvoked, "onFileJson should have been invoked"); + + // Verify file was deleted after error + boolean fileExists = check (clientEp)->exists(testFilePath); + test:assertFalse(fileExists, "File should have been deleted after error"); +} + +@test:Config { + dependsOn: [testAfterErrorDelete] +} +public function testAfterErrorMove() returns error? { + // Reset state + moveAfterErrorInvoked = false; + lastProcessedFilePath = ""; + + // Create error directory + boolean errorDirExists = check (clientEp)->exists(POST_PROCESS_ERROR_DIR); + if !errorDirExists { + check (clientEp)->mkdir(POST_PROCESS_ERROR_DIR); + } + + // Service with afterError: MOVE that throws an error + Service errorMoveService = service object { + @FunctionConfig { + fileNamePattern: "error-move-test.*\\.json", + afterError: { + moveTo: POST_PROCESS_ERROR_DIR + } + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file that will error and move: ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + moveAfterErrorInvoked = true; + // Return an error to trigger afterError action + return error("Simulated processing error for move"); + } + }; + + // Create listener + Listener errorMoveListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "error-move-test.*\\.json" + }); + + check errorMoveListener.attach(errorMoveService); + check errorMoveListener.'start(); + runtime:registerListener(errorMoveListener); + + // Upload test file + string testFilePath = POST_PROCESS_TEST_DIR + "/error-move-test.json"; + string errorFilePath = POST_PROCESS_ERROR_DIR + "/error-move-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "action": "error-move"}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(errorMoveListener); + check errorMoveListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(moveAfterErrorInvoked, "onFileJson should have been invoked"); + + // Verify file was moved to error directory + boolean sourceExists = check (clientEp)->exists(testFilePath); + test:assertFalse(sourceExists, "Source file should not exist after error move"); + + boolean errorExists = check (clientEp)->exists(errorFilePath); + test:assertTrue(errorExists, "File should exist in error directory after error move"); + + // Cleanup + check (clientEp)->delete(errorFilePath); +} + +@test:Config { + dependsOn: [testAfterErrorMove] +} +public function testBothAfterProcessAndAfterError() returns error? { + // Reset state + moveAfterProcessInvoked = false; + lastProcessedFilePath = ""; + + // Service with both afterProcess and afterError configured + Service bothActionsService = service object { + @FunctionConfig { + fileNamePattern: "both-success-test.*\\.json", + afterProcess: { + moveTo: POST_PROCESS_ARCHIVE_DIR + }, + afterError: { + moveTo: POST_PROCESS_ERROR_DIR + } + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file with both actions: ${fileInfo.path}`); + lastProcessedFilePath = fileInfo.path; + moveAfterProcessInvoked = true; + // Success case - should trigger afterProcess + } + }; + + // Create listener + Listener bothListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "both-success-test.*\\.json" + }); + + check bothListener.attach(bothActionsService); + check bothListener.'start(); + runtime:registerListener(bothListener); + + // Upload test file + string testFilePath = POST_PROCESS_TEST_DIR + "/both-success-test.json"; + string archiveFilePath = POST_PROCESS_ARCHIVE_DIR + "/both-success-test.json"; + string errorFilePath = POST_PROCESS_ERROR_DIR + "/both-success-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "action": "both-success"}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(bothListener); + check bothListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(moveAfterProcessInvoked, "onFileJson should have been invoked"); + + // Verify file was moved to archive (success case) + boolean sourceExists = check (clientEp)->exists(testFilePath); + test:assertFalse(sourceExists, "Source file should not exist"); + + boolean archiveExists = check (clientEp)->exists(archiveFilePath); + test:assertTrue(archiveExists, "File should be in archive directory on success"); + + boolean errorExists = check (clientEp)->exists(errorFilePath); + test:assertFalse(errorExists, "File should NOT be in error directory on success"); + + // Cleanup + check (clientEp)->delete(archiveFilePath); +} + +boolean noActionInvoked = false; +string noActionFilePath = ""; + +@test:Config { + dependsOn: [testBothAfterProcessAndAfterError] +} +public function testNoActionWhenNotConfigured() returns error? { + // Reset state + noActionInvoked = false; + noActionFilePath = ""; + + // Service without any afterProcess or afterError + Service noActionService = service object { + @FunctionConfig { + fileNamePattern: "no-action-test.*\\.json" + } + remote function onFileJson(json content, FileInfo fileInfo, Caller caller) returns error? { + log:printInfo(string `Processing file with no post-action: ${fileInfo.path}`); + noActionFilePath = fileInfo.path; + noActionInvoked = true; + } + }; + + // Create listener + Listener noActionListener = check new ({ + protocol: FTP, + host: "127.0.0.1", + auth: {credentials: {username: "wso2", password: "wso2123"}}, + port: 21212, + path: POST_PROCESS_TEST_DIR, + pollingInterval: 4, + fileNamePattern: "no-action-test.*\\.json" + }); + + check noActionListener.attach(noActionService); + check noActionListener.'start(); + runtime:registerListener(noActionListener); + + // Upload test file + string testFilePath = POST_PROCESS_TEST_DIR + "/no-action-test.json"; + check (clientEp)->putJson(testFilePath, {"name": "test", "action": "none"}); + runtime:sleep(15); + + // Cleanup listener + runtime:deregisterListener(noActionListener); + check noActionListener.gracefulStop(); + + // Verify file was processed + test:assertTrue(noActionInvoked, "onFileJson should have been invoked"); + + // Verify file still exists (no action configured) + boolean fileExists = check (clientEp)->exists(testFilePath); + test:assertTrue(fileExists, "File should still exist when no action is configured"); + + // Cleanup + check (clientEp)->delete(testFilePath); +} + +@test:AfterSuite +function cleanupPostProcessTestDirectories() returns error? { + // Clean up test directories + boolean testDirExists = check (clientEp)->exists(POST_PROCESS_TEST_DIR); + if testDirExists { + FileInfo[] files = check (clientEp)->list(POST_PROCESS_TEST_DIR); + foreach FileInfo file in files { + if file.isFile { + check (clientEp)->delete(file.path); + } + } + // Try to remove subdirectories + boolean subDirExists = check (clientEp)->exists(POST_PROCESS_TEST_DIR + "/subdir"); + if subDirExists { + check (clientEp)->rmdir(POST_PROCESS_TEST_DIR + "/subdir"); + } + boolean preserveSubDirExists = check (clientEp)->exists(POST_PROCESS_TEST_DIR + "/preserve-subdir"); + if preserveSubDirExists { + FileInfo[] preserveFiles = check (clientEp)->list(POST_PROCESS_TEST_DIR + "/preserve-subdir"); + foreach FileInfo file in preserveFiles { + if file.isFile { + check (clientEp)->delete(file.path); + } + } + check (clientEp)->rmdir(POST_PROCESS_TEST_DIR + "/preserve-subdir"); + } + } + + boolean archiveDirExists = check (clientEp)->exists(POST_PROCESS_ARCHIVE_DIR); + if archiveDirExists { + FileInfo[] archiveFiles = check (clientEp)->list(POST_PROCESS_ARCHIVE_DIR); + foreach FileInfo file in archiveFiles { + if file.isFile { + check (clientEp)->delete(file.path); + } + } + boolean preserveArchiveDirExists = check (clientEp)->exists(POST_PROCESS_ARCHIVE_DIR + + "/preserve-subdir"); + if preserveArchiveDirExists { + FileInfo[] preserveArchiveFiles = check (clientEp)->list(POST_PROCESS_ARCHIVE_DIR + + "/preserve-subdir"); + foreach FileInfo file in preserveArchiveFiles { + if file.isFile { + check (clientEp)->delete(file.path); + } + } + check (clientEp)->rmdir(POST_PROCESS_ARCHIVE_DIR + "/preserve-subdir"); + } + } + + boolean errorDirExists = check (clientEp)->exists(POST_PROCESS_ERROR_DIR); + if errorDirExists { + FileInfo[] errorFiles = check (clientEp)->list(POST_PROCESS_ERROR_DIR); + foreach FileInfo file in errorFiles { + if file.isFile { + check (clientEp)->delete(file.path); + } + } + } +} diff --git a/ballerina/tests/secure_ftps_client_endpoint_test.bal b/ballerina/tests/secure_ftps_client_endpoint_test.bal index 709d6e25..00b3b5d2 100644 --- a/ballerina/tests/secure_ftps_client_endpoint_test.bal +++ b/ballerina/tests/secure_ftps_client_endpoint_test.bal @@ -20,10 +20,10 @@ import ballerina/lang.runtime as runtime; import ballerina/test; // Constants for test configuration -const string KEYSTORE_PATH = "tests/resources/keystore.jks"; -const string KEYSTORE_PASSWORD = "changeit"; -const string FTPS_CLIENT_ROOT = "/ftps-client"; -const string PUT_FILE_PATH = "tests/resources/datafiles/file2.txt"; +const KEYSTORE_PATH = "tests/resources/keystore.jks"; +const KEYSTORE_PASSWORD = "changeit"; +const FTPS_CLIENT_ROOT = "/ftps-client"; +const PUT_FILE_PATH = "tests/resources/datafiles/file2.txt"; // Create the config to access mock FTPS server in EXPLICIT mode ClientConfiguration ftpsExplicitConfig = { diff --git a/ballerina/tests/secure_ftps_listener_endpoint_test.bal b/ballerina/tests/secure_ftps_listener_endpoint_test.bal index a9881839..d4945f49 100644 --- a/ballerina/tests/secure_ftps_listener_endpoint_test.bal +++ b/ballerina/tests/secure_ftps_listener_endpoint_test.bal @@ -20,8 +20,8 @@ import ballerina/log; import ballerina/test; // Constants for test configuration -const string KEYSTORE_PATH = "tests/resources/keystore.jks"; -const string KEYSTORE_PASSWORD = "changeit"; +const KEYSTORE_PATH = "tests/resources/keystore.jks"; +const KEYSTORE_PASSWORD = "changeit"; // --- Global State for Event Capture (Managed per test) --- isolated boolean ftpsEventReceived = false; diff --git a/ballerina/tests/service_config_test.bal b/ballerina/tests/service_config_test.bal index 9832eb9f..93052a56 100644 --- a/ballerina/tests/service_config_test.bal +++ b/ballerina/tests/service_config_test.bal @@ -1,16 +1,16 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the +// KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. @@ -23,10 +23,10 @@ import ballerina/test; // Isolated directories – each test group gets its own FTP subtree so that // polling from one listener never picks up files meant for another. // --------------------------------------------------------------------------- -const string SC_ROUTE_A_DIR = "/home/in/sc-route-a"; -const string SC_ROUTE_B_DIR = "/home/in/sc-route-b"; -const string SC_SINGLE_DIR = "/home/in/sc-single"; -const string SC_LEGACY_DIR = "/home/in/sc-legacy"; +const SC_ROUTE_A_DIR = "/home/in/sc-route-a"; +const SC_ROUTE_B_DIR = "/home/in/sc-route-b"; +const SC_SINGLE_DIR = "/home/in/sc-single"; +const SC_LEGACY_DIR = "/home/in/sc-legacy"; // --------------------------------------------------------------------------- // Test-specific client for file uploads diff --git a/changelog.md b/changelog.md index a8eb1fd1..de4e1873 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added +- [Add post processing action support](https://github.com/ballerina-platform/ballerina-library/issues/8604) - [Add service level monitoring config](https://github.com/ballerina-platform/ballerina-library/issues/8606) - [Introduce onError remote function](https://github.com/ballerina-platform/ballerina-library/issues/8605) - [Add Circuit breaker support](https://github.com/ballerina-platform/ballerina-library/issues/8382) diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_1/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_1/service.bal index a99e31fc..cf12a426 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_1/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_1/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_3/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_3/service.bal index 6bdae417..823a116d 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_3/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_3/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_4/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_4/service.bal index e7098d9c..4d0321d1 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_4/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/invalid_on_error_service_4/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_1/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_1/service.bal index becb5f60..5711affa 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_1/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_1/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_2/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_2/service.bal index 29eb94e7..db3819c6 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_2/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_2/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_3/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_3/service.bal index 6b64c6f2..f1ffffce 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_3/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_3/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_4/service.bal b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_4/service.bal index abc33008..70126751 100644 --- a/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_4/service.bal +++ b/compiler-plugin-tests/src/test/resources/ballerina_sources/valid_on_error_service_4/service.bal @@ -1,11 +1,11 @@ -// Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. +// Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). // // WSO2 LLC. licenses this file to you under the Apache License, // Version 2.0 (the "License"); you may not use this file except // in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an diff --git a/compiler-plugin/src/main/java/io/ballerina/stdlib/ftp/plugin/FtpOnErrorValidator.java b/compiler-plugin/src/main/java/io/ballerina/stdlib/ftp/plugin/FtpOnErrorValidator.java index 6e91d153..183a2304 100644 --- a/compiler-plugin/src/main/java/io/ballerina/stdlib/ftp/plugin/FtpOnErrorValidator.java +++ b/compiler-plugin/src/main/java/io/ballerina/stdlib/ftp/plugin/FtpOnErrorValidator.java @@ -1,12 +1,12 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an diff --git a/docs/spec/spec.md b/docs/spec/spec.md index b3f66a8b..c759855a 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -45,6 +45,7 @@ The conforming implementation of the specification is released and included in t - [4.4. Usage](#44-usage) - [4.4.1. Service-Level Monitoring Configuration](#441-service-level-monitoring-configuration) - [4.4.2. Format-Specific Listener Callbacks](#442-format-specific-listener-callbacks) + - [4.4.3. Post-Processing Actions](#443-post-processing-actions) - [4.5. Distributed Coordination](#45-distributed-coordination) - [4.5.1. Coordination Configuration](#451-coordination-configuration) - [4.5.2. Coordination Mechanism](#452-coordination-mechanism) @@ -838,7 +839,6 @@ public type WatchEvent record {| string[] deletedFiles; |}; ``` - ### 4.2. Service Configuration Annotation The `@ftp:ServiceConfig` annotation allows configuring monitoring paths and file patterns at the service level rather than the listener level. This enables multiple services attached to a single listener to monitor different directories independently. @@ -1164,6 +1164,120 @@ If `onError` is not defined, binding errors are logged and the file is skipped. All format-specific callbacks receive `fileInfo` (metadata about the file) and optionally `caller` (to perform additional FTP operations). The data is automatically parsed based on the callback type. +#### 4.4.3. Post-Processing Actions + +The `@ftp:FunctionConfig` annotation supports automatic file actions after processing completes. This enables common patterns like moving processed files to an archive directory or deleting files after successful processing. + +**Configuration Types:** + +* `DELETE` - Delete the file after processing +```ballerina +public const DELETE = "DELETE"; +``` + +* `Move` - Move the file to a specified directory +```ballerina +# Type alias for Move record, used in union types for post-processing actions. +public type MOVE Move; + +# Configuration for moving a file after processing. +# +# + moveTo - Destination directory path where the file will be moved +# + preserveSubDirs - If true, preserves the subdirectory structure relative to the +# listener's root path. Defaults to true. +public type Move record {| + string moveTo; + boolean preserveSubDirs = true; +|}; +``` + +**Updated FtpFunctionConfig:** +```ballerina +public type FtpFunctionConfig record {| + string fileNamePattern?; + MOVE|DELETE afterProcess?; + MOVE|DELETE afterError?; +|}; +``` + +**Action Fields:** +- `afterProcess` - Action to perform after successful processing. If not specified, no action is taken. +- `afterError` - Action to perform after the handler returns an error or panics. If not specified, no action is taken. + +**Prerequisites:** +- When using `MOVE` action with `preserveSubDirs: true`, the destination directory structure must be pre-created on the FTP server. The move operation will fail if the destination parent directories do not exist. + +**Behavior:** +1. **After Successful Processing:** If the handler returns successfully (no error), the `afterProcess` action is executed. +2. **After Error:** If the handler returns an error, the `afterError` action is executed. +3. **Subdirectory Preservation:** When using `MOVE` with `preserveSubDirs: true` (default), the subdirectory structure relative to the listener's monitored path is preserved. For example, if listening to `/input/` and processing `/input/orders/2024/file.csv` with `moveTo: "/archive/"`, the file is moved to `/archive/orders/2024/file.csv`. + +**Examples:** + +Delete after successful processing: +```ballerina +service on ftpListener { + @ftp:FunctionConfig { + fileNamePattern: ".*\\.json", + afterProcess: ftp:DELETE + } + remote function onFileJson(json content, ftp:FileInfo fileInfo) returns error? { + // Process JSON - file is automatically deleted after successful return + processJson(content); + } +} +``` + +Move to archive after processing: +```ballerina +service on ftpListener { + @ftp:FunctionConfig { + fileNamePattern: ".*\\.csv", + afterProcess: { + moveTo: "/archive/processed/" + } + } + remote function onFileCsv(Employee[] content, ftp:FileInfo fileInfo) returns error? { + // Process CSV - file is moved to archive after success + saveEmployees(content); + } +} +``` + +Different actions for success and error: +```ballerina +service on ftpListener { + @ftp:FunctionConfig { + fileNamePattern: ".*\\.xml", + afterProcess: { + moveTo: "/archive/success/" + }, + afterError: { + moveTo: "/archive/failed/" + } + } + remote function onFileXml(xml content, ftp:FileInfo fileInfo) returns error? { + // On success: moved to /archive/success/ + // On error: moved to /archive/failed/ + check processXml(content); + } +} +``` + +Move without preserving subdirectories: +```ballerina +service on ftpListener { + @ftp:FunctionConfig { + afterProcess: { + moveTo: "/archive/flat/", + preserveSubDirs: false + } + } + remote function onFile(byte[] content, ftp:FileInfo fileInfo) returns error? { + // All files moved directly to /archive/flat/ regardless of source subdirectory + } +} +``` The Listener has following functions to manage a service. * `attach()` - can be used to bind a service to the `ftp:Listener`. diff --git a/gradle.properties b/gradle.properties index 9cfbd869..ffa33c4a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ org.gradle.caching=true group=io.ballerina.stdlib -version=2.16.1-SNAPSHOT +version=2.17.0-SNAPSHOT checkstylePluginVersion=10.12.0 testngVersion=7.6.1 diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FormatMethodsHolder.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FormatMethodsHolder.java index b0d10634..48bf7731 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FormatMethodsHolder.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FormatMethodsHolder.java @@ -20,9 +20,13 @@ import io.ballerina.runtime.api.types.AnnotatableType; import io.ballerina.runtime.api.types.MethodType; +import io.ballerina.runtime.api.types.TypeTags; +import io.ballerina.runtime.api.utils.StringUtils; +import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.ftp.exception.FtpInvalidConfigException; import io.ballerina.stdlib.ftp.transport.message.FileInfo; import io.ballerina.stdlib.ftp.util.FtpConstants; import io.ballerina.stdlib.ftp.util.FtpFileExtensionMapper; @@ -35,6 +39,11 @@ import java.util.Optional; import java.util.regex.Pattern; +import static io.ballerina.stdlib.ftp.util.FtpConstants.ANNOTATION_AFTER_ERROR; +import static io.ballerina.stdlib.ftp.util.FtpConstants.ANNOTATION_AFTER_PROCESS; +import static io.ballerina.stdlib.ftp.util.FtpConstants.ANNOTATION_MOVE_TO; +import static io.ballerina.stdlib.ftp.util.FtpConstants.ANNOTATION_PRESERVE_SUB_DIRS; + /** * Routes files to appropriate content handler methods based on file extension and annotations. */ @@ -48,19 +57,20 @@ public class FormatMethodsHolder { private final Map annotationPatternToMethod; private final Map availableContentMethods; private final MethodType onErrorMethod; + private final Map methodAfterProcessAction; + private final Map methodAfterErrorAction; - public FormatMethodsHolder(BObject service) { + public FormatMethodsHolder(BObject service) throws FtpInvalidConfigException { this.service = service; this.annotationPatternToMethod = new HashMap<>(); this.availableContentMethods = new HashMap<>(); this.onErrorMethod = FtpUtil.getOnErrorMethod(service).orElse(null); + this.methodAfterProcessAction = new HashMap<>(); + this.methodAfterErrorAction = new HashMap<>(); initializeMethodMappings(); } - /** - * Initializes method mappings by scanning all content methods and their annotations. - */ - private void initializeMethodMappings() { + private void initializeMethodMappings() throws FtpInvalidConfigException { MethodType[] contentMethods = FtpUtil.getAllContentHandlerMethods(service); for (MethodType method : contentMethods) { @@ -71,17 +81,82 @@ private void initializeMethodMappings() { Optional> annotationOpt = getFileConfigAnnotation(method); if (annotationOpt.isPresent()) { BMap annotation = annotationOpt.get(); - Object patternObj = annotation.get(io.ballerina.runtime.api.utils.StringUtils. - fromString(ANNOTATION_PATTERN_FIELD)); + + // Parse fileNamePattern + Object patternObj = annotation.get(StringUtils.fromString(ANNOTATION_PATTERN_FIELD)); if (patternObj != null) { String pattern = patternObj.toString(); annotationPatternToMethod.put(pattern, method); log.debug("Registered annotation pattern '{}' for method '{}'", pattern, methodName); } + + // Parse afterProcess action + PostProcessAction afterProcess = parsePostProcessAction(annotation, ANNOTATION_AFTER_PROCESS, + methodName); + if (afterProcess != null) { + methodAfterProcessAction.put(methodName, afterProcess); + log.debug("Registered afterProcess action '{}' for method '{}'", afterProcess, methodName); + } + + // Parse afterError action + PostProcessAction afterError = parsePostProcessAction(annotation, ANNOTATION_AFTER_ERROR, + methodName); + if (afterError != null) { + methodAfterErrorAction.put(methodName, afterError); + log.debug("Registered afterError action '{}' for method '{}'", afterError, methodName); + } + } + } + + // Parse post-processing actions for onError method + if (onErrorMethod != null) { + Optional> annotationOpt = getFileConfigAnnotation(onErrorMethod); + if (annotationOpt.isPresent()) { + BMap annotation = annotationOpt.get(); + PostProcessAction afterProcess = parsePostProcessAction(annotation, ANNOTATION_AFTER_PROCESS, + onErrorMethod.getName()); + if (afterProcess != null) { + methodAfterProcessAction.put(onErrorMethod.getName(), afterProcess); + log.debug("Registered afterProcess action '{}' for onError method", afterProcess); + } + PostProcessAction afterError = parsePostProcessAction(annotation, ANNOTATION_AFTER_ERROR, + onErrorMethod.getName()); + if (afterError != null) { + methodAfterErrorAction.put(onErrorMethod.getName(), afterError); + log.debug("Registered afterError action '{}' for onError method", afterError); + } } } } + private PostProcessAction parsePostProcessAction(BMap annotation, String fieldName, + String methodName) throws FtpInvalidConfigException { + Object actionObj = annotation.get(StringUtils.fromString(fieldName)); + if (actionObj == null) { + return null; + } + + // Check if it's the DELETE constant (string) + if (TypeUtils.getType(actionObj).getTag() == TypeTags.STRING_TAG) { + return PostProcessAction.delete(); + } + + @SuppressWarnings("unchecked") + BMap moveRecord = (BMap) actionObj; + + String moveTo = moveRecord.getStringValue(StringUtils.fromString(ANNOTATION_MOVE_TO)).getValue(); + if (moveTo.trim().isEmpty()) { + throw new FtpInvalidConfigException(String.format( + "Move action in '%s' for method '%s' has an empty 'moveTo' path.", + fieldName, methodName)); + } + + // preserveSubDirs defaults to true + boolean preserveSubDirs = moveRecord.getBooleanValue(StringUtils.fromString(ANNOTATION_PRESERVE_SUB_DIRS)); + + return PostProcessAction.move(moveTo, preserveSubDirs); + } + /** * Routes a file to the appropriate content handler method. * Priority: Annotation override > Extension mapping > Generic onFile > onFileChange fallback @@ -195,4 +270,33 @@ public Optional getOnErrorMethod() { public boolean hasOnErrorMethod() { return onErrorMethod != null; } + + /** + * Gets the afterProcess action for a method. + * + * @param methodName The method name + * @return Optional containing the PostProcessAction if configured + */ + public Optional getAfterProcessAction(String methodName) { + return Optional.ofNullable(methodAfterProcessAction.get(methodName)); + } + + /** + * Gets the afterError action for a method. + * + * @param methodName The method name + * @return Optional containing the PostProcessAction if configured + */ + public Optional getAfterErrorAction(String methodName) { + return Optional.ofNullable(methodAfterErrorAction.get(methodName)); + } + + /** + * Checks if any content method has post-processing actions configured. + * + * @return true if any method has afterProcess or afterError actions + */ + public boolean hasPostProcessingActions() { + return !methodAfterProcessAction.isEmpty() || !methodAfterErrorAction.isEmpty(); + } } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java index 2fb37fb8..91637e46 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpContentCallbackHandler.java @@ -29,6 +29,7 @@ import io.ballerina.runtime.api.types.StreamType; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.types.TypeTags; +import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; @@ -97,6 +98,7 @@ public FtpContentCallbackHandler(Runtime ballerinaRuntime, FileSystemManager fil public void processContentCallbacks(Environment env, BObject service, RemoteFileSystemEvent event, FormatMethodsHolder holder, BObject callerObject) { List addedFiles = event.getAddedFiles(); + String listenerPath = event.getSourcePath(); for (FileInfo fileInfo : addedFiles) { try { @@ -120,7 +122,7 @@ public void processContentCallbacks(Environment env, BObject service, RemoteFile // Check if content conversion returned an error (ContentBindingError) if (convertedContent instanceof BError bError) { log.error("Content binding failed for file: {}", fileInfo.getPath()); - routeToOnError(service, holder, bError, callerObject); + routeToOnError(service, holder, bError, callerObject, fileInfo, listenerPath); continue; } @@ -128,8 +130,13 @@ public void processContentCallbacks(Environment env, BObject service, RemoteFile Object[] methodArguments = prepareContentMethodArguments(methodType, convertedContent, fileInfo, callerObject); - // Invoke method asynchronously - invokeContentMethodAsync(service, methodType.getName(), methodArguments); + // Get post-processing actions + Optional afterProcess = holder.getAfterProcessAction(methodType.getName()); + Optional afterError = holder.getAfterErrorAction(methodType.getName()); + + // Invoke method asynchronously with post-processing + invokeContentMethodAsync(service, methodType.getName(), methodArguments, + fileInfo, callerObject, listenerPath, afterProcess, afterError); } catch (Exception exception) { log.error("Failed to process file: " + fileInfo.getPath(), exception); @@ -138,9 +145,6 @@ public void processContentCallbacks(Environment env, BObject service, RemoteFile } } - /** - * Fetches file content from the remote FTP/SFTP server. - */ private byte[] fetchAllFileContentFromRemote(FileObject fileObject, InputStream inputStream) throws Exception { try { return FtpContentConverter.convertInputStreamToByteArray(inputStream); @@ -162,9 +166,6 @@ private byte[] fetchAllFileContentFromRemote(FileObject fileObject, InputStream } } - /** - * Converts file content to the appropriate Ballerina type based on the method signature. - */ private Object convertFileContent(Environment environment, FileObject fileObject, InputStream inputStream, MethodType methodType) throws Exception { @@ -207,9 +208,6 @@ private Object convertFileContent(Environment environment, FileObject fileObject } } - /** - * Prepares method arguments for content handler methods. - */ private Object[] prepareContentMethodArguments(MethodType methodType, Object convertedContent, FileInfo fileInfo, BObject callerObject) { Parameter[] parameters = methodType.getParameters(); @@ -236,9 +234,6 @@ private Object[] prepareContentMethodArguments(MethodType methodType, Object con return new Object[]{convertedContent}; } - /** - * Creates a Ballerina FileInfo record from Java FileInfo object. - */ private BMap createFileInfoRecord(FileInfo fileInfo) { Map fileInfoParams = new HashMap<>(); fileInfoParams.put("path", fileInfo.getPath()); @@ -277,10 +272,8 @@ private BMap createFileInfoRecord(FileInfo fileInfo) { ); } - /** - * Routes error to onError handler if available. - */ - private void routeToOnError(BObject service, FormatMethodsHolder holder, BError error, BObject callerObject) { + private void routeToOnError(BObject service, FormatMethodsHolder holder, BError error, BObject callerObject, + FileInfo fileInfo, String listenerPath) { if (!holder.hasOnErrorMethod()) { // No onError handler, error is already logged return; @@ -293,17 +286,20 @@ private void routeToOnError(BObject service, FormatMethodsHolder holder, BError MethodType onErrorMethod = onErrorMethodOpt.get(); + Optional onErrorAfterProcess = holder.getAfterProcessAction(onErrorMethod.getName()); + Optional onErrorAfterError = holder.getAfterErrorAction(onErrorMethod.getName()); + boolean hasOnErrorActions = onErrorAfterProcess.isPresent() || onErrorAfterError.isPresent(); + // Prepare arguments for onError method Object[] methodArguments = prepareOnErrorMethodArguments(onErrorMethod, error, callerObject); - // Invoke onError asynchronously - invokeContentMethodAsync(service, onErrorMethod.getName(), methodArguments); + // Invoke onError asynchronously and apply afterProcess/afterError actions + invokeOnErrorMethodAsync(service, onErrorMethod.getName(), methodArguments, fileInfo, callerObject, + listenerPath, + hasOnErrorActions ? onErrorAfterProcess : Optional.empty(), + hasOnErrorActions ? onErrorAfterError : Optional.empty()); } - /** - * Prepares method arguments for onError handler. - * onError accepts: (error) or (error, caller) - */ private Object[] prepareOnErrorMethodArguments(MethodType methodType, BError error, BObject callerObject) { Parameter[] parameters = methodType.getParameters(); @@ -316,11 +312,45 @@ private Object[] prepareOnErrorMethodArguments(MethodType methodType, BError err return new Object[]{error}; } - /** - * Invokes the content handler method asynchronously. - */ - private void invokeContentMethodAsync(BObject service, String methodName, Object[] methodArguments) { + private void invokeOnErrorMethodAsync(BObject service, String methodName, Object[] methodArguments, + FileInfo fileInfo, BObject callerObject, String listenerPath, + Optional afterProcess, + Optional afterError) { Thread.startVirtualThread(() -> { + boolean isSuccess = false; + try { + ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); + boolean isConcurrentSafe = serviceType.isIsolated() && serviceType.isIsolated(methodName); + StrandMetadata strandMetadata = new StrandMetadata(isConcurrentSafe, null); + + Object result = ballerinaRuntime.callMethod(service, methodName, strandMetadata, methodArguments); + if (result instanceof BError) { + ((BError) result).printStackTrace(); + } else { + isSuccess = true; + } + } catch (BError error) { + error.printStackTrace(); + } catch (Exception exception) { + log.error("Error invoking onError method: " + methodName, exception); + } + + if (isSuccess) { + afterProcess.ifPresent(action -> executePostProcessAction(action, fileInfo, callerObject, + listenerPath, "afterProcess")); + } else { + afterError.ifPresent(action -> executePostProcessAction(action, fileInfo, callerObject, + listenerPath, "afterError")); + } + }); + } + + private void invokeContentMethodAsync(BObject service, String methodName, Object[] methodArguments, + FileInfo fileInfo, BObject callerObject, String listenerPath, + Optional afterProcess, + Optional afterError) { + Thread.startVirtualThread(() -> { + boolean isSuccess = false; try { ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); boolean isConcurrentSafe = serviceType.isIsolated() && serviceType.isIsolated(methodName); @@ -330,12 +360,146 @@ private void invokeContentMethodAsync(BObject service, String methodName, Object if (result instanceof BError) { ((BError) result).printStackTrace(); + // Method returned an error - execute afterError action + afterError.ifPresent(action -> executePostProcessAction(action, fileInfo, callerObject, + listenerPath, "afterError")); + } else { + isSuccess = true; } } catch (BError error) { error.printStackTrace(); + // Method threw an error - execute afterError action + afterError.ifPresent(action -> executePostProcessAction(action, fileInfo, callerObject, + listenerPath, "afterError")); } catch (Exception exception) { log.error("Error invoking content method: " + methodName, exception); + // Method threw an exception - execute afterError action + afterError.ifPresent(action -> executePostProcessAction(action, fileInfo, callerObject, + listenerPath, "afterError")); + } + + // Execute afterProcess action on success + if (isSuccess) { + afterProcess.ifPresent(action -> executePostProcessAction(action, fileInfo, callerObject, + listenerPath, "afterProcess")); } }); } + + private void executePostProcessAction(PostProcessAction action, FileInfo fileInfo, BObject callerObject, + String listenerPath, String actionContext) { + + String filePath; + try { + filePath = fileInfo.getFileName().getPathDecoded(); + } catch (Exception e) { + log.warn("Cannot execute {} action: Failed to retrieve file path from FileInfo: {}", actionContext, + e.getMessage()); + return; + } + + try { + if (action.isDelete()) { + executeDeleteAction(callerObject, filePath, actionContext); + } else if (action.isMove()) { + executeMoveAction(callerObject, filePath, listenerPath, action, actionContext); + } + } catch (Exception e) { + log.error("Failed to execute {} action on file: {}", actionContext, filePath, e); + } + } + + private void executeDeleteAction(BObject callerObject, String filePath, String actionContext) { + try { + BObject clientObj = callerObject.getObjectValue(StringUtils.fromString("client")); + StrandMetadata strandMetadata = new StrandMetadata(true, null); + Object result = ballerinaRuntime.callMethod(clientObj, "delete", strandMetadata, + StringUtils.fromString(filePath)); + + if (result instanceof BError) { + log.error("Failed to delete file during {}: {} - {}", actionContext, filePath, + ((BError) result).getErrorMessage()); + } else { + log.debug("Successfully deleted file during {}: {}", actionContext, filePath); + } + } catch (Exception e) { + log.error("Exception during delete action ({}): {}", actionContext, filePath, e); + } + } + + private void executeMoveAction(BObject callerObject, String filePath, String listenerPath, + PostProcessAction action, String actionContext) { + try { + String destinationPath = calculateMoveDestination(filePath, listenerPath, action); + + BObject clientObj = callerObject.getObjectValue(StringUtils.fromString("client")); + StrandMetadata strandMetadata = new StrandMetadata(true, null); + Object result = ballerinaRuntime.callMethod(clientObj, "move", strandMetadata, + StringUtils.fromString(filePath), StringUtils.fromString(destinationPath)); + + if (result instanceof BError) { + log.error("Failed to move file during {}: {} -> {} - {}", actionContext, filePath, + destinationPath, ((BError) result).getErrorMessage()); + } else { + log.debug("Successfully moved file during {}: {} -> {}", actionContext, filePath, destinationPath); + } + } catch (Exception e) { + log.error("Exception during move action ({}): {}", actionContext, filePath, e); + } + } + + private String calculateMoveDestination(String filePath, String listenerPath, PostProcessAction action) { + String moveTo = action.getMoveTo(); + String normalizedFilePath = normalizeFilePath(filePath); + String fileName = extractFileName(normalizedFilePath); + if (fileName.isEmpty()) { + return ensureTrailingSlash(moveTo); + } + + if (!action.isPreserveSubDirs() || listenerPath == null || listenerPath.isEmpty()) { + // Simple case: just append filename to moveTo directory + return ensureTrailingSlash(moveTo) + fileName; + } + + // Calculate relative path from listener root + String normalizedListenerPath = ensureTrailingSlash(listenerPath); + + if (normalizedFilePath.startsWith(normalizedListenerPath)) { + // Extract relative path including subdirectories + String relativePath = normalizedFilePath.substring(normalizedListenerPath.length()); + if (relativePath.isEmpty()) { + return ensureTrailingSlash(moveTo) + fileName; + } + return ensureTrailingSlash(moveTo) + relativePath; + } else { + // File path doesn't start with listener path, fall back to simple append + return ensureTrailingSlash(moveTo) + fileName; + } + } + + private String ensureTrailingSlash(String path) { + if (path == null || path.isEmpty()) { + return "/"; + } + return path.endsWith("/") ? path : path + "/"; + } + + private String normalizeFilePath(String path) { + if (path == null || path.isEmpty()) { + return ""; + } + int end = path.length(); + while (end > 1 && path.charAt(end - 1) == '/') { + end--; + } + return path.substring(0, end); + } + + private String extractFileName(String path) { + if (path == null || path.isEmpty()) { + return ""; + } + int lastSlashIndex = path.lastIndexOf('/'); + return lastSlashIndex >= 0 ? path.substring(lastSlashIndex + 1) : path; + } } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java index e65be2b3..280de202 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListener.java @@ -37,6 +37,7 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.ballerina.stdlib.ftp.exception.FtpInvalidConfigException; import io.ballerina.stdlib.ftp.exception.RemoteFileSystemConnectorException; import io.ballerina.stdlib.ftp.transport.listener.RemoteFileSystemListener; import io.ballerina.stdlib.ftp.transport.message.FileInfo; @@ -81,14 +82,14 @@ public class FtpListener implements RemoteFileSystemListener { private static final Logger log = LoggerFactory.getLogger(FtpListener.class); private final Runtime runtime; private Environment environment = null; - private Map registeredServices = new ConcurrentHashMap<>(); - private Map serviceConfigurations = new ConcurrentHashMap<>(); - private Map pathToService = new ConcurrentHashMap<>(); + private Map serviceContexts = new ConcurrentHashMap<>(); + private Map pathToServiceContext = new ConcurrentHashMap<>(); private AtomicBoolean usesServiceLevelConfig = new AtomicBoolean(false); private BObject caller; private FileSystemManager fileSystemManager; private FileSystemOptions fileSystemOptions; private boolean laxDataBinding; + private String legacyListenerPath; private BMap csvFailSafe = ValueCreator.createMapValue(); FtpListener(Runtime runtime) { @@ -124,14 +125,17 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage if (runtime != null) { if (usesServiceLevelConfig.get() && event.getSourcePath() != null) { // Path-keyed routing: dispatch only to the service monitoring this path - BObject service = pathToService.get(event.getSourcePath()); - if (service != null) { - dispatchFileEventToService(this.environment, service, event); + ServiceContext context = pathToServiceContext.get(event.getSourcePath()); + if (context != null) { + dispatchFileEventToService(this.environment, context, event); } } else { + if (event.getSourcePath() == null && legacyListenerPath != null) { + event.setSourcePath(legacyListenerPath); + } // Legacy single-path mode: dispatch to all registered services - for (BObject service : registeredServices.values()) { - dispatchFileEventToService(this.environment, service, event); + for (ServiceContext context : serviceContexts.values()) { + dispatchFileEventToService(this.environment, context, event); } } } else { @@ -141,23 +145,34 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseMessage return true; } - private void dispatchFileEventToService(Environment env, BObject service, RemoteFileSystemEvent event) { - FormatMethodsHolder formatMethodHolder = new FormatMethodsHolder(service); + private void dispatchFileEventToService(Environment env, ServiceContext context, RemoteFileSystemEvent event) { + BObject service = context.getService(); + BObject caller = context.getCaller(); + FormatMethodsHolder formatMethodHolder = context.getFormatMethodsHolder(); + if (formatMethodHolder == null) { + try { + formatMethodHolder = new FormatMethodsHolder(service); + } catch (FtpInvalidConfigException e) { + // This should not happen as validation occurs during attach + log.error("Invalid post-process action configuration: {}", e.getMessage()); + return; + } + } Optional onFileDeletedMethodType = getOnFileDeletedMethod(service); // Dispatch Strategy: Check handler availability in order if (formatMethodHolder.hasContentMethods()) { - processContentBasedCallbacks(env, service, event, formatMethodHolder); + processContentBasedCallbacks(env, service, event, formatMethodHolder, caller); } else if (onFileDeletedMethodType.isPresent()) { if (!event.getDeletedFiles().isEmpty()) { - processDeletionCallback(service, event, onFileDeletedMethodType.get()); + processDeletionCallback(service, event, onFileDeletedMethodType.get(), caller); } } else { // Strategy 3: Fall back to legacy onFileChange handler Optional onFileChangeMethodType = getOnFileChangeMethod(service); if (onFileChangeMethodType.isPresent()) { log.debug("Service uses deprecated onFileChange handler for file events."); - processMetadataOnlyCallbacks(service, event, onFileChangeMethodType.get()); + processMetadataOnlyCallbacks(service, event, onFileChangeMethodType.get(), caller); } else { log.error("Service has no valid handler method. Must have one of: " + "onFile, onFileText, onFileJson, onFileXml, onFileCsv (format-specific), " + @@ -172,7 +187,7 @@ private void dispatchFileEventToService(Environment env, BObject service, Remote * Also handles file deletion events via onFileDeleted method if available. */ private void processContentBasedCallbacks(Environment env, BObject service, RemoteFileSystemEvent event, - FormatMethodsHolder holder) { + FormatMethodsHolder holder, BObject caller) { // Process added files with content methods if (!event.getAddedFiles().isEmpty()) { if (fileSystemManager == null || fileSystemOptions == null) { @@ -193,7 +208,7 @@ private void processContentBasedCallbacks(Environment env, BObject service, Remo if (!event.getDeletedFiles().isEmpty()) { Optional onFileDeletedMethodType = getOnFileDeletedMethod(service); if (onFileDeletedMethodType.isPresent()) { - processDeletionCallback(service, event, onFileDeletedMethodType.get()); + processDeletionCallback(service, event, onFileDeletedMethodType.get(), caller); } else { log.debug("No onFileDeleted method found. Skipping deletion event processing for {} deleted files.", event.getDeletedFiles().size()); @@ -202,7 +217,7 @@ private void processContentBasedCallbacks(Environment env, BObject service, Remo } private void processDeletionCallback(BObject service, RemoteFileSystemEvent event, - MethodType methodType) { + MethodType methodType, BObject caller) { Parameter[] params = methodType.getParameters(); // Check first parameter type to determine method variant @@ -211,10 +226,10 @@ private void processDeletionCallback(BObject service, RemoteFileSystemEvent even if (isArrayType) { // onFileDeleted(string[] deletedFiles) - call once with all files - processFileDeletedCallback(service, event, methodType); + processFileDeletedCallback(service, event, methodType, caller); } else { // onFileDelete(string deletedFile) - call once per file - processFileDeleteCallback(service, event, methodType); + processFileDeleteCallback(service, event, methodType, caller); } } @@ -223,14 +238,14 @@ private void processDeletionCallback(BObject service, RemoteFileSystemEvent even * Calls the method once per deleted file with a single string parameter. */ private void processFileDeleteCallback(BObject service, RemoteFileSystemEvent event, - MethodType methodType) { + MethodType methodType, BObject caller) { List deletedFilesList = event.getDeletedFiles(); Parameter[] params = methodType.getParameters(); // Call onFileDelete once per deleted file for (String deletedFile : deletedFilesList) { BString deletedFileBString = StringUtils.fromString(deletedFile); - Object[] args = getOnFileDeleteMethodArguments(params, deletedFileBString); + Object[] args = getOnFileDeleteMethodArguments(params, deletedFileBString, caller); if (args != null) { invokeOnFileDeleteAsync(service, args); } @@ -241,7 +256,7 @@ private void processFileDeleteCallback(BObject service, RemoteFileSystemEvent ev * Processes file deletion callback for onFileDeleted method (deprecated). */ private void processFileDeletedCallback(BObject service, RemoteFileSystemEvent event, - MethodType methodType) { + MethodType methodType, BObject caller) { List deletedFilesList = event.getDeletedFiles(); BString[] deletedFilesBStringArray = new BString[deletedFilesList.size()]; for (int i = 0; i < deletedFilesList.size(); i++) { @@ -252,7 +267,7 @@ private void processFileDeletedCallback(BObject service, RemoteFileSystemEvent e BArray deletedFilesArray = ValueCreator.createArrayValue(deletedFilesBStringArray); Parameter[] params = methodType.getParameters(); - Object[] args = getOnFileDeletedMethodArguments(params, deletedFilesArray); + Object[] args = getOnFileDeletedMethodArguments(params, deletedFilesArray, caller); if (args != null) { invokeOnFileDeletedAsync(service, args); } @@ -262,16 +277,17 @@ private void processFileDeletedCallback(BObject service, RemoteFileSystemEvent e * Processes metadata-only callbacks for the traditional onFileChange method. */ private void processMetadataOnlyCallbacks(BObject service, RemoteFileSystemEvent event, - MethodType methodType) { + MethodType methodType, BObject caller) { Map watchEventParamValues = processWatchEventParamValues(event); Parameter[] params = methodType.getParameters(); - Object[] args = getMethodArguments(params, watchEventParamValues); + Object[] args = getMethodArguments(params, watchEventParamValues, caller); if (args != null) { invokeMethodAsync(service, args); } } - private Object[] getMethodArguments(Parameter[] params, Map watchEventParamValues) { + private Object[] getMethodArguments(Parameter[] params, Map watchEventParamValues, + BObject caller) { if (params.length == 1) { return new Object[] {getWatchEvent(params[0], watchEventParamValues)}; } else if (params.length == 2) { @@ -290,7 +306,7 @@ private Object[] getMethodArguments(Parameter[] params, Map watc return null; } - private Object[] getOnFileDeleteMethodArguments(Parameter[] params, BString deletedFile) { + private Object[] getOnFileDeleteMethodArguments(Parameter[] params, BString deletedFile, BObject caller) { if (params.length == 1) { // Only deletedFile parameter return new Object[] {deletedFile}; @@ -303,7 +319,7 @@ private Object[] getOnFileDeleteMethodArguments(Parameter[] params, BString dele return null; } - private Object[] getOnFileDeletedMethodArguments(Parameter[] params, BArray deletedFiles) { + private Object[] getOnFileDeletedMethodArguments(Parameter[] params, BArray deletedFiles, BObject caller) { if (params.length == 1) { // Only deletedFiles parameter return new Object[] {deletedFiles}; @@ -477,9 +493,9 @@ public void onError(Throwable throwable) { @Override public BError done() { - Set> serviceEntries = registeredServices.entrySet(); - for (Map.Entry serviceEntry : serviceEntries) { - BObject service = serviceEntry.getValue(); + Set> serviceEntries = serviceContexts.entrySet(); + for (Map.Entry serviceEntry : serviceEntries) { + BObject service = serviceEntry.getValue().getService(); try { Object serverConnectorObject = service.getNativeData(FtpConstants.FTP_SERVER_CONNECTOR); if (serverConnectorObject instanceof RemoteFileSystemServerConnector) { @@ -500,31 +516,54 @@ public BError done() { return null; } - protected void addService(BObject service) { - Type serviceType = TypeUtils.getType(service); - if (service != null && serviceType != null && serviceType.getName() != null) { - registeredServices.put(serviceType.getName(), service); - } - } - /** - * Registers a service with its path-based configuration. - * Path must be unique across all services on this listener. + * Registers a service context. * - * @param service The service object - * @param config The service configuration - * @return true if added successfully, false if path is already registered + * @param context The service context + * @return true if added successfully, false if path is already registered or service type is invalid */ - public boolean addServiceConfiguration(BObject service, ServiceConfiguration config) { - String path = config.getPath(); - if (pathToService.putIfAbsent(path, service) != null) { + public boolean addServiceContext(ServiceContext context) { + BObject service = context.getService(); + Type serviceType = TypeUtils.getType(service); + if (service == null || serviceType == null || serviceType.getName() == null) { + return false; + } + ServiceConfiguration config = context.getConfiguration(); + if (config != null) { + String path = config.getPath(); + if (pathToServiceContext.putIfAbsent(path, context) != null) { + return false; + } + usesServiceLevelConfig.set(true); + } + if (serviceContexts.putIfAbsent(serviceType.getName(), context) != null) { + if (config != null) { + pathToServiceContext.remove(config.getPath(), context); + if (pathToServiceContext.isEmpty()) { + usesServiceLevelConfig.set(false); + } + } return false; } - serviceConfigurations.put(path, config); - usesServiceLevelConfig.set(true); return true; } + public void removeServiceContext(ServiceContext context) { + BObject service = context.getService(); + Type serviceType = TypeUtils.getType(service); + if (serviceType == null || serviceType.getName() == null) { + return; + } + serviceContexts.remove(serviceType.getName(), context); + ServiceConfiguration config = context.getConfiguration(); + if (config != null) { + pathToServiceContext.remove(config.getPath(), context); + if (pathToServiceContext.isEmpty()) { + usesServiceLevelConfig.set(false); + } + } + } + /** * Gets the service configuration for a given path. * @@ -532,7 +571,8 @@ public boolean addServiceConfiguration(BObject service, ServiceConfiguration con * @return The service configuration, or null if not configured */ public ServiceConfiguration getServiceConfiguration(String path) { - return serviceConfigurations.get(path); + ServiceContext context = pathToServiceContext.get(path); + return context == null ? null : context.getConfiguration(); } /** @@ -550,16 +590,24 @@ public boolean usesServiceLevelConfig() { * @return The count of registered services */ public int getServiceCount() { - return registeredServices.size(); + return serviceContexts.size(); + } + + public Iterable getServiceContexts() { + return serviceContexts.values(); } /** * Gets all service configurations. * - * @return Map of service name to configuration + * @return Map of path to configuration */ public Map getServiceConfigurations() { - return serviceConfigurations; + Map configs = new HashMap<>(); + for (Map.Entry entry : pathToServiceContext.entrySet()) { + configs.put(entry.getKey(), entry.getValue().getConfiguration()); + } + return configs; } public void setCaller(BObject caller) { @@ -570,13 +618,17 @@ public BObject getCaller() { return caller; } + public void setLegacyListenerPath(String legacyListenerPath) { + this.legacyListenerPath = legacyListenerPath; + } + void cleanup() { - registeredServices.clear(); + serviceContexts.clear(); usesServiceLevelConfig.set(false); - serviceConfigurations.clear(); - pathToService.clear(); + pathToServiceContext.clear(); caller = null; fileSystemManager = null; fileSystemOptions = null; + legacyListenerPath = null; } } diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java index daccb8b0..e0552f53 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/FtpListenerHelper.java @@ -24,6 +24,7 @@ import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; import io.ballerina.runtime.api.types.Parameter; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BArray; @@ -130,6 +131,7 @@ public static Object register(BObject ftpListener, BObject service) { // Check for @ftp:ServiceConfig annotation Optional> serviceConfigAnnotation = getServiceConfigAnnotation(service); + ServiceConfiguration serviceConfiguration = null; // First service registration - create the appropriate connector type if (ftpConnector == null) { @@ -137,7 +139,7 @@ public static Object register(BObject ftpListener, BObject service) { // First service has @ServiceConfig - create multi-path connector try { ServiceConfiguration config = parseServiceConfiguration(serviceConfigAnnotation.get()); - listener.addServiceConfiguration(service, config); + serviceConfiguration = config; log.warn("Creating multi-path connector for service with @ftp:ServiceConfig annotation " + "and path '{}'. The listener-level 'path' configuration will be ignored.", config.getPath()); @@ -203,30 +205,36 @@ public static Object register(BObject ftpListener, BObject service) { if (serviceConfigAnnotation.isPresent()) { try { ServiceConfiguration config = parseServiceConfiguration(serviceConfigAnnotation.get()); + serviceConfiguration = config; // Path must be unique across services on the same listener - if (!listener.addServiceConfiguration(service, config)) { + if (listener.getServiceConfiguration(config.getPath()) != null) { return FtpUtil.createError( "Duplicate path '" + config.getPath() + "' in @ftp:ServiceConfig. " + "Each service must monitor a unique path.", null, InvalidConfigError.errorType()); } - log.debug("Service registered with path: {}", config.getPath()); // Add consumer for this service's path - if (ftpConnector instanceof MultiPathServerConnector) { - MultiPathServerConnector multiPathConnector = (MultiPathServerConnector) ftpConnector; - if (!multiPathConnector.hasConsumerForPath(config.getPath())) { - log.debug("Adding path consumer for: {}", config.getPath()); - multiPathConnector.addPathConsumer( - config.getPath(), - config.getFileNamePattern(), - config.getMinAge(), - config.getMaxAge(), - config.getAgeCalculationMode(), - config.getDependencyConditions() - ); - } + if (!(ftpConnector instanceof MultiPathServerConnector)) { + return FtpUtil.createError( + "Invalid connector state: expected multi-path connector for services with " + + "@ftp:ServiceConfig annotation.", null, InvalidConfigError.errorType()); + } + + MultiPathServerConnector multiPathConnector = (MultiPathServerConnector) ftpConnector; + if (!multiPathConnector.hasConsumerForPath(config.getPath())) { + log.debug("Adding path consumer for: {}", config.getPath()); + multiPathConnector.addPathConsumer( + config.getPath(), + config.getFileNamePattern(), + config.getMinAge(), + config.getMaxAge(), + config.getAgeCalculationMode(), + config.getDependencyConditions() + ); } + + log.debug("Service registered with path: {}", config.getPath()); } catch (FtpInvalidConfigException e) { return FtpUtil.createError(e.getMessage(), findRootCause(e), InvalidConfigError.errorType()); } catch (RemoteFileSystemConnectorException e) { @@ -235,7 +243,12 @@ public static Object register(BObject ftpListener, BObject service) { } } - listener.addService(service); + FormatMethodsHolder formatMethodsHolder; + try { + formatMethodsHolder = new FormatMethodsHolder(service); + } catch (FtpInvalidConfigException e) { + return FtpUtil.createError(e.getMessage(), e, FtpUtil.ErrorType.InvalidConfigError.errorType()); + } // Check if caller is needed (for onFileChange with 2 params or content methods with caller param) boolean needsCaller = false; @@ -265,18 +278,46 @@ public static Object register(BObject ftpListener, BObject service) { } } - if (!needsCaller) { - return null; - } - if (listener.getCaller() != null) { - return null; + // Post-processing actions require a caller even if the handler doesn't accept one + if (!needsCaller && formatMethodsHolder.hasPostProcessingActions()) { + needsCaller = true; } - BMap serviceEndpointConfig = (BMap) ftpListener.getNativeData(FTP_SERVICE_ENDPOINT_CONFIG); - BObject caller = createCaller(serviceEndpointConfig); - if (caller instanceof BError) { - return caller; - } else { - listener.setCaller(caller); + + ServiceContext context = new ServiceContext(service, serviceConfiguration, formatMethodsHolder, null); + if (!listener.addServiceContext(context)) { + if (serviceConfiguration != null) { + return FtpUtil.createError( + "Duplicate path '" + serviceConfiguration.getPath() + "' in @ftp:ServiceConfig. " + + "Each service must monitor a unique path.", null, InvalidConfigError.errorType()); + } + return FtpUtil.createError("Failed to register service.", null, InvalidConfigError.errorType()); + } + + if (needsCaller) { + BMap serviceEndpointConfig = + (BMap) ftpListener.getNativeData(FTP_SERVICE_ENDPOINT_CONFIG); + BObject caller; + if (serviceConfiguration != null) { + BMap callerConfig = + createCallerConfigWithPath(serviceEndpointConfig, serviceConfiguration.getPath()); + BObject createdCaller = createCaller(callerConfig); + if (TypeUtils.getType(createdCaller).getTag() == TypeTags.ERROR_TAG) { + listener.removeServiceContext(context); + return createdCaller; + } + caller = createdCaller; + } else { + if (listener.getCaller() == null) { + BObject createdCaller = createCaller(serviceEndpointConfig); + if (TypeUtils.getType(createdCaller).getTag() == TypeTags.ERROR_TAG) { + listener.removeServiceContext(context); + return createdCaller; + } + listener.setCaller(createdCaller); + } + caller = listener.getCaller(); + } + context.setCaller(caller); } return null; } @@ -330,6 +371,11 @@ private static Object createLegacySinglePathConnector(BObject ftpListener, FtpLi (RemoteFileSystemServerConnectorImpl) serverConnector; listener.setFileSystemManager(connectorImpl.getFileSystemManager()); listener.setFileSystemOptions(connectorImpl.getFileSystemOptions()); + BString path = serviceEndpointConfig.getStringValue( + StringUtils.fromString(FtpConstants.ENDPOINT_CONFIG_PATH)); + if (path != null && !path.getValue().isEmpty()) { + listener.setLegacyListenerPath(path.getValue()); + } } ftpListener.addNativeData(FtpConstants.FTP_SERVER_CONNECTOR, serverConnector); @@ -782,6 +828,15 @@ private static BObject createCaller(BMap serviceEndpointConfig) return ValueCreator.createObjectValue(ModuleUtils.getModule(), FTP_CALLER, client); } + private static BMap createCallerConfigWithPath(BMap baseConfig, String path) { + Map configCopy = new HashMap<>(); + for (BString key : baseConfig.getKeys()) { + configCopy.put(key.getValue(), baseConfig.get(key)); + } + configCopy.put(FtpConstants.ENDPOINT_CONFIG_PATH, StringUtils.fromString(path)); + return ValueCreator.createRecordValue(ModuleUtils.getModule(), "ListenerConfiguration", configCopy); + } + public static Object closeCaller(Environment env, BObject ftpListener) { RemoteFileSystemServerConnector ftpConnector = (RemoteFileSystemServerConnector) ftpListener .getNativeData(FtpConstants.FTP_SERVER_CONNECTOR); @@ -789,10 +844,28 @@ public static Object closeCaller(Environment env, BObject ftpListener) { return null; } FtpListener listener = ftpConnector.getFtpListener(); + if (listener.usesServiceLevelConfig()) { + for (ServiceContext context : listener.getServiceContexts()) { + BObject caller = context.getCaller(); + if (caller == null) { + continue; + } + Object result = closeCallerClient(env, caller); + if (result != null) { + return result; + } + } + return null; + } + BObject caller = listener.getCaller(); if (caller == null) { return null; } + return closeCallerClient(env, caller); + } + + private static Object closeCallerClient(Environment env, BObject caller) { BObject ftpClient = caller.getObjectValue(CLIENT_INSTANCE); return env.yieldAndRun(() -> { StrandMetadata strandMetadata = new StrandMetadata(true, diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/PostProcessAction.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/PostProcessAction.java new file mode 100644 index 00000000..29a34e27 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/PostProcessAction.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.server; + +/** + * Represents a post-processing action to perform on a file after processing. + * Can be either DELETE or MOVE action. + */ +public class PostProcessAction { + + public enum ActionType { + DELETE, + MOVE + } + + private final ActionType actionType; + private final String moveTo; + private final boolean preserveSubDirs; + + private PostProcessAction(ActionType actionType, String moveTo, boolean preserveSubDirs) { + this.actionType = actionType; + this.moveTo = moveTo; + this.preserveSubDirs = preserveSubDirs; + } + + /** + * Creates a DELETE action. + * + * @return A PostProcessAction representing DELETE + */ + public static PostProcessAction delete() { + return new PostProcessAction(ActionType.DELETE, null, false); + } + + /** + * Creates a MOVE action. + * + * @param moveTo The destination directory path + * @param preserveSubDirs Whether to preserve subdirectory structure + * @return A PostProcessAction representing MOVE + */ + public static PostProcessAction move(String moveTo, boolean preserveSubDirs) { + return new PostProcessAction(ActionType.MOVE, moveTo, preserveSubDirs); + } + + public ActionType getActionType() { + return actionType; + } + + public String getMoveTo() { + return moveTo; + } + + public boolean isPreserveSubDirs() { + return preserveSubDirs; + } + + public boolean isDelete() { + return actionType == ActionType.DELETE; + } + + public boolean isMove() { + return actionType == ActionType.MOVE; + } + + @Override + public String toString() { + if (isDelete()) { + return "DELETE"; + } else { + return "MOVE{moveTo='" + moveTo + "', preserveSubDirs=" + preserveSubDirs + "}"; + } + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceConfiguration.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceConfiguration.java index bbf706cb..410ac1e4 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceConfiguration.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceConfiguration.java @@ -1,12 +1,12 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceContext.java b/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceContext.java new file mode 100644 index 00000000..5619a145 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/ftp/server/ServiceContext.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.ftp.server; + +import io.ballerina.runtime.api.values.BObject; + +/** + * Encapsulates per-service state required by the FTP listener. + */ +public class ServiceContext { + + private final BObject service; + private final ServiceConfiguration configuration; + private final FormatMethodsHolder formatMethodsHolder; + private volatile BObject caller; + + public ServiceContext(BObject service, ServiceConfiguration configuration, + FormatMethodsHolder formatMethodsHolder, BObject caller) { + this.service = service; + this.configuration = configuration; + this.formatMethodsHolder = formatMethodsHolder; + this.caller = caller; + } + + public BObject getService() { + return service; + } + + public ServiceConfiguration getConfiguration() { + return configuration; + } + + public FormatMethodsHolder getFormatMethodsHolder() { + return formatMethodsHolder; + } + + public BObject getCaller() { + return caller; + } + + public void setCaller(BObject caller) { + this.caller = caller; + } + + public String getPath() { + return configuration == null ? null : configuration.getPath(); + } +} diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/MultiPathServerConnector.java b/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/MultiPathServerConnector.java index 0b95b0d4..9617774f 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/MultiPathServerConnector.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/MultiPathServerConnector.java @@ -1,12 +1,12 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/PathSpecificListener.java b/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/PathSpecificListener.java index 7f43e8e8..868fad13 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/PathSpecificListener.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/transport/server/connector/contractimpl/PathSpecificListener.java @@ -1,12 +1,12 @@ /* - * Copyright (c) 2026 WSO2 LLC. (http://www.wso2.com) All Rights Reserved. + * Copyright (c) 2026, WSO2 LLC. (http://www.wso2.com). * * WSO2 LLC. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an diff --git a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java index 0c888781..997ba61a 100644 --- a/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java +++ b/native/src/main/java/io/ballerina/stdlib/ftp/util/FtpConstants.java @@ -226,4 +226,10 @@ private FtpConstants() { // Service-level configuration annotation public static final String SERVICE_CONFIG_ANNOTATION = "ServiceConfig"; public static final String SERVICE_CONFIG_PATH = "path"; + + // Post-processing action annotation fields + public static final String ANNOTATION_AFTER_PROCESS = "afterProcess"; + public static final String ANNOTATION_AFTER_ERROR = "afterError"; + public static final String ANNOTATION_MOVE_TO = "moveTo"; + public static final String ANNOTATION_PRESERVE_SUB_DIRS = "preserveSubDirs"; } diff --git a/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java b/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java index b2da7825..64a03ff3 100644 --- a/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java +++ b/test-utils/src/main/java/io/ballerina/stdlib/ftp/testutils/mockServerUtils/MockFtpServer.java @@ -149,6 +149,11 @@ public static Object initFtpServer() throws Exception { fileSystem.add(new DirectoryEntry("/home/in/advanced/dependency")); fileSystem.add(new FileEntry("/home/in/dependency/.init", "")); + fileSystem.add(new DirectoryEntry("/home/in/post-process")); + fileSystem.add(new FileEntry("/home/in/post-process/.init", "")); + fileSystem.add(new DirectoryEntry("/home/in/post-process-archive")); + fileSystem.add(new DirectoryEntry("/home/in/post-process-error")); + ftpServer.setFileSystem(fileSystem); ftpServer.start(); logger.info("Starting FTP server...");