Skip to content

Sink optimizations - reduce message cloning#3189

Draft
BulkBeing wants to merge 4 commits intomainfrom
sink-optimizations
Draft

Sink optimizations - reduce message cloning#3189
BulkBeing wants to merge 4 commits intomainfrom
sink-optimizations

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Feb 4, 2026

What this PR does / why we need it

Changes based on CPU profiling done on below Monovertex:

apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
  name: simple-monovertex
spec:
  replicas: 1
  containerTemplate:
    resources:
      requests:
        cpu: 1
        memory: 500Mi
      limits:
        cpu: 1
        memory: 500Mi
  source:
    udsource:
      container:
        image: quay.io/numaio/numaflow-rs/simple-source:stable
        resources:
          requests:
            cpu: 500m
            memory: 500Mi
          limits:
            cpu: 500m
            memory: 500Mi
    transformer:
      container:
        image: quay.io/numaio/numaflow-rs/source-transformer-now:stable
        resources:
          requests:
            cpu: 500m
            memory: 500Mi
          limits:
            cpu: 500m
            memory: 500Mi
  sink:
    udsink:
      container:
        image: sink-blackhole:stable
        resources:
          requests:
            cpu: 500m
            memory: 500Mi
          limits:
            cpu: 500m
            memory: 500Mi

I'm seeing around 15% increase in processing rate with the above Monovertex (ran the benchmark a few times).
Screenshot 2026-02-05 at 10 05 28 AM

Related issues

Fixes #

Testing

Running above Monovertex.

Special notes for reviewers

Anything notable for review (risk, rollout, follow-ups).

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@vaibhavtiwari33
Copy link
Contributor

This requires changes in all the SDKs as well correct?
Since we're changing the type of value, that might break backward compatibility?

-        #[prost(bytes = "vec", tag = "2")]
-        pub value: ::prost::alloc::vec::Vec<u8>,
+        #[prost(bytes = "bytes", tag = "2")]
+        pub value: ::prost::bytes::Bytes,

@BulkBeing
Copy link
Contributor Author

This requires changes in all the SDKs as well correct? Since we're changing the type of value, that might break backward compatibility?

-        #[prost(bytes = "vec", tag = "2")]
-        pub value: ::prost::alloc::vec::Vec<u8>,
+        #[prost(bytes = "bytes", tag = "2")]
+        pub value: ::prost::bytes::Bytes,

It shouldn't. Encoded form of Vec<u8> and Bytes should be just an array of bytes. I've tested with a Monovertex that involves udsource, transformer and a udsink.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants