Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20251216164002-58c749b888d8.1
buf.build/go/hyperpb v0.1.3
cloud.google.com/go/aiplatform v1.104.0
cloud.google.com/go/bigquery v1.71.0
cloud.google.com/go/pubsub v1.50.1
Expand Down Expand Up @@ -283,7 +282,6 @@ require (
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/timandy/routine v1.1.5 // indirect
github.com/twpayne/go-geom v1.6.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
Expand Down
16 changes: 0 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
buf.build/gen/go/bufbuild/hyperpb-examples/protocolbuffers/go v1.36.7-20250725192734-0dd56aa9cbbc.1 h1:bFnppdLYActzr2F0iomSrkjUnGgVufb0DtZxjKgTLGc=
buf.build/gen/go/bufbuild/hyperpb-examples/protocolbuffers/go v1.36.7-20250725192734-0dd56aa9cbbc.1/go.mod h1:x7jYNX5/7EPnsKHEq596krkOGzvR97/MsZw2fw3Mrq0=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1 h1:31on4W/yPcV4nZHL4+UCiCvLPsMqe/vJcNg8Rci0scc=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1/go.mod h1:fUl8CEN/6ZAMk6bP8ahBJPUJw7rbp+j4x+wCcYi2IG4=
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 h1:vK2m7N3SPeHRqfVBj4FpmjlNCBEhR05OgCgJ+xIGfAs=
Expand All @@ -8,10 +6,6 @@ buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1/go.mod h1:dDSnTB/bSMAA9z59+0E2JWab9LyGnb+spW8nrVeEAqA=
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20251216164002-58c749b888d8.1 h1:4jqc94IBC9Ea9GaMbmgfhczXZzCkA4ZWfon3/uI3KV0=
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20251216164002-58c749b888d8.1/go.mod h1:akvBCH3f6fL10sDu4NppgjHrQITLe1m5YWLt/yiLEKI=
buf.build/go/hyperpb v0.1.3 h1:wiw2F7POvAe2VA2kkB0TAsFwj91lXbFrKM41D3ZgU1w=
buf.build/go/hyperpb v0.1.3/go.mod h1:IHXAM5qnS0/Fsnd7/HGDghFNvUET646WoHmq1FDZXIE=
buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A=
buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA=
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down Expand Up @@ -850,8 +844,6 @@ github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUS
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/apache/arrow-go/v18 v18.4.1 h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4=
github.com/apache/arrow-go/v18 v18.4.1/go.mod h1:tLyFubsAl17bvFdUAy24bsSvA/6ww95Iqi67fTpGu3E=
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0=
Expand Down Expand Up @@ -1419,8 +1411,6 @@ github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/cel-go v0.26.0 h1:DPGjXackMpJWH680oGY4lZhYjIameYmR+/6RBdDGmaI=
github.com/google/cel-go v0.26.0/go.mod h1:A9O8OU9rdvrK5MQyrqfIxo1a0u4g3sF8KB6PUIaryMM=
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
Expand Down Expand Up @@ -2010,8 +2000,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9 h1:arwj11zP0yJIxIRiDn22E0H8PxfF7TsTrc2wIPFIsf4=
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9/go.mod h1:SKZx6stCn03JN3BOWTwvVIO2ajMkb/zQdTceXYhKw/4=
github.com/protocolbuffers/txtpbfmt v0.0.0-20251016062345-16587c79cd91 h1:s1LvMaU6mVwoFtbxv/rCZKE7/fwDmDY684FfUe4c1Io=
github.com/protocolbuffers/txtpbfmt v0.0.0-20251016062345-16587c79cd91/go.mod h1:JSbkp0BviKovYYt9XunS95M3mLPibE9bGg+Y95DsEEY=
github.com/pusher/pusher-http-go v4.0.1+incompatible h1:4u6tomPG1WhHaST7Wi9mw83Y+MS/j2EplR2YmDh8Xp4=
Expand Down Expand Up @@ -2124,8 +2112,6 @@ github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3A
github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo=
github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/stoewer/go-strcase v1.3.1 h1:iS0MdW+kVTxgMoE1LAZyMiYJFKlOzLooE4MxjirtkAs=
github.com/stoewer/go-strcase v1.3.1/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
Expand Down Expand Up @@ -2173,8 +2159,6 @@ github.com/tigerbeetle/tigerbeetle-go v0.16.61 h1:ciGSFxBhpXRbTorxPV7O/vXQKupVKe
github.com/tigerbeetle/tigerbeetle-go v0.16.61/go.mod h1:d6G7n4OlD7GLHd62x0VlWPXeI/L0SoNNTfm/ee24GJI=
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
github.com/timandy/routine v1.1.5 h1:LSpm7Iijwb9imIPlucl4krpr2EeCeAUvifiQ9Uf5X+M=
github.com/timandy/routine v1.1.5/go.mod h1:kXslgIosdY8LW0byTyPnenDgn4/azt2euufAq9rK51w=
github.com/timeplus-io/proton-go-driver/v2 v2.1.2 h1:XPHvI4irUBBuVGAyvAzpb170IiyWK5DEBfGpC7h8bgU=
github.com/timeplus-io/proton-go-driver/v2 v2.1.2/go.mod h1:rUs4zvXvKsmuyFpzdJnnid6p8IvRJTa/n/jNQ2B6Dfw=
github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4=
Expand Down
5 changes: 1 addition & 4 deletions internal/impl/confluent/serde_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(
defer mu.Unlock()
if msgDesc.FullName() != cachedMessageName {
cachedMessageName = msgDesc.FullName()
cachedDecoder = common.NewHyperPbDecoder(msgDesc, common.ProfilingOptions{
Rate: 0.01,
RecompileInterval: 100_000,
})
cachedDecoder = common.NewDynamicPbDecoder(msgDesc)
}
return cachedDecoder
}
Expand Down
63 changes: 3 additions & 60 deletions internal/impl/protobuf/common/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func loadTestFileDescriptorSet(t testing.TB) (protoreflect.MessageDescriptor, *p

// BenchmarkProtobufToMessage benchmarks the complete pipeline of decoding protobuf
// and converting to a Benthos message, testing the matrix of:
// - Decoding: dynamicpb vs hyperpb (with PGO)
// - Decoding: dynamicpb
// - Conversion: Fast (SetStructuredMut) vs Slow (SetBytes)
func BenchmarkProtobufToMessage(b *testing.B) {
md, types := loadTestFileDescriptorSet(b)
Expand Down Expand Up @@ -109,18 +109,8 @@ func BenchmarkProtobufToMessage(b *testing.B) {
},
}

b.StopTimer()
// Profile-guided optimization settings for hyperpb
pgoOpts := ProfilingOptions{
Rate: 0.01, // Profile every message during priming
RecompileInterval: 100000, // Recompile after 1000 messages
}

// Create decoders
dynamicpbDecoder := NewDynamicPbDecoder(md, ProfilingOptions{})
hyperpbDecoder := NewHyperPbDecoder(md, pgoOpts)

b.StartTimer()
// Create decoder
dynamicpbDecoder := NewDynamicPbDecoder(md)

marshalOpts := protojson.MarshalOptions{Resolver: types}

Expand All @@ -137,18 +127,6 @@ func BenchmarkProtobufToMessage(b *testing.B) {
b.Fatal(err)
}

// Prime the hyperpb decoder with sample data to build profile
// Run with enough iterations to trigger at least one recompilation
for range pgoOpts.RecompileInterval * 2 {
err := hyperpbDecoder.WithDecoded(pbBytes, func(proto.Message) error {
return nil
})
if err != nil {
b.Fatal(err)
}
}
b.StartTimer()

// Benchmark: dynamicpb decode + fast conversion + read
b.Run(tc.name+"/dynamicpb/fast", func(b *testing.B) {
b.ReportAllocs()
Expand Down Expand Up @@ -185,40 +163,5 @@ func BenchmarkProtobufToMessage(b *testing.B) {
}
})

// Benchmark: hyperpb decode + fast conversion + read
b.Run(tc.name+"/hyperpb/fast", func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
msg := service.NewMessage(nil)
err := hyperpbDecoder.WithDecoded(pbBytes, func(decoded proto.Message) error {
return ToMessageFast(decoded.(protoreflect.Message), marshalOpts, msg)
})
if err != nil {
b.Fatal(err)
}
_, err = msg.AsStructured()
if err != nil {
b.Fatal(err)
}
}
})

// Benchmark: hyperpb decode + slow conversion + read
b.Run(tc.name+"/hyperpb/slow", func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
msg := service.NewMessage(nil)
err := hyperpbDecoder.WithDecoded(pbBytes, func(decoded proto.Message) error {
return ToMessageSlow(decoded.(protoreflect.Message), marshalOpts, msg)
})
if err != nil {
b.Fatal(err)
}
_, err = msg.AsStructured()
if err != nil {
b.Fatal(err)
}
}
})
}
}
9 changes: 0 additions & 9 deletions internal/impl/protobuf/common/decode_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import "google.golang.org/protobuf/proto"

// ProtobufDecoder is an interface for different methods to parse protobuf
// (the binary format) in a dynamic and reflective way.
//
// Currently, there are two supported approaches: dynamicpb and hyperpb
type ProtobufDecoder interface {
// Decode the buffer into a proto message that is passed into the callback.
//
Expand All @@ -25,10 +23,3 @@ type ProtobufDecoder interface {
// the provided callback.
WithDecoded(buf []byte, cb func(msg proto.Message) error) error
}

// ProfilingOptions specifies the profiling rate and how often we recompile
// for ProtobufDecoders that support profile-guided optimizations in flight (PGO)
type ProfilingOptions struct {
Rate float64
RecompileInterval int64
}
7 changes: 2 additions & 5 deletions internal/impl/protobuf/common/decode_dynamicpb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ import (
)

// NewDynamicPbDecoder returns a new ProtobufDecoder based on standard proto reflection
// in the offical protobuf library.
func NewDynamicPbDecoder(
md protoreflect.MessageDescriptor,
_ ProfilingOptions,
) ProtobufDecoder {
// in the official protobuf library.
func NewDynamicPbDecoder(md protoreflect.MessageDescriptor) ProtobufDecoder {
return &dynamicPbParser{dynamicpb.NewMessageType(md)}
}

Expand Down
83 changes: 0 additions & 83 deletions internal/impl/protobuf/common/decode_hyperpb.go

This file was deleted.

25 changes: 0 additions & 25 deletions internal/impl/protobuf/common/decode_hyperpb_fallback.go

This file was deleted.

14 changes: 2 additions & 12 deletions internal/impl/protobuf/processor_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,7 @@ func newProtobufToJSONOperator(
if err != nil {
return nil, fmt.Errorf("unable to find protobuf type %q: %w", msg, err)
}
decoder := common.NewHyperPbDecoder(
msgType.Descriptor(),
common.ProfilingOptions{
Rate: 0.01,
RecompileInterval: 100_000,
})
decoder := common.NewDynamicPbDecoder(msgType.Descriptor())
opts.Resolver = types
return func(part *service.Message) error {
partBytes, err := part.AsBytes()
Expand Down Expand Up @@ -403,12 +398,7 @@ func newProtobufToJSONBSROperator(
if err != nil {
return nil, fmt.Errorf("unable to find message '%v' definition: %w", msg, err)
}
decoder := common.NewHyperPbDecoder(
d.Descriptor(),
common.ProfilingOptions{
Rate: 0.01,
RecompileInterval: 100_000,
})
decoder := common.NewDynamicPbDecoder(d.Descriptor())
opts.Resolver = multiModuleWatcher
return func(part *service.Message) error {
partBytes, err := part.AsBytes()
Expand Down
1 change: 0 additions & 1 deletion public/bundle/enterprise/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1 // indirect
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 // indirect
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1 // indirect
buf.build/go/hyperpb v0.1.3 // indirect
cel.dev/expr v0.24.0 // indirect
cloud.google.com/go/aiplatform v1.104.0 // indirect
cloud.google.com/go/bigquery v1.71.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion public/bundle/free/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1 // indirect
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 // indirect
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1 // indirect
buf.build/go/hyperpb v0.1.3 // indirect
cel.dev/expr v0.24.0 // indirect
cloud.google.com/go/aiplatform v1.104.0 // indirect
cloud.google.com/go/bigquery v1.71.0 // indirect
Expand Down