-
Notifications
You must be signed in to change notification settings - Fork 27
Introduce support for liveness check for MSSQL CDC listener #1170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
177e71f
b08c1ec
41ca431
fdd727b
b9661d5
4de9558
7322dc0
a4bb5b5
262b760
ad6a9e7
5a489b7
7bf8a80
39aed1d
15fe2f2
119c08c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,21 +19,21 @@ import ballerinax/cdc; | |
| public isolated class CdcListener { | ||
| *cdc:Listener; | ||
|
|
||
| private final map<string> & readonly config; | ||
| private final map<anydata> & readonly config; | ||
Nuvindu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private boolean isStarted = false; | ||
| private boolean hasAttachedService = false; | ||
|
|
||
| # Initializes the MSSQL listener with the given configuration. | ||
| # | ||
| # + config - The configuration for the MSSQL connector | ||
| public isolated function init(*MsSqlListenerConfiguration config) { | ||
| map<string> configMap = {}; | ||
| map<string> debeziumConfigs = {}; | ||
| cdc:populateDebeziumProperties({ | ||
| engineName: config.engineName, | ||
| offsetStorage: config.offsetStorage, | ||
| internalSchemaStorage: config.internalSchemaStorage, | ||
| options: config.options | ||
| }, configMap); | ||
| }, debeziumConfigs); | ||
| cdc:populateDatabaseConfigurations({ | ||
| connectorClass: config.database.connectorClass, | ||
| hostname: config.database.hostname, | ||
|
|
@@ -47,9 +47,13 @@ public isolated class CdcListener { | |
| excludedTables: config.database.excludedTables, | ||
| includedColumns: config.database.includedColumns, | ||
| excludedColumns: config.database.excludedColumns | ||
| }, configMap); | ||
| populateMsSqlConfigurations(config.database, configMap); | ||
| self.config = configMap.cloneReadOnly(); | ||
| }, debeziumConfigs); | ||
| populateMsSqlConfigurations(config.database, debeziumConfigs); | ||
| map<anydata> listenerConfigs = { | ||
| ...debeziumConfigs | ||
| }; | ||
| listenerConfigs["livenessInterval"] = config.livenessInterval; | ||
| self.config = listenerConfigs.cloneReadOnly(); | ||
|
Comment on lines
+52
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Find MsSqlListenerConfiguration definition and livenessInterval usage
rg -n "MsSqlListenerConfiguration" -g '*.bal' -C 5Repository: ballerina-platform/module-ballerinax-mssql Length of output: 1585 🏁 Script executed: #!/bin/bash
# Search for livenessInterval declarations and usages
rg -n "livenessInterval" -g '*.bal' -B 2 -A 2Repository: ballerina-platform/module-ballerinax-mssql Length of output: 409 🏁 Script executed: #!/bin/bash
# Check the cdc_listener.bal file around the target lines
cat -n ballerina/cdc_listener.bal | sed -n '40,70p'Repository: ballerina-platform/module-ballerinax-mssql Length of output: 2016 🏁 Script executed: #!/bin/bash
# Search for cdc:ListenerConfiguration in the codebase
rg -n "ListenerConfiguration" -g '*.bal' -B 2 -A 8Repository: ballerina-platform/module-ballerinax-mssql Length of output: 1862 🏁 Script executed: #!/bin/bash
# Check types.bal for the full type definitions
cat -n ballerina/types.balRepository: ballerina-platform/module-ballerinax-mssql Length of output: 10105 🏁 Script executed: #!/bin/bash
# Search for any cdc-related files in the repository
find . -name "*cdc*" -type f 2>/dev/null | head -20Repository: ballerina-platform/module-ballerinax-mssql Length of output: 162 🏁 Script executed: #!/bin/bash
# Search for imports and usage of livenessInterval across the codebase
rg -n "livenessInterval" -g '*.bal' -B 3 -A 3Repository: ballerina-platform/module-ballerinax-mssql Length of output: 511 🏁 Script executed: #!/bin/bash
# Check if there are any Ballerina lock files or dependencies
fd -e "lock" -e "Ballerina.lock" 2>/dev/nullRepository: ballerina-platform/module-ballerinax-mssql Length of output: 68 🏁 Script executed: #!/bin/bash
# Find and read Ballerina.toml
fd -name "Ballerina.toml" -o -name "*.toml" 2>/dev/null | head -5Repository: ballerina-platform/module-ballerinax-mssql Length of output: 68 🏁 Script executed: #!/bin/bash
# Read the Ballerina.toml file
cat Ballerina.toml 2>/dev/null || cat ballerina/Ballerina.toml 2>/dev/nullRepository: ballerina-platform/module-ballerinax-mssql Length of output: 809 🏁 Script executed: #!/bin/bash
# Check how config parameter is passed and used - look for calls to init function
rg -n "init\(" -g '*.bal' -B 2 -A 5Repository: ballerina-platform/module-ballerinax-mssql Length of output: 6113 🏁 Script executed: #!/bin/bash
# Search for test files that use MsSqlListener initialization
find . -name "*test*.bal" -o -name "*Test.bal" | head -10Repository: ballerina-platform/module-ballerinax-mssql Length of output: 510 🏁 Script executed: #!/bin/bash
# Look for test files and examples
fd -name "*test*" -type f -g "*.bal" 2>/dev/null | head -20Repository: ballerina-platform/module-ballerinax-mssql Length of output: 68 🏁 Script executed: #!/bin/bash
# Search for how the listener is instantiated in tests
rg -n "new MsSqlListener" -g '*.bal' -B 3 -A 5Repository: ballerina-platform/module-ballerinax-mssql Length of output: 68 🏁 Script executed: #!/bin/bash
# Check the listener test file
cat -n ballerina/tests/listener_tests.bal | head -100Repository: ballerina-platform/module-ballerinax-mssql Length of output: 3957 🏁 Script executed: #!/bin/bash
# Search for MsSqlListener in test files
rg -n "MsSqlListener" -g '*.bal' -B 2 -A 5Repository: ballerina-platform/module-ballerinax-mssql Length of output: 4091 🏁 Script executed: #!/bin/bash
# Get the full cdc_listener.bal file to see complete context
cat -n ballerina/cdc_listener.balRepository: ballerina-platform/module-ballerinax-mssql Length of output: 5188 🏁 Script executed: #!/antml:thinking
Looking at the test file, I can see that the `getDummyMsSqlListener()` function creates a listener without passing `livenessInterval`. This suggests it's optional. Let me check the full init function to understand the context better.Repository: ballerina-platform/module-ballerinax-mssql Length of output: 154 🏁 Script executed: #!/bin/bash
# Search for any comments or documentation about config handling
rg -n "config\[" -g '*.bal' -B 2 -A 2Repository: ballerina-platform/module-ballerinax-mssql Length of output: 68 🏁 Script executed: #!/bin/bash
# Look for patterns of guarding assignments in similar codebases
rg -n "if .* != \(\)" -g '*.bal' -B 1 -A 1Repository: ballerina-platform/module-ballerinax-mssql Length of output: 68 🌐 Web query:
💡 Result: In the current Ballerina CDC module (
So Source: Ballerina Central API docs for [1] (central.ballerina.io) Citations: 🌐 Web query:
💡 Result: In
Source: Citations: 🏁 Script executed: #!/bin/bash
# Check if the code compiles and get more context around config handling
rg -n "config\." -g 'cdc_listener.bal' -B 1 -A 1Repository: ballerina-platform/module-ballerinax-mssql Length of output: 2673 🏁 Script executed: #!/antml:thinking
Based on the web search results, cdc:ListenerConfiguration does NOT have a livenessInterval field. But the code is accessing config.livenessInterval. This is strange. Let me check if maybe the record allows additional fields (open record) or if there's a recent addition.
Actually, looking at the record definition again:Repository: ballerina-platform/module-ballerinax-mssql Length of output: 444 🏁 Script executed: #!/an/bash
# Check if there are type definitions or imports that might add livenessInterval
cat -n ballerina/cdc_listener.bal | grep -E "import|type"Repository: ballerina-platform/module-ballerinax-mssql Length of output: 115 Guard optional Since Suggested fix- listenerConfigs["livenessInterval"] = config.livenessInterval;
+ if config.livenessInterval != () {
+ listenerConfigs["livenessInterval"] = config.livenessInterval;
+ }🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| # Attaches a CDC service to the MSSQL listener. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| // Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). | ||
| // | ||
| // WSO2 LLC. licenses this file to you under the Apache License, | ||
| // Version 2.0 (the "License"); you may not use this file except | ||
| // in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| import ballerina/lang.runtime; | ||
| import ballerina/test; | ||
| import ballerinax/cdc; | ||
|
|
||
| @test:Config { | ||
| groups: ["liveness"] | ||
| } | ||
| function testLivenessBeforeListenerStart() returns error? { | ||
| CdcListener mssqlListener = new ({ | ||
| database: { | ||
| username: cdcUsername, | ||
| password: cdcPassword, | ||
| port: cdcPort, | ||
| databaseNames: cdcDatabase | ||
| }, | ||
| options: { | ||
| snapshotMode: cdc:NO_DATA | ||
| } | ||
| }); | ||
| check mssqlListener.attach(testService); | ||
| boolean liveness = check cdc:isLive(mssqlListener); | ||
| test:assertFalse(liveness, "Liveness check passes even before listener starts"); | ||
| } | ||
|
|
||
| @test:Config { | ||
| groups: ["liveness"] | ||
| } | ||
| function testLivenessWithStartedListener() returns error? { | ||
| CdcListener mssqlListener = new ({ | ||
| database: { | ||
| username: cdcUsername, | ||
| password: cdcPassword, | ||
| port: cdcPort, | ||
| databaseNames: cdcDatabase | ||
| }, | ||
| options: { | ||
| snapshotMode: cdc:NO_DATA | ||
| } | ||
| }); | ||
| check mssqlListener.attach(testService); | ||
| check mssqlListener.'start(); | ||
| boolean liveness = check cdc:isLive(mssqlListener); | ||
| test:assertTrue(liveness, "Liveness fails for a started listener"); | ||
| check mssqlListener.gracefulStop(); | ||
| } | ||
|
|
||
| @test:Config { | ||
| groups: ["liveness"] | ||
| } | ||
| function testLivenessAfterListenerStop() returns error? { | ||
| CdcListener mssqlListener = new ({ | ||
| database: { | ||
| username: cdcUsername, | ||
| password: cdcPassword, | ||
| port: cdcPort, | ||
| databaseNames: cdcDatabase | ||
| }, | ||
| options: { | ||
| snapshotMode: cdc:NO_DATA | ||
| } | ||
| }); | ||
| check mssqlListener.attach(testService); | ||
| check mssqlListener.'start(); | ||
| check mssqlListener.gracefulStop(); | ||
| boolean liveness = check cdc:isLive(mssqlListener); | ||
| test:assertFalse(liveness, "Liveness check passes after the listener has stopped"); | ||
| } | ||
|
|
||
| @test:Config { | ||
| groups: ["liveness"] | ||
| } | ||
| function testLivenessWithoutReceivingEvents() returns error? { | ||
| CdcListener mssqlListener = new ({ | ||
| database: { | ||
| username: cdcUsername, | ||
| password: cdcPassword, | ||
| port: cdcPort, | ||
| databaseNames: cdcDatabase | ||
| }, | ||
| options: { | ||
| snapshotMode: cdc:NO_DATA | ||
| }, | ||
| livenessInterval: 5.0 | ||
| }); | ||
| check mssqlListener.attach(testService); | ||
| check mssqlListener.'start(); | ||
| runtime:sleep(10); | ||
| boolean liveness = check cdc:isLive(mssqlListener); | ||
| test:assertFalse(liveness, "Liveness check passes even after not receiving events within the liveness interval"); | ||
| check mssqlListener.gracefulStop(); | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.