diff --git a/.gitignore b/.gitignore index 0f2eb75..9aabdb1 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,5 @@ examples/**/Config.toml # Environment files *.env + +examples/*/tmp diff --git a/README.md b/README.md index c97a9a1..b7afc73 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,11 @@ bal run ## Examples +The `cdc` module provides practical examples illustrating its usage in various real-world scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples) to understand how to capture and process database change events effectively. + +1. [Fraud Detection](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples/fraud-detection) - Detect suspicious transactions in a financial database and send fraud alerts via email. This example showcases how to integrate the CDC module with the Gmail connector to notify stakeholders of potential fraud. + +2. [Cache Management](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples/cache-management) - Synchronize a Redis cache with changes in a MySQL database. It listens to changes in the `products`, `vendors`, and `product_reviews` tables and updates the Redis cache accordingly. ## Issues and projects diff --git a/ballerina/README.md b/ballerina/README.md index c3577b4..5d02fb0 100644 --- a/ballerina/README.md +++ b/ballerina/README.md @@ -209,3 +209,9 @@ bal run ``` ## Examples + +The `cdc` module provides practical examples illustrating its usage in various real-world scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples) to understand how to capture and process database change events effectively. + +1. [Fraud Detection](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples/fraud-detection) - Detect suspicious transactions in a financial database and send fraud alerts via email. This example showcases how to integrate the CDC module with the Gmail connector to notify stakeholders of potential fraud. + +2. [Cache Management](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples/cache-management) - Synchronize a Redis cache with changes in a MySQL database. It listens to changes in the `products`, `vendors`, and `product_reviews` tables and updates the Redis cache accordingly. diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..bd51766 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,41 @@ +# Examples + +The `cdc` module provides practical examples illustrating its usage in various real-world scenarios. Explore these [examples](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples) to understand how to capture and process database change events effectively. + +1. [Fraud Detection](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples/fraud-detection) - Detect suspicious transactions in a financial database and send fraud alerts via email. This example showcases how to integrate the CDC module with the Gmail connector to notify stakeholders of potential fraud. + +2. [Cache Management](https://github.com/ballerina-platform/module-ballerinax-cdc/tree/main/examples/cache-management) - Synchronize a Redis cache with changes in a MySQL database. It listens to changes in the `products`, `vendors`, and `product_reviews` tables and updates the Redis cache accordingly. + +## Running an Example + +Execute the following commands to build an example from the source: + +* To build an example: + + ```bash + bal build + ``` + +* To run an example: + + ```bash + bal run + ``` + +## Building the Examples with the Local Module + +**Warning**: Due to the absence of support for reading local repositories for single Ballerina files, the Bala of the module is manually written to the central repository as a workaround. Consequently, the bash script may modify your local Ballerina repositories. + +Execute the following commands to build all the examples against the changes you have made to the module locally: + +* To build all the examples: + + ```bash + ./build.sh build + ``` + +* To run all the examples: + + ```bash + ./build.sh run + ``` diff --git a/examples/cache-management/.github/README.md b/examples/cache-management/.github/README.md new file mode 120000 index 0000000..7a8e27c --- /dev/null +++ b/examples/cache-management/.github/README.md @@ -0,0 +1 @@ +../Cache Management.md \ No newline at end of file diff --git a/examples/cache-management/Ballerina.toml b/examples/cache-management/Ballerina.toml new file mode 100644 index 0000000..9c54c94 --- /dev/null +++ b/examples/cache-management/Ballerina.toml @@ -0,0 +1,17 @@ +[package] +org = "wso2" +name = "cache_management" +version = "0.1.0" +distribution = "2201.12.2" + +[[dependency]] +org="ballerinax" +name="cdc" +version="0.1.0" +repository="local" + +[[dependency]] +org="ballerinax" +name="cdc.mysql.driver" +version="0.1.0" +repository="local" diff --git a/examples/cache-management/Cache Management.md b/examples/cache-management/Cache Management.md new file mode 100644 index 0000000..1d338eb --- /dev/null +++ b/examples/cache-management/Cache Management.md @@ -0,0 +1,78 @@ +# Cache Management + +This example demonstrates how to use the Ballerina Change Data Capture (CDC) module to synchronize a Redis cache with changes in a MySQL database. It listens to changes in the `products`, `vendors`, and `product_reviews` tables and updates the Redis cache accordingly. + +## Setup Guide + +### 1. MySQL Database + +1. Refer to the [Setup Guide](https://central.ballerina.io/ballerinax/cdc/latest#setup-guide) for the necessary steps to enable CDC in the MySQL server. + +2. Add the necessary schema and data using the `setup.sql` script: + ```bash + mysql -u -p < db_scripts/setup.sql + ``` + +### 2. Redis Server + +Ensure a Redis server is running on `localhost:6379`. + +### 3. Configuration + +Configure MySQL database credentials in the `Config.toml` file located in the example directory: + +```toml +username = "" +password = "" +``` + +Replace `` and `` with your MySQL database credentials. + +## Setup Guide: Using Docker Compose + +You can use Docker Compose to set up both MySQL and Redis services for this example. Follow these steps: + +### 1. Start the services + +Run the following command to start both MySQL and Redis services: + +```bash +docker-compose up -d +``` + +### 2. Verify the services + +Ensure both `mysql` and `redis` services are in a healthy state: + +```bash +docker-compose ps +``` + +### 3. Configuration + +Ensure the `Config.toml` file is updated with the following credentials: + +```toml +username = "cdc_user" +password = "cdc_password" +``` + +## Run the Example + +1. Execute the following command to run the example: + + ```bash + bal run + ``` + +2. Use the provided `test.sql` script to insert sample transactions into the `products`, `vendors`, and `product_reviews` tables to test the synchronization. Run the following command: + + ```bash + mysql -u -p < db_scripts/test.sql + ``` + +If using docker services, + + ```bash + docker exec -i mysql-cdc mysql -u cdc_user -pcdc_password < db-scripts/test.sql + ``` diff --git a/examples/cache-management/db-scripts/setup.sql b/examples/cache-management/db-scripts/setup.sql new file mode 100644 index 0000000..1093cb2 --- /dev/null +++ b/examples/cache-management/db-scripts/setup.sql @@ -0,0 +1,38 @@ +CREATE DATABASE IF NOT EXISTS store_db; +USE store_db; + +CREATE TABLE vendors ( + id INT PRIMARY KEY, + name VARCHAR(255), + contact_info TEXT +); + +INSERT INTO vendors VALUES +(1, 'Samsung', 'contact@samsung.com'), +(2, 'Apple', 'contact@apple.com'); + +CREATE TABLE products ( + id INT PRIMARY KEY, + name VARCHAR(255), + price DECIMAL(10,2), + description TEXT, + vendor_id INT, + FOREIGN KEY (vendor_id) REFERENCES vendors(id) +); + +INSERT INTO products VALUES +(1001, 'Samsung Galaxy S24', 999.99, 'Flagship phone with AI camera', 1), +(1002, 'Apple iPhone 15 Pro', 1099.00, 'New titanium design', 2); + +CREATE TABLE product_reviews ( + review_id INT PRIMARY KEY, + product_id INT, + rating INT CHECK (rating BETWEEN 1 AND 5), + comment TEXT, + FOREIGN KEY (product_id) REFERENCES products(id) +); + +INSERT INTO product_reviews VALUES +(1, 1001, 5, 'Amazing camera'), +(2, 1001, 4, 'Great battery life'), +(3, 1002, 5, 'Best iPhone yet'); diff --git a/examples/cache-management/db-scripts/test.sql b/examples/cache-management/db-scripts/test.sql new file mode 100644 index 0000000..c693ef8 --- /dev/null +++ b/examples/cache-management/db-scripts/test.sql @@ -0,0 +1,14 @@ +USE store_db; + +UPDATE products +SET price = price * 0.9 +WHERE id = 1002; + +UPDATE product_reviews +SET rating = rating - 1 +WHERE product_id = 1002; + +INSERT products VALUES (1003, "Samsung Galaxy S20", 499.99, "Old Smartphone", 2); + +DELETE FROM products +WHERE id = 1003; \ No newline at end of file diff --git a/examples/cache-management/docker-compose.yml b/examples/cache-management/docker-compose.yml new file mode 100644 index 0000000..fff59ad --- /dev/null +++ b/examples/cache-management/docker-compose.yml @@ -0,0 +1,31 @@ +name: cache-management-example + +services: + mysql: + image: mysql:8.0 + container_name: mysql-cdc + ports: + - "3306:3306" + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: store_db + MYSQL_USER: cdc_user + MYSQL_PASSWORD: cdc_password + volumes: + - ./db-scripts/setup.sql:/docker-entrypoint-initdb.d/setup.sql + healthcheck: + test: [ "CMD", "mysqladmin", "ping", "-h", "localhost" ] + interval: 10s + timeout: 5s + retries: 5 + + redis: + image: redis:latest + container_name: redis-cache + ports: + - "6379:6379" + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + interval: 10s + timeout: 5s + retries: 5 diff --git a/examples/cache-management/main.bal b/examples/cache-management/main.bal new file mode 100644 index 0000000..8871816 --- /dev/null +++ b/examples/cache-management/main.bal @@ -0,0 +1,144 @@ +// Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/log; +import ballerina/os; +import ballerinax/cdc; +import ballerinax/cdc.mysql.driver as _; +import ballerinax/redis; + +configurable string username = os:getEnv("DB_USERNAME"); +configurable string password = os:getEnv("DB_PASSWORD"); + +listener cdc:MySqlListener mysqlListener = new ( + database = { + username, + password, + includedDatabases: "store_db", + includedTables: ["store_db.products", "store_db.vendors", "store_db.product_reviews"] + }, + snapshotMode = cdc:NO_DATA +); + +final redis:Client redis = check new ( + connection = { + host: "localhost", + port: 6379 + } +); + +type Entity record { + int id; +}; + +type ProductReviews record { + int product_id; + int rating; +}; + +@cdc:ServiceConfig { + tables: ["store_db.products", "store_db.vendors"] +} +service cdc:Service on mysqlListener { + + remote function onRead(Entity after, string tableName) returns error? { + _ = check redis->set(string `${tableName}:${after.id}`, after.toJsonString()); + log:printInfo(`'${tableName}' cache entry created for Id: ${after.id}`); + } + + remote function onCreate(Entity after, string tableName) returns error? { + _ = check redis->set(string `product:${after.id}`, after.toJsonString()); + log:printInfo(`'${tableName}' cache entry created for Id: ${after.id}`); + } + + remote function onUpdate(Entity before, Entity after, string tableName) returns error? { + _ = check redis->set(string `product:${after.id}`, after.toJsonString()); + log:printInfo(`'${tableName}' cache entry updated for Id: ${after.id}.`); + } + + remote function onDelete(Entity before, string tableName) returns error? { + int delVal = check redis->del([ + string `${tableName}:${before.id}` + ]); + if tableName == "products" { + _ = check redis->del([ + string `product_tot_rating:${before.id}`, + string `product_reviews:${before.id}` + ]); + log:printInfo(`'products' cache entry deleted for Id: ${before.id}. Redis delete count: ${delVal}`); + } else { + log:printInfo(`'vendors' cache entry deleted for Id: ${before.id}. Redis delete count: ${delVal}`); + } + } + + remote function onError(cdc:Error 'error) returns error? { + log:printInfo(`Error occurred while processing events. Error: ${'error.message()}`); + if 'error is cdc:PayloadBindingError { + log:printInfo(`Error occurred while processing events. Error: ${'error.detail().payload.toBalString()}`); + } + } +} + +@cdc:ServiceConfig { + tables: ["store_db.product_reviews"] +} +service cdc:Service on mysqlListener { + + remote function onRead(ProductReviews after, string tableName) returns error? { + int totalRating = check redis->incrBy(string `product_tot_rating:${after.product_id}`, after.rating); + log:printInfo(`'product_tot_rating' cache added for Product Id: ${after.product_id}. Current total rating: ${totalRating}`); + + int reviews = check redis->incr(string `product_reviews:${after.product_id}`); + log:printInfo(`'product_reviews' cache entry added for Product Id: ${after.product_id}. Current total reviews: ${reviews}`); + } + + remote function onCreate(ProductReviews after, string tableName) returns error? { + int totalRating = check redis->incrBy(string `product_tot_rating:${after.product_id}`, after.rating); + log:printInfo(`'product_tot_rating' cache added for Product Id: ${after.product_id}. Current total rating: ${totalRating}`); + + int reviews = check redis->incr(string `product_reviews:${after.product_id}`); + log:printInfo(`'product_reviews' cache entry added for Product Id: ${after.product_id}. Current total reviews: ${reviews}`); + } + + remote function onUpdate(ProductReviews before, ProductReviews after, string tableName) returns error? { + int ratingDiff = after.rating - before.rating; + + if ratingDiff > 0 { + int updatedRating = check redis->incrBy(string `product_tot_rating:${after.product_id}`, ratingDiff); + log:printInfo(`'product_tot_rating' cache updated for Product Id: ${after.product_id}. Current total rating: ${updatedRating}`); + return; + } + + if ratingDiff < 0 { + int updatedRating = check redis->decrBy(string `product_tot_rating:${after.product_id}`, ratingDiff); + log:printInfo(`'product_tot_rating' cache updated for Product Id: ${after.product_id}. Current total rating: ${updatedRating}`); + return; + } + log:printInfo(`No change in rating for Product ID: ${after.product_id} from table '${tableName}'`); + + } + + remote function onDelete(ProductReviews before, string tableName) returns error? { + int deletedRating = check redis->decrBy(string `product_tot_rating:${before.product_id}`, before.rating); + log:printInfo(`'product_tot_rating' cache deleted for Product Id: ${before.product_id}. Current total rating: ${deletedRating}`); + int reviews = check redis->decr(string `product_reviews:${before.product_id}`); + log:printInfo(`'product_reviews' cache entry deleted for Product Id: ${before.product_id}. Current total reviews: ${reviews}`); + } + + remote function onError(cdc:Error 'error) returns error? { + log:printInfo(`Error occurred while processing events. Error: ${'error.message()}`); + } +} diff --git a/examples/fraud-detection/.github/README.md b/examples/fraud-detection/.github/README.md new file mode 120000 index 0000000..01fef8d --- /dev/null +++ b/examples/fraud-detection/.github/README.md @@ -0,0 +1 @@ +../Fraud Detection.md \ No newline at end of file diff --git a/examples/fraud-detection/Ballerina.toml b/examples/fraud-detection/Ballerina.toml new file mode 100644 index 0000000..8f27ab4 --- /dev/null +++ b/examples/fraud-detection/Ballerina.toml @@ -0,0 +1,17 @@ +[package] +org = "wso2" +name = "fraud_detection" +version = "0.1.0" +distribution = "2201.12.2" + +[[dependency]] +org="ballerinax" +name="cdc" +version="0.1.0" +repository="local" + +[[dependency]] +org="ballerinax" +name="cdc.mysql.driver" +version="0.1.0" +repository="local" diff --git a/examples/fraud-detection/Fraud Detection.md b/examples/fraud-detection/Fraud Detection.md new file mode 100644 index 0000000..d356652 --- /dev/null +++ b/examples/fraud-detection/Fraud Detection.md @@ -0,0 +1,90 @@ +# Fraud Detection + +This example demonstrates how to use the Ballerina CDC module to implement a fraud detection system. The system listens to table changes and processes them to identify potential fraudulent activities. + +## Setup Guide + +### 1. MySQL Database + +1. Refer to the [Setup Guide](https://central.ballerina.io/ballerinax/cdc/latest#setup-guide) for the necessary steps to enable CDC in the MySQL server. + +2. Add the necessary schema and data using the `setup.sql` script: + ```bash + mysql -u -p < db_scripts/setup.sql + ``` + +### 2. Configuration + +Configure MySQL Database and Gmail API credentials in the `Config.toml` file located in the example directory: + +```toml +username = "" +password = "" + +refreshToken = "" +clientId = "" +clientSecret = "" +recipient = "" +sender = "" +``` + +Replace `` and `` with your MySQL database credentials. + +Replace the Gmail API placeholders (``, ``, ``, ``, ``) with your Gmail API credentials and email addresses. + +## Setup Guide: Using Docker Compose + +You can use Docker Compose to set up MySQL for this example. Follow these steps: + +### 1. Start the service + +Run the following command to start the MySQL service: + +```bash +docker-compose up -d +``` + +### 2. Verify the service + +Ensure `mysql` service is in a healthy state: + +```bash +docker-compose ps +``` + +### 3. Configuration + +Ensure the `Config.toml` file is updated with the following credentials: + +```toml +username = "cdc_user" +password = "cdc_password" + +refreshToken = "" +clientId = "" +clientSecret = "" +recipient = "" +sender = "" +``` + +Replace the Gmail API placeholders (``, ``, ``, ``, ``) with your Gmail API credentials and email addresses. + +## Run the Example + +1. Execute the following command to run the example: + + ```bash + bal run + ``` + +2. Use the provided `test.sql` script to insert a sample transactions into the `trx` table to test the fraud detection system. Use the following SQL command: + + ```bash + mysql -u -p < db_scripts/test.sql + ``` + +If using docker services, + + ```bash + docker exec -i mysql-cdc mysql -u cdc_user -pcdc_password < db-scripts/test.sql + ``` diff --git a/examples/fraud-detection/db-scripts/setup.sql b/examples/fraud-detection/db-scripts/setup.sql new file mode 100644 index 0000000..6110253 --- /dev/null +++ b/examples/fraud-detection/db-scripts/setup.sql @@ -0,0 +1,17 @@ +CREATE DATABASE IF NOT EXISTS finance_db; +USE finance_db; + +-- transactions table +CREATE TABLE transactions ( + tx_id INT AUTO_INCREMENT PRIMARY KEY, + user_id INT, + amount DECIMAL(10,2), + status VARCHAR(50), + created_at DATETIME +); + +-- Sample data +INSERT INTO transactions (user_id, amount, status, created_at) VALUES +(10, 9000.00, 'COMPLETED', '2025-04-01 08:00:00'), +(11, 12000.00, 'COMPLETED', '2025-04-01 08:10:00'), -- this one should trigger fraud logic +(12, 4500.00, 'PENDING', '2025-04-01 08:30:00'); diff --git a/examples/fraud-detection/db-scripts/test.sql b/examples/fraud-detection/db-scripts/test.sql new file mode 100644 index 0000000..8fb542b --- /dev/null +++ b/examples/fraud-detection/db-scripts/test.sql @@ -0,0 +1,7 @@ +USE finance_db; + +INSERT INTO transactions (user_id, amount, status, created_at) VALUES +(11, 2000.00, 'COMPLETED', '2025-04-01 08:10:00'); + +INSERT INTO transactions (user_id, amount, status, created_at) VALUES +(11, 12000.00, 'COMPLETED', '2025-04-01 08:10:00'); diff --git a/examples/fraud-detection/docker-compose.yml b/examples/fraud-detection/docker-compose.yml new file mode 100644 index 0000000..4ddbd36 --- /dev/null +++ b/examples/fraud-detection/docker-compose.yml @@ -0,0 +1,20 @@ +name: fraud-detection-example + +services: + mysql: + image: mysql:8.0 + container_name: mysql-cdc + ports: + - "3306:3306" + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: finance_db + MYSQL_USER: cdc_user + MYSQL_PASSWORD: cdc_password + volumes: + - ./db-scripts/setup.sql:/docker-entrypoint-initdb.d/setup.sql + healthcheck: + test: [ "CMD", "mysqladmin", "ping", "-h", "localhost" ] + interval: 10s + timeout: 5s + retries: 5 diff --git a/examples/fraud-detection/main.bal b/examples/fraud-detection/main.bal new file mode 100644 index 0000000..d1b693e --- /dev/null +++ b/examples/fraud-detection/main.bal @@ -0,0 +1,78 @@ +// Copyright (c) 2025, WSO2 LLC. (http://www.wso2.org). +// +// WSO2 LLC. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import ballerina/log; +import ballerina/os; +import ballerinax/cdc; +import ballerinax/cdc.mysql.driver as _; +import ballerinax/googleapis.gmail; + +configurable string refreshToken = os:getEnv("REFRESH_TOKEN"); +configurable string clientId = os:getEnv("CLIENT_ID"); +configurable string clientSecret = os:getEnv("CLIENT_SECRET"); +configurable string recipient = os:getEnv("RECIPIENT"); +configurable string sender = os:getEnv("SENDER"); + +configurable string username = os:getEnv("DB_USERNAME"); +configurable string password = os:getEnv("DB_PASSWORD"); + +listener cdc:MySqlListener financeDBListener = new ( + database = { + username, + password, + includedDatabases: "finance_db", + includedTables: "finance_db.transactions" + }, + snapshotMode = cdc:NO_DATA, + skippedOperations = [cdc:TRUNCATE, cdc:UPDATE, cdc:DELETE] +); + +final gmail:Client gmail = check new ({ + auth: { + refreshToken, + clientId, + clientSecret + } +}); + +service cdc:Service on financeDBListener { + isolated remote function onCreate(Transactions trx) returns error? { + log:printInfo(`Create trx event received Transaction Id: ${trx.tx_id}`); + if trx.amount > 10000.00 { + string fraudAlert = string `Fraud detected! Transaction Id: ${trx.tx_id}, User Id: ${trx.user_id}, Amount: $${trx.amount}`; + + gmail:MessageRequest message = { + to: [recipient], + subject: "Fraud Alert: Suspicious Transaction Detected", + bodyInText: fraudAlert + }; + + gmail:Message sendResult = check gmail->/users/me/messages/send.post(message); + log:printInfo(`Email sent. Message ID: ${sendResult.id}`); + } + } + + isolated remote function onError(cdc:Error e) { + log:printInfo(`Error occurred: ${e.message()}`); + } +} + +type Transactions record {| + int tx_id; + int user_id; + float amount; + string status; + int created_at; +|};