Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public type KafkaOffsetStorage record {|

# Represents the base configuration for a database connection.
#
# + connectorClass - The class name of the database connector implementation to use
# + hostname - The hostname of the database server
# + port - The port number of the database server
# + username - The username for the database connection
Expand All @@ -181,6 +182,7 @@ public type KafkaOffsetStorage record {|
# + includedColumns - A list of regular expressions matching fully-qualified column identifiers to capture changes from (should not be used alongside columnExclude)
# + excludedColumns - A list of regular expressions matching fully-qualified column identifiers to exclude from change capture (should not be used alongside columnInclude)
public type DatabaseConnection record {|
string connectorClass;
string hostname;
int port;
string username;
Expand All @@ -196,6 +198,7 @@ public type DatabaseConnection record {|

# Represents the configuration for a MySQL database connection.
#
# + connectorClass - The class name of the MySQL connector implementation to use
# + hostname - The hostname of the MySQL server
# + port - The port number of the MySQL server
# + databaseServerId - The unique identifier for the MySQL server
Expand All @@ -205,6 +208,7 @@ public type DatabaseConnection record {|
# + secure - The connector establishes an encrypted connection if the server supports secure connections
public type MySqlDatabaseConnection record {|
*DatabaseConnection;
string connectorClass = "io.debezium.connector.mysql.MySqlConnector";
string hostname = "localhost";
int port = 3306;
string databaseServerId = (checkpanic random:createIntInRange(0, 100000)).toString();
Expand All @@ -216,6 +220,7 @@ public type MySqlDatabaseConnection record {|

# Represents the configuration for an MSSQL database connection.
#
# + connectorClass - The class name of the MSSQL connector implementation to use
# + hostname - The hostname of the MSSQL server
# + port - The port number of the MSSQL server
# + databaseInstance - The name of the database instance
Expand All @@ -225,6 +230,7 @@ public type MySqlDatabaseConnection record {|
# + tasksMax - The maximum number of tasks to create for this connector. If the `databaseNames` contains more than one element, you can increase the value of this property to a number less than or equal to the number of elements in the list
public type MsSqlDatabaseConnection record {|
*DatabaseConnection;
string connectorClass = "io.debezium.connector.sqlserver.SqlServerConnector";
string hostname = "localhost";
int port = 1433;
string databaseInstance?;
Expand All @@ -236,6 +242,7 @@ public type MsSqlDatabaseConnection record {|

# Represents the configuration for a PostgreSQL CDC connector.
#
# + connectorClass - The class name of the PostgreSQL connector implementation to use
# + hostname - The hostname of the PostgreSQL server
# + port - The port number of the PostgreSQL server
# + databaseName - The name of the PostgreSQL database from which to stream the changes.
Expand All @@ -247,6 +254,7 @@ public type MsSqlDatabaseConnection record {|
# + publicationName - The name of the PostgreSQL publication created for streaming changes when using pgoutput.
public type PostgresDatabaseConnection record {|
*DatabaseConnection;
string connectorClass = "io.debezium.connector.postgresql.PostgresConnector";
string hostname = "localhost";
int port = 5432;
string databaseName;
Expand All @@ -260,6 +268,7 @@ public type PostgresDatabaseConnection record {|

# Represents the configuration for a Oracle CDC connector.
#
# + connectorClass - The class name of the Oracle connector implementation to use
# + hostname - The hostname of the Oracle server
# + port - The port number of the Oracle server
# + url - JDBC url
Expand All @@ -271,6 +280,7 @@ public type PostgresDatabaseConnection record {|
# + tasksMax - The Oracle connector always uses a single task and therefore does not use this value, so the default is always acceptable
public type OracleDatabaseConnection record {|
*DatabaseConnection;
string connectorClass = "io.debezium.connector.oracle.OracleConnector";
string hostname = "localhost";
int port = 1521;
string url?;
Expand All @@ -282,73 +292,68 @@ public type OracleDatabaseConnection record {|
int tasksMax = 1;
|};

# Represents the base configuration for the CDC engine.
# Provides a set of additional configurations related to the cdc connection.
#
# + engineName - The name of the CDC engine
# + connectorClass - The class name of the connector implementation to use
# + maxQueueSize - The maximum size of the queue for events
# + maxBatchSize - The maximum size of the batch for events
# + queryTimeout - Specifies the time, in seconds, that the connector waits for a query to complete. Set the value to 0 (zero) to remove the timeout l
# + internalSchemaStorage - The internal schema history configuration
# + offsetStorage - The offset storage configuration
# + eventProcessingFailureHandlingMode - The mode for handling event processing failures
# + snapshotMode - The mode for capturing snapshots
# + eventProcessingFailureHandlingMode - The mode for handling event processing failures
# + skippedOperations - The list of operations to skip
# + skipMessagesWithoutChange - Whether to skip messages without changes
# + sendTombstonesOnDelete - Whether to include tombstones on delete operations
# + decimalHandlingMode - The mode for handling decimal values from the database
public type ListenerConfiguration record {|
string engineName = "ballerina-cdc-connector";
string connectorClass;
# + maxQueueSize - The maximum size of the queue for events
# + maxBatchSize - The maximum size of the batch for events
# + queryTimeout - Specifies the time, in seconds, that the connector waits for a query to complete. Set the value to 0 (zero) to remove the timeout
public type Options record {|
SnapshotMode snapshotMode = INITIAL;
EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode = WARN;
Operation[] skippedOperations = [TRUNCATE];
boolean skipMessagesWithoutChange = false;
DecimalHandlingMode decimalHandlingMode = DOUBLE;
int maxQueueSize = 8192;
int maxBatchSize = 2048;
decimal queryTimeout = 60;
|};

# Represents the base configuration for the CDC engine.
#
# + engineName - The name of the CDC engine
# + internalSchemaStorage - The internal schema history configuration
# + offsetStorage - The offset storage configuration
# + options - The additional options for the CDC engine
public type ListenerConfiguration record {|
string engineName = "ballerina-cdc-connector";
FileInternalSchemaStorage|KafkaInternalSchemaStorage internalSchemaStorage = {};
FileOffsetStorage|KafkaOffsetStorage offsetStorage = {};
EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode = WARN;
SnapshotMode snapshotMode = INITIAL;
Operation[] skippedOperations = [TRUNCATE];
boolean skipMessagesWithoutChange = false;
boolean sendTombstonesOnDelete = false;
DecimalHandlingMode decimalHandlingMode = DOUBLE;
Options options = {};
|};

# Represents the configuration for a MySQL CDC connector.
#
# + connectorClass - The class name of the MySQL connector implementation to use
# + database - The MySQL database connection configuration
public type MySqlListenerConfiguration record {|
*ListenerConfiguration;
string connectorClass = "io.debezium.connector.mysql.MySqlConnector";
MySqlDatabaseConnection database;
*ListenerConfiguration;
|};

# Represents the configuration for an MSSQL CDC connector.
#
# + connectorClass - The class name of the MSSQL connector implementation to use
# + database - The MSSQL database connection configuration
public type MsSqlListenerConfiguration record {|
*ListenerConfiguration;
string connectorClass = "io.debezium.connector.sqlserver.SqlServerConnector";
MsSqlDatabaseConnection database;
*ListenerConfiguration;
|};

# Represents the configuration for a Postgres CDC connector.
#
# + connectorClass - The class name of the Postgres connector implementation to use
# + database - The Postgres database connection configuration
public type PostgresListenerConfiguration record {|
*ListenerConfiguration;
string connectorClass = "io.debezium.connector.postgresql.PostgresConnector";
PostgresDatabaseConnection database;
*ListenerConfiguration;
|};

# Represents the configuration for an Oracle CDC connector.
#
# + connectorClass - The class name of the Oracle connector implementation to use
# + database - The Oracle database connection configuration
public type OracleListenerConfiguration record {|
*ListenerConfiguration;
string connectorClass = "io.debezium.connector.oracle.OracleConnector";
OracleDatabaseConnection database;
*ListenerConfiguration;
|};
36 changes: 17 additions & 19 deletions ballerina/utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -77,38 +77,35 @@ const string ORACLE_URL = "database.url";
const string ORACLE_PDB_NAME = "database.dbname";
const string ORACLE_CONNECTION_ADAPTER = "database.connection.adapter";

isolated function getDebeziumProperties(MySqlListenerConfiguration|MsSqlListenerConfiguration|PostgresListenerConfiguration|OracleListenerConfiguration config) returns map<string> & readonly{
isolated function getDebeziumProperties(MySqlListenerConfiguration|MsSqlListenerConfiguration|PostgresListenerConfiguration|OracleListenerConfiguration config) returns map<string> & readonly {
map<string> configMap = {};
configMap[NAME] = config.engineName;

// Common configurations
populateCommonConfigurations(config, configMap);

// Schema history storage configurations
populateSchemaHistoryConfigurations(config.internalSchemaStorage, configMap);

// Offset storage configurations
populateOffsetStorageConfigurations(config.offsetStorage, configMap);

// Database-specific configurations
populateDatabaseConfigurations(config.database, configMap);

populateOptions(config.options, configMap);

// The following values cannot be overridden by the user
configMap[TOMBSTONES_ON_DELETE] = "false";
configMap[INCLUDE_SCHEMA_CHANGES] = "false";

return configMap.cloneReadOnly();
}

// Populates common configurations shared across all databases
isolated function populateCommonConfigurations(MySqlListenerConfiguration|MsSqlListenerConfiguration|PostgresListenerConfiguration|OracleListenerConfiguration config, map<string> configMap) {
configMap[NAME] = config.engineName;
configMap[CONNECTOR_CLASS] = config.connectorClass;
configMap[MAX_QUEUE_SIZE] = config.maxQueueSize.toString();
configMap[MAX_BATCH_SIZE] = config.maxBatchSize.toString();
configMap[EVENT_PROCESSING_FAILURE_HANDLING_MODE] = config.eventProcessingFailureHandlingMode;
configMap[SNAPSHOT_MODE] = config.snapshotMode;
configMap[SKIPPED_OPERATIONS] = string:'join(",", ...config.skippedOperations);
configMap[SKIP_MESSAGES_WITHOUT_CHANGE] = config.skipMessagesWithoutChange.toString();
configMap[TOMBSTONES_ON_DELETE] = config.sendTombstonesOnDelete.toString();
configMap[DECIMAL_HANDLING_MODE] = config.decimalHandlingMode;
configMap[DATABASE_QUERY_TIMEOUTS_MS] = getMillisecondValueOf(config.queryTimeout);
isolated function populateOptions(Options options, map<string> configMap) {
configMap[MAX_QUEUE_SIZE] = options.maxQueueSize.toString();
configMap[MAX_BATCH_SIZE] = options.maxBatchSize.toString();
configMap[EVENT_PROCESSING_FAILURE_HANDLING_MODE] = options.eventProcessingFailureHandlingMode;
configMap[SNAPSHOT_MODE] = options.snapshotMode;
configMap[SKIPPED_OPERATIONS] = string:'join(",", ...options.skippedOperations);
configMap[SKIP_MESSAGES_WITHOUT_CHANGE] = options.skipMessagesWithoutChange.toString();
configMap[DECIMAL_HANDLING_MODE] = options.decimalHandlingMode;
configMap[DATABASE_QUERY_TIMEOUTS_MS] = getMillisecondValueOf(options.queryTimeout);
}

// Populates schema history storage configurations
Expand Down Expand Up @@ -144,6 +141,7 @@ isolated function populateOffsetStorageConfigurations(FileOffsetStorage|KafkaOff

// Populates database-specific configurations
isolated function populateDatabaseConfigurations(MySqlDatabaseConnection|MsSqlDatabaseConnection|PostgresDatabaseConnection|OracleDatabaseConnection connection, map<string> configMap) {
configMap[CONNECTOR_CLASS] = connection.connectorClass;
configMap[DATABASE_HOSTNAME] = connection.hostname;
configMap[DATABASE_PORT] = connection.port.toString();
configMap[DATABASE_USER] = connection.username;
Expand Down
Loading