feat(reconciliation): split worker into ingestion and matching workers#383
feat(reconciliation): split worker into ingestion and matching workers#383
Conversation
- Split single reconciliation worker into two dedicated workers:
- Ingestion worker (`worker ingestion`): consumes ledger/payments topics
- Matching worker (`worker matching`): consumes reconciliation topic
- Each worker has its own BrokerConsumer with distinct NATS deliver queue groups
- Ingestion worker publishes to `{stack}.reconciliation` topic
- Add cleanup of legacy single worker consumer and deployment
- Add Elasticsearch env vars support for reconciliation deployments
- Fix duplicate POD_NAME env var when both OTEL traces and metrics are enabled
|
Link with formancehq/reconciliation#46 |
WalkthroughThis change refactors the Kubernetes reconciliation controller to orchestrate multiple deployment types (API and workers), introduces broker consumer creation with legacy resource cleanup, integrates Elasticsearch and broker environment variables into deployments, and adds comprehensive test coverage validating the new behavior. Changes
Sequence DiagramsequenceDiagram
participant RC as Reconciliation<br/>Controller
participant LR as deleteLegacy<br/>Resources
participant BC as BrokerConsumer<br/>Factory
participant DEP as createDeployments<br/>Orchestrator
participant API as API<br/>Deployment
participant WI as Worker<br/>Ingestion
participant WM as Worker<br/>Matching
RC->>LR: Clean up legacy BrokerConsumer<br/>& reconciliation-worker
LR-->>RC: Legacy resources removed
RC->>BC: Create ingestion consumer
BC-->>RC: Ingestion consumer Ready
RC->>BC: Create matching consumer
BC-->>RC: Matching consumer Ready
RC->>DEP: Orchestrate deployments
DEP->>WI: Create worker-ingestion<br/>(Kafka topics, Elasticsearch)
WI-->>DEP: Ready
DEP->>WM: Create worker-matching<br/>(Kafka topics, Elasticsearch)
WM-->>DEP: Ready
DEP->>API: Create API deployment<br/>(sans Elasticsearch vars)
API-->>DEP: Ready
DEP-->>RC: All deployments created
RC-->>RC: Reconciliation status = Ready
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@internal/resources/reconciliations/deployments.go`:
- Around line 147-153: The Elasticsearch env var injection currently calls
settings.GetElasticsearchEnvVars(ctx, stack.Name) and appends esEnvVars into the
shared env used for the API deployment (variable env); move this call and the
append so ES vars are only added when constructing the worker deployment
environment (e.g., where the worker-specific env is built or where worker
container spec is created) instead of the API path, or alternatively update the
tests to expect ES vars on the API if that was intended; ensure references to
esEnvVars and settings.GetElasticsearchEnvVars remain in the worker-deployment
code paths and remove the append from the API deployment construction.
In `@internal/tests/reconciliation_controller_test.go`:
- Around line 112-192: The test must be updated to assert both split worker
deployments and consumers instead of a single "reconciliation-worker" and
BrokerConsumer: load and assert two deployments (e.g., names like
"reconciliation-ingestion" and "reconciliation-matching" or whatever Create
uses), verify their container Args are the expected per-worker values (e.g.,
"ingestion" vs "matching"), check each worker's KAFKA_TOPICS env is correct
(ingestion should contain "<stack>.ledger <stack>.payments", matching should
contain "<stack>.reconciliation"), and assert two BrokerConsumer resources exist
with the correct Service lists and names matching the naming used by
brokerconsumers.Create (reference BrokerConsumer type, brokerconsumers.Create,
LoadResource, and the reconciliation/stack naming in your existing tests to
locate where to change assertions).
| // Add Elasticsearch env vars for both API and Worker | ||
| esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| env = append(env, esEnvVars...) | ||
|
|
There was a problem hiding this comment.
Elasticsearch env vars are injected into the API deployment.
The tests in this PR assert the API deployment should not carry ELASTICSEARCH_* env vars. If that expectation is correct, gate these env vars to the worker deployments only (or update the tests/spec to match the intended behavior).
💡 Suggested fix (apply ES env vars to workers only)
- // Add Elasticsearch env vars for both API and Worker
- esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
- if err != nil {
- return err
- }
- env = append(env, esEnvVars...)
+ // Add Elasticsearch env vars only for worker deployments
+ if deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching {
+ esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name)
+ if err != nil {
+ return err
+ }
+ env = append(env, esEnvVars...)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Add Elasticsearch env vars for both API and Worker | |
| esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name) | |
| if err != nil { | |
| return err | |
| } | |
| env = append(env, esEnvVars...) | |
| // Add Elasticsearch env vars only for worker deployments | |
| if deploymentType == DeploymentTypeWorkerIngestion || deploymentType == DeploymentTypeWorkerMatching { | |
| esEnvVars, err := settings.GetElasticsearchEnvVars(ctx, stack.Name) | |
| if err != nil { | |
| return err | |
| } | |
| env = append(env, esEnvVars...) | |
| } |
🤖 Prompt for AI Agents
In `@internal/resources/reconciliations/deployments.go` around lines 147 - 153,
The Elasticsearch env var injection currently calls
settings.GetElasticsearchEnvVars(ctx, stack.Name) and appends esEnvVars into the
shared env used for the API deployment (variable env); move this call and the
append so ES vars are only added when constructing the worker deployment
environment (e.g., where the worker-specific env is built or where worker
container spec is created) instead of the API path, or alternatively update the
tests to expect ES vars on the API if that was intended; ensure references to
esEnvVars and settings.GetElasticsearchEnvVars remain in the worker-deployment
code paths and remove the append from the API deployment construction.
| By("Should create an API deployment", func() { | ||
| deployment := &appsv1.Deployment{} | ||
| Eventually(func() error { | ||
| return LoadResource(stack.Name, "reconciliation", deployment) | ||
| }).Should(Succeed()) | ||
| Expect(deployment).To(BeControlledBy(reconciliation)) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation")) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"serve"})) | ||
| }) | ||
| By("Should create a worker deployment", func() { | ||
| deployment := &appsv1.Deployment{} | ||
| Eventually(func() error { | ||
| return LoadResource(stack.Name, "reconciliation-worker", deployment) | ||
| }).Should(Succeed()) | ||
| Expect(deployment).To(BeControlledBy(reconciliation)) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker")) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker"})) | ||
| }) | ||
| By("API deployment should have broker environment variables", func() { | ||
| deployment := &appsv1.Deployment{} | ||
| Eventually(func() error { | ||
| return LoadResource(stack.Name, "reconciliation", deployment) | ||
| }).Should(Succeed()) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements( | ||
| core.Env("BROKER", "nats"), | ||
| core.Env("PUBLISHER_NATS_ENABLED", "true"), | ||
| )) | ||
| }) | ||
| By("Worker deployment should have broker environment variables", func() { | ||
| deployment := &appsv1.Deployment{} | ||
| Eventually(func() error { | ||
| return LoadResource(stack.Name, "reconciliation-worker", deployment) | ||
| }).Should(Succeed()) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElements( | ||
| core.Env("BROKER", "nats"), | ||
| core.Env("PUBLISHER_NATS_ENABLED", "true"), | ||
| )) | ||
| }) | ||
| By("Worker deployment should have topics environment variable", func() { | ||
| deployment := &appsv1.Deployment{} | ||
| Eventually(func() error { | ||
| return LoadResource(stack.Name, "reconciliation-worker", deployment) | ||
| }).Should(Succeed()) | ||
| Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElement( | ||
| core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)), | ||
| )) | ||
| }) | ||
| By("API deployment should NOT have topics environment variable", func() { | ||
| deployment := &appsv1.Deployment{} | ||
| Eventually(func() error { | ||
| return LoadResource(stack.Name, "reconciliation", deployment) | ||
| }).Should(Succeed()) | ||
| for _, envVar := range deployment.Spec.Template.Spec.Containers[0].Env { | ||
| Expect(envVar.Name).NotTo(Equal("KAFKA_TOPICS")) | ||
| } | ||
| }) | ||
| By("Should create a new GatewayHTTPAPI object", func() { | ||
| httpService := &v1beta1.GatewayHTTPAPI{} | ||
| Eventually(func() error { | ||
| return LoadResource("", core.GetObjectName(stack.Name, "reconciliation"), httpService) | ||
| }).Should(Succeed()) | ||
| }) | ||
| By("Should create a new AuthClient object", func() { | ||
| authClient := &v1beta1.AuthClient{} | ||
| Eventually(func() error { | ||
| return LoadResource("", core.GetObjectName(stack.Name, "reconciliation"), authClient) | ||
| }).Should(Succeed()) | ||
| }) | ||
| By("Should create a new BrokerConsumer object", func() { | ||
| consumer := &v1beta1.BrokerConsumer{} | ||
| Eventually(func() error { | ||
| return LoadResource("", reconciliation.Name+"-reconciliation", consumer) | ||
| }).Should(Succeed()) | ||
| }) | ||
| By("BrokerConsumer should have correct services", func() { | ||
| consumer := &v1beta1.BrokerConsumer{} | ||
| Eventually(func(g Gomega) []string { | ||
| g.Expect(LoadResource("", reconciliation.Name+"-reconciliation", consumer)).To(Succeed()) | ||
| return consumer.Spec.Services | ||
| }).Should(ContainElements("ledger", "payments")) | ||
| }) |
There was a problem hiding this comment.
Update assertions for split ingestion/matching workers and consumers.
These assertions still expect a single reconciliation-worker deployment and a single BrokerConsumer. With the split, update the test to validate both worker deployments, their args, per‑worker KAFKA_TOPICS, and the two BrokerConsumers (ingestion: ledger/payments; matching: reconciliation). Also confirm the BrokerConsumer names match the naming in brokerconsumers.Create.
🔧 Example updates
- By("Should create a worker deployment", func() {
- deployment := &appsv1.Deployment{}
- Eventually(func() error {
- return LoadResource(stack.Name, "reconciliation-worker", deployment)
- }).Should(Succeed())
- Expect(deployment).To(BeControlledBy(reconciliation))
- Expect(deployment.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker"))
- Expect(deployment.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker"}))
- })
+ By("Should create ingestion and matching worker deployments", func() {
+ ingestion := &appsv1.Deployment{}
+ Eventually(func() error {
+ return LoadResource(stack.Name, "reconciliation-worker-ingestion", ingestion)
+ }).Should(Succeed())
+ Expect(ingestion).To(BeControlledBy(reconciliation))
+ Expect(ingestion.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker-ingestion"))
+ Expect(ingestion.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker", "ingestion"}))
+
+ matching := &appsv1.Deployment{}
+ Eventually(func() error {
+ return LoadResource(stack.Name, "reconciliation-worker-matching", matching)
+ }).Should(Succeed())
+ Expect(matching).To(BeControlledBy(reconciliation))
+ Expect(matching.Spec.Template.Spec.Containers[0].Name).To(Equal("reconciliation-worker-matching"))
+ Expect(matching.Spec.Template.Spec.Containers[0].Args).To(Equal([]string{"worker", "matching"}))
+ })
...
- By("Worker deployment should have topics environment variable", func() {
- deployment := &appsv1.Deployment{}
- Eventually(func() error {
- return LoadResource(stack.Name, "reconciliation-worker", deployment)
- }).Should(Succeed())
- Expect(deployment.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
- core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)),
- ))
- })
+ By("Worker deployments should have correct topics environment variable", func() {
+ ingestion := &appsv1.Deployment{}
+ Eventually(func() error {
+ return LoadResource(stack.Name, "reconciliation-worker-ingestion", ingestion)
+ }).Should(Succeed())
+ Expect(ingestion.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
+ core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.ledger %s.payments", stack.Name, stack.Name)),
+ ))
+
+ matching := &appsv1.Deployment{}
+ Eventually(func() error {
+ return LoadResource(stack.Name, "reconciliation-worker-matching", matching)
+ }).Should(Succeed())
+ Expect(matching.Spec.Template.Spec.Containers[0].Env).To(ContainElement(
+ core.Env("KAFKA_TOPICS", fmt.Sprintf("%s.reconciliation", stack.Name)),
+ ))
+ })
...
- By("Should create a new BrokerConsumer object", func() {
- consumer := &v1beta1.BrokerConsumer{}
- Eventually(func() error {
- return LoadResource("", reconciliation.Name+"-reconciliation", consumer)
- }).Should(Succeed())
- })
+ By("Should create ingestion and matching BrokerConsumer objects", func() {
+ ingestion := &v1beta1.BrokerConsumer{}
+ Eventually(func() error {
+ return LoadResource("", reconciliation.Name+"-reconciliation-ingestion", ingestion)
+ }).Should(Succeed())
+
+ matching := &v1beta1.BrokerConsumer{}
+ Eventually(func() error {
+ return LoadResource("", reconciliation.Name+"-reconciliation-matching", matching)
+ }).Should(Succeed())
+ })
...
- By("BrokerConsumer should have correct services", func() {
- consumer := &v1beta1.BrokerConsumer{}
- Eventually(func(g Gomega) []string {
- g.Expect(LoadResource("", reconciliation.Name+"-reconciliation", consumer)).To(Succeed())
- return consumer.Spec.Services
- }).Should(ContainElements("ledger", "payments"))
- })
+ By("BrokerConsumers should have correct services", func() {
+ ingestion := &v1beta1.BrokerConsumer{}
+ Eventually(func(g Gomega) []string {
+ g.Expect(LoadResource("", reconciliation.Name+"-reconciliation-ingestion", ingestion)).To(Succeed())
+ return ingestion.Spec.Services
+ }).Should(ContainElements("ledger", "payments"))
+
+ matching := &v1beta1.BrokerConsumer{}
+ Eventually(func(g Gomega) []string {
+ g.Expect(LoadResource("", reconciliation.Name+"-reconciliation-matching", matching)).To(Succeed())
+ return matching.Spec.Services
+ }).Should(ContainElements("reconciliation"))
+ })🤖 Prompt for AI Agents
In `@internal/tests/reconciliation_controller_test.go` around lines 112 - 192, The
test must be updated to assert both split worker deployments and consumers
instead of a single "reconciliation-worker" and BrokerConsumer: load and assert
two deployments (e.g., names like "reconciliation-ingestion" and
"reconciliation-matching" or whatever Create uses), verify their container Args
are the expected per-worker values (e.g., "ingestion" vs "matching"), check each
worker's KAFKA_TOPICS env is correct (ingestion should contain "<stack>.ledger
<stack>.payments", matching should contain "<stack>.reconciliation"), and assert
two BrokerConsumer resources exist with the correct Service lists and names
matching the naming used by brokerconsumers.Create (reference BrokerConsumer
type, brokerconsumers.Create, LoadResource, and the reconciliation/stack naming
in your existing tests to locate where to change assertions).
Summary
worker ingestion) and matching (worker matching)ledger/paymentsbroker topics and publishes to{stack}.reconciliationreconciliationtopic via its own BrokerConsumer with a distinct NATS deliver queue groupPOD_NAMEenv var when both OTEL traces and metrics are enabledTest plan
KAFKA_TOPICSfor ledger/payments andPUBLISHER_TOPIC_MAPPINGfor reconciliationKAFKA_TOPICSfor reconciliation onlyreconciliation_ingestion/reconciliation_matching)reconciliation-workerdeployment and old BrokerConsumer are cleaned upPOD_NAMEenv var appears only once per pod