Skip to content

feat(reconciliation): split worker into ingestion and matching workers#383

Open
flemzord wants to merge 1 commit intomainfrom
feat/reconciliation-split-workers
Open

feat(reconciliation): split worker into ingestion and matching workers#383
flemzord wants to merge 1 commit intomainfrom
feat/reconciliation-split-workers

Conversation

@flemzord
Copy link
Member

Summary

  • Split the single reconciliation worker into two dedicated workers: ingestion (worker ingestion) and matching (worker matching)
  • Ingestion worker consumes ledger/payments broker topics and publishes to {stack}.reconciliation
  • Matching worker consumes from the reconciliation topic via its own BrokerConsumer with a distinct NATS deliver queue group
  • Clean up legacy single worker BrokerConsumer and Deployment on reconciliation
  • Fix duplicate POD_NAME env var when both OTEL traces and metrics are enabled
  • Add Elasticsearch env vars support for reconciliation deployments

Test plan

  • Verify ingestion worker deployment has KAFKA_TOPICS for ledger/payments and PUBLISHER_TOPIC_MAPPING for reconciliation
  • Verify matching worker deployment has KAFKA_TOPICS for reconciliation only
  • Verify NATS deliver queue groups are distinct (reconciliation_ingestion / reconciliation_matching)
  • Verify legacy reconciliation-worker deployment and old BrokerConsumer are cleaned up
  • Verify POD_NAME env var appears only once per pod

- Split single reconciliation worker into two dedicated workers:
  - Ingestion worker (`worker ingestion`): consumes ledger/payments topics
  - Matching worker (`worker matching`): consumes reconciliation topic
- Each worker has its own BrokerConsumer with distinct NATS deliver queue groups
- Ingestion worker publishes to `{stack}.reconciliation` topic
- Add cleanup of legacy single worker consumer and deployment
- Add Elasticsearch env vars support for reconciliation deployments
- Fix duplicate POD_NAME env var when both OTEL traces and metrics are enabled
@flemzord flemzord requested a review from a team as a code owner January 29, 2026 15:28
@flemzord
Copy link
Member Author

Link with formancehq/reconciliation#46

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2026

Walkthrough

This change refactors the Kubernetes reconciliation controller to orchestrate multiple deployment types (API and workers), introduces broker consumer creation with legacy resource cleanup, integrates Elasticsearch and broker environment variables into deployments, and adds comprehensive test coverage validating the new behavior.

Changes

Cohort / File(s) Summary
Reconciliation Flow Orchestration
internal/resources/reconciliations/init.go
Restructures reconciliation initialization to clean up legacy resources, create broker consumers (ingestion and matching), enforce database readiness before image resolution, gate deployment creation on broker consumer readiness, and extend reconciler wiring to own BrokerConsumer, GatewayHTTPAPI, Job, and ResourceReference resources.
Deployment Infrastructure
internal/resources/reconciliations/deployments.go
Introduces deployment type constants (API, worker-ingestion, worker-matching), refactors environment variable assembly into reusable commonEnvVars pathway, adds createDeployments orchestrator and createDeployment configurator to build multiple deployment types in sequence, integrates Elasticsearch and Kafka topic environment variables for workers, and ensures workers are created before API.
Environment Configuration
internal/resources/settings/elasticsearch.go, internal/resources/settings/opentelemetry.go, internal/resources/brokerconsumers/controller.go
Adds GetElasticsearchEnvVars helper to configure Elasticsearch DSN and ILM-related variables; refactors POD_NAME environment variable in GetOTELEnvVars to append conditionally instead of including in helper; updates DELIVER environment variable in broker consumer to use computed consumerName.
Testing
internal/tests/reconciliation_controller_test.go
Introduces comprehensive Ginkgo/Gomega test suite validating reconciliation controller behavior, including resource creation, owner references, deployment types, container configuration, environment variables (broker, Elasticsearch, topics), GatewayHTTPAPI creation, BrokerConsumer services, and deployment-specific environment variable presence/absence.

Sequence Diagram

sequenceDiagram
    participant RC as Reconciliation<br/>Controller
    participant LR as deleteLegacy<br/>Resources
    participant BC as BrokerConsumer<br/>Factory
    participant DEP as createDeployments<br/>Orchestrator
    participant API as API<br/>Deployment
    participant WI as Worker<br/>Ingestion
    participant WM as Worker<br/>Matching

    RC->>LR: Clean up legacy BrokerConsumer<br/>& reconciliation-worker
    LR-->>RC: Legacy resources removed

    RC->>BC: Create ingestion consumer
    BC-->>RC: Ingestion consumer Ready
    
    RC->>BC: Create matching consumer
    BC-->>RC: Matching consumer Ready

    RC->>DEP: Orchestrate deployments
    DEP->>WI: Create worker-ingestion<br/>(Kafka topics, Elasticsearch)
    WI-->>DEP: Ready
    
    DEP->>WM: Create worker-matching<br/>(Kafka topics, Elasticsearch)
    WM-->>DEP: Ready
    
    DEP->>API: Create API deployment<br/>(sans Elasticsearch vars)
    API-->>DEP: Ready

    DEP-->>RC: All deployments created
    RC-->>RC: Reconciliation status = Ready
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 Deployments now dance in ordered grace,
With workers racing first to their place,
Elasticsearch flows through the matching stream,
While APIs await—the final theme,
Legacy ghosts are swept away with care,
A rabbit's refactor beyond compare!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(reconciliation): split worker into ingestion and matching workers' directly and accurately describes the main architectural change in the pull request—splitting a single reconciliation worker into two specialized workers.
Description check ✅ Passed The description comprehensively explains the key changes: worker splitting, broker topic handling, legacy cleanup, POD_NAME fix, and Elasticsearch support, all of which are present in the changeset and align with the file modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/reconciliation-split-workers

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@internal/resources/reconciliations/deployments.go`:
- Around line 147-153: The Elasticsearch env var injection currently calls
settings.GetElasticsearchEnvVars(ctx, stack.Name) and appends esEnvVars into the
shared env used for the API deployment (variable env); move this call and the
append so ES vars are only added when constructing the worker deployment
environment (e.g., where the worker-specific env is built or where worker
container spec is created) instead of the API path, or alternatively update the
tests to expect ES vars on the API if that was intended; ensure references to
esEnvVars and settings.GetElasticsearchEnvVars remain in the worker-deployment
code paths and remove the append from the API deployment construction.

In `@internal/tests/reconciliation_controller_test.go`:
- Around line 112-192: The test must be updated to assert both split worker
deployments and consumers instead of a single "reconciliation-worker" and
BrokerConsumer: load and assert two deployments (e.g., names like
"reconciliation-ingestion" and "reconciliation-matching" or whatever Create
uses), verify their container Args are the expected per-worker values (e.g.,
"ingestion" vs "matching"), check each worker's KAFKA_TOPICS env is correct
(ingestion should contain "<stack>.ledger <stack>.payments", matching should
contain "<stack>.reconciliation"), and assert two BrokerConsumer resources exist
with the correct Service lists and names matching the naming used by
brokerconsumers.Create (reference BrokerConsumer type, brokerconsumers.Create,
LoadResource, and the reconciliation/stack naming in your existing tests to
locate where to change assertions).

Comment on lines +147 to +153
// Add Elasticsearch env vars for both API and Worker
esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
if err != nil {
return err
}
env = append(env, esEnvVars...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Elasticsearch env vars are injected into the API deployment.

The tests in this PR assert the API deployment should not carry ELASTICSEARCH_* env vars. If that expectation is correct, gate these env vars to the worker deployments only (or update the tests/spec to match the intended behavior).

💡 Suggested fix (apply ES env vars to workers only)
-	// Add Elasticsearch env vars for both API and Worker
-	esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
-	if err != nil {
-		return err
-	}
-	env = append(env, esEnvVars...)
+	// Add Elasticsearch env vars only for worker deployments
+	if deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching {
+		esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
+		if err != nil {
+			return err
+		}
+		env = append(env, esEnvVars...)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Add Elasticsearch env vars for both API and Worker
esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
if err != nil {
return err
}
env = append(env, esEnvVars...)
// Add Elasticsearch env vars only for worker deployments
if deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching {
esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
if err != nil {
return err
}
env = append(env, esEnvVars...)
}
🤖 Prompt for AI Agents
In `@internal/resources/reconciliations/deployments.go` around lines 147 - 153,
The Elasticsearch env var injection currently calls
settings.GetElasticsearchEnvVars(ctx, stack.Name) and appends esEnvVars into the
shared env used for the API deployment (variable env); move this call and the
append so ES vars are only added when constructing the worker deployment
environment (e.g., where the worker-specific env is built or where worker
container spec is created) instead of the API path, or alternatively update the
tests to expect ES vars on the API if that was intended; ensure references to
esEnvVars and settings.GetElasticsearchEnvVars remain in the worker-deployment
code paths and remove the append from the API deployment construction.

Comment on lines +112 to +192
By("Should create an API deployment", func() {
deployment := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "reconciliation", deployment)
}).Should(Succeed())
Expect(deployment).To(BeControlledBy(reconciliation))
Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation"))
Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"serve"}))
})
By("Should create a worker deployment", func() {
deployment := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "reconciliation-worker", deployment)
}).Should(Succeed())
Expect(deployment).To(BeControlledBy(reconciliation))
Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker"))
Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker"}))
})
By("API deployment should have broker environment variables", func() {
deployment := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "reconciliation", deployment)
}).Should(Succeed())
Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements(
core.Env("BROKER", "nats"),
core.Env("PUBLISHER_NATS_ENABLED", "true"),
))
})
By("Worker deployment should have broker environment variables", func() {
deployment := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "reconciliation-worker", deployment)
}).Should(Succeed())
Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements(
core.Env("BROKER", "nats"),
core.Env("PUBLISHER_NATS_ENABLED", "true"),
))
})
By("Worker deployment should have topics environment variable", func() {
deployment := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "reconciliation-worker", deployment)
}).Should(Succeed())
Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)),
))
})
By("API deployment should NOT have topics environment variable", func() {
deployment := &appsv1.Deployment{}
Eventually(func() error {
return LoadResource(stack.Name, "reconciliation", deployment)
}).Should(Succeed())
for _, envVar := range deployment.Spec.Template.Spec.Containers[0].Env {
Expect(envVar.Name).NotTo(Equal("KAFKA_TOPICS"))
}
})
By("Should create a new GatewayHTTPAPI object", func() {
httpService := &v1beta1.GatewayHTTPAPI{}
Eventually(func() error {
return LoadResource("", core.GetObjectName(stack.Name, "reconciliation"), httpService)
}).Should(Succeed())
})
By("Should create a new AuthClient object", func() {
authClient := &v1beta1.AuthClient{}
Eventually(func() error {
return LoadResource("", core.GetObjectName(stack.Name, "reconciliation"), authClient)
}).Should(Succeed())
})
By("Should create a new BrokerConsumer object", func() {
consumer := &v1beta1.BrokerConsumer{}
Eventually(func() error {
return LoadResource("", reconciliation.Name+"-reconciliation", consumer)
}).Should(Succeed())
})
By("BrokerConsumer should have correct services", func() {
consumer := &v1beta1.BrokerConsumer{}
Eventually(func(g Gomega) []string {
g.Expect(LoadResource("", reconciliation.Name+"-reconciliation", consumer)).To(Succeed())
return consumer.Spec.Services
}).Should(ContainElements("ledger", "payments"))
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Update assertions for split ingestion/matching workers and consumers.

These assertions still expect a single reconciliation-worker deployment and a single BrokerConsumer. With the split, update the test to validate both worker deployments, their args, per‑worker KAFKA_TOPICS, and the two BrokerConsumers (ingestion: ledger/payments; matching: reconciliation). Also confirm the BrokerConsumer names match the naming in brokerconsumers.Create.

🔧 Example updates
-			By("Should create a worker deployment", func() {
-				deployment := &appsv1.Deployment{}
-				Eventually(func() error {
-					return LoadResource(stack.Name, "reconciliation-worker", deployment)
-				}).Should(Succeed())
-				Expect(deployment).To(BeControlledBy(reconciliation))
-				Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker"))
-				Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker"}))
-			})
+			By("Should create ingestion and matching worker deployments", func() {
+				ingestion := &appsv1.Deployment{}
+				Eventually(func() error {
+					return LoadResource(stack.Name, "reconciliation-worker-ingestion", ingestion)
+				}).Should(Succeed())
+				Expect(ingestion).To(BeControlledBy(reconciliation))
+				Expect(ingestion.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker-ingestion"))
+				Expect(ingestion.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker", "ingestion"}))
+
+				matching := &appsv1.Deployment{}
+				Eventually(func() error {
+					return LoadResource(stack.Name, "reconciliation-worker-matching", matching)
+				}).Should(Succeed())
+				Expect(matching).To(BeControlledBy(reconciliation))
+				Expect(matching.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker-matching"))
+				Expect(matching.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker", "matching"}))
+			})
...
-			By("Worker deployment should have topics environment variable", func() {
-				deployment := &appsv1.Deployment{}
-				Eventually(func() error {
-					return LoadResource(stack.Name, "reconciliation-worker", deployment)
-				}).Should(Succeed())
-				Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
-					core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)),
-				))
-			})
+			By("Worker deployments should have correct topics environment variable", func() {
+				ingestion := &appsv1.Deployment{}
+				Eventually(func() error {
+					return LoadResource(stack.Name, "reconciliation-worker-ingestion", ingestion)
+				}).Should(Succeed())
+				Expect(ingestion.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
+					core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)),
+				))
+
+				matching := &appsv1.Deployment{}
+				Eventually(func() error {
+					return LoadResource(stack.Name, "reconciliation-worker-matching", matching)
+				}).Should(Succeed())
+				Expect(matching.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
+					core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.reconciliation", stack.Name)),
+				))
+			})
...
-			By("Should create a new BrokerConsumer object", func() {
-				consumer := &v1beta1.BrokerConsumer{}
-				Eventually(func() error {
-					return LoadResource("", reconciliation.Name+"-reconciliation", consumer)
-				}).Should(Succeed())
-			})
+			By("Should create ingestion and matching BrokerConsumer objects", func() {
+				ingestion := &v1beta1.BrokerConsumer{}
+				Eventually(func() error {
+					return LoadResource("", reconciliation.Name+"-reconciliation-ingestion", ingestion)
+				}).Should(Succeed())
+
+				matching := &v1beta1.BrokerConsumer{}
+				Eventually(func() error {
+					return LoadResource("", reconciliation.Name+"-reconciliation-matching", matching)
+				}).Should(Succeed())
+			})
...
-			By("BrokerConsumer should have correct services", func() {
-				consumer := &v1beta1.BrokerConsumer{}
-				Eventually(func(g Gomega) []string {
-					g.Expect(LoadResource("", reconciliation.Name+"-reconciliation", consumer)).To(Succeed())
-					return consumer.Spec.Services
-				}).Should(ContainElements("ledger", "payments"))
-			})
+			By("BrokerConsumers should have correct services", func() {
+				ingestion := &v1beta1.BrokerConsumer{}
+				Eventually(func(g Gomega) []string {
+					g.Expect(LoadResource("", reconciliation.Name+"-reconciliation-ingestion", ingestion)).To(Succeed())
+					return ingestion.Spec.Services
+				}).Should(ContainElements("ledger", "payments"))
+
+				matching := &v1beta1.BrokerConsumer{}
+				Eventually(func(g Gomega) []string {
+					g.Expect(LoadResource("", reconciliation.Name+"-reconciliation-matching", matching)).To(Succeed())
+					return matching.Spec.Services
+				}).Should(ContainElements("reconciliation"))
+			})
🤖 Prompt for AI Agents
In `@internal/tests/reconciliation_controller_test.go` around lines 112 - 192, The
test must be updated to assert both split worker deployments and consumers
instead of a single "reconciliation-worker" and BrokerConsumer: load and assert
two deployments (e.g., names like "reconciliation-ingestion" and
"reconciliation-matching" or whatever Create uses), verify their container Args
are the expected per-worker values (e.g., "ingestion" vs "matching"), check each
worker's KAFKA_TOPICS env is correct (ingestion should contain "<stack>.ledger
<stack>.payments", matching should contain "<stack>.reconciliation"), and assert
two BrokerConsumer resources exist with the correct Service lists and names
matching the naming used by brokerconsumers.Create (reference BrokerConsumer
type, brokerconsumers.Create, LoadResource, and the reconciliation/stack naming
in your existing tests to locate where to change assertions).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant