Skip to content

Source autoscaling not working #3208

@bobo

Description

@bobo

Describe the bug

Source vertex autoscaling does not trigger in the Rust runtime. The source emits vertex_pending_messages_raw with partition_name="Source", but the Go daemon queries for partition_name="<vertex-name>" (e.g., "my-source"). This mismatch causes CalculatePending() to return PendingNotAvailable, so the autoscaler does not see pending messages for source vertices.
I verified this by running the example pipeline with a external nats server in a kind cluster. And could see that it does not autoscale. Trying the code in #3205 It does Autoscale. But i do see the problem with renaming the metric. But it was atleast helpfull to me to see it working.

To Reproduce

  1. Deploy a pipeline with a source vertex that has autoscaling configured (scale.min: 1, max: 5) and a slow consumer (e.g., a source transformer with a 1-second sleep)
  2. Publish enough messages to create a pending backlog (e.g., 5000 messages)
  3. Port-forward to the source pod and scrape the metrics endpoint:
    SOURCE_POD=$(kubectl -n numaflow-system get pod \
      -l "numaflow.numaproj.io/pipeline-name=<pipeline>,numaflow.numaproj.io/vertex-name=<vertex>" \
      -o jsonpath='{.items[0].metadata.name}')
    kubectl -n numaflow-system port-forward "$SOURCE_POD" 9090:2469 &
    curl -sk https://localhost:9090/metrics | grep pending_messages_raw
  4. Observe the metric has partition_name="Source" instead of the vertex name
  5. Wait several minutes — the source never scales beyond 1 replica despite thousands of pending messages

Reproduction pipeline:

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: test-autoscale
  namespace: numaflow-system
spec:
  vertices:
    - name: my-source
      scale:
        min: 1
        max: 5
      limits:
        readBatchSize: 1
      source:
        jetstream:
          url: nats://nats-external.default.svc:4222
          stream: test-source-stream
          consumer: numaflow-consumer
        transformer:
          container:
            image: sleep-transformer:latest # sleeps 1s per message
            imagePullPolicy: Never
    - name: out
      sink:
        log: {}
  edges:
    - from: my-source
      to: out

Expected behavior

With ~5000 pending messages and a source configured with max: 5 replicas, the autoscaler should scale up the source vertex to handle the backlog.

Actual behavior

Source stays at 1 replica indefinitely. The daemon cannot find the pending count because the metric label doesn't match what it queries for.

Evidence

Scraping the source pod's metrics endpoint on v1.7.1:

vertex_pending_messages_raw{partition_name="Source",pipeline="test-autoscale",vertex="my-source",vertex_type="source"} 4998

The Go daemon queries with partitionName="my-source" (the vertex name), per pipeline_metrics_query.go:152-154:

if abstractVertex.IsASource() {
    bufferList = append(bufferList, vertexName)  // "my-source"
}

In CalculatePending() (helper.go:72), the lookup pendingCount["my-source"] fails because the map contains "Source":

val, ok := pendingCount[partitionName]  // partitionName="my-source", but map key is "Source"
// ok is always false → returns PendingNotAvailable

v1.7.1 — source stays at 1 replica with ~5000 pending:

Time       | Replicas | Pending (daemon sees)
-----------|----------|----------------------
17:31:43   | 1        | -1 (PendingNotAvailable)
17:31:58   | 1        | -1
17:32:13   | 1        | -1
17:32:28   | 1        | -1
17:32:43   | 1        | -1
(stayed at 1 for 3+ minutes)

With fix applied — source scales to 3 replicas within 15 seconds:

Time       | Replicas | Pending
-----------|----------|--------
17:26:43   | 1        | 4978
17:26:58   | 3        | 4955
17:27:13   | 3        | 4910
17:27:29   | 3        | 4865
17:27:44   | 3        | 4820

Root cause

The Rust source runtime and the Go daemon ended up using different values for the source partition_name.

Rust siderust/numaflow-core/src/metrics.rs uses "Source":

metric_labels.push((
    PIPELINE_PARTITION_NAME_LABEL.to_string(),
    "Source".to_string(),
));

Go sidepkg/daemon/server/service/pipeline_metrics_query.go:152-153 uses the vertex name:

// source vertex will have a single partition, which is the vertex name itself
if abstractVertex.IsASource() {
    bufferList = append(bufferList, vertexName)
}

Since sources don't have real partitions (no upstream ISB), the partition name is just a key that both sides need to agree on. These currently differ, which causes the lookup to fail.

A fix in either place would work — either change the Rust source to emit the vertex name, or change the Go daemon to query for "Source". We have a PR open with a Rust-side fix (#3204), but happy to adjust the approach to whatever the team prefers.

Environment (please complete the following information):

  • Kubernetes: v1.32.0 (kind)
  • Numaflow: v1.7.1 (Rust runtime, default since v1.6)

Additional context

  • This affects all source types (JetStream, Kafka, HTTP, etc.) since they all go through the same expose_pending_metrics code path
  • Sink and UDF autoscaling are not affected — they use ISB reader metrics which correctly use stream names as partition labels
  • The mismatch can be fixed on either side: change the Rust metric to emit the vertex name, or change the Go daemon to query for "Source"

How this worked in the Go runtime

I had Claude dig through git history to check how this worked before the Rust rewrite. And i have not fully understood, but maybe it helps someone who knows the code better. (Or maybe you can see that claude was wrong.)
In the Go runtime (removed in a8bb86b9), source autoscaling used the vertex_pending_messages metric (not vertex_pending_messages_raw). The Go metrics server set partition_name to the value of reader.GetName(), where all source implementations (kafka, jetstream, etc.) returned the vertex name. The flow was:

  1. GetName() returns vertexName on all source implementations
  2. Used as map key in lagReaders
  3. That key becomes the partition_name label in exposePendingMetrics

So in the Go runtime, the source pending metric used partition_name=<vertex-name>, which matched what the daemon queried for. The Rust rewrite changed this to the hardcoded "Source", which broke the contract.

Metadata

Metadata

Assignees

No one assigned

    Labels

    backportBack port the commit to previous stable release.bugSomething isn't workingrustRust

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions