Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "cdc"
version = "1.0.2"
version = "1.0.3"
distribution = "2201.12.0"
authors = ["Ballerina"]
repository = "https://github.com/ballerina-platform/module-ballerinax-cdc"
Expand All @@ -15,9 +15,9 @@ graalvmCompatible=true

[[platform.java21.dependency]]
groupId = "io.ballerina.lib.cdc"
artifactId = "cdc-native-1.0.2"
version = "1.0.2"
path = "../native/build/libs/cdc-native-1.0.2.jar"
artifactId = "cdc-native-1.0.3-SNAPSHOT"
version = "1.0.3-SNAPSHOT"
path = "../native/build/libs/cdc-native-1.0.3-SNAPSHOT.jar"

[[platform.java21.dependency]]
groupId = "io.debezium"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "cdc-compiler-plugin"
class = "io.ballerina.lib.cdc.compiler.CdcCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/cdc-compiler-plugin-1.0.2.jar"
path = "../compiler-plugin/build/libs/cdc-compiler-plugin-1.0.3-SNAPSHOT.jar"
31 changes: 25 additions & 6 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ org = "ballerina"
name = "lang.object"
version = "0.0.0"

[[package]]
org = "ballerina"
name = "lang.regexp"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.runtime"
Expand All @@ -95,6 +104,16 @@ modules = [
{org = "ballerina", packageName = "lang.runtime", moduleName = "lang.runtime"}
]

[[package]]
org = "ballerina"
name = "lang.string"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.regexp"}
]

[[package]]
org = "ballerina"
name = "lang.value"
Expand Down Expand Up @@ -174,7 +193,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "cdc"
version = "1.0.2"
version = "1.0.3"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "data.jsondata"},
Expand All @@ -183,7 +202,7 @@ dependencies = [
{org = "ballerina", name = "random"},
{org = "ballerina", name = "test"},
{org = "ballerinai", name = "observe"},
{org = "ballerinax", name = "mysql"},
{org = "ballerinax", name = "java.jdbc"},
{org = "ballerinax", name = "mysql.cdc.driver"}
]
modules = [
Expand All @@ -192,18 +211,18 @@ modules = [

[[package]]
org = "ballerinax"
name = "mysql"
version = "1.15.0"
name = "java.jdbc"
version = "1.14.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "crypto"},
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.string"},
{org = "ballerina", name = "sql"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerinax", packageName = "mysql", moduleName = "mysql"}
{org = "ballerinax", packageName = "java.jdbc", moduleName = "java.jdbc"}
]

[[package]]
Expand Down
9 changes: 4 additions & 5 deletions ballerina/tests/listener_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import ballerina/lang.runtime;
import ballerina/test;
import ballerinax/mysql;
import ballerinax/java.jdbc;

Service mysqlTestService =
@ServiceConfig {tables: "store_db.products"}
Expand Down Expand Up @@ -55,11 +55,10 @@ type WrongVendor record {|
int test;
|};

final mysql:Client mysqlClient = check new (host = "localhost",
port = port,
final jdbc:Client mysqlClient = check new (
url = string `jdbc:mysql://localhost:${port}/${database}`,
user = username,
password = password,
database = database
password = password
);

int createEventCount = 0;
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ subprojects {
ballerinaStdLibs "io.ballerina.lib:data.jsondata-ballerina:${stdlibDataJsonDataVersion}"

ballerinaStdLibs "io.ballerina.stdlib:sql-ballerina:${stdlibSqlVersion}"
ballerinaStdLibs "io.ballerina.stdlib:mysql-ballerina:${stdlibMySqlVersion}"
ballerinaStdLibs "io.ballerina.stdlib:java.jdbc-ballerina:${stdlibJdbcVersion}"

ballerinaStdLibs "io.ballerina.lib:mysql.cdc.driver-ballerina:${stdlibMysqlCdcDriverVersion}"
ballerinaStdLibs "io.ballerina.stdlib:mysql.driver-ballerina:${stdlibMysqlDriverVersion}"
Expand Down
18 changes: 18 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

### Changed
- Fixed schema not included in service map key
- Fix data binding error being invoked incorrectly
- Fix payload member throwing null pointer exception

## [1.0.2] - 2025-05-27

### Added
- GraalVM configurations for MSSQL

## [1.0.1] - 2025-05-26

### Added
- GraalVM configurations

## [1.0.0] - 2025-05-22

### Added
- Base cdc:Service, utils, types and compiler plugin
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ stdlibCryptoVersion=2.9.0
stdlibRandomVersion=1.7.0

stdlibSqlVersion=1.16.0
stdlibMySqlVersion=1.15.0
stdlibJdbcVersion=1.14.0

# Ballerinax ObseObserver
observeVersion=1.5.0
Expand Down
31 changes: 19 additions & 12 deletions native/src/main/java/io/ballerina/lib/cdc/BalChangeConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@
if (this.isSingleServiceAttached) {
return this.singleService;
}
String serviceMapKey = payload.getDatabase() + "." + payload.getTable();
StringBuilder serviceMapKeyBuilder = new StringBuilder(payload.getDatabase()).append(".");
if (payload.getSchema() != null && !payload.getSchema().isEmpty()) {
serviceMapKeyBuilder.append(payload.getSchema()).append(".");

Check warning on line 126 in native/src/main/java/io/ballerina/lib/cdc/BalChangeConsumer.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/lib/cdc/BalChangeConsumer.java#L126

Added line #L126 was not covered by tests
}
serviceMapKeyBuilder.append(payload.getTable());
String serviceMapKey = serviceMapKeyBuilder.toString();

if (this.serviceMap.containsKey(serviceMapKey)) {
return this.serviceMap.get(serviceMapKey);
}
Expand Down Expand Up @@ -162,22 +168,23 @@
}

private Object processParameterToIntendedType(Payload payload, String memberKey, Type type) {
try {
Map<String, Object> jsonDataOptions = new HashMap<>();
jsonDataOptions.put(ENABLE_CONSTRAINT_VALIDATION, FALSE);
jsonDataOptions.put(ALLOW_DATA_PROJECTION, FALSE);
BMap<BString, Object> mapValue = ValueCreator.createRecordValue(
io.ballerina.lib.data.ModuleUtils.getModule(),
PARSER_AS_TYPE_OPTIONS, jsonDataOptions);
BTypedesc typeDescValue = ValueCreator.createTypedescValue(TypeUtils.getReferredType(type));
return Native.parseString(
StringUtils.fromString(payload.getPayloadMember(memberKey).toString()), mapValue, typeDescValue);
} catch (BError e) {
Map<String, Object> jsonDataOptions = new HashMap<>();
jsonDataOptions.put(ENABLE_CONSTRAINT_VALIDATION, FALSE);
jsonDataOptions.put(ALLOW_DATA_PROJECTION, FALSE);
BMap<BString, Object> mapValue = ValueCreator.createRecordValue(
io.ballerina.lib.data.ModuleUtils.getModule(),
PARSER_AS_TYPE_OPTIONS, jsonDataOptions);
BTypedesc typeDescValue = ValueCreator.createTypedescValue(TypeUtils.getReferredType(type));
JsonObject payloadMember = payload.getPayloadMember(memberKey);
String memberString = payloadMember == null ? "{}" : payloadMember.toString();
Object parsedRecord = Native.parseString(StringUtils.fromString(memberString), mapValue, typeDescValue);
if (parsedRecord instanceof BError e) {
BMap<BString, Object> detail = ValueCreator.createMapValue();
detail.put(StringUtils.fromString(EVENT_PROCESSING_ERROR_DETAIL_PAYLOAD_FIELD),
JsonUtils.parse(payload.toString()));
throw createError(PAYLOAD_BINDING_ERROR, "Payload binding failed. " + e.getMessage(), e, detail);
}
return parsedRecord;
}

@SuppressWarnings("CallToPrintStackTrace")
Expand Down
11 changes: 10 additions & 1 deletion native/src/main/java/io/ballerina/lib/cdc/models/Payload.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import static io.ballerina.lib.cdc.utils.Constants.EventMembers.DB;
import static io.ballerina.lib.cdc.utils.Constants.EventMembers.OP;
import static io.ballerina.lib.cdc.utils.Constants.EventMembers.SCHEMA;
import static io.ballerina.lib.cdc.utils.Constants.EventMembers.SOURCE;
import static io.ballerina.lib.cdc.utils.Constants.EventMembers.TABLE;

Expand All @@ -39,13 +40,21 @@
}

public JsonObject getPayloadMember(String key) {
return payload.getAsJsonObject(key);
JsonElement element = payload.get(key);
if (element != null && element.isJsonObject()) {
return element.getAsJsonObject();
}
return null;

Check warning on line 47 in native/src/main/java/io/ballerina/lib/cdc/models/Payload.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/lib/cdc/models/Payload.java#L47

Added line #L47 was not covered by tests
}

public String getDatabase() {
return getSourceMember(DB);
}

public String getSchema() {
return getSourceMember(SCHEMA);
}

public String getTable() {
return getSourceMember(TABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static class EventMembers {
public static final String OP = "op";
public static final String PAYLOAD = "payload";
public static final String DB = "db";
public static final String SCHEMA = "schema";
public static final String TABLE = "table";

private EventMembers() {
Expand Down