-
Notifications
You must be signed in to change notification settings - Fork 150
Description
Describe the bug
Source vertex autoscaling does not trigger in the Rust runtime. The source emits vertex_pending_messages_raw with partition_name="Source", but the Go daemon queries for partition_name="<vertex-name>" (e.g., "my-source"). This mismatch causes CalculatePending() to return PendingNotAvailable, so the autoscaler does not see pending messages for source vertices.
I verified this by running the example pipeline with a external nats server in a kind cluster. And could see that it does not autoscale. Trying the code in #3205 It does Autoscale. But i do see the problem with renaming the metric. But it was atleast helpfull to me to see it working.
To Reproduce
- Deploy a pipeline with a source vertex that has autoscaling configured (
scale.min: 1, max: 5) and a slow consumer (e.g., a source transformer with a 1-second sleep) - Publish enough messages to create a pending backlog (e.g., 5000 messages)
- Port-forward to the source pod and scrape the metrics endpoint:
SOURCE_POD=$(kubectl -n numaflow-system get pod \ -l "numaflow.numaproj.io/pipeline-name=<pipeline>,numaflow.numaproj.io/vertex-name=<vertex>" \ -o jsonpath='{.items[0].metadata.name}') kubectl -n numaflow-system port-forward "$SOURCE_POD" 9090:2469 & curl -sk https://localhost:9090/metrics | grep pending_messages_raw
- Observe the metric has
partition_name="Source"instead of the vertex name - Wait several minutes — the source never scales beyond 1 replica despite thousands of pending messages
Reproduction pipeline:
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: test-autoscale
namespace: numaflow-system
spec:
vertices:
- name: my-source
scale:
min: 1
max: 5
limits:
readBatchSize: 1
source:
jetstream:
url: nats://nats-external.default.svc:4222
stream: test-source-stream
consumer: numaflow-consumer
transformer:
container:
image: sleep-transformer:latest # sleeps 1s per message
imagePullPolicy: Never
- name: out
sink:
log: {}
edges:
- from: my-source
to: outExpected behavior
With ~5000 pending messages and a source configured with max: 5 replicas, the autoscaler should scale up the source vertex to handle the backlog.
Actual behavior
Source stays at 1 replica indefinitely. The daemon cannot find the pending count because the metric label doesn't match what it queries for.
Evidence
Scraping the source pod's metrics endpoint on v1.7.1:
vertex_pending_messages_raw{partition_name="Source",pipeline="test-autoscale",vertex="my-source",vertex_type="source"} 4998
The Go daemon queries with partitionName="my-source" (the vertex name), per pipeline_metrics_query.go:152-154:
if abstractVertex.IsASource() {
bufferList = append(bufferList, vertexName) // "my-source"
}In CalculatePending() (helper.go:72), the lookup pendingCount["my-source"] fails because the map contains "Source":
val, ok := pendingCount[partitionName] // partitionName="my-source", but map key is "Source"
// ok is always false → returns PendingNotAvailablev1.7.1 — source stays at 1 replica with ~5000 pending:
Time | Replicas | Pending (daemon sees)
-----------|----------|----------------------
17:31:43 | 1 | -1 (PendingNotAvailable)
17:31:58 | 1 | -1
17:32:13 | 1 | -1
17:32:28 | 1 | -1
17:32:43 | 1 | -1
(stayed at 1 for 3+ minutes)
With fix applied — source scales to 3 replicas within 15 seconds:
Time | Replicas | Pending
-----------|----------|--------
17:26:43 | 1 | 4978
17:26:58 | 3 | 4955
17:27:13 | 3 | 4910
17:27:29 | 3 | 4865
17:27:44 | 3 | 4820
Root cause
The Rust source runtime and the Go daemon ended up using different values for the source partition_name.
Rust side — rust/numaflow-core/src/metrics.rs uses "Source":
metric_labels.push((
PIPELINE_PARTITION_NAME_LABEL.to_string(),
"Source".to_string(),
));Go side — pkg/daemon/server/service/pipeline_metrics_query.go:152-153 uses the vertex name:
// source vertex will have a single partition, which is the vertex name itself
if abstractVertex.IsASource() {
bufferList = append(bufferList, vertexName)
}Since sources don't have real partitions (no upstream ISB), the partition name is just a key that both sides need to agree on. These currently differ, which causes the lookup to fail.
A fix in either place would work — either change the Rust source to emit the vertex name, or change the Go daemon to query for "Source". We have a PR open with a Rust-side fix (#3204), but happy to adjust the approach to whatever the team prefers.
Environment (please complete the following information):
- Kubernetes: v1.32.0 (kind)
- Numaflow: v1.7.1 (Rust runtime, default since v1.6)
Additional context
- This affects all source types (JetStream, Kafka, HTTP, etc.) since they all go through the same
expose_pending_metricscode path - Sink and UDF autoscaling are not affected — they use ISB reader metrics which correctly use stream names as partition labels
- The mismatch can be fixed on either side: change the Rust metric to emit the vertex name, or change the Go daemon to query for
"Source"
How this worked in the Go runtime
I had Claude dig through git history to check how this worked before the Rust rewrite. And i have not fully understood, but maybe it helps someone who knows the code better. (Or maybe you can see that claude was wrong.)
In the Go runtime (removed in a8bb86b9), source autoscaling used the vertex_pending_messages metric (not vertex_pending_messages_raw). The Go metrics server set partition_name to the value of reader.GetName(), where all source implementations (kafka, jetstream, etc.) returned the vertex name. The flow was:
GetName()returnsvertexNameon all source implementations- Used as map key in
lagReaders - That key becomes the
partition_namelabel inexposePendingMetrics
So in the Go runtime, the source pending metric used partition_name=<vertex-name>, which matched what the daemon queried for. The Rust rewrite changed this to the hardcoded "Source", which broke the contract.