diff --git a/ballerina/types.bal b/ballerina/types.bal index d304b39..996233b 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -169,6 +169,7 @@ public type KafkaOffsetStorage record {| # Represents the base configuration for a database connection. # +# + connectorClass - The class name of the database connector implementation to use # + hostname - The hostname of the database server # + port - The port number of the database server # + username - The username for the database connection @@ -181,6 +182,7 @@ public type KafkaOffsetStorage record {| # + includedColumns - A list of regular expressions matching fully-qualified column identifiers to capture changes from (should not be used alongside columnExclude) # + excludedColumns - A list of regular expressions matching fully-qualified column identifiers to exclude from change capture (should not be used alongside columnInclude) public type DatabaseConnection record {| + string connectorClass; string hostname; int port; string username; @@ -196,6 +198,7 @@ public type DatabaseConnection record {| # Represents the configuration for a MySQL database connection. # +# + connectorClass - The class name of the MySQL connector implementation to use # + hostname - The hostname of the MySQL server # + port - The port number of the MySQL server # + databaseServerId - The unique identifier for the MySQL server @@ -205,6 +208,7 @@ public type DatabaseConnection record {| # + secure - The connector establishes an encrypted connection if the server supports secure connections public type MySqlDatabaseConnection record {| *DatabaseConnection; + string connectorClass = "io.debezium.connector.mysql.MySqlConnector"; string hostname = "localhost"; int port = 3306; string databaseServerId = (checkpanic random:createIntInRange(0, 100000)).toString(); @@ -216,6 +220,7 @@ public type MySqlDatabaseConnection record {| # Represents the configuration for an MSSQL database connection. # +# + connectorClass - The class name of the MSSQL connector implementation to use # + hostname - The hostname of the MSSQL server # + port - The port number of the MSSQL server # + databaseInstance - The name of the database instance @@ -225,6 +230,7 @@ public type MySqlDatabaseConnection record {| # + tasksMax - The maximum number of tasks to create for this connector. If the `databaseNames` contains more than one element, you can increase the value of this property to a number less than or equal to the number of elements in the list public type MsSqlDatabaseConnection record {| *DatabaseConnection; + string connectorClass = "io.debezium.connector.sqlserver.SqlServerConnector"; string hostname = "localhost"; int port = 1433; string databaseInstance?; @@ -236,6 +242,7 @@ public type MsSqlDatabaseConnection record {| # Represents the configuration for a PostgreSQL CDC connector. # +# + connectorClass - The class name of the PostgreSQL connector implementation to use # + hostname - The hostname of the PostgreSQL server # + port - The port number of the PostgreSQL server # + databaseName - The name of the PostgreSQL database from which to stream the changes. @@ -247,6 +254,7 @@ public type MsSqlDatabaseConnection record {| # + publicationName - The name of the PostgreSQL publication created for streaming changes when using pgoutput. public type PostgresDatabaseConnection record {| *DatabaseConnection; + string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; string hostname = "localhost"; int port = 5432; string databaseName; @@ -260,6 +268,7 @@ public type PostgresDatabaseConnection record {| # Represents the configuration for a Oracle CDC connector. # +# + connectorClass - The class name of the Oracle connector implementation to use # + hostname - The hostname of the Oracle server # + port - The port number of the Oracle server # + url - JDBC url @@ -271,6 +280,7 @@ public type PostgresDatabaseConnection record {| # + tasksMax - The Oracle connector always uses a single task and therefore does not use this value, so the default is always acceptable public type OracleDatabaseConnection record {| *DatabaseConnection; + string connectorClass = "io.debezium.connector.oracle.OracleConnector"; string hostname = "localhost"; int port = 1521; string url?; @@ -282,73 +292,68 @@ public type OracleDatabaseConnection record {| int tasksMax = 1; |}; -# Represents the base configuration for the CDC engine. +# Provides a set of additional configurations related to the cdc connection. # -# + engineName - The name of the CDC engine -# + connectorClass - The class name of the connector implementation to use -# + maxQueueSize - The maximum size of the queue for events -# + maxBatchSize - The maximum size of the batch for events -# + queryTimeout - Specifies the time, in seconds, that the connector waits for a query to complete. Set the value to 0 (zero) to remove the timeout l -# + internalSchemaStorage - The internal schema history configuration -# + offsetStorage - The offset storage configuration -# + eventProcessingFailureHandlingMode - The mode for handling event processing failures # + snapshotMode - The mode for capturing snapshots +# + eventProcessingFailureHandlingMode - The mode for handling event processing failures # + skippedOperations - The list of operations to skip # + skipMessagesWithoutChange - Whether to skip messages without changes -# + sendTombstonesOnDelete - Whether to include tombstones on delete operations # + decimalHandlingMode - The mode for handling decimal values from the database -public type ListenerConfiguration record {| - string engineName = "ballerina-cdc-connector"; - string connectorClass; +# + maxQueueSize - The maximum size of the queue for events +# + maxBatchSize - The maximum size of the batch for events +# + queryTimeout - Specifies the time, in seconds, that the connector waits for a query to complete. Set the value to 0 (zero) to remove the timeout +public type Options record {| + SnapshotMode snapshotMode = INITIAL; + EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode = WARN; + Operation[] skippedOperations = [TRUNCATE]; + boolean skipMessagesWithoutChange = false; + DecimalHandlingMode decimalHandlingMode = DOUBLE; int maxQueueSize = 8192; int maxBatchSize = 2048; decimal queryTimeout = 60; +|}; + +# Represents the base configuration for the CDC engine. +# +# + engineName - The name of the CDC engine +# + internalSchemaStorage - The internal schema history configuration +# + offsetStorage - The offset storage configuration +# + options - The additional options for the CDC engine +public type ListenerConfiguration record {| + string engineName = "ballerina-cdc-connector"; FileInternalSchemaStorage|KafkaInternalSchemaStorage internalSchemaStorage = {}; FileOffsetStorage|KafkaOffsetStorage offsetStorage = {}; - EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode = WARN; - SnapshotMode snapshotMode = INITIAL; - Operation[] skippedOperations = [TRUNCATE]; - boolean skipMessagesWithoutChange = false; - boolean sendTombstonesOnDelete = false; - DecimalHandlingMode decimalHandlingMode = DOUBLE; + Options options = {}; |}; # Represents the configuration for a MySQL CDC connector. # -# + connectorClass - The class name of the MySQL connector implementation to use # + database - The MySQL database connection configuration public type MySqlListenerConfiguration record {| - *ListenerConfiguration; - string connectorClass = "io.debezium.connector.mysql.MySqlConnector"; MySqlDatabaseConnection database; + *ListenerConfiguration; |}; # Represents the configuration for an MSSQL CDC connector. # -# + connectorClass - The class name of the MSSQL connector implementation to use # + database - The MSSQL database connection configuration public type MsSqlListenerConfiguration record {| - *ListenerConfiguration; - string connectorClass = "io.debezium.connector.sqlserver.SqlServerConnector"; MsSqlDatabaseConnection database; + *ListenerConfiguration; |}; # Represents the configuration for a Postgres CDC connector. # -# + connectorClass - The class name of the Postgres connector implementation to use # + database - The Postgres database connection configuration public type PostgresListenerConfiguration record {| - *ListenerConfiguration; - string connectorClass = "io.debezium.connector.postgresql.PostgresConnector"; PostgresDatabaseConnection database; + *ListenerConfiguration; |}; # Represents the configuration for an Oracle CDC connector. # -# + connectorClass - The class name of the Oracle connector implementation to use # + database - The Oracle database connection configuration public type OracleListenerConfiguration record {| - *ListenerConfiguration; - string connectorClass = "io.debezium.connector.oracle.OracleConnector"; OracleDatabaseConnection database; + *ListenerConfiguration; |}; diff --git a/ballerina/utils.bal b/ballerina/utils.bal index ac21493..b0de5e6 100644 --- a/ballerina/utils.bal +++ b/ballerina/utils.bal @@ -77,38 +77,35 @@ const string ORACLE_URL = "database.url"; const string ORACLE_PDB_NAME = "database.dbname"; const string ORACLE_CONNECTION_ADAPTER = "database.connection.adapter"; -isolated function getDebeziumProperties(MySqlListenerConfiguration|MsSqlListenerConfiguration|PostgresListenerConfiguration|OracleListenerConfiguration config) returns map & readonly{ +isolated function getDebeziumProperties(MySqlListenerConfiguration|MsSqlListenerConfiguration|PostgresListenerConfiguration|OracleListenerConfiguration config) returns map & readonly { map configMap = {}; + configMap[NAME] = config.engineName; - // Common configurations - populateCommonConfigurations(config, configMap); - - // Schema history storage configurations populateSchemaHistoryConfigurations(config.internalSchemaStorage, configMap); - // Offset storage configurations populateOffsetStorageConfigurations(config.offsetStorage, configMap); - // Database-specific configurations populateDatabaseConfigurations(config.database, configMap); + populateOptions(config.options, configMap); + + // The following values cannot be overridden by the user + configMap[TOMBSTONES_ON_DELETE] = "false"; configMap[INCLUDE_SCHEMA_CHANGES] = "false"; + return configMap.cloneReadOnly(); } // Populates common configurations shared across all databases -isolated function populateCommonConfigurations(MySqlListenerConfiguration|MsSqlListenerConfiguration|PostgresListenerConfiguration|OracleListenerConfiguration config, map configMap) { - configMap[NAME] = config.engineName; - configMap[CONNECTOR_CLASS] = config.connectorClass; - configMap[MAX_QUEUE_SIZE] = config.maxQueueSize.toString(); - configMap[MAX_BATCH_SIZE] = config.maxBatchSize.toString(); - configMap[EVENT_PROCESSING_FAILURE_HANDLING_MODE] = config.eventProcessingFailureHandlingMode; - configMap[SNAPSHOT_MODE] = config.snapshotMode; - configMap[SKIPPED_OPERATIONS] = string:'join(",", ...config.skippedOperations); - configMap[SKIP_MESSAGES_WITHOUT_CHANGE] = config.skipMessagesWithoutChange.toString(); - configMap[TOMBSTONES_ON_DELETE] = config.sendTombstonesOnDelete.toString(); - configMap[DECIMAL_HANDLING_MODE] = config.decimalHandlingMode; - configMap[DATABASE_QUERY_TIMEOUTS_MS] = getMillisecondValueOf(config.queryTimeout); +isolated function populateOptions(Options options, map configMap) { + configMap[MAX_QUEUE_SIZE] = options.maxQueueSize.toString(); + configMap[MAX_BATCH_SIZE] = options.maxBatchSize.toString(); + configMap[EVENT_PROCESSING_FAILURE_HANDLING_MODE] = options.eventProcessingFailureHandlingMode; + configMap[SNAPSHOT_MODE] = options.snapshotMode; + configMap[SKIPPED_OPERATIONS] = string:'join(",", ...options.skippedOperations); + configMap[SKIP_MESSAGES_WITHOUT_CHANGE] = options.skipMessagesWithoutChange.toString(); + configMap[DECIMAL_HANDLING_MODE] = options.decimalHandlingMode; + configMap[DATABASE_QUERY_TIMEOUTS_MS] = getMillisecondValueOf(options.queryTimeout); } // Populates schema history storage configurations @@ -144,6 +141,7 @@ isolated function populateOffsetStorageConfigurations(FileOffsetStorage|KafkaOff // Populates database-specific configurations isolated function populateDatabaseConfigurations(MySqlDatabaseConnection|MsSqlDatabaseConnection|PostgresDatabaseConnection|OracleDatabaseConnection connection, map configMap) { + configMap[CONNECTOR_CLASS] = connection.connectorClass; configMap[DATABASE_HOSTNAME] = connection.hostname; configMap[DATABASE_PORT] = connection.port.toString(); configMap[DATABASE_USER] = connection.username;