Skip to content

Fix gRPC broadcast latency and tune connection pool#3478

Open
kpango wants to merge 3 commits intomainfrom
fix-grpc-broadcast-latency-17105619334351837761
Open

Fix gRPC broadcast latency and tune connection pool#3478
kpango wants to merge 3 commits intomainfrom
fix-grpc-broadcast-latency-17105619334351837761

Conversation

@kpango
Copy link
Collaborator

@kpango kpango commented Feb 18, 2026

This PR addresses high latency and DeadlineExceeded errors in the LB Gateway broadcast.

Changes:

  1. Fix Broadcast Concurrency: pkg/gateway/lb/service/gateway.go was calling RangeConcurrent with -1, which fell back to sequential execution. Changed to use g.GetAgentCount(ctx) to enable true concurrency.
  2. Tune Connection Pool: Increased default connection pool size from 3 to 10 in internal/net/grpc/option.go to support higher concurrency and added keepalive parameters to maintain idle connections.
  3. Benchmarks: Added benchmarks proving the fix (Sequential ~60ms vs Concurrent ~2ms) and verifying pool contention.

Verification:

  • go test -bench . ./internal/net/grpc confirms massive latency reduction.
  • go test -bench . ./internal/net/grpc/pool confirms pool performance.
  • Existing tests pass (except unrelated cgroup stats failures in sandbox).

PR created automatically by Jules for task 17105619334351837761 started by @kpango

Summary by CodeRabbit

  • Tests

    • Added new benchmarks to measure broadcast behavior and connection-pool performance in both sequential and concurrent scenarios, including end-to-end server-client runs.
  • Chores

    • Increased the default connection pool size and enabled keepalive settings to improve connection stability.
  • Performance

    • Broadcasts now cap concurrent requests to the current agent count, reducing overload and improving resource efficiency.

- fix: pass correct concurrency count to `RangeConcurrent` in LB Gateway broadcast, fixing sequential execution bug.
- feat: increase default gRPC connection pool size from 3 to 10 to better handle high concurrency.
- feat: enable gRPC keepalive parameters (30s time, 10s timeout) to prevent connection drops.
- test: add `benchmark_test.go` to `internal/net/grpc` to measure client broadcast performance.
- test: add `BenchmarkPool_HighContention` to `internal/net/grpc/pool/pool_bench_test.go` to measure pool contention.

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
@google-labs-jules
Copy link

👋 Jules, reporting for duty! I'm here to lend a hand with this pull request.

When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down.

I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job!

For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with @jules. You can find this option in the Pull Request section of your global Jules UI settings. You can always switch back!

New to Jules? Learn more at jules.google/docs.


For security, I will only act on instructions from the user who triggered this task.

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Feb 18, 2026

Deploying vald with  Cloudflare Pages  Cloudflare Pages

Latest commit: e2c528d
Status: ✅  Deploy successful!
Preview URL: https://b313f5a5.vald.pages.dev
Branch Preview URL: https://fix-grpc-broadcast-latency-1.vald.pages.dev

View logs

@vdaas-ci
Copy link
Collaborator

[CHATOPS:HELP] ChatOps commands.

  • 🙆‍♀️ /approve - approve
  • 🍱 /format - format codes and add licenses
  • /gen-test - generate test codes
  • 🏷️ /label - add labels
  • 🔚 2️⃣ 🔚 /label actions/e2e-deploy - run E2E deploy & integration test

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 18, 2026

📝 Walkthrough

Walkthrough

Adds multiple gRPC benchmarks (unit and e2e), increases default pool size and enables keepalive, refactors pool slot iteration and connection selection, and limits Gateway.BroadCast concurrency to the current agent count.

Changes

Cohort / File(s) Summary
gRPC benchmarks (unit + e2e)
internal/net/grpc/benchmark_test.go, internal/net/grpc/e2e_bench_test.go
Added new benchmark files: unit BenchmarkBroadcast simulating sequential and fixed-concurrency RangeConcurrent calls; e2e benchmark spins up in-process Discoverer servers and runs sequential vs concurrent broadcast measurements.
Pool benchmark
internal/net/grpc/pool/pool_bench_test.go
Augmented existing benchmark to connect the pool, reset timer, and run a parallel loop invoking Do to simulate high-concurrency usage.
gRPC default options
internal/net/grpc/option.go
Increased default connection pool size from 3 to 10 and added default keepalive parameters via WithKeepaliveParams("30s","10s", true).
Pool internals
internal/net/grpc/pool/pool.go
Refactored connection-slot handling to use a slots slice and its length for sizing; adjusted iteration, getHealthyConn logic, and added guards for empty-slot cases and improved slot-based loads.
Gateway broadcast concurrency
pkg/gateway/lb/service/gateway.go
Changed Gateway.BroadCast to pass g.GetAgentCount(ctx) as the concurrency limit to RangeConcurrent instead of -1 (unlimited/sequential).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

type/bug, type/feature, type/refactoring

Suggested reviewers

  • kmrmt
  • Matts966
  • datelier

Poem

Benchmarks wake, ten pools arise,
Keepalives steady under skies,
Slots aligned, connections found,
Broadcasts bound to agent count—sound.
Metrics hum, performance-wise.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the two main changes: fixing gRPC broadcast concurrency and tuning the connection pool size with keepalive parameters.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix-grpc-broadcast-latency-17105619334351837761

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Feb 18, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 17.91%. Comparing base (eb66fdc) to head (e2c528d).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3478      +/-   ##
==========================================
- Coverage   18.03%   17.91%   -0.12%     
==========================================
  Files         127      127              
  Lines       12107    12107              
==========================================
- Hits         2183     2169      -14     
- Misses       9648     9664      +16     
+ Partials      276      274       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/net/grpc/benchmark_test.go`:
- Around line 49-86: Move the setup of the gRPCClient, population of
client.conns (mockPool entries) and atomic.StoreUint64(&client.clientCount, ...)
so it runs before starting the benchmark timers; then call b.ReportAllocs() and
b.ResetTimer() once before the sub-benchmarks in BenchmarkBroadcast. Inside each
sub-benchmark replace the old for i := 0; i < b.N; i++ loops with the Go 1.24
style for b.Loop() loop, and keep the call to client.RangeConcurrent(...) and
its simulated latency inside that loop. Ensure you reference the
BenchmarkBroadcast function, the gRPCClient instance (client),
client.conns/mockPool population, and the RangeConcurrent call when making these
changes.
- Around line 36-47: The mockPool currently hands out zero-value grpc.ClientConn
structs (see mockPool.Do, mockPool.Get, mockPool.Reconnect) which violates gRPC
construction rules; replace those returns with real, properly-initialized
*grpc.ClientConn instances created by dialing a test server (use net.Listen +
grpc.NewServer or an in-process bufconn as done in pool_bench_test.go), update
the benchmark setup to start and tear down the test gRPC server and dial a
client connection once, and have mockPool methods return that *grpc.ClientConn
instead of &grpc.ClientConn{} so any later method calls on the connection are
safe.

In `@internal/net/grpc/option.go`:
- Line 43: The change increasing the connection pool to
WithConnectionPoolSize(10) can multiply persistent gRPC connections (e.g., 100
agents → 1,000 connections) and risk hitting OS file descriptor and memory
limits; update the configuration and docs to make the pool size safe and
configurable, validate/adjust WithIdleTimeout to reclaim idle connections, and
add a checklist/test to verify target nodes' ulimit -n and pod memory limits for
the expected scale. Specifically, ensure WithConnectionPoolSize is
documented/configurable (or reduced), tune WithIdleTimeout to a shorter reclaim
window, add runtime validation or warnings if computed total FDs may exceed
ulimit, and include guidance in deployment manifests about required pod memory
and ulimit values.
- Around line 43-44: The hardcoded client keepalive in WithKeepaliveParams
currently sets PermitWithoutStream=true which will cause servers (with default
permit_without_stream=false and min_time=10m) to GOAWAY; update the
WithKeepaliveParams usage in internal/net/grpc/option.go to use
PermitWithoutStream=false (or remove/disable the aggressive 30s keepalive) so
client pings won’t be sent without active streams, or alternatively coordinate
server-side config changes before landing; change the WithKeepaliveParams call
accordingly to prevent immediate GOAWAYs from Vald servers.

In `@internal/net/grpc/pool/pool_bench_test.go`:
- Around line 237-246: The benchmark currently runs measurement without
bracketing timers or reporting allocations; update the benchmark surrounding the
b.RunParallel(...) loop to match the other benchmarks by calling b.StopTimer(),
b.ReportAllocs(), b.ResetTimer(), and b.StartTimer() before entering the
b.RunParallel block (so pool setup and connection time are excluded) and call
b.StopTimer() immediately after the parallel loop; modify the same benchmark
that invokes p.Do(...) on *ClientConn to include these calls.
- Around line 212-216: Remove the stale development-commentary block that spans
the scratch notes referencing "pool_bench_test.go" and the
ListenAndServe/pool.New ideas (the comment lines between the existing test setup
and helper usage); simply delete those lines so the test file contains only
meaningful comments and code around the test setup (where ListenAndServe and
pool.New are used) to keep the test tidy.

In `@pkg/gateway/lb/service/gateway.go`:
- Line 91: The tracing span was started on fctx but GetAgentCount was called
with the stale ctx; change the call to use the traced context (call
g.GetAgentCount(fctx)) so the address lookup is attributed to the BroadCast
trace span, and ensure any other calls inside the same return expression use
fctx consistently (e.g., client.RangeConcurrent(fctx, g.GetAgentCount(fctx),
...)).

Comment on lines +36 to +47
func (m *mockPool) Do(ctx context.Context, f func(*pool.ClientConn) error) error {
// Pass a dummy non-nil connection to satisfy the check in executeRPC
return f(&grpc.ClientConn{})
}
func (m *mockPool) Get(ctx context.Context) (*pool.ClientConn, bool) { return &grpc.ClientConn{}, true }
func (m *mockPool) IsHealthy(ctx context.Context) bool { return true }
func (m *mockPool) IsIPConn() bool { return true }
func (m *mockPool) Len() uint64 { return 1 }
func (m *mockPool) Size() uint64 { return 1 }
func (m *mockPool) Reconnect(ctx context.Context, force bool) (pool.Conn, error) { return m, nil }
func (m *mockPool) String() string { return m.addr }
func (m *mockPool) Close(ctx context.Context, delay time.Duration) error { return nil }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find RangeConcurrent implementation
rg -n "func.*RangeConcurrent" internal/net/grpc/ -A 15

Repository: vdaas/vald

Length of output: 4426


🏁 Script executed:

#!/bin/bash
# Find executeRPC and related functions
rg -n "executeRPC" internal/net/grpc/ -B 2 -A 10

Repository: vdaas/vald

Length of output: 6995


🏁 Script executed:

#!/bin/bash
# Check for any method calls on ClientConn within pool usage
rg -n "\.Invoke|\.NewStream|\.GetState|\.Close" internal/net/grpc/client.go -B 3 -A 3

Repository: vdaas/vald

Length of output: 242


🏁 Script executed:

#!/bin/bash
# Locate the pool.Conn interface definition
rg -n "type Conn interface" internal/net/grpc/ -A 20

Repository: vdaas/vald

Length of output: 1707


Replace zero-value grpc.ClientConn{} with a properly initialized connection

grpc.ClientConn is an opaque struct designed to be created only via grpc.NewClient or grpc.Dial. The zero-value literal violates this design contract. While the benchmark is currently safe (the user function only calls time.Sleep), this pattern is fragile—if code within the pool infrastructure ever invokes any method on the ClientConn before dispatching to the user function, it will panic.

Adopt the pattern used in pool_bench_test.go: establish a real gRPC server with net.Listen + grpc.NewServer, or use an in-process listener. This ensures the connection is properly initialized and matches the actual runtime behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/benchmark_test.go` around lines 36 - 47, The mockPool
currently hands out zero-value grpc.ClientConn structs (see mockPool.Do,
mockPool.Get, mockPool.Reconnect) which violates gRPC construction rules;
replace those returns with real, properly-initialized *grpc.ClientConn instances
created by dialing a test server (use net.Listen + grpc.NewServer or an
in-process bufconn as done in pool_bench_test.go), update the benchmark setup to
start and tear down the test gRPC server and dial a client connection once, and
have mockPool methods return that *grpc.ClientConn instead of &grpc.ClientConn{}
so any later method calls on the connection are safe.

Comment on lines +49 to +86
func BenchmarkBroadcast(b *testing.B) {
ctx := context.Background()
client := &gRPCClient{
name: "benchmark",
}

// Setup mock connections
numAgents := 50
for i := 0; i < numAgents; i++ {
addr := "agent-" + strconv.Itoa(i)
client.conns.Store(addr, &mockPool{addr: addr})
}
atomic.StoreUint64(&client.clientCount, uint64(numAgents))

b.Run("Sequential_Legacy", func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := client.RangeConcurrent(ctx, -1, func(ctx context.Context, addr string, conn *ClientConn, copts ...CallOption) error {
time.Sleep(1 * time.Millisecond) // Simulate RPC latency
return nil
})
if err != nil {
b.Fatal(err)
}
}
})

b.Run("Concurrent_Fixed", func(b *testing.B) {
for i := 0; i < b.N; i++ {
// Simulate the fixed logic where concurrency > 1
err := client.RangeConcurrent(ctx, numAgents, func(ctx context.Context, addr string, conn *ClientConn, copts ...CallOption) error {
time.Sleep(1 * time.Millisecond) // Simulate RPC latency
return nil
})
if err != nil {
b.Fatal(err)
}
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing benchmark hygiene (b.ResetTimer, b.ReportAllocs) and old-style loop

Three issues:

  1. Setup pollutes measurements: the 50-connection setup + atomic.StoreUint64 (lines 57–61) runs inside the benchmark timer because b.ResetTimer() is never called before the sub-benchmarks. Both sub-benchmarks inherit this contaminated baseline.

  2. No allocation tracking: b.ReportAllocs() is absent, making it impossible to distinguish latency wins from allocation regressions.

  3. Stale loop idiom: for i := 0; i < b.N; i++ should be for b.Loop() (Go 1.24+) to match the style already used in pool_bench_test.go (lines 119, 143).

🔧 Proposed fix
 func BenchmarkBroadcast(b *testing.B) {
 	ctx := context.Background()
 	client := &gRPCClient{
 		name: "benchmark",
 	}

 	// Setup mock connections
 	numAgents := 50
 	for i := 0; i < numAgents; i++ {
 		addr := "agent-" + strconv.Itoa(i)
 		client.conns.Store(addr, &mockPool{addr: addr})
 	}
 	atomic.StoreUint64(&client.clientCount, uint64(numAgents))

 	b.Run("Sequential_Legacy", func(b *testing.B) {
+		b.ReportAllocs()
+		b.ResetTimer()
-		for i := 0; i < b.N; i++ {
+		for b.Loop() {
 			err := client.RangeConcurrent(ctx, -1, func(ctx context.Context, addr string, conn *ClientConn, copts ...CallOption) error {
 				time.Sleep(1 * time.Millisecond) // Simulate RPC latency
 				return nil
 			})
 			if err != nil {
 				b.Fatal(err)
 			}
 		}
 	})

 	b.Run("Concurrent_Fixed", func(b *testing.B) {
+		b.ReportAllocs()
+		b.ResetTimer()
-		for i := 0; i < b.N; i++ {
+		for b.Loop() {
 			// Simulate the fixed logic where concurrency > 1
 			err := client.RangeConcurrent(ctx, numAgents, func(ctx context.Context, addr string, conn *ClientConn, copts ...CallOption) error {
 				time.Sleep(1 * time.Millisecond) // Simulate RPC latency
 				return nil
 			})
 			if err != nil {
 				b.Fatal(err)
 			}
 		}
 	})
 }
🧰 Tools
🪛 OpenGrep (1.16.0)

[INFO] 50-50: Avoid using context.TODO()/Background() except in top-level init/main/test setup; prefer propagated contexts with cancellation/timeouts in request-handling code

(go.context.todo-used)

🪛 Semgrep (1.151.0)

[INFO] 50-50: Avoid using context.TODO()/Background() except in top-level init/main/test setup; prefer propagated contexts with cancellation/timeouts in request-handling code

(go.context.todo-used)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/benchmark_test.go` around lines 49 - 86, Move the setup of
the gRPCClient, population of client.conns (mockPool entries) and
atomic.StoreUint64(&client.clientCount, ...) so it runs before starting the
benchmark timers; then call b.ReportAllocs() and b.ResetTimer() once before the
sub-benchmarks in BenchmarkBroadcast. Inside each sub-benchmark replace the old
for i := 0; i < b.N; i++ loops with the Go 1.24 style for b.Loop() loop, and
keep the call to client.RangeConcurrent(...) and its simulated latency inside
that loop. Ensure you reference the BenchmarkBroadcast function, the gRPCClient
instance (client), client.conns/mockPool population, and the RangeConcurrent
call when making these changes.


var defaultOptions = []Option{
WithConnectionPoolSize(3),
WithConnectionPoolSize(10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Pool size 3 → 10: verify FD and memory budget at scale

At 100 agents, the default pool size of 10 yields 1,000 persistent gRPC connections (up from 300). Each connection holds OS-level file descriptors and HTTP/2 flow-control buffers. Confirm the target node's ulimit -n and pod memory limits accommodate this and that the WithIdleTimeout is tuned to reclaim idle slots.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/option.go` at line 43, The change increasing the connection
pool to WithConnectionPoolSize(10) can multiply persistent gRPC connections
(e.g., 100 agents → 1,000 connections) and risk hitting OS file descriptor and
memory limits; update the configuration and docs to make the pool size safe and
configurable, validate/adjust WithIdleTimeout to reclaim idle connections, and
add a checklist/test to verify target nodes' ulimit -n and pod memory limits for
the expected scale. Specifically, ensure WithConnectionPoolSize is
documented/configurable (or reduced), tune WithIdleTimeout to a shorter reclaim
window, add runtime validation or warnings if computed total FDs may exceed
ulimit, and include guidance in deployment manifests about required pod memory
and ulimit values.

Comment on lines +43 to +44
WithConnectionPoolSize(10),
WithKeepaliveParams("30s", "10s", true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find and examine option.go to see the actual implementation
find . -name "option.go" -path "*/grpc/*" -type f

Repository: vdaas/vald

Length of output: 569


🏁 Script executed:

#!/bin/bash
# Search for KeepaliveEnforcementPolicy and PermitWithoutStream in the codebase
rg -n 'KeepaliveEnforcementPolicy|PermitWithoutStream|EnforcementPolicy' --type go -C 3

Repository: vdaas/vald

Length of output: 15906


🏁 Script executed:

#!/bin/bash
# Check for server-side keepalive configurations
rg -n 'keepalive\.(EnforcementPolicy|ServerParameters)' --type go -C 3

Repository: vdaas/vald

Length of output: 5538


🏁 Script executed:

#!/bin/bash
# Look at the actual implementation of WithKeepaliveParams in option.go
fd 'option.go' -path '*/net/grpc/*' -exec cat -n {} \;

Repository: vdaas/vald

Length of output: 222


🏁 Script executed:

#!/bin/bash
# Get the actual implementation of WithKeepaliveParams in option.go
cat -n internal/net/grpc/option.go | head -100

Repository: vdaas/vald

Length of output: 3645


🏁 Script executed:

#!/bin/bash
# Look at the broader context around lines 43-44 to see what these options are applied to
sed -n '1,100p' internal/net/grpc/option.go

Repository: vdaas/vald

Length of output: 2945


🏁 Script executed:

#!/bin/bash
# Check for default server keepalive configuration
rg -n 'WithGRPCKeepalivePermitWithoutStream|WithGRPCKeepaliveMinTime' --type go | head -20

Repository: vdaas/vald

Length of output: 1095


🏁 Script executed:

#!/bin/bash
# Check what the actual default values are in config and how servers are initialized
rg -n 'GRPCKeepalive|Keepalive.*struct' internal/config/server.go -A 5 | head -40

Repository: vdaas/vald

Length of output: 1752


🏁 Script executed:

#!/bin/bash
# Find where server keepalive defaults are set
rg -n 'GRPCKeepalive.*{' internal/config/server.go -A 10 | head -50

Repository: vdaas/vald

Length of output: 149


🏁 Script executed:

#!/bin/bash
# Check what the default/zero values are for PermitWithoutStream on server side
rg -n 'permitWithoutStream|PermitWithoutStream' internal/servers/server/ -B 2 -A 2 | head -60

Repository: vdaas/vald

Length of output: 2511


🏁 Script executed:

#!/bin/bash
# Check the values.yaml in charts to see what the server-side keepalive defaults are
find charts -name "values.yaml" -o -name "values.go" | xargs grep -l -i keepalive 2>/dev/null

Repository: vdaas/vald

Length of output: 212


🏁 Script executed:

#!/bin/bash
# Look for the actual initialization of server keepalive in grpc server setup
sed -n '210,240p' internal/servers/server/server.go

Repository: vdaas/vald

Length of output: 1047


🏁 Script executed:

#!/bin/bash
# Check the main vald values.yaml for server keepalive defaults
grep -A 20 "keepalive:" charts/vald/values.yaml | head -40

Repository: vdaas/vald

Length of output: 3221


🏁 Script executed:

#!/bin/bash
# Check if there's a default server keepalive configuration anywhere
rg -n 'minTime.*=|permitWithoutStream.*=' internal/servers/server/ -B 2 -A 1 | head -40

Repository: vdaas/vald

Length of output: 829


🏁 Script executed:

#!/bin/bash
# Check what happens when PermitWithoutStream is not set (zero value)
rg -n 'new\(grpcKeepalive\)' internal/servers/server/ -A 5

Repository: vdaas/vald

Length of output: 2348


🏁 Script executed:

#!/bin/bash
# Search for how the grpcKeepalive struct is initialized
ast-grep --pattern 'type grpcKeepalive struct { $$$ }'

Repository: vdaas/vald

Length of output: 661


🏁 Script executed:

#!/bin/bash
# Check the full keepalive section in values.yaml to see if permit_without_stream has a default
grep -A 1 "permit_without_stream" charts/vald/values.yaml

Repository: vdaas/vald

Length of output: 846


🏁 Script executed:

#!/bin/bash
# Verify the WithKeepaliveParams implementation to confirm all 3 parameters
sed -n '465,490p' internal/net/grpc/option.go

Repository: vdaas/vald

Length of output: 683


🏁 Script executed:

#!/bin/bash
# Check if there are any recent changes related to keepalive configuration
git log --oneline -20 -- internal/net/grpc/option.go 2>/dev/null || echo "Git not available"

Repository: vdaas/vald

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Check if this PR modifies the server keepalive defaults in values.yaml
git diff HEAD~1 charts/vald/values.yaml 2>/dev/null | grep -A 5 -B 5 "permit_without_stream\|min_time" || echo "Unable to get diff"

Repository: vdaas/vald

Length of output: 72


🏁 Script executed:

#!/bin/bash
# Look for any commits that update server keepalive settings
git log --oneline --all -- charts/vald/values.yaml | head -10

Repository: vdaas/vald

Length of output: 715


🏁 Script executed:

#!/bin/bash
# Verify the current state: are servers actually configured with PermitWithoutStream=true?
# Check if any component configs explicitly set it to true
rg -n 'permit_without_stream.*true|PermitWithoutStream.*true' charts/ --type yaml --type go | head -20

Repository: vdaas/vald

Length of output: 517


🏁 Script executed:

#!/bin/bash
# Get more detail on the recent commit that mentions "tune connection pool"
git show 2ba82dec --stat 2>/dev/null | head -50

Repository: vdaas/vald

Length of output: 3158


🏁 Script executed:

#!/bin/bash
# Check if the commit modifies values.yaml
git show 2ba82dec -- charts/vald/values.yaml 2>/dev/null | head -100

Repository: vdaas/vald

Length of output: 5808


🏁 Script executed:

#!/bin/bash
# Check what the commit actually changed
git show 2ba82dec 2>/dev/null | grep -A 5 -B 5 "permit_without_stream\|WithKeepaliveParams\|WithConnectionPoolSize" | head -100

Repository: vdaas/vald

Length of output: 15959


🏁 Script executed:

#!/bin/bash
# Check the current state of values.yaml for server keepalive defaults
grep -A 1 "permit_without_stream\|min_time" charts/vald/values.yaml | grep -A 1 "server.grpc.keepalive" | head -20

Repository: vdaas/vald

Length of output: 644


🏁 Script executed:

#!/bin/bash
# Extract the exact server keepalive values from the new values.yaml in this commit
git show 2ba82dec:charts/vald/values.yaml 2>/dev/null | grep -A 10 "gRPC server keep alive" | head -20

Repository: vdaas/vald

Length of output: 1927


🏁 Script executed:

#!/bin/bash
# Confirm the pool size was changed from 3 to 10
git show 2ba82dec -- internal/net/grpc/option.go 2>/dev/null | grep -B 5 -A 5 "ConnectionPoolSize"

Repository: vdaas/vald

Length of output: 561


🏁 Script executed:

#!/bin/bash
# Check if the commit updates server defaults to permit_without_stream=true
git show 2ba82dec:charts/vald/values.yaml 2>/dev/null | grep -B 2 -A 2 'permit_without_stream.*true' | head -20

Repository: vdaas/vald

Length of output: 36


🏁 Script executed:

#!/bin/bash
# Double-check: look for ANY permit_without_stream true in the full server config section
git show 2ba82dec:charts/vald/values.yaml 2>/dev/null | sed -n '/server_config:/,/^[a-z]/p' | grep -i permit_without_stream

Repository: vdaas/vald

Length of output: 655


🏁 Script executed:

#!/bin/bash
# Verify the gRPC MinTime on server matches or supports the 30s client ping interval
git show 2ba82dec:charts/vald/values.yaml 2>/dev/null | grep -A 2 "min_time"

Repository: vdaas/vald

Length of output: 601


PermitWithoutStream: true in hardcoded client defaults will trigger GOAWAY from all unmodified Vald servers

The gRPC client defaults at line 44 hardcode WithKeepaliveParams("30s", "10s", true), setting PermitWithoutStream: true. This immediately creates two violations against the server defaults:

  1. PermitWithoutStream mismatch: Servers default to permit_without_stream: false, which rejects client pings sent without active streams. The server will respond with GOAWAY and close the connection.

  2. MinTime enforcement violation: Client pings every 30 seconds, but servers enforce a minimum ping interval of 10 minutes (min_time: "10m"). Even if PermitWithoutStream were aligned, the ping frequency still violates the server's enforcement policy.

With a pool size of 10, all 10 connections per agent will simultaneously trigger these violations, amplifying connection churn by 10×.

Required fix: Either set PermitWithoutStream: false in line 44 (making idle pings inapplicable), or ensure every Vald server component explicitly enables permit_without_stream: true and sets min_time: 30s or lower in its configuration before this change lands.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/option.go` around lines 43 - 44, The hardcoded client
keepalive in WithKeepaliveParams currently sets PermitWithoutStream=true which
will cause servers (with default permit_without_stream=false and min_time=10m)
to GOAWAY; update the WithKeepaliveParams usage in internal/net/grpc/option.go
to use PermitWithoutStream=false (or remove/disable the aggressive 30s
keepalive) so client pings won’t be sent without active streams, or
alternatively coordinate server-side config changes before landing; change the
WithKeepaliveParams call accordingly to prevent immediate GOAWAYs from Vald
servers.

Comment on lines +237 to +246
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Simulate high frequency requests
_ = p.Do(ctx, func(c *ClientConn) error {
// Simulate short work
return nil
})
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing b.ReportAllocs(), setup timer bracketing, and b.StopTimer() — inconsistent with all other benchmarks in this file

Every other benchmark (Benchmark_ConnPool, BenchmarkParallel_ConnPool, etc.) calls b.StopTimer() + b.ResetTimer() + b.ReportAllocs() + b.StartTimer() before the measurement loop, and b.StopTimer() afterward. Omitting b.ReportAllocs() hides allocation regressions; omitting the timer bracketing folds pool-connect setup time into measurements.

🔧 Proposed fix
+	b.StopTimer()
 	b.ResetTimer()
+	b.ReportAllocs()
+	b.StartTimer()
 	b.RunParallel(func(pb *testing.PB) {
 		for pb.Next() {
 			// Simulate high frequency requests
 			_ = p.Do(ctx, func(c *ClientConn) error {
 				// Simulate short work
 				return nil
 			})
 		}
 	})
+	b.StopTimer()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Simulate high frequency requests
_ = p.Do(ctx, func(c *ClientConn) error {
// Simulate short work
return nil
})
}
})
b.StopTimer()
b.ResetTimer()
b.ReportAllocs()
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Simulate high frequency requests
_ = p.Do(ctx, func(c *ClientConn) error {
// Simulate short work
return nil
})
}
})
b.StopTimer()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/pool/pool_bench_test.go` around lines 237 - 246, The
benchmark currently runs measurement without bracketing timers or reporting
allocations; update the benchmark surrounding the b.RunParallel(...) loop to
match the other benchmarks by calling b.StopTimer(), b.ReportAllocs(),
b.ResetTimer(), and b.StartTimer() before entering the b.RunParallel block (so
pool setup and connection time are excluded) and call b.StopTimer() immediately
after the parallel loop; modify the same benchmark that invokes p.Do(...) on
*ClientConn to include these calls.

}

return client.RangeConcurrent(fctx, -1, func(ictx context.Context,
return client.RangeConcurrent(fctx, g.GetAgentCount(ctx), func(ictx context.Context,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

GetAgentCount uses stale ctx instead of the traced fctx

The span is started on fctx, but GetAgentCount(ctx) bypasses it. Use fctx consistently so the address lookup is correctly attributed to the BroadCast trace span.

🔧 Proposed fix
-	return client.RangeConcurrent(fctx, g.GetAgentCount(ctx), func(ictx context.Context,
+	return client.RangeConcurrent(fctx, g.GetAgentCount(fctx), func(ictx context.Context,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return client.RangeConcurrent(fctx, g.GetAgentCount(ctx), func(ictx context.Context,
return client.RangeConcurrent(fctx, g.GetAgentCount(fctx), func(ictx context.Context,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/gateway/lb/service/gateway.go` at line 91, The tracing span was started
on fctx but GetAgentCount was called with the stale ctx; change the call to use
the traced context (call g.GetAgentCount(fctx)) so the address lookup is
attributed to the BroadCast trace span, and ensure any other calls inside the
same return expression use fctx consistently (e.g., client.RangeConcurrent(fctx,
g.GetAgentCount(fctx), ...)).

…l.go

- fix: pass correct concurrency count to `RangeConcurrent` in LB Gateway broadcast, fixing sequential execution bug.
- feat: increase default gRPC connection pool size from 3 to 10 to better handle high concurrency.
- feat: enable gRPC keepalive parameters (30s time, 10s timeout) to prevent connection drops.
- perf: optimize `pool.getHealthyConn` and `pool.loop` to cache atomic loads of connection slots, reducing overhead in high-throughput paths.
- test: add `benchmark_test.go` to `internal/net/grpc` to measure client broadcast performance.

Benchmark results:
- `BenchmarkBroadcast/Concurrent` latency reduced from ~60ms (Sequential) to ~2ms.
- `BenchmarkPool_HighContention` throughput improved from ~131ns/op to ~92ns/op with optimized pool access.

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/net/grpc/pool/pool.go (2)

348-350: 🧹 Nitpick | 🔵 Trivial

Dead code: count == 0 is unreachable after the sz == 0 guard

The early-return at lines 328–330 guarantees sz > 0 before the loop. Since every iteration either increments count (in the default branch) or returns early via ctx.Done(), count is always ≥ 1 when execution reaches line 348. The block can never trigger.

🧹 Proposed cleanup
  }
- if count == 0 {
- 	return errors.ErrGRPCPoolConnectionNotFound
- }
  return nil
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/pool/pool.go` around lines 348 - 350, Remove the
unreachable dead branch that checks `if count == 0 { return
errors.ErrGRPCPoolConnectionNotFound }` in the function that iterates over the
pool (the loop guarded by `sz == 0` and using `count` and `ctx.Done()`). Because
`sz > 0` is ensured before the loop and every loop path either increments
`count` or returns on `ctx.Done()`, delete this `count == 0` check to clean up
dead code and leave the remaining return paths intact.

287-300: ⚠️ Potential issue | 🟡 Minor

Remove load() method — confirmed unused and contains bounds-checking bug

The method is dead code: no active calls exist in the codebase, and its test is commented out. More critically, the bounds check at line 294 is inverted: if sz < idx should be if idx >= sz. If that condition evaluates true, line 295 executes slots[sz].Load() where sz == len(slots), causing an out-of-bounds panic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/pool/pool.go` around lines 287 - 300, Remove the dead and
unsafe method pool.load: it is unused and contains an inverted bounds check that
can index slots[len(slots)] causing an OOB panic; delete the load(idx uint64)
(ridx uint64, pc *poolConn) implementation from the pool type and any commented
tests referencing it, and run tests to confirm nothing else depends on the
function name load in type pool (search for method name "load" and type "pool"
to ensure no remaining callers).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/net/grpc/pool/pool_bench_test.go`:
- Around line 211-213: Replace the b.Fatal(err) call after p.Connect(ctx) with
b.Error(err) so the benchmark follows the file-wide convention used in
Benchmark_ConnPool, BenchmarkParallel_ConnPool and ListenAndServe; keep the
p.Connect(ctx) error path but use b.Error(err) (and ensure any subsequent code
remains safe if p is nil).

---

Outside diff comments:
In `@internal/net/grpc/pool/pool.go`:
- Around line 348-350: Remove the unreachable dead branch that checks `if count
== 0 { return errors.ErrGRPCPoolConnectionNotFound }` in the function that
iterates over the pool (the loop guarded by `sz == 0` and using `count` and
`ctx.Done()`). Because `sz > 0` is ensured before the loop and every loop path
either increments `count` or returns on `ctx.Done()`, delete this `count == 0`
check to clean up dead code and leave the remaining return paths intact.
- Around line 287-300: Remove the dead and unsafe method pool.load: it is unused
and contains an inverted bounds check that can index slots[len(slots)] causing
an OOB panic; delete the load(idx uint64) (ridx uint64, pc *poolConn)
implementation from the pool type and any commented tests referencing it, and
run tests to confirm nothing else depends on the function name load in type pool
(search for method name "load" and type "pool" to ensure no remaining callers).

---

Duplicate comments:
In `@internal/net/grpc/pool/pool_bench_test.go`:
- Around line 216-226: The benchmark BenchmarkPool_HighContention is missing the
standard measurement bracketing and is including setup latency and allocations;
before entering the parallel loop around p.Do(...) call stop the timer and call
b.ReportAllocs() then call b.StartTimer(), and after the parallel RunParallel
returns call b.StopTimer(); ensure you place these calls in the same pattern
used by the other benchmarks in this file so the allocation and setup overhead
are excluded from the measured loop.

Comment on lines +211 to +213
p, err = p.Connect(ctx)
if err != nil {
b.Fatal(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

b.Fatal vs b.Error — inconsistent with the rest of the file

Every other setup-failure path in this file (e.g., Benchmark_ConnPool, BenchmarkParallel_ConnPool, ListenAndServe) uses b.Error. Using b.Fatal here is defensible (stops execution immediately on a bad pool state rather than running with a nil connection), but pick one convention and apply it consistently across all benchmarks.

♻️ Proposed change (align with file convention)
 	if err != nil {
-		b.Fatal(err)
+		b.Error(err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
p, err = p.Connect(ctx)
if err != nil {
b.Fatal(err)
p, err = p.Connect(ctx)
if err != nil {
b.Error(err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/pool/pool_bench_test.go` around lines 211 - 213, Replace
the b.Fatal(err) call after p.Connect(ctx) with b.Error(err) so the benchmark
follows the file-wide convention used in Benchmark_ConnPool,
BenchmarkParallel_ConnPool and ListenAndServe; keep the p.Connect(ctx) error
path but use b.Error(err) (and ensure any subsequent code remains safe if p is
nil).

…l.go

- fix: pass correct concurrency count to `RangeConcurrent` in LB Gateway broadcast, fixing sequential execution bug.
- feat: increase default gRPC connection pool size from 3 to 10 to better handle high concurrency.
- feat: enable gRPC keepalive parameters (30s time, 10s timeout) to prevent connection drops.
- perf: optimize `pool.getHealthyConn` and `pool.loop` to cache atomic loads of connection slots, reducing overhead in high-throughput paths.
- test: add `benchmark_test.go` to `internal/net/grpc` to measure client broadcast performance.
- test: add `e2e_bench_test.go` to `internal/net/grpc` to measure real E2E broadcast performance with real server.

Benchmark results:
- `BenchmarkBroadcast/Concurrent` latency reduced from ~60ms (Sequential) to ~2ms (Simulated).
- `Benchmark_E2E_Real_Broadcast`: 4x speedup (2.17ms -> 0.5ms per op).

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/net/grpc/e2e_bench_test.go`:
- Around line 144-146: The code currently logs a non-fatal warning when
connectivity is partial (the time.Since(start) > 5*time.Second branch) which
allows the benchmark to continue with fewer than numAgents; change this to fail
the benchmark immediately by replacing the warning log with a fatal failure (use
b.Fatalf or equivalent) so the benchmark halts when connected < numAgents after
the timeout; update the branch that references start, connected, and numAgents
in e2e_bench_test.go to call b.Fatalf with a clear message including connected
and numAgents.
- Around line 88-101: startBenchServer currently calls b.Fatalf on error which
panics when invoked from goroutines; change startBenchServer to return (addr
string, td func(), err error) instead of calling b.Fatalf, update the goroutine
to capture the returned err into a shared errors slice (protected by setupMu)
along with addrs and teardowns, and after setupWg.Wait() check the collected
errors and call b.Fatalf (or b.Fatalf with combined error info) from the
benchmark goroutine if any error occurred; update any callers of
startBenchServer to handle the new error return.
- Line 178: The test currently calls client.Close(ctx) at the end (client.Close)
which leaks connections if a sub-benchmark panics; after creating the client
(the variable named client in e2e_bench_test.go) immediately call defer
client.Close(ctx) so the connection is always cleaned up on panic or early
return, and remove the standalone client.Close(ctx) call at line 178.
- Around line 59-72: The error channel ech created for srv.ListenAndServe is
never read so startup failures are ignored; change the logic to drain/check ech
to detect bind/startup errors by launching a background goroutine that reads
from ech and records any error (or use a non-blocking select/read after the
sleep) and fail the test/benchmark if an error is received; also replace the
fragile time.Sleep with a readiness check or by waiting for either a successful
"started" signal or an error from ech (use the existing ctx/cancel and wg to
coordinate shutdown).
- Around line 83-84: The benchmark silently swallows port conflicts because it
uses hardcoded ports and discards srv.ListenAndServe() errors; fix by adding a
WithListener(server.New option) to accept a pre-bound net.Listener and a method
like BoundAddr()/Addr() on the server to retrieve the actual bound address, then
in the test do l, _ := net.Listen("tcp", ":0"), pass l via WithListener to
server.New, obtain the bound address via BoundAddr()/Addr() and use that for
agents instead of fixed ports; additionally capture srv.ListenAndServe() errors
by sending them on a startup error channel and fail fast (or log and abort) if
no successful start within the poll timeout instead of ignoring the error, or
alternatively use IsConnected() polling against the server address to ensure all
agents are reachable before proceeding.

Comment on lines +59 to +72
ctx, cancel := context.WithCancel(context.Background())
ech := make(chan error, 1)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if err := srv.ListenAndServe(ctx, ech); err != nil {
// Ignore server stopped errors for benchmarking cleanup
}
}()

// Give it a moment to start (or we could poll connection)
time.Sleep(10 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Error channel ech is created but never drained or checked — startup failures go undetected.

ListenAndServe sends errors to ech, but nothing reads from it. If the server fails to bind (e.g., port already in use), the benchmark proceeds with a dead server and produces misleading results. The 10 ms sleep on line 72 is also fragile on slow CI hosts.

Consider draining ech in a background goroutine (or at least doing a non-blocking read after the sleep to detect early failures):

Proposed improvement
 	ctx, cancel := context.WithCancel(context.Background())
 	ech := make(chan error, 1)
 
 	wg := sync.WaitGroup{}
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
 		if err := srv.ListenAndServe(ctx, ech); err != nil {
-			// Ignore server stopped errors for benchmarking cleanup
+			select {
+			case ech <- err:
+			default:
+			}
 		}
 	}()
 
-	// Give it a moment to start (or we could poll connection)
-	time.Sleep(10 * time.Millisecond)
+	// Give it a moment to start, then check for early startup failure.
+	time.Sleep(50 * time.Millisecond)
+	select {
+	case err := <-ech:
+		b.Fatalf("server on port %d failed to start: %v", port, err)
+	default:
+	}
🧰 Tools
🪛 OpenGrep (1.16.0)

[INFO] 59-59: Avoid using context.TODO()/Background() except in top-level init/main/test setup; prefer propagated contexts with cancellation/timeouts in request-handling code

(go.context.todo-used)

🪛 Semgrep (1.151.0)

[INFO] 59-59: Avoid using context.TODO()/Background() except in top-level init/main/test setup; prefer propagated contexts with cancellation/timeouts in request-handling code

(go.context.todo-used)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/e2e_bench_test.go` around lines 59 - 72, The error channel
ech created for srv.ListenAndServe is never read so startup failures are
ignored; change the logic to drain/check ech to detect bind/startup errors by
launching a background goroutine that reads from ech and records any error (or
use a non-blocking select/read after the sleep) and fail the test/benchmark if
an error is received; also replace the fragile time.Sleep with a readiness check
or by waiting for either a successful "started" signal or an error from ech (use
the existing ctx/cancel and wg to coordinate shutdown).

Comment on lines +83 to +84
numAgents := 10
startPort := 50050
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n internal/net/grpc/e2e_bench_test.go | head -150

Repository: vdaas/vald

Length of output: 4949


🏁 Script executed:

# Search for net.Listen with :0 pattern in tests
rg -n --type=go 'net\.Listen.*:0' --glob '*_test.go' | head -20

# Search for WithPort usage patterns
rg -n --type=go 'WithPort' --glob '*_test.go' | head -20

# Search for WithListener pattern if it exists
rg -n --type=go 'WithListener' --glob '*_test.go' | head -10

Repository: vdaas/vald

Length of output: 1494


🏁 Script executed:

# Find the server package implementation
fd -t f 'server.go' --path 'internal/servers/server/*'

# Search for the server package and its options
rg -n --type=go 'func New\(' internal/servers/server/ -A 10 | head -40

# Check if there's a WithListener or similar option
rg -n --type=go 'func With' internal/servers/server/option.go | head -30

Repository: vdaas/vald

Length of output: 2567


🏁 Script executed:

# Check the full WithPort and WithListenConfig implementation
rg -n --type=go -A 8 'func WithPort' internal/servers/server/option.go

# Check the ListenAndServe implementation to see error handling
rg -n --type=go -A 20 'func.*ListenAndServe' internal/servers/server/server.go | head -60

# Check if there's a WithListener option
rg -n --type=go 'WithListener\|WithAddr\|listener' internal/servers/server/option.go | head -20

Repository: vdaas/vald

Length of output: 847


🏁 Script executed:

# Get more of ListenAndServe to see the full implementation
sed -n '284,350p' internal/servers/server/server.go

# Check how listen and serve is implemented for gRPC
rg -n --type=go -A 30 'func.*grpcServe\|func.*grpcListen' internal/servers/server/ | head -80

Repository: vdaas/vald

Length of output: 1822


🏁 Script executed:

# Check what happens with port 0 - is there a way to retrieve the actual bound port?
rg -n --type=go 'func.*Port\|GetPort\|Addr' internal/servers/server/server.go | head -30

# Check if there's a LocalAddr or GetAddr method
rg -n --type=go 'LocalAddr\|GetAddr\|Listener' internal/servers/server/server.go | head -20

Repository: vdaas/vald

Length of output: 36


🏁 Script executed:

# Check the Server interface definition
rg -n --type=go 'type Server interface' internal/servers/server/ -A 20

# Check how the benchmark gets the actual address/port for the client
sed -n '43,80p' internal/net/grpc/e2e_bench_test.go

Repository: vdaas/vald

Length of output: 2110


🏁 Script executed:

# Let's check if there are other tests that use dynamic ports
rg -n --type=go 'net\.JoinHostPort.*0|:0\|' internal/net/grpc/ --glob '*_test.go' | head -20

# Check if there are other benchmark tests that might show a pattern
fd -t f 'bench_test.go' internal/ | xargs cat | head -100

# Verify: does the error channel in startBenchServer get read anywhere?
sed -n '43,79p' internal/net/grpc/e2e_bench_test.go | grep -n 'ech'

Repository: vdaas/vald

Length of output: 2867


🏁 Script executed:

# Check the server struct to see if it stores or exposes the listener
sed -n '70,200p' internal/servers/server/server.go | head -60

# Search for any exported method that might return the listener or port
rg -n --type=go 'func.*\(s \*server\).*\(' internal/servers/server/server.go | grep -v 'func (s'

# Check what the actual error handling looks like - maybe ListenAndServe errors aren't actually ignored
sed -n '60,90p' internal/net/grpc/e2e_bench_test.go

Repository: vdaas/vald

Length of output: 2162


🏁 Script executed:

# Check if the server implementation exposes the listener or bound address anywhere
rg -n --type=go 'type server struct' internal/servers/server/server.go -A 50 | grep -i 'listener\|addr'

# Check if there's a way to retrieve listener after ListenAndServe is called
rg -n --type=go 'func.*\(s \*server\)' internal/servers/server/server.go | grep -i 'listener\|addr\|port'

# Let's look at the full grpc serve implementation
sed -n '400,500p' internal/servers/server/server.go

Repository: vdaas/vald

Length of output: 1716


🏁 Script executed:

# Verify: are errors from ListenAndServe actually being swallowed?
# Look at the exact error handling in startBenchServer
sed -n '59,72p' internal/net/grpc/e2e_bench_test.go

# Check if there's any logging of startup errors
rg -n --type=go 'failed to.*listen|listen.*error' internal/servers/server/server.go | head -10

# Verify that port 0 is accepted (we saw WithPort already)
# Check if the server has any way to get the actual bound port
rg -n --type=go 'LocalAddr\|listener\.Addr\|listener\.Port' internal/servers/server/

Repository: vdaas/vald

Length of output: 598


🏁 Script executed:

# Check if there's any way to pass a listener or get the actual bound address
rg -n --type=go 'WithListener|listener\.Addr\|net.Listener' internal/servers/server/

# Check how other gRPC tests in Vald handle port allocation
rg -n --type=go 'startPort\|port.*50\|:0\|ephemeral' internal/net/grpc/ --glob '*test.go'

# Look for any patterns that work around the server API limitation
rg -n --type=go 'time.Sleep.*[0-9]' internal/net/grpc/e2e_bench_test.go

Repository: vdaas/vald

Length of output: 131


🏁 Script executed:

# Verify: Check if port 0 actually works in the server implementation
rg -n --type=go -B5 -A10 's.port.*=' internal/servers/server/server.go | grep -A10 'addr :='

# Double check the actual ListenAndServe flow to see if port binding errors are properly surfaced
sed -n '308,325p' internal/servers/server/server.go

Repository: vdaas/vald

Length of output: 552


🏁 Script executed:

# Verify the actual error handling flow - check if errors from ListenAndServe
# due to port binding failures are truly silently ignored
sed -n '66,69p' internal/net/grpc/e2e_bench_test.go

# Check if there's any mechanism to poll for successful startup or check server state
rg -n --type=go 'IsRunning\|IsConnected' internal/net/grpc/e2e_bench_test.go

# Final check: verify that using port 0 would actually work (accepted by WithPort)
grep -n 'port != 0' internal/servers/server/option.go

Repository: vdaas/vald

Length of output: 198


Port binding errors are silently ignored; use dynamic port allocation or refactor server API to support it.

The hardcoded port range (50050–50059) combined with error swallowing creates race conditions in concurrent test execution. When srv.ListenAndServe() fails due to port conflicts, the error is returned from the goroutine (line 66) but immediately discarded, allowing the benchmark to proceed with a non-functional server.

The server.New() API currently does not support:

  • Passing a pre-bound listener
  • Retrieving the actual bound port/address after Listen succeeds

Viable options:

  1. Extend the server package with WithListener() option and add a method to retrieve the bound address, then use net.Listen("tcp", ":0")
  2. Refactor the benchmark to use port 0 and poll IsConnected() until all agents are reachable, or implement server address introspection
  3. Use a thread-safe port allocator (e.g., finding free ports upfront) to avoid collisions entirely

Also address the error swallowing at line 67: rather than ignoring startup errors, log them or fail fast if the server doesn't start within the poll window (lines 71–72).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/e2e_bench_test.go` around lines 83 - 84, The benchmark
silently swallows port conflicts because it uses hardcoded ports and discards
srv.ListenAndServe() errors; fix by adding a WithListener(server.New option) to
accept a pre-bound net.Listener and a method like BoundAddr()/Addr() on the
server to retrieve the actual bound address, then in the test do l, _ :=
net.Listen("tcp", ":0"), pass l via WithListener to server.New, obtain the bound
address via BoundAddr()/Addr() and use that for agents instead of fixed ports;
additionally capture srv.ListenAndServe() errors by sending them on a startup
error channel and fail fast (or log and abort) if no successful start within the
poll timeout instead of ignoring the error, or alternatively use IsConnected()
polling against the server address to ensure all agents are reachable before
proceeding.

Comment on lines +88 to +101
// Start servers in parallel to speed up setup
var setupWg sync.WaitGroup
var setupMu sync.Mutex
for i := 0; i < numAgents; i++ {
setupWg.Add(1)
go func(idx int) {
defer setupWg.Done()
addr, td := startBenchServer(b, startPort+idx)
setupMu.Lock()
addrs = append(addrs, addr)
teardowns = append(teardowns, td)
setupMu.Unlock()
}(i)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

b.Fatalf called from a non-benchmark goroutine — will panic.

startBenchServer (line 95) calls b.Fatalf on line 56 if server creation fails. Go's testing package explicitly prohibits calling Fatal/Fatalf/FailNow from any goroutine other than the one running the benchmark function. This will produce a panic rather than a clean failure.

Return an error from startBenchServer instead of calling b.Fatalf, and collect errors after setupWg.Wait().

Sketch of the fix

Change startBenchServer signature:

-func startBenchServer(b *testing.B, port int) (string, func()) {
+func startBenchServer(b *testing.B, port int) (string, func(), error) {
 	...
-	b.Fatalf("failed to create server: %v", err)
+	return "", nil, fmt.Errorf("failed to create server on port %d: %w", port, err)

In the setup loop, collect errors and check after the wait:

+	var setupErrors []error
 	for i := 0; i < numAgents; i++ {
 		setupWg.Add(1)
 		go func(idx int) {
 			defer setupWg.Done()
-			addr, td := startBenchServer(b, startPort+idx)
+			addr, td, err := startBenchServer(b, startPort+idx)
 			setupMu.Lock()
+			if err != nil {
+				setupErrors = append(setupErrors, err)
+			} else {
 				addrs = append(addrs, addr)
 				teardowns = append(teardowns, td)
+			}
 			setupMu.Unlock()
 		}(i)
 	}
 	setupWg.Wait()
+	if len(setupErrors) > 0 {
+		b.Fatalf("server setup failed: %v", setupErrors)
+	}
🧰 Tools
🪛 OpenGrep (1.16.0)

[WARNING] 94-94: Using defer inside a loop delays resource release and increases memory usage. Move defer outside the loop or use a function to scope it.

(go.defer.in-loop)

🪛 Semgrep (1.151.0)

[WARNING] 94-94: Using defer inside a loop delays resource release and increases memory usage. Move defer outside the loop or use a function to scope it.

(go.defer.in-loop)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/e2e_bench_test.go` around lines 88 - 101, startBenchServer
currently calls b.Fatalf on error which panics when invoked from goroutines;
change startBenchServer to return (addr string, td func(), err error) instead of
calling b.Fatalf, update the goroutine to capture the returned err into a shared
errors slice (protected by setupMu) along with addrs and teardowns, and after
setupWg.Wait() check the collected errors and call b.Fatalf (or b.Fatalf with
combined error info) from the benchmark goroutine if any error occurred; update
any callers of startBenchServer to handle the new error return.

Comment on lines +144 to +146
if time.Since(start) > 5*time.Second {
b.Logf("Warning: only %d/%d agents connected", connected, numAgents)
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Partial connectivity should be fatal, not a warning.

If fewer than numAgents servers are connected after 5 seconds, the benchmark proceeds with degraded topology and produces misleading latency numbers. This should fail the benchmark outright.

 		if time.Since(start) > 5*time.Second {
-			b.Logf("Warning: only %d/%d agents connected", connected, numAgents)
-			break
+			b.Fatalf("only %d/%d agents connected after 5s — aborting benchmark", connected, numAgents)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if time.Since(start) > 5*time.Second {
b.Logf("Warning: only %d/%d agents connected", connected, numAgents)
break
if time.Since(start) > 5*time.Second {
b.Fatalf("only %d/%d agents connected after 5s — aborting benchmark", connected, numAgents)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/e2e_bench_test.go` around lines 144 - 146, The code
currently logs a non-fatal warning when connectivity is partial (the
time.Since(start) > 5*time.Second branch) which allows the benchmark to continue
with fewer than numAgents; change this to fail the benchmark immediately by
replacing the warning log with a fatal failure (use b.Fatalf or equivalent) so
the benchmark halts when connected < numAgents after the timeout; update the
branch that references start, connected, and numAgents in e2e_bench_test.go to
call b.Fatalf with a clear message including connected and numAgents.

}
})

client.Close(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Defer client.Close to ensure cleanup on panic.

If any sub-benchmark panics (e.g., b.Fatal inside the loop), line 178 is never reached and connections leak. Move the close into a defer right after the client is created.

+	defer client.Close(ctx)
+
 	// Start monitor to connect
 	ech, err := client.StartConnectionMonitor(ctx)

And remove the standalone call at line 178.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/net/grpc/e2e_bench_test.go` at line 178, The test currently calls
client.Close(ctx) at the end (client.Close) which leaks connections if a
sub-benchmark panics; after creating the client (the variable named client in
e2e_bench_test.go) immediately call defer client.Close(ctx) so the connection is
always cleaned up on panic or early return, and remove the standalone
client.Close(ctx) call at line 178.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments