-
Notifications
You must be signed in to change notification settings - Fork 21
Description
While working on a SQRL script, I observed that after introducing an additional Flink Process Table Function (PTF) used for “array expansion” (replacing a previous UNNEST/fanout step), Kafka-backed CREATE TABLE sources/outputs stopped being planned as Kafka topics in the generated deployment artifacts.
This is an observation/correlation (not proven causation): the only functional change at the time was replacing one expansion step with a PTF-based expansion, but the exact root cause is not isolated yet.
Observed behavior
- Compilation succeeds.
- Kafka physical plan is generated with no topics:
build/deploy/plan/kafka.jsoncontains:"topics": []"testRunnerTopics": []
As a consequence, dev deployment that relies on that plan no longer creates/configures the expected Kafka topics/sources.
Expected behavior
- Kafka-backed
CREATE TABLE ... WITH ('connector'='kafka'...)tables should still appear as topics in the Kafka physical plan. build/deploy/plan/kafka.jsonshould include the corresponding NewTopic entries.
Why this looks like a planner/metadata issue
From reading KafkaLogEngine.plan(...), topic creation is derived from the stage plan tables/mutations:
Streams.concat(stagePlan.getTables().stream(), stagePlan.getMutations().stream()).map(NewTopic.class::cast).toList();In the failing case, this ends up empty even though the generated Flink SQL (build/deploy/plan/flink-sql.sql) still includes CREATE TABLE ... WITH ('connector'='kafka'...) statements. That suggests the Kafka connector tables are not being surfaced into MaterializationStagePlan in the way the Kafka log engine expects.
Change that coincided with the regression
- Before: script used a standard fanout/expansion step (e.g. UNNEST(...) / correlate-style expansion) and Kafka topics were present in
kafka.json. - After: replaced that expansion with an additional Flink PTF-based expansion step (no correlate variables) and
kafka.jsontopics became empty.
Again, it was not proven the PTF on its own is the root cause. It’s just the change that coincided with the behavior.
Suggested diagnostics / attachments
To help debug, these artifacts clearly show the mismatch:
build/deploy/plan/kafka.json(empty topics)build/deploy/plan/flink-sql.sql(still contains Kafka CREATE TABLE definitions)build/pipeline_explain.txtand/orbuild/pipeline_visual.html(to see how the DAG classifies the relevant tables)- The minimal SQRL diff showing the expansion being replaced by a PTF call and the PTF signature (Java class + annotations)
Impact
Kafka topics are not created/planned, breaking local/dev deployments that rely on the generated Kafka physical plan.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status