Skip to content

Kafka topics/sources disappear from deployment plan after adding a PTF-based “expand” step #1804

@ferenc-csaky

Description

@ferenc-csaky

While working on a SQRL script, I observed that after introducing an additional Flink Process Table Function (PTF) used for “array expansion” (replacing a previous UNNEST/fanout step), Kafka-backed CREATE TABLE sources/outputs stopped being planned as Kafka topics in the generated deployment artifacts.
This is an observation/correlation (not proven causation): the only functional change at the time was replacing one expansion step with a PTF-based expansion, but the exact root cause is not isolated yet.

Observed behavior

  • Compilation succeeds.
  • Kafka physical plan is generated with no topics:
    • build/deploy/plan/kafka.json contains:
      • "topics": []
      • "testRunnerTopics": []

As a consequence, dev deployment that relies on that plan no longer creates/configures the expected Kafka topics/sources.

Expected behavior

  • Kafka-backed CREATE TABLE ... WITH ('connector'='kafka'...) tables should still appear as topics in the Kafka physical plan.
  • build/deploy/plan/kafka.json should include the corresponding NewTopic entries.

Why this looks like a planner/metadata issue

From reading KafkaLogEngine.plan(...), topic creation is derived from the stage plan tables/mutations:

Streams.concat(stagePlan.getTables().stream(), stagePlan.getMutations().stream()).map(NewTopic.class::cast).toList();

In the failing case, this ends up empty even though the generated Flink SQL (build/deploy/plan/flink-sql.sql) still includes CREATE TABLE ... WITH ('connector'='kafka'...) statements. That suggests the Kafka connector tables are not being surfaced into MaterializationStagePlan in the way the Kafka log engine expects.

Change that coincided with the regression

  • Before: script used a standard fanout/expansion step (e.g. UNNEST(...) / correlate-style expansion) and Kafka topics were present in kafka.json.
  • After: replaced that expansion with an additional Flink PTF-based expansion step (no correlate variables) and kafka.json topics became empty.

Again, it was not proven the PTF on its own is the root cause. It’s just the change that coincided with the behavior.

Suggested diagnostics / attachments

To help debug, these artifacts clearly show the mismatch:

  • build/deploy/plan/kafka.json(empty topics)
  • build/deploy/plan/flink-sql.sql (still contains Kafka CREATE TABLE definitions)
  • build/pipeline_explain.txt and/or build/pipeline_visual.html (to see how the DAG classifies the relevant tables)
  • The minimal SQRL diff showing the expansion being replaced by a PTF call and the PTF signature (Java class + annotations)

Impact

Kafka topics are not created/planned, breaking local/dev deployments that rely on the generated Kafka physical plan.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

Status

Backlog

Relationships

None yet

Development

No branches or pull requests

Issue actions