Skip to content

Commit f36187f

Browse files
authored
stress test for the new RPC streaming primitives (+ bug fixes) (#2828)
This pull request introduces a new integration test tool for the StreamManager streaming system, adding a standalone test binary with supporting modules for simulating and verifying high-throughput data transfer. The changes include a test driver, a configurable in-memory delivery pipe for simulating network conditions, a data generator, a verifier for end-to-end integrity, and a metrics tracker. Additionally, several improvements are made to the circular buffer and StreamManager for better handling of blocking writes and out-of-order acknowledgments. **New StreamManager Integration Test Tool** * Added a new test binary `cmd/test-streammanager` with a main driver (`main-test-streammanager.go`) that orchestrates end-to-end streaming tests, including configuration for data size, delivery delay/skew, window size, slow reader simulation, and verbose logging. * Implemented a configurable `DeliveryPipe` (`deliverypipe.go`) for simulating network delivery with delay and skew, supporting separate data and ack channels, out-of-order delivery, and high water mark tracking. * Added `WriterBridge` and `ReaderBridge` modules for interfacing between brokers and the delivery pipe, enforcing correct directionality of data and acks. * Created a sequential test data generator (`generator.go`) and a verifier (`verifier.go`) for checking data integrity and reporting mismatches. [[1]](diffhunk://#diff-3f2d6e0349089e3748c001791a383687b33a2c2391fd3baccfceb83e76e6ee0dR1-R40) [[2]](diffhunk://#diff-cb3aab0bae9bec15ef0c06fe5d9e0e96094affcf4720680605a92054ab717575R1-R61) * Introduced a metrics module (`metrics.go`) for tracking throughput, packet counts, out-of-order events, and pipe usage, with a summary report at test completion. **StreamManager and CirBuf Improvements** * Refactored circular buffer (`pkg/jobmanager/cirbuf.go`) to replace blocking writes with a non-blocking `WriteAvailable` method, returning a wait channel for buffer-full scenarios, and removed context-based cancellation logic. * Updated StreamManager (`pkg/jobmanager/streammanager.go`) to track the maximum acknowledged sequence/rwnd tuple, ignoring stale or out-of-order ACKs, and resetting this state on disconnect. * Modified StreamManager's data handling to use the new non-blocking buffer write logic, ensuring correct signaling and waiting for space when needed. **Minor Cleanup** * Removed unused context import from `cirbuf.go`. * Minor whitespace cleanup in `streambroker.go`.
1 parent 0fb25da commit f36187f

File tree

9 files changed

+816
-54
lines changed

9 files changed

+816
-54
lines changed

cmd/test-streammanager/bridge.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2026, Command Line Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"fmt"
8+
9+
"github.com/wavetermdev/waveterm/pkg/wshrpc"
10+
)
11+
12+
// WriterBridge - used by the writer broker
13+
// Sends data to the pipe, receives acks from the pipe
14+
type WriterBridge struct {
15+
pipe *DeliveryPipe
16+
}
17+
18+
func (b *WriterBridge) StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error {
19+
b.pipe.EnqueueData(data)
20+
return nil
21+
}
22+
23+
func (b *WriterBridge) StreamDataAckCommand(ack wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error {
24+
return fmt.Errorf("writer bridge should not send acks")
25+
}
26+
27+
// ReaderBridge - used by the reader broker
28+
// Sends acks to the pipe, receives data from the pipe
29+
type ReaderBridge struct {
30+
pipe *DeliveryPipe
31+
}
32+
33+
func (b *ReaderBridge) StreamDataCommand(data wshrpc.CommandStreamData, opts *wshrpc.RpcOpts) error {
34+
return fmt.Errorf("reader bridge should not send data")
35+
}
36+
37+
func (b *ReaderBridge) StreamDataAckCommand(ack wshrpc.CommandStreamAckData, opts *wshrpc.RpcOpts) error {
38+
b.pipe.EnqueueAck(ack)
39+
return nil
40+
}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
// Copyright 2026, Command Line Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"encoding/base64"
8+
"math/rand"
9+
"sort"
10+
"sync"
11+
"time"
12+
13+
"github.com/wavetermdev/waveterm/pkg/wshrpc"
14+
)
15+
16+
type DeliveryConfig struct {
17+
Delay time.Duration
18+
Skew time.Duration
19+
}
20+
21+
type taggedPacket struct {
22+
seq uint64
23+
deliveryTime time.Time
24+
isData bool
25+
dataPk wshrpc.CommandStreamData
26+
ackPk wshrpc.CommandStreamAckData
27+
dataSize int
28+
}
29+
30+
type DeliveryPipe struct {
31+
lock sync.Mutex
32+
config DeliveryConfig
33+
34+
// Sequence counters (separate for data and ack)
35+
dataSeq uint64
36+
ackSeq uint64
37+
38+
// Pending packets sorted by (deliveryTime, seq)
39+
dataPending []taggedPacket
40+
ackPending []taggedPacket
41+
42+
// Delivery targets
43+
dataTarget func(wshrpc.CommandStreamData)
44+
ackTarget func(wshrpc.CommandStreamAckData)
45+
46+
// Control
47+
closed bool
48+
wg sync.WaitGroup
49+
50+
// Metrics
51+
metrics *Metrics
52+
lastDataSeqNum int64
53+
lastAckSeqNum int64
54+
55+
// Byte tracking for high water mark
56+
currentBytes int64
57+
}
58+
59+
func NewDeliveryPipe(config DeliveryConfig, metrics *Metrics) *DeliveryPipe {
60+
return &DeliveryPipe{
61+
config: config,
62+
metrics: metrics,
63+
lastDataSeqNum: -1,
64+
lastAckSeqNum: -1,
65+
}
66+
}
67+
68+
func (dp *DeliveryPipe) SetDataTarget(fn func(wshrpc.CommandStreamData)) {
69+
dp.lock.Lock()
70+
defer dp.lock.Unlock()
71+
dp.dataTarget = fn
72+
}
73+
74+
func (dp *DeliveryPipe) SetAckTarget(fn func(wshrpc.CommandStreamAckData)) {
75+
dp.lock.Lock()
76+
defer dp.lock.Unlock()
77+
dp.ackTarget = fn
78+
}
79+
80+
func (dp *DeliveryPipe) EnqueueData(pkt wshrpc.CommandStreamData) {
81+
dp.lock.Lock()
82+
defer dp.lock.Unlock()
83+
84+
if dp.closed {
85+
return
86+
}
87+
88+
dataSize := base64.StdEncoding.DecodedLen(len(pkt.Data64))
89+
dp.dataSeq++
90+
tagged := taggedPacket{
91+
seq: dp.dataSeq,
92+
deliveryTime: dp.computeDeliveryTime(),
93+
isData: true,
94+
dataPk: pkt,
95+
dataSize: dataSize,
96+
}
97+
98+
dp.dataPending = append(dp.dataPending, tagged)
99+
dp.sortPending(&dp.dataPending)
100+
101+
dp.currentBytes += int64(dataSize)
102+
if dp.metrics != nil {
103+
dp.metrics.AddDataPacket()
104+
dp.metrics.UpdatePipeHighWaterMark(dp.currentBytes)
105+
}
106+
}
107+
108+
func (dp *DeliveryPipe) EnqueueAck(pkt wshrpc.CommandStreamAckData) {
109+
dp.lock.Lock()
110+
defer dp.lock.Unlock()
111+
112+
if dp.closed {
113+
return
114+
}
115+
116+
dp.ackSeq++
117+
tagged := taggedPacket{
118+
seq: dp.ackSeq,
119+
deliveryTime: dp.computeDeliveryTime(),
120+
isData: false,
121+
ackPk: pkt,
122+
}
123+
124+
dp.ackPending = append(dp.ackPending, tagged)
125+
dp.sortPending(&dp.ackPending)
126+
127+
if dp.metrics != nil {
128+
dp.metrics.AddAckPacket()
129+
}
130+
}
131+
132+
func (dp *DeliveryPipe) computeDeliveryTime() time.Time {
133+
base := time.Now().Add(dp.config.Delay)
134+
135+
if dp.config.Skew == 0 {
136+
return base
137+
}
138+
139+
// Random skew: -skew to +skew
140+
skewNs := dp.config.Skew.Nanoseconds()
141+
randomSkew := time.Duration(rand.Int63n(2*skewNs+1) - skewNs)
142+
return base.Add(randomSkew)
143+
}
144+
145+
func (dp *DeliveryPipe) sortPending(pending *[]taggedPacket) {
146+
sort.Slice(*pending, func(i, j int) bool {
147+
pi, pj := (*pending)[i], (*pending)[j]
148+
if pi.deliveryTime.Equal(pj.deliveryTime) {
149+
return pi.seq < pj.seq
150+
}
151+
return pi.deliveryTime.Before(pj.deliveryTime)
152+
})
153+
}
154+
155+
func (dp *DeliveryPipe) Start() {
156+
dp.wg.Add(2)
157+
go dp.dataDeliveryLoop()
158+
go dp.ackDeliveryLoop()
159+
}
160+
161+
func (dp *DeliveryPipe) dataDeliveryLoop() {
162+
defer dp.wg.Done()
163+
dp.deliveryLoop(
164+
func() *[]taggedPacket { return &dp.dataPending },
165+
func(pkt taggedPacket) {
166+
if dp.dataTarget != nil {
167+
// Track out-of-order packets
168+
if dp.metrics != nil && dp.lastDataSeqNum != -1 {
169+
if pkt.dataPk.Seq < dp.lastDataSeqNum {
170+
dp.metrics.AddOOOPacket()
171+
}
172+
}
173+
dp.lastDataSeqNum = pkt.dataPk.Seq
174+
dp.dataTarget(pkt.dataPk)
175+
176+
dp.lock.Lock()
177+
dp.currentBytes -= int64(pkt.dataSize)
178+
dp.lock.Unlock()
179+
}
180+
},
181+
)
182+
}
183+
184+
func (dp *DeliveryPipe) ackDeliveryLoop() {
185+
defer dp.wg.Done()
186+
dp.deliveryLoop(
187+
func() *[]taggedPacket { return &dp.ackPending },
188+
func(pkt taggedPacket) {
189+
if dp.ackTarget != nil {
190+
// Track out-of-order acks
191+
if dp.metrics != nil && dp.lastAckSeqNum != -1 {
192+
if pkt.ackPk.Seq < dp.lastAckSeqNum {
193+
dp.metrics.AddOOOPacket()
194+
}
195+
}
196+
dp.lastAckSeqNum = pkt.ackPk.Seq
197+
dp.ackTarget(pkt.ackPk)
198+
}
199+
},
200+
)
201+
}
202+
203+
func (dp *DeliveryPipe) deliveryLoop(
204+
getPending func() *[]taggedPacket,
205+
deliver func(taggedPacket),
206+
) {
207+
for {
208+
dp.lock.Lock()
209+
if dp.closed {
210+
dp.lock.Unlock()
211+
return
212+
}
213+
214+
pending := getPending()
215+
now := time.Now()
216+
217+
// Find all packets ready for delivery (deliveryTime <= now)
218+
readyCount := 0
219+
for _, pkt := range *pending {
220+
if pkt.deliveryTime.After(now) {
221+
break
222+
}
223+
readyCount++
224+
}
225+
226+
// Extract ready packets
227+
ready := make([]taggedPacket, readyCount)
228+
copy(ready, (*pending)[:readyCount])
229+
*pending = (*pending)[readyCount:]
230+
231+
dp.lock.Unlock()
232+
233+
// Deliver all ready packets (outside lock)
234+
for _, pkt := range ready {
235+
deliver(pkt)
236+
}
237+
238+
// Always sleep 1ms - simple busy loop
239+
time.Sleep(1 * time.Millisecond)
240+
}
241+
}
242+
243+
func (dp *DeliveryPipe) Close() {
244+
dp.lock.Lock()
245+
dp.closed = true
246+
dp.lock.Unlock()
247+
248+
dp.wg.Wait()
249+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2026, Command Line Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"io"
8+
)
9+
10+
// Base64 charset: all printable, easy to inspect manually
11+
const Base64Chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
12+
13+
type TestDataGenerator struct {
14+
totalBytes int64
15+
generated int64
16+
}
17+
18+
func NewTestDataGenerator(totalBytes int64) *TestDataGenerator {
19+
return &TestDataGenerator{totalBytes: totalBytes}
20+
}
21+
22+
func (g *TestDataGenerator) Read(p []byte) (n int, err error) {
23+
if g.generated >= g.totalBytes {
24+
return 0, io.EOF
25+
}
26+
27+
remaining := g.totalBytes - g.generated
28+
toRead := int64(len(p))
29+
if toRead > remaining {
30+
toRead = remaining
31+
}
32+
33+
// Sequential pattern using base64 chars (0-63 cycling)
34+
for i := int64(0); i < toRead; i++ {
35+
p[i] = Base64Chars[(g.generated+i)%64]
36+
}
37+
38+
g.generated += toRead
39+
return int(toRead), nil
40+
}

0 commit comments

Comments
 (0)