Introduce liveness-check mechanism for the CDC listener#24
Introduce liveness-check mechanism for the CDC listener#24ayeshLK merged 33 commits intoballerina-platform:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a liveness-check feature for CDC listeners: exposes Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Listener
participant BalChangeConsumer
participant CdcCompletionCallback
participant DebeziumEngine
User->>Listener: start()
Listener->>BalChangeConsumer: create (track lastEventReceivedTime)
Listener->>CdcCompletionCallback: create (track invoked)
Listener->>DebeziumEngine: register changeConsumer & completionCallback
DebeziumEngine->>BalChangeConsumer: handleBatch(records)
BalChangeConsumer->>BalChangeConsumer: updateLastEventReceivedTime()
User->>Listener: isLive()
rect rgba(100,150,200,0.5)
Listener->>CdcCompletionCallback: isInvoked()?
Listener->>BalChangeConsumer: getLastEventReceivedTime()
Listener->>Listener: compare with livenessInterval
end
Listener->>User: return boolean|Error
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@native/src/main/java/io/ballerina/lib/cdc/BalChangeConsumer.java`:
- Line 73: The field lastEventReceivedTime is written in handleBatch() (Debezium
thread) and read in getLastEventReceivedTime()/isLive() on another thread,
causing visibility races; fix by making lastEventReceivedTime volatile (or use
an AtomicReference<Instant>) so writes in handleBatch() are visible to readers
in getLastEventReceivedTime()/isLive(); update the declaration of
lastEventReceivedTime accordingly and keep usages in handleBatch() and
getLastEventReceivedTime() unchanged.
In `@native/src/main/java/io/ballerina/lib/cdc/Listener.java`:
- Around line 279-285: In Listener.java fix the inverted time-diff in the
liveness check: currently ChronoUnit.MILLIS.between(current, lastEventReceived)
yields a negative value; change the call to compute elapsed =
ChronoUnit.MILLIS.between(lastEventReceived, current) (or use
Duration.between(lastEventReceived, current).toMillis()) and compare that
elapsed long against the stored liveness interval (from LIVENESS_INTERVAL_KEY)
so the condition in the method that uses lastEventReceivedTime and
LIVENESS_INTERVAL_KEY can correctly return false on timeout.
- Line 71: Rename the misspelled constant LIVENSS_INTERVAL_CONFIG_KEY to
LIVENESS_INTERVAL_CONFIG_KEY and update all references to it (e.g., the usage
currently referring to LIVENSS_INTERVAL_CONFIG_KEY around the listener code) so
the symbol name matches; ensure the public static final BString declaration uses
StringUtils.fromString("livenessInterval") with the corrected identifier and
update any imports/usages accordingly to avoid compilation errors.
- Around line 260-295: The isLive method currently treats a missing
changeConsumer or an empty lastEventReceivedTime as live; change this so that if
listener.getNativeData(CHANGE_CONSUMER_KEY) is null the method returns false
(not live), and if ((BalChangeConsumer)
changeConsumer).getLastEventReceivedTime() is empty it also returns false until
the first event is received; alternatively, if you prefer explicit start
semantics, read an IS_STARTED_KEY native flag (if present) and return false when
that flag is absent/false; keep existing liveness timeout logic using
LIVENESS_INTERVAL_KEY and preserve the COMP_CALLBACK_KEY callback check, lock
handling, and error-return behavior in isLive.
🧹 Nitpick comments (2)
native/src/main/java/io/ballerina/lib/cdc/CdcCompletionCallback.java (1)
27-29: Javadoc is misleading.The Javadoc says "shuts-down gracefully" but this callback is invoked for both successful completions and unexpected terminations. Consider updating the documentation to reflect this.
-/** - * This class contains the logic to be executed when the CDC engine shuts-down gracefully. - */ +/** + * Completion callback invoked when the Debezium CDC engine terminates (gracefully or unexpectedly). + */ballerina/extern_functions.bal (1)
66-75: Minor typo in documentation.Line 72 has "fail" which should be "fails" for grammatical correctness.
-# or an error if the liveness check fail +# or an error if the liveness check fails
native/src/main/java/io/ballerina/lib/cdc/BalChangeConsumer.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is ❌ Your project status has failed because the head coverage (76.70%) is below the target coverage (80.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #24 +/- ##
============================================
- Coverage 77.07% 76.70% -0.37%
- Complexity 214 224 +10
============================================
Files 29 30 +1
Lines 1025 1082 +57
Branches 159 169 +10
============================================
+ Hits 790 830 +40
- Misses 163 175 +12
- Partials 72 77 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
native/src/main/java/io/ballerina/lib/cdc/Listener.java (1)
14-14:⚠️ Potential issue | 🟡 MinorTypo in license header.
Line 14 has an extra character: "express q or implied" should be "express or implied".
📝 Proposed fix
- * specific language governing permissions and limitations + * specific language governing permissions and limitationsNote: The actual typo is on line 14 where "express q or implied" should be "express or implied".
🤖 Fix all issues with AI agents
In `@ballerina/Dependencies.toml`:
- Around line 6-8: The Dependencies.toml lists an unavailable dependency version
for ballerinax/cdc (1.1.1); verify whether 1.1.1 was meant to be published and
if not, change the ballerinax/cdc entry in Dependencies.toml to a published
version (e.g., 1.1.0 or 1.0.3) so the distribution (distribution-version =
"2201.13.1") can resolve all dependencies; update the dependency line for
ballerinax/cdc accordingly and commit the change.
In `@docs/spec/spec.md`:
- Line 43: Update the table-of-contents entry that currently reads "- [2.2 Using
health checks](`#23-using-health-checks`)" so the visible link text matches the
actual section number; change "2.2" to "2.3" in the TOC line (the string "- [2.2
Using health checks](`#23-using-health-checks`)"), leaving the anchor unchanged.
- Around line 316-322: The example assigns cdc:isLive(mssqlCdc) to a boolean
without handling that cdc:isLive returns boolean|Error; update the resource
function get liveness() to capture the result into a variable (e.g.,
isLiveResult), test for error with an "is error" check, handle/log the error
(using the existing logger or return http:ServiceUnavailable), and only treat
the success branch as a boolean (e.g., boolean isLive = isLiveResult; or use a
guarded check) so both the boolean and Error cases from cdc:isLive are handled.
- Line 305: The function signature for isLive has a syntax error in the
parameter type declaration; update the parameter from "cdc: Listener cdc" to
remove the space after the colon so it reads "cdc:Listener cdc" in the isLive
declaration to match the module-qualified type syntax.
🧹 Nitpick comments (1)
ballerina/tests/dynamic_attachment.bal (1)
197-218: Consider test execution time impact.This test uses
runtime:sleep(10)(10 seconds) to wait beyond the 5-second liveness interval. While functionally correct, this adds significant execution time to the test suite.Consider reducing both values proportionally (e.g.,
livenessInterval: 2.0andsleep(5)) to speed up test execution while maintaining the same verification logic.⏱️ Optional: Reduce test execution time
MockListener mysqlListener = new ({ database: { username, password, port }, options: { snapshotMode: NO_DATA }, - livenessInterval: 5.0 + livenessInterval: 2.0 }); check mysqlListener.attach(testService); check mysqlListener.'start(); - runtime:sleep(10); + runtime:sleep(5); boolean liveness = check isLive(mysqlListener);
| - [2.2.4.5 `onTruncate`](#2245-ontruncate) | ||
| - [2.2.4.6 `onError`](#2246-onerror) | ||
| - [2.2.5 Service Configuration](#225-service-configuration) | ||
| - [2.2 Using health checks](#23-using-health-checks) |
There was a problem hiding this comment.
Fix the section numbering in the table of contents.
The link text says "2.2 Using health checks" but it should be "2.3" to match the actual section number.
📝 Proposed fix
- - [2.2 Using health checks](`#23-using-health-checks`)
+ - [2.3 Using health checks](`#23-using-health-checks`) 🤖 Prompt for AI Agents
In `@docs/spec/spec.md` at line 43, Update the table-of-contents entry that
currently reads "- [2.2 Using health checks](`#23-using-health-checks`)" so the
visible link text matches the actual section number; change "2.2" to "2.3" in
the TOC line (the string "- [2.2 Using health
checks](`#23-using-health-checks`)"), leaving the anchor unchanged.
| # + cdc - The CDC listener instance to be checked | ||
| # + return - Returns `true` if the listener is considered live, `false` otherwise, | ||
| # or an error if the liveness check fails | ||
| public function isLive(cdc: Listener cdc) returns boolean|Error; |
There was a problem hiding this comment.
Syntax error in function signature.
The parameter type has an incorrect space: cdc: Listener cdc should be cdc:Listener cdc (no space after the colon in the module-qualified type).
📝 Proposed fix
-public function isLive(cdc: Listener cdc) returns boolean|Error;
+public function isLive(cdc:Listener cdc) returns boolean|Error;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public function isLive(cdc: Listener cdc) returns boolean|Error; | |
| public function isLive(cdc:Listener cdc) returns boolean|Error; |
🤖 Prompt for AI Agents
In `@docs/spec/spec.md` at line 305, The function signature for isLive has a
syntax error in the parameter type declaration; update the parameter from "cdc:
Listener cdc" to remove the space after the colon so it reads "cdc:Listener cdc"
in the isLive declaration to match the module-qualified type syntax.
docs/spec/spec.md
Outdated
| resource function get liveness() returns http:Ok|http:ServiceUnavailable { | ||
| boolean isLive = cdc:isLive(mssqlCdc); | ||
| if isLive { | ||
| return http:OK; | ||
| } | ||
| return http:SERVICE_UNAVAILABLE; | ||
| } |
There was a problem hiding this comment.
Example ignores error case from isLive return type.
The isLive function returns boolean|Error, but the example assigns it directly to a boolean without handling the potential error. This could mislead users.
📝 Proposed fix showing error handling
resource function get liveness() returns http:Ok|http:ServiceUnavailable {
- boolean isLive = cdc:isLive(mssqlCdc);
- if isLive {
+ boolean|cdc:Error isLiveResult = cdc:isLive(mssqlCdc);
+ if isLiveResult is boolean && isLiveResult {
return http:OK;
}
return http:SERVICE_UNAVAILABLE;
}🤖 Prompt for AI Agents
In `@docs/spec/spec.md` around lines 316 - 322, The example assigns
cdc:isLive(mssqlCdc) to a boolean without handling that cdc:isLive returns
boolean|Error; update the resource function get liveness() to capture the result
into a variable (e.g., isLiveResult), test for error with an "is error" check,
handle/log the error (using the existing logger or return
http:ServiceUnavailable), and only treat the success branch as a boolean (e.g.,
boolean isLive = isLiveResult; or use a guarded check) so both the boolean and
Error cases from cdc:isLive are handled.
This reverts commit 1194337.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
native/src/main/java/io/ballerina/lib/cdc/Listener.java (1)
173-193:⚠️ Potential issue | 🟠 MajorAdd timeout to
comFuture.get()to prevent indefinite blocking if engine startup fails.When the Debezium engine fails to start (bad configuration, connection failure, task initialization error, etc.), the
taskStarted()callback is never invoked, leavingcomFutureuncompleted. ThecomFuture.get()call at line 192 will then block indefinitely. The registeredCdcCompletionCallbackonly logs errors and does not complete the future.Add a timeout to detect and handle startup failures:
⏱️ Timeout handling example
- EngineResult engineResult = comFuture.get(); + EngineResult engineResult; + try { + engineResult = comFuture.get(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + return createCdcError("Debezium engine did not start within 30 seconds"); + }
🤖 Fix all issues with AI agents
In `@ballerina/Dependencies.toml`:
- Line 33: The Dependencies.toml currently pins ballerinax/cdc to a
non-published version (1.2.0); update the dependency entry for ballerinax/cdc in
Dependencies.toml to the latest available published version (1.1.0) or add a
valid repository override/source that hosts 1.2.0 so the build can resolve for
distribution 2201.12.0; look for the ballerinax/cdc version line in
Dependencies.toml (the current version = "1.2.0" entry) and change it to "1.1.0"
or configure an alternate registry/source that actually contains 1.2.0.
In `@native/src/main/java/io/ballerina/lib/cdc/Listener.java`:
- Around line 165-168: Validate the liveness config value before converting to
milliseconds: when reading the config for LIVENESS_INTERVAL_CONFIG_KEY in
Listener (the code that computes livenessInterval), first check the config entry
is non-null and its numeric value > 0, and if not return/throw a clear error
(e.g., BallerinaException or the existing error path used by this component)
explaining the invalid liveness interval instead of proceeding to
multiply/longValue; this ensures livenessInterval is never zero or negative and
avoids downstream misbehavior.
- Around line 262-305: In isLive(BObject listener) ensure the listener is
considered not live when it has been stopped or liveness data is missing: first
check the IS_STARTED_KEY native data and return false if absent/false (this
prevents reporting live after immediateStop), then validate
LIVENESS_INTERVAL_KEY is present (return false if null) before using it, and
ensure CHANGE_CONSUMER_KEY presence is enforced (return false if null) rather
than skipping that liveness check; update the logic around COMP_CALLBACK_KEY,
LISTENER_START_TIME_KEY, and the BalChangeConsumer/CdcCompletionCallback checks
to early-return false on any missing required native data so partial
initialization never yields true.
ballerina/Dependencies.toml
Outdated
| org = "ballerina" | ||
| name = "data.jsondata" | ||
| version = "1.1.0" | ||
| version = "1.1.3" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat ballerina/Dependencies.tomlRepository: ballerina-platform/module-ballerinax-cdc
Length of output: 7202
🌐 Web query:
Ballerina Central ballerina/data.jsondata 1.1.3
💡 Result:
ballerina/data.jsondata v1.1.3 (Ballerina Central) is the Ballerina JSON Data Library for converting and working with JSON as Ballerina anydata (including typed record/array projections), plus JSON navigation and prettification. It provides APIs such as parseAsType, parseString, parseBytes, parseStream, read (JSONPath-style access), prettify, and toJson. [1]
Import:
import ballerina/data.jsondata;[1]
Compatibility (v1.1.3): Ballerina 2201.12.0, Platform java21, GraalVM compatible. [1]
Package listing: The Ballerina standard library index also lists data.jsondata at 1.1.3. [2]
Sources:
[1] Ballerina Central module docs for ballerina/data.jsondata v1.1.3
[2] Ballerina Central “Ballerina library” package index
🌐 Web query:
Ballerina Central ballerina/observe 1.5.1
💡 Result:
ballerina/observe v1.5.1 is the Ballerina standard library package that provides APIs for observing Ballerina services (observability features such as tracing and metrics). It is published on Ballerina Central and is marked as having a newer latest version (1.7.0). [1]
How to use (v1.5.1):
- Import:
[1]
import ballerina/observe;
- Enable observability when running (as shown in the package docs):
[1]
bal run <your_service>.bal --b7a.observability.enabled=true
Compatibility (v1.5.1 on Central):
- Platform: java21
- Ballerina version: 2201.12.0
- GraalVM compatible: Yes [1]
Pinning the exact version (so you get 1.5.1, not a newer compatible one):
- Ballerina’s dependency resolution picks the latest compatible version by default (Semantic Versioning rules). [2]
- To force
1.5.1, specify that version in your project’s dependency configuration (e.g., inBallerina.toml). [2]
Sources
- Ballerina Central docs for
ballerina/observev1.5.1 [1] - Ballerina “Manage dependencies” documentation (version selection rules) [2]
🌐 Web query:
Ballerina Central ballerinax/cdc 1.2.0
💡 Result:
ballerinax/cdc does not have a 1.2.0 release on Ballerina Central. The latest version is 1.1.0 (other available versions shown: 1.0.3, 1.0.2, 1.0.1, 1.0.0). [1]
To use the latest:
- Import:
import ballerinax/cdc;[1] - And include the appropriate Debezium driver package (e.g.,
ballerinax/mysql.cdc.driver,ballerinax/mssql.cdc.driver,ballerinax/postgresql.cdc.driver). [1]
If you were looking for a 1.2.0 package, ballerinax/cdata.connect is at 1.2.0 on Central. [2]
Sources:
[1] https://central.ballerina.io/ballerinax/cdc/latest
[2] https://central.ballerina.io/ballerinax/cdata.connect/latest
🌐 Web query:
Ballerina Central ballerinax/java.jdbc 1.14.1
💡 Result:
ballerinax/java.jdbc v1.14.1 is the Ballerina JDBC client library for accessing relational databases via the Java JDBC API. It’s compatible with Ballerina 2201.12.0 and Java 21 (GraalVM compatible). [1]
Use it
- Import:
[1]
import ballerinax/java.jdbc;
- Add your database’s JDBC driver JAR as a native dependency in
Ballerina.toml(e.g., via[[platform.java21.dependency]]with a Maven coordinate or file path). [1][2]
Docs / source
- Ballerina Central package page: [1]
- GitHub repo (
module-ballerinax-java.jdbc): [2]
Sources
[1] https://central.ballerina.io/ballerinax/java.jdbc/latest
[2] https://github.com/ballerina-platform/module-ballerinax-java.jdbc
ballerinax/cdc 1.2.0 is not published on Ballerina Central.
Three packages verified:
- ✓ ballerina/data.jsondata 1.1.3 — exists on Ballerina Central, compatible with 2201.12.0
- ✓ ballerina/observe 1.5.1 — exists on Ballerina Central, compatible with 2201.12.0
- ✓ ballerinax/java.jdbc 1.14.1 — exists on Ballerina Central, compatible with 2201.12.0
However, ballerinax/cdc 1.2.0 does not exist on Ballerina Central; the latest available version is 1.1.0. This version bump cannot resolve from Ballerina Central for distribution 2201.12.0.
🤖 Prompt for AI Agents
In `@ballerina/Dependencies.toml` at line 33, The Dependencies.toml currently pins
ballerinax/cdc to a non-published version (1.2.0); update the dependency entry
for ballerinax/cdc in Dependencies.toml to the latest available published
version (1.1.0) or add a valid repository override/source that hosts 1.2.0 so
the build can resolve for distribution 2201.12.0; look for the ballerinax/cdc
version line in Dependencies.toml (the current version = "1.2.0" entry) and
change it to "1.1.0" or configure an alternate registry/source that actually
contains 1.2.0.
| Long livenessInterval = ((BDecimal) config.get(LIVENESS_INTERVAL_CONFIG_KEY)) | ||
| .decimalValue() | ||
| .multiply(BigDecimal.valueOf(1000)) | ||
| .longValue(); |
There was a problem hiding this comment.
Validate livenessInterval before converting to milliseconds.
If the config is null or ≤ 0, liveness checks can misbehave; return a clear error early.
🛠️ Suggested guard
- Long livenessInterval = ((BDecimal) config.get(LIVENESS_INTERVAL_CONFIG_KEY))
- .decimalValue()
- .multiply(BigDecimal.valueOf(1000))
- .longValue();
+ BDecimal intervalSec = (BDecimal) config.get(LIVENESS_INTERVAL_CONFIG_KEY);
+ if (intervalSec == null || intervalSec.decimalValue().compareTo(BigDecimal.ZERO) <= 0) {
+ return createCdcError("livenessInterval must be a positive decimal value");
+ }
+ Long livenessInterval = intervalSec.decimalValue()
+ .multiply(BigDecimal.valueOf(1000))
+ .longValue();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Long livenessInterval = ((BDecimal) config.get(LIVENESS_INTERVAL_CONFIG_KEY)) | |
| .decimalValue() | |
| .multiply(BigDecimal.valueOf(1000)) | |
| .longValue(); | |
| BDecimal intervalSec = (BDecimal) config.get(LIVENESS_INTERVAL_CONFIG_KEY); | |
| if (intervalSec == null || intervalSec.decimalValue().compareTo(BigDecimal.ZERO) <= 0) { | |
| return createCdcError("livenessInterval must be a positive decimal value"); | |
| } | |
| Long livenessInterval = intervalSec.decimalValue() | |
| .multiply(BigDecimal.valueOf(1000)) | |
| .longValue(); |
🤖 Prompt for AI Agents
In `@native/src/main/java/io/ballerina/lib/cdc/Listener.java` around lines 165 -
168, Validate the liveness config value before converting to milliseconds: when
reading the config for LIVENESS_INTERVAL_CONFIG_KEY in Listener (the code that
computes livenessInterval), first check the config entry is non-null and its
numeric value > 0, and if not return/throw a clear error (e.g.,
BallerinaException or the existing error path used by this component) explaining
the invalid liveness interval instead of proceeding to multiply/longValue; this
ensures livenessInterval is never zero or negative and avoids downstream
misbehavior.
| public static Object isLive(BObject listener) { | ||
| String id = getListenerId(listener); | ||
| ReentrantLock lock = lockMap.computeIfAbsent(id, k -> new ReentrantLock()); | ||
|
|
||
| lock.lock(); | ||
| try { | ||
| Object completionCallback = listener.getNativeData(COMP_CALLBACK_KEY); | ||
| if (completionCallback != null) { | ||
| boolean invoked = ((CdcCompletionCallback) completionCallback).isInvoked(); | ||
| if (invoked) { | ||
| return false; | ||
| } | ||
| } else { | ||
| return false; | ||
| } | ||
|
|
||
| Long livenessInterval = (Long) listener.getNativeData(LIVENESS_INTERVAL_KEY); | ||
|
|
||
| Object listenerStartTime = listener.getNativeData(LISTENER_START_TIME_KEY); | ||
| if (listenerStartTime != null) { | ||
| Instant current = Instant.now(); | ||
| long diff = ChronoUnit.MILLIS.between((Instant) listenerStartTime, current); | ||
| if (diff < livenessInterval) { | ||
| return true; | ||
| } | ||
| } else { | ||
| return false; | ||
| } | ||
|
|
||
| Object changeConsumer = listener.getNativeData(CHANGE_CONSUMER_KEY); | ||
| if (changeConsumer != null) { | ||
| Optional<Instant> lastEventReceivedTime = ((BalChangeConsumer) changeConsumer) | ||
| .getLastEventReceivedTime(); | ||
| if (lastEventReceivedTime.isEmpty()) { | ||
| return false; | ||
| } | ||
|
|
||
| Instant current = Instant.now(); | ||
| Instant lastEventReceived = lastEventReceivedTime.get(); | ||
| long diff = ChronoUnit.MILLIS.between(lastEventReceived, current); | ||
| if (diff > livenessInterval) { | ||
| return false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Ensure isLive() returns false after stop / when liveness data is missing.
isLive() doesn’t check IS_STARTED_KEY and doesn’t guard missing LIVENESS_INTERVAL_KEY/CHANGE_CONSUMER_KEY, so it can still report live after immediateStop or partial initialization.
✅ Suggested guards
+ Object isStarted = listener.getNativeData(IS_STARTED_KEY);
+ if (!(isStarted instanceof Boolean) || !((Boolean) isStarted)) {
+ return false;
+ }
+
+ Long livenessInterval = (Long) listener.getNativeData(LIVENESS_INTERVAL_KEY);
+ if (livenessInterval == null) {
+ return false;
+ }
...
- Object changeConsumer = listener.getNativeData(CHANGE_CONSUMER_KEY);
- if (changeConsumer != null) {
+ Object changeConsumer = listener.getNativeData(CHANGE_CONSUMER_KEY);
+ if (changeConsumer == null) {
+ return false;
+ } else {🤖 Prompt for AI Agents
In `@native/src/main/java/io/ballerina/lib/cdc/Listener.java` around lines 262 -
305, In isLive(BObject listener) ensure the listener is considered not live when
it has been stopped or liveness data is missing: first check the IS_STARTED_KEY
native data and return false if absent/false (this prevents reporting live after
immediateStop), then validate LIVENESS_INTERVAL_KEY is present (return false if
null) before using it, and ensure CHANGE_CONSUMER_KEY presence is enforced
(return false if null) rather than skipping that liveness check; update the
logic around COMP_CALLBACK_KEY, LISTENER_START_TIME_KEY, and the
BalChangeConsumer/CdcCompletionCallback checks to early-return false on any
missing required native data so partial initialization never yields true.
ballerina/Ballerina.toml
Outdated
| org = "ballerinax" | ||
| name = "cdc" | ||
| version = "1.1.0" | ||
| version = "1.1.1" |
There was a problem hiding this comment.
Don't we need to bump this to 1.2.0?
Purpose
Fixes: ballerina-platform/ballerina-library#8589
Examples
Checklist
Summary
This pull request adds a liveness-check mechanism for the CDC listener to allow users to determine whether the underlying CDC engine is running and responsive.
Key Changes
Public API
Implementation
Tests & Documentation
Versioning & Dependencies
This change improves operational visibility and stability by enabling proactive monitoring of CDC listener health.