Conversation
…ifecycle - Move runCtx/runCancel creation before event handler registration and QR loop so Stop() can cancel at any point during startup - Replace blocking QR event loop in Start() with a background goroutine that selects on runCtx.Done(), preventing Start() from hanging indefinitely when waiting for QR scan - Track all background goroutines (QR handler, reconnect) with sync.WaitGroup; Stop() waits for them to finish before releasing client/container resources - Cancel runCtx on error paths in Start() to avoid leaked contexts Fixes resource leak introduced in sipeed#655.
Add a 5-second timeout to handleReasoning's PublishOutbound call so fire-and-forget goroutines do not block indefinitely when the outbound bus channel is full. Reasoning output is best-effort; on timeout the publish is abandoned with a warning log instead of holding the goroutine alive. Fixes goroutine leak introduced in sipeed#802.
There was a problem hiding this comment.
Pull request overview
Fixes potential goroutine/memory leaks in channel integrations by ensuring background work can be canceled and does not block indefinitely on bounded queues.
Changes:
- WhatsApp native channel: move QR handling off the
Start()call path; track QR/reconnect goroutines with aWaitGroupand cancel viarunCtx. - WhatsApp native channel: track reconnect goroutine instead of fire-and-forget.
- Agent reasoning publishing: add a bounded publish timeout and log failures as best-effort.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
pkg/channels/whatsapp_native/whatsapp_native.go |
Introduces runCtx earlier, moves QR loop to a tracked goroutine, and tracks reconnect goroutines via WaitGroup. |
pkg/agent/loop.go |
Adds a timeout + error handling for reasoning publishes to avoid indefinite blocking when the outbound bus is full. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -122,33 +129,50 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | |||
| if client.Store.ID == nil { | |||
| qrChan, err := client.GetQRChannel(ctx) | |||
There was a problem hiding this comment.
client.GetQRChannel is still called with the parent ctx, but the QR reader goroutine exits on c.runCtx.Done() during Stop(). If GetQRChannel uses its context to stop its internal producer, canceling only runCtx can leave the producer running and blocked trying to send into qrChan after the reader exits. Use c.runCtx (or another context canceled by Stop()) when calling GetQRChannel so the producer is also canceled/closed when stopping.
| qrChan, err := client.GetQRChannel(ctx) | |
| qrChan, err := client.GetQRChannel(c.runCtx) |
| if c.runCancel != nil { | ||
| c.runCancel() | ||
| } | ||
|
|
||
| // Wait for background goroutines (QR handler, reconnect) to finish so | ||
| // they don't reference the client/container after cleanup. | ||
| c.wg.Wait() | ||
|
|
There was a problem hiding this comment.
Stop() calls c.wg.Wait() while eventHandler can still call c.wg.Add(1) concurrently (e.g., a *events.Disconnected arriving during shutdown). This is a WaitGroup misuse and can panic with sync: WaitGroup misuse: Add called concurrently with Wait, or let reconnect goroutines run after cleanup. Add a shutdown/stopping guard (protected by a mutex/atomic) that prevents new goroutines from being added once stopping begins, and/or ensure the client can't dispatch events before Wait() is entered.
| // Wait for background goroutines (QR handler, reconnect) to finish so | ||
| // they don't reference the client/container after cleanup. | ||
| c.wg.Wait() | ||
|
|
||
| c.mu.Lock() | ||
| client := c.client | ||
| container := c.container |
There was a problem hiding this comment.
Stop(ctx) now waits unconditionally on c.wg.Wait(), but the tracked goroutines can block for an unbounded time (notably reconnectWithBackoff during client.Connect()), and Stop ignores its ctx parameter. Consider making the wait context-aware (e.g., wait in a separate goroutine and select on ctx.Done()), and/or disconnect the client before waiting so Connect() can be interrupted.
| // Wait for background goroutines (QR handler, reconnect) to finish so | |
| // they don't reference the client/container after cleanup. | |
| c.wg.Wait() | |
| c.mu.Lock() | |
| client := c.client | |
| container := c.container | |
| // Grab current client/container so we can disconnect the client before waiting. | |
| c.mu.Lock() | |
| client := c.client | |
| container := c.container | |
| c.mu.Unlock() | |
| // Disconnect the client first so any blocking Connect()/reconnect loops | |
| // can be interrupted before we wait on the goroutines. | |
| if client != nil { | |
| client.Disconnect() | |
| } | |
| // Wait for background goroutines (QR handler, reconnect) to finish in a | |
| // context-aware way so Stop can be bounded by ctx. | |
| done := make(chan struct{}) | |
| go func() { | |
| c.wg.Wait() | |
| close(done) | |
| }() | |
| select { | |
| case <-done: | |
| // All goroutines have finished. | |
| case <-ctx.Done(): | |
| // Context canceled or timed out; log and proceed with best-effort cleanup. | |
| logger.WarnC("whatsapp", "Stop context canceled before all goroutines finished: %v", ctx.Err()) | |
| } | |
| // Now it is safe to clear and close resources. | |
| c.mu.Lock() | |
| client = c.client | |
| container = c.container |
| // Use a short timeout so the goroutine does not block indefinitely when | ||
| // the outbound bus is full. Reasoning output is best-effort; dropping it | ||
| // is acceptable to avoid goroutine accumulation. | ||
| pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer pubCancel() | ||
|
|
||
| if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ | ||
| Channel: channelName, | ||
| ChatID: channelID, | ||
| Content: reasoningContent, | ||
| }) | ||
| }); err != nil { |
There was a problem hiding this comment.
There are tests for handleReasoning, but none cover the new “bus is full / publish times out” path. Add a test that fills the outbound buffer and uses a short-deadline parent context to assert handleReasoning returns promptly (and does not publish) when PublishOutbound can’t enqueue before the deadline.
| }); err != nil { | ||
| logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
handleReasoning treats reasoning as best-effort, but logs a warning on every publish failure/timeout. Under load (when the outbound bus is full), this can become very noisy and add its own performance/ops overhead. Consider lowering the log level, sampling/rate-limiting, or only warning on unexpected errors (e.g., excluding context deadline/cancel).
Huaaudio
left a comment
There was a problem hiding this comment.
Nice fix to save the real memory leak problem. LGTM
- Use c.runCtx for GetQRChannel so the QR producer is canceled on Stop() - Add atomic stopping guard to prevent wg.Add/wg.Wait race in eventHandler - Make Stop() context-aware: disconnect client before waiting, respect ctx deadline - Reduce reasoning publish log noise: use debug level for expected ctx errors - Add test for handleReasoning when outbound bus is full (timeout path)
Review: TOCTOU Race Condition in eventHandlerThanks for addressing these memory leak issues. The overall approach is sound, but there's a critical race condition that still needs to be fixed before merging. The ProblemIn case *events.Disconnected:
if c.stopping.Load() { // Check
return
}
c.reconnectMu.Lock()
// ...
c.reconnectMu.Unlock()
c.wg.Add(1) // Use - NOT atomic with the check above
go func() { ... }()Race scenario:
As Copilot noted, this can cause a panic: Suggested FixMove case *events.Disconnected:
c.reconnectMu.Lock()
if c.reconnecting {
c.reconnectMu.Unlock()
return
}
if c.stopping.Load() { // Check stopping while holding the lock
c.reconnectMu.Unlock()
return
}
c.reconnecting = true
c.wg.Add(1) // Add to WaitGroup while holding the lock
c.reconnectMu.Unlock()
go func() {
defer c.wg.Done()
c.reconnectWithBackoff()
}()This ensures the Minor Observations
|
nikolasdehor
left a comment
There was a problem hiding this comment.
Solid fix addressing three distinct memory leak vectors in the WhatsApp native channel. Let me break down each:
1. QR loop blocking in Start() -- Previously the for evt := range qrChan loop ran synchronously in Start(), which meant Start() would not return until QR auth completed (or the channel closed). Now it runs in a goroutine tracked by c.wg. Good.
2. Fire-and-forget reconnect goroutine -- go c.reconnectWithBackoff() was not tracked, so Stop() could return while reconnection was still in progress. Now tracked via c.wg.Add(1) / defer c.wg.Done().
3. Reasoning goroutine accumulation -- handleReasoning used to call PublishOutbound without timeout, so if the outbound bus was full, goroutines would pile up indefinitely. The 5-second timeout with best-effort semantics is the right tradeoff -- reasoning output is supplementary and dropping it is acceptable.
Architecture observations:
- Moving
runCtx/runCancelcreation beforeclient.Connect()is correct -- it ensures Stop() can cancel the QR flow at any time. - The
stoppingatomic bool prevents new goroutines from being spawned after Stop() begins. This closes the race where a disconnect event fires betweenStop()start andrunCancel(). - The context-aware
wg.Wait()in Stop() with fallback to best-effort cleanup is well designed -- prevents Stop() from hanging indefinitely if a goroutine is stuck. - The test for the bus-full scenario verifies the timeout behavior correctly.
One minor concern (non-blocking): wg.Add(1) in the event handler runs on the whatsmeow event dispatch goroutine. If stopping.Store(true) and wg.Add(1) race, there is a tiny window where wg.Add(1) is called after Stop()'s wg.Wait() has already started, which would panic. The stopping.Load() check before wg.Add(1) mitigates this, but the ordering is not guaranteed by the memory model since stopping.Store and runCancel are not synchronized. In practice the client.Disconnect() call before wg.Wait() should prevent whatsmeow from dispatching more events, so this should be safe. Worth a comment noting this assumption.
LGTM.
Move the stopping check and wg.Add(1) inside reconnectMu in eventHandler, and set the stopping flag under the same lock in Stop(). This makes the two operations atomic with respect to each other, preventing the race where: 1. eventHandler checks stopping (false) 2. Stop() sets stopping=true and enters wg.Wait() (wg is 0) 3. eventHandler calls wg.Add(1) → panic or goroutine leak
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err := client.Connect(); err != nil { | ||
| c.runCancel() | ||
| _ = container.Close() | ||
| return fmt.Errorf("connect: %w", err) | ||
| } | ||
| } | ||
|
|
||
| c.runCtx, c.runCancel = context.WithCancel(ctx) | ||
| c.SetRunning(true) | ||
| logger.InfoC("whatsapp", "WhatsApp native channel connected") |
There was a problem hiding this comment.
In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.
| }); err != nil { | ||
| // Only log unexpected errors; context deadline/cancel are expected when | ||
| // the bus is full under load and would be too noisy to warn about. | ||
| if ctx.Err() == nil { | ||
| logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } else { | ||
| logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
The error logging path checks ctx.Err() to decide whether the PublishOutbound error is expected, but the failure may come from pubCtx timing out (5s) while the parent ctx is still active. In that case ctx.Err() is nil and this will incorrectly warn for the expected "bus full" timeout. Consider basing the decision on err / pubCtx.Err() (e.g., treat context.Canceled/DeadlineExceeded as expected) instead of ctx.Err().
pkg/agent/loop_test.go
Outdated
| // Fill the outbound bus buffer (default size is 64) so that the | ||
| // next PublishOutbound will block until the context deadline. | ||
| for i := 0; i < 64; i++ { | ||
| err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ | ||
| Channel: "filler", | ||
| ChatID: "filler", | ||
| Content: fmt.Sprintf("filler-%d", i), | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("failed to fill bus: %v", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
This test hard-codes the outbound bus buffer size (64) and uses context.Background() when filling it. If the bus buffer size changes in the future, the fill loop could block indefinitely and hang the test suite. Consider filling until PublishOutbound fails using a short timeout context (or introducing a helper that determines/fills capacity) so the test remains robust to buffer-size changes.
| // Fill the outbound bus buffer (default size is 64) so that the | |
| // next PublishOutbound will block until the context deadline. | |
| for i := 0; i < 64; i++ { | |
| err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ | |
| Channel: "filler", | |
| ChatID: "filler", | |
| Content: fmt.Sprintf("filler-%d", i), | |
| }) | |
| if err != nil { | |
| t.Fatalf("failed to fill bus: %v", err) | |
| } | |
| } | |
| // Fill the outbound bus buffer so that the next PublishOutbound will | |
| // block until the context deadline. Do not assume a specific buffer | |
| // size; instead, publish until PublishOutbound fails with a short | |
| // timeout context. | |
| const maxFillAttempts = 10000 | |
| filledCount := 0 | |
| for i := 0; i < maxFillAttempts; i++ { | |
| fillCtx, fillCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
| err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{ | |
| Channel: "filler", | |
| ChatID: "filler", | |
| Content: fmt.Sprintf("filler-%d", i), | |
| }) | |
| fillCancel() | |
| if err != nil { | |
| // We assume the bus is full or otherwise unable to accept more | |
| // messages once PublishOutbound returns an error. | |
| if filledCount == 0 { | |
| t.Fatalf("failed to publish to outbound bus even once: %v", err) | |
| } | |
| break | |
| } | |
| filledCount++ | |
| } | |
| if filledCount == maxFillAttempts { | |
| t.Fatalf("failed to reach outbound bus capacity after %d attempts; test may not reflect fullness semantics", maxFillAttempts) | |
| } |
- WhatsApp Send(): detect unpaired state (Store.ID == nil) and return ErrTemporary instead of attempting to send while QR login is pending - handleReasoning: check the returned error type (DeadlineExceeded / Canceled) instead of ctx.Err() to decide log level, so pubCtx timeouts on a full bus are correctly classified as expected - Test: fill bus with a short-timeout loop instead of hardcoding the buffer size (64), making the test resilient to buffer size changes
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Create runCtx/runCancel BEFORE registering event handler and starting | ||
| // goroutines so that Stop() can cancel them at any time, including during | ||
| // the QR-login flow. | ||
| c.runCtx, c.runCancel = context.WithCancel(ctx) | ||
|
|
There was a problem hiding this comment.
c.stopping is set to true in Stop() but never reset in Start(). If this channel instance is ever restarted after a Stop(), the disconnect handler will permanently skip reconnection attempts because eventHandler returns early when stopping is true. Consider resetting stopping (and any related reconnect state) at the beginning of Start(), under the same lock used in Stop()/eventHandler, so lifecycle restarts behave correctly.
| // promptly. The goroutine is tracked via c.wg and respects | ||
| // c.runCtx for cancellation. | ||
| c.wg.Add(1) | ||
| go func() { | ||
| defer c.wg.Done() |
There was a problem hiding this comment.
c.wg.Add(1) for the QR handler goroutine is not coordinated with Stop() calling c.wg.Wait(). If Stop() can run concurrently with Start() (the comments imply it can, e.g. during QR-login), there is a real sync.WaitGroup misuse risk: Stop may enter Wait() while Start subsequently calls Add(1), which can panic. To fix, ensure all wg.Add calls are prevented once Stop begins (e.g., guard wg.Add with the same mutex/flag protocol used in eventHandler, or introduce a lifecycle mutex that serializes Start/Stop).
| // promptly. The goroutine is tracked via c.wg and respects | |
| // c.runCtx for cancellation. | |
| c.wg.Add(1) | |
| go func() { | |
| defer c.wg.Done() | |
| // promptly. The goroutine respects c.runCtx for cancellation. | |
| go func() { |
- WhatsApp Start(): use deferred cleanup to nil out c.client/c.container and disconnect/close resources on any error after struct fields are assigned, preventing stale references and double-close in Stop() - handleReasoning: treat bus.ErrBusClosed as an expected condition (DEBUG level) alongside context timeout/cancel, avoiding WARN noise during normal shutdown
yinwm
left a comment
There was a problem hiding this comment.
Review Summary
After multiple iterations, this PR has successfully fixed the memory leak issues:
✅ Issues Fixed
- QR loop blocking Start() - QR event handling moved to background goroutine
- Fire-and-forget reconnect goroutine - Now tracked with WaitGroup
- Reasoning goroutine accumulation - Added 5-second timeout
- TOCTOU race condition - Properly fixed in commit fc28c26
Code Quality
- Start()'s defer cleanup ensures resources are properly released
- Stop() is context-aware, supporting graceful shutdown
- reconnectWithBackoff() correctly responds to runCtx cancellation
- Tests cover timeout behavior when bus is full
Minor Suggestions (Non-blocking)
reconnectMucould be renamed tolifecycleMufor clarity- Test threshold of 2s could be reduced to 1s for tighter bounds
- Stop() timeout behavior could be documented in comments
LGTM 👍
📝 Description
Fix memory leak issues in the WhatsApp Native Channel:
Issue 1: QR loop blocking in Start()
Issue 2: fire-and-forget reconnection goroutine
Fix issues in the Reasoning Channel that could cause memory leaks:
Each LLM response initiates an untracked goroutine
al.handleReasoning(ctx, response.Reasoning, opts.Channel, ...)🗣️ Type of Change
🤖 AI Code Generation
☑️ Checklist