Skip to content

Fix/memory leak whatsapp reasoning#884

Merged
yinwm merged 9 commits intosipeed:mainfrom
alexhoshina:fix/memory-leak-whatsapp-reasoning
Feb 28, 2026
Merged

Fix/memory leak whatsapp reasoning#884
yinwm merged 9 commits intosipeed:mainfrom
alexhoshina:fix/memory-leak-whatsapp-reasoning

Conversation

@alexhoshina
Copy link
Collaborator

@alexhoshina alexhoshina commented Feb 27, 2026

📝 Description

Fix memory leak issues in the WhatsApp Native Channel:

Issue 1: QR loop blocking in Start()
Issue 2: fire-and-forget reconnection goroutine

Fix issues in the Reasoning Channel that could cause memory leaks:

Each LLM response initiates an untracked goroutine
al.handleReasoning(ctx, response.Reasoning, opts.Channel, ...)

  • Each LLM response fire-and-forgets a goroutine to publish reasoning content
  • If the message bus's outbound channel is full, the goroutine will block on PublishOutbound
  • High concurrency scenarios may lead to goroutine accumulation, consuming significant memory and stack space

🗣️ Type of Change

  • 🐞 Bug fix (non-breaking change which fixes an issue)

🤖 AI Code Generation

  • 🛠️ Mostly AI-generated (AI draft, Human verified/modified)
  • 👨‍💻 Mostly Human-written (Human lead, AI assisted or none)

☑️ Checklist

  • My code/docs follow the style of this project.
  • I have performed a self-review of my own changes.
  • I have updated the documentation accordingly.

alexhoshina and others added 4 commits February 27, 2026 20:15
…ifecycle

- Move runCtx/runCancel creation before event handler registration and
  QR loop so Stop() can cancel at any point during startup
- Replace blocking QR event loop in Start() with a background goroutine
  that selects on runCtx.Done(), preventing Start() from hanging
  indefinitely when waiting for QR scan
- Track all background goroutines (QR handler, reconnect) with
  sync.WaitGroup; Stop() waits for them to finish before releasing
  client/container resources
- Cancel runCtx on error paths in Start() to avoid leaked contexts

Fixes resource leak introduced in sipeed#655.
Add a 5-second timeout to handleReasoning's PublishOutbound call so
fire-and-forget goroutines do not block indefinitely when the outbound
bus channel is full. Reasoning output is best-effort; on timeout the
publish is abandoned with a warning log instead of holding the
goroutine alive.

Fixes goroutine leak introduced in sipeed#802.
Copilot AI review requested due to automatic review settings February 27, 2026 18:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes potential goroutine/memory leaks in channel integrations by ensuring background work can be canceled and does not block indefinitely on bounded queues.

Changes:

  • WhatsApp native channel: move QR handling off the Start() call path; track QR/reconnect goroutines with a WaitGroup and cancel via runCtx.
  • WhatsApp native channel: track reconnect goroutine instead of fire-and-forget.
  • Agent reasoning publishing: add a bounded publish timeout and log failures as best-effort.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
pkg/channels/whatsapp_native/whatsapp_native.go Introduces runCtx earlier, moves QR loop to a tracked goroutine, and tracks reconnect goroutines via WaitGroup.
pkg/agent/loop.go Adds a timeout + error handling for reasoning publishes to avoid indefinite blocking when the outbound bus is full.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@@ -122,33 +129,50 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
if client.Store.ID == nil {
qrChan, err := client.GetQRChannel(ctx)
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.GetQRChannel is still called with the parent ctx, but the QR reader goroutine exits on c.runCtx.Done() during Stop(). If GetQRChannel uses its context to stop its internal producer, canceling only runCtx can leave the producer running and blocked trying to send into qrChan after the reader exits. Use c.runCtx (or another context canceled by Stop()) when calling GetQRChannel so the producer is also canceled/closed when stopping.

Suggested change
qrChan, err := client.GetQRChannel(ctx)
qrChan, err := client.GetQRChannel(c.runCtx)

Copilot uses AI. Check for mistakes.
Comment on lines 183 to 190
if c.runCancel != nil {
c.runCancel()
}

// Wait for background goroutines (QR handler, reconnect) to finish so
// they don't reference the client/container after cleanup.
c.wg.Wait()

Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop() calls c.wg.Wait() while eventHandler can still call c.wg.Add(1) concurrently (e.g., a *events.Disconnected arriving during shutdown). This is a WaitGroup misuse and can panic with sync: WaitGroup misuse: Add called concurrently with Wait, or let reconnect goroutines run after cleanup. Add a shutdown/stopping guard (protected by a mutex/atomic) that prevents new goroutines from being added once stopping begins, and/or ensure the client can't dispatch events before Wait() is entered.

Copilot uses AI. Check for mistakes.
Comment on lines 187 to 193
// Wait for background goroutines (QR handler, reconnect) to finish so
// they don't reference the client/container after cleanup.
c.wg.Wait()

c.mu.Lock()
client := c.client
container := c.container
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop(ctx) now waits unconditionally on c.wg.Wait(), but the tracked goroutines can block for an unbounded time (notably reconnectWithBackoff during client.Connect()), and Stop ignores its ctx parameter. Consider making the wait context-aware (e.g., wait in a separate goroutine and select on ctx.Done()), and/or disconnect the client before waiting so Connect() can be interrupted.

Suggested change
// Wait for background goroutines (QR handler, reconnect) to finish so
// they don't reference the client/container after cleanup.
c.wg.Wait()
c.mu.Lock()
client := c.client
container := c.container
// Grab current client/container so we can disconnect the client before waiting.
c.mu.Lock()
client := c.client
container := c.container
c.mu.Unlock()
// Disconnect the client first so any blocking Connect()/reconnect loops
// can be interrupted before we wait on the goroutines.
if client != nil {
client.Disconnect()
}
// Wait for background goroutines (QR handler, reconnect) to finish in a
// context-aware way so Stop can be bounded by ctx.
done := make(chan struct{})
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
// All goroutines have finished.
case <-ctx.Done():
// Context canceled or timed out; log and proceed with best-effort cleanup.
logger.WarnC("whatsapp", "Stop context canceled before all goroutines finished: %v", ctx.Err())
}
// Now it is safe to clear and close resources.
c.mu.Lock()
client = c.client
container = c.container

Copilot uses AI. Check for mistakes.
Comment on lines +577 to +587
// Use a short timeout so the goroutine does not block indefinitely when
// the outbound bus is full. Reasoning output is best-effort; dropping it
// is acceptable to avoid goroutine accumulation.
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()

if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channelName,
ChatID: channelID,
Content: reasoningContent,
})
}); err != nil {
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests for handleReasoning, but none cover the new “bus is full / publish times out” path. Add a test that fills the outbound buffer and uses a short-deadline parent context to assert handleReasoning returns promptly (and does not publish) when PublishOutbound can’t enqueue before the deadline.

Copilot uses AI. Check for mistakes.
Comment on lines 587 to 592
}); err != nil {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleReasoning treats reasoning as best-effort, but logs a warning on every publish failure/timeout. Under load (when the outbound bus is full), this can become very noisy and add its own performance/ops overhead. Consider lowering the log level, sampling/rate-limiting, or only warning on unexpected errors (e.g., excluding context deadline/cancel).

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

@Huaaudio Huaaudio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix to save the real memory leak problem. LGTM

- Use c.runCtx for GetQRChannel so the QR producer is canceled on Stop()
- Add atomic stopping guard to prevent wg.Add/wg.Wait race in eventHandler
- Make Stop() context-aware: disconnect client before waiting, respect ctx deadline
- Reduce reasoning publish log noise: use debug level for expected ctx errors
- Add test for handleReasoning when outbound bus is full (timeout path)
@alexhoshina alexhoshina requested a review from yinwm February 27, 2026 19:09
@yinwm
Copy link
Collaborator

yinwm commented Feb 28, 2026

Review: TOCTOU Race Condition in eventHandler

Thanks for addressing these memory leak issues. The overall approach is sound, but there's a critical race condition that still needs to be fixed before merging.

The Problem

In eventHandler(), there's a TOCTOU (Time-Of-Check-Time-Of-Use) race between the stopping check and wg.Add(1):

case *events.Disconnected:
    if c.stopping.Load() {  // Check
        return
    }
    c.reconnectMu.Lock()
    // ...
    c.reconnectMu.Unlock()
    c.wg.Add(1)              // Use - NOT atomic with the check above
    go func() { ... }()

Race scenario:

  1. eventHandler checks stopping.Load() → returns false
  2. Stop() is called, sets stopping.Store(true)
  3. Stop() calls c.wg.Wait() → returns immediately (wg is still 0)
  4. Stop() clears c.client = nil and returns
  5. eventHandler continues, calls c.wg.Add(1), starts reconnect goroutine
  6. Result: Goroutine leak (or worse, sync: WaitGroup misuse panic)

As Copilot noted, this can cause a panic: sync: WaitGroup misuse: Add called concurrently with Wait.

Suggested Fix

Move wg.Add(1) inside the lock, and check stopping while holding the lock:

case *events.Disconnected:
    c.reconnectMu.Lock()
    if c.reconnecting {
        c.reconnectMu.Unlock()
        return
    }
    if c.stopping.Load() {  // Check stopping while holding the lock
        c.reconnectMu.Unlock()
        return
    }
    c.reconnecting = true
    c.wg.Add(1)  // Add to WaitGroup while holding the lock
    c.reconnectMu.Unlock()
    go func() {
        defer c.wg.Done()
        c.reconnectWithBackoff()
    }()

This ensures the stopping check and wg.Add(1) are atomic with respect to Stop().

Minor Observations

  • The test coverage and context-aware Stop() improvements look good
  • The log level differentiation (Warn for unexpected, Debug for context cancellation) is a nice touch

Copy link

@nikolasdehor nikolasdehor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid fix addressing three distinct memory leak vectors in the WhatsApp native channel. Let me break down each:

1. QR loop blocking in Start() -- Previously the for evt := range qrChan loop ran synchronously in Start(), which meant Start() would not return until QR auth completed (or the channel closed). Now it runs in a goroutine tracked by c.wg. Good.

2. Fire-and-forget reconnect goroutine -- go c.reconnectWithBackoff() was not tracked, so Stop() could return while reconnection was still in progress. Now tracked via c.wg.Add(1) / defer c.wg.Done().

3. Reasoning goroutine accumulation -- handleReasoning used to call PublishOutbound without timeout, so if the outbound bus was full, goroutines would pile up indefinitely. The 5-second timeout with best-effort semantics is the right tradeoff -- reasoning output is supplementary and dropping it is acceptable.

Architecture observations:

  • Moving runCtx/runCancel creation before client.Connect() is correct -- it ensures Stop() can cancel the QR flow at any time.
  • The stopping atomic bool prevents new goroutines from being spawned after Stop() begins. This closes the race where a disconnect event fires between Stop() start and runCancel().
  • The context-aware wg.Wait() in Stop() with fallback to best-effort cleanup is well designed -- prevents Stop() from hanging indefinitely if a goroutine is stuck.
  • The test for the bus-full scenario verifies the timeout behavior correctly.

One minor concern (non-blocking): wg.Add(1) in the event handler runs on the whatsmeow event dispatch goroutine. If stopping.Store(true) and wg.Add(1) race, there is a tiny window where wg.Add(1) is called after Stop()'s wg.Wait() has already started, which would panic. The stopping.Load() check before wg.Add(1) mitigates this, but the ordering is not guaranteed by the memory model since stopping.Store and runCancel are not synchronized. In practice the client.Disconnect() call before wg.Wait() should prevent whatsmeow from dispatching more events, so this should be safe. Worth a comment noting this assumption.

LGTM.

Move the stopping check and wg.Add(1) inside reconnectMu in
eventHandler, and set the stopping flag under the same lock in Stop().
This makes the two operations atomic with respect to each other,
preventing the race where:
1. eventHandler checks stopping (false)
2. Stop() sets stopping=true and enters wg.Wait() (wg is 0)
3. eventHandler calls wg.Add(1) → panic or goroutine leak
Copilot AI review requested due to automatic review settings February 28, 2026 04:38
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 171 to 179
if err := client.Connect(); err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
}

c.runCtx, c.runCancel = context.WithCancel(ctx)
c.SetRunning(true)
logger.InfoC("whatsapp", "WhatsApp native channel connected")
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.

Copilot uses AI. Check for mistakes.
Comment on lines 587 to 600
}); err != nil {
// Only log unexpected errors; context deadline/cancel are expected when
// the bus is full under load and would be too noisy to warn about.
if ctx.Err() == nil {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error logging path checks ctx.Err() to decide whether the PublishOutbound error is expected, but the failure may come from pubCtx timing out (5s) while the parent ctx is still active. In that case ctx.Err() is nil and this will incorrectly warn for the expected "bus full" timeout. Consider basing the decision on err / pubCtx.Err() (e.g., treat context.Canceled/DeadlineExceeded as expected) instead of ctx.Err().

Copilot uses AI. Check for mistakes.
Comment on lines 804 to 815
// Fill the outbound bus buffer (default size is 64) so that the
// next PublishOutbound will block until the context deadline.
for i := 0; i < 64; i++ {
err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
if err != nil {
t.Fatalf("failed to fill bus: %v", err)
}
}
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test hard-codes the outbound bus buffer size (64) and uses context.Background() when filling it. If the bus buffer size changes in the future, the fill loop could block indefinitely and hang the test suite. Consider filling until PublishOutbound fails using a short timeout context (or introducing a helper that determines/fills capacity) so the test remains robust to buffer-size changes.

Suggested change
// Fill the outbound bus buffer (default size is 64) so that the
// next PublishOutbound will block until the context deadline.
for i := 0; i < 64; i++ {
err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
if err != nil {
t.Fatalf("failed to fill bus: %v", err)
}
}
// Fill the outbound bus buffer so that the next PublishOutbound will
// block until the context deadline. Do not assume a specific buffer
// size; instead, publish until PublishOutbound fails with a short
// timeout context.
const maxFillAttempts = 10000
filledCount := 0
for i := 0; i < maxFillAttempts; i++ {
fillCtx, fillCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
fillCancel()
if err != nil {
// We assume the bus is full or otherwise unable to accept more
// messages once PublishOutbound returns an error.
if filledCount == 0 {
t.Fatalf("failed to publish to outbound bus even once: %v", err)
}
break
}
filledCount++
}
if filledCount == maxFillAttempts {
t.Fatalf("failed to reach outbound bus capacity after %d attempts; test may not reflect fullness semantics", maxFillAttempts)
}

Copilot uses AI. Check for mistakes.
- WhatsApp Send(): detect unpaired state (Store.ID == nil) and return
  ErrTemporary instead of attempting to send while QR login is pending
- handleReasoning: check the returned error type (DeadlineExceeded /
  Canceled) instead of ctx.Err() to decide log level, so pubCtx
  timeouts on a full bus are correctly classified as expected
- Test: fill bus with a short-timeout loop instead of hardcoding the
  buffer size (64), making the test resilient to buffer size changes
Copilot AI review requested due to automatic review settings February 28, 2026 05:00
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +119 to +123
// Create runCtx/runCancel BEFORE registering event handler and starting
// goroutines so that Stop() can cancel them at any time, including during
// the QR-login flow.
c.runCtx, c.runCancel = context.WithCancel(ctx)

Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.stopping is set to true in Stop() but never reset in Start(). If this channel instance is ever restarted after a Stop(), the disconnect handler will permanently skip reconnection attempts because eventHandler returns early when stopping is true. Consider resetting stopping (and any related reconnect state) at the beginning of Start(), under the same lock used in Stop()/eventHandler, so lifecycle restarts behave correctly.

Copilot uses AI. Check for mistakes.
Comment on lines 144 to 148
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
c.wg.Add(1)
go func() {
defer c.wg.Done()
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.wg.Add(1) for the QR handler goroutine is not coordinated with Stop() calling c.wg.Wait(). If Stop() can run concurrently with Start() (the comments imply it can, e.g. during QR-login), there is a real sync.WaitGroup misuse risk: Stop may enter Wait() while Start subsequently calls Add(1), which can panic. To fix, ensure all wg.Add calls are prevented once Stop begins (e.g., guard wg.Add with the same mutex/flag protocol used in eventHandler, or introduce a lifecycle mutex that serializes Start/Stop).

Suggested change
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
c.wg.Add(1)
go func() {
defer c.wg.Done()
// promptly. The goroutine respects c.runCtx for cancellation.
go func() {

Copilot uses AI. Check for mistakes.
- WhatsApp Start(): use deferred cleanup to nil out c.client/c.container
  and disconnect/close resources on any error after struct fields are
  assigned, preventing stale references and double-close in Stop()
- handleReasoning: treat bus.ErrBusClosed as an expected condition
  (DEBUG level) alongside context timeout/cancel, avoiding WARN noise
  during normal shutdown
Copy link
Collaborator

@yinwm yinwm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

After multiple iterations, this PR has successfully fixed the memory leak issues:

✅ Issues Fixed

  1. QR loop blocking Start() - QR event handling moved to background goroutine
  2. Fire-and-forget reconnect goroutine - Now tracked with WaitGroup
  3. Reasoning goroutine accumulation - Added 5-second timeout
  4. TOCTOU race condition - Properly fixed in commit fc28c26

Code Quality

  • Start()'s defer cleanup ensures resources are properly released
  • Stop() is context-aware, supporting graceful shutdown
  • reconnectWithBackoff() correctly responds to runCtx cancellation
  • Tests cover timeout behavior when bus is full

Minor Suggestions (Non-blocking)

  1. reconnectMu could be renamed to lifecycleMu for clarity
  2. Test threshold of 2s could be reduced to 1s for tighter bounds
  3. Stop() timeout behavior could be documented in comments

LGTM 👍

@yinwm yinwm merged commit feee0da into sipeed:main Feb 28, 2026
2 checks passed
@alexhoshina alexhoshina deleted the fix/memory-leak-whatsapp-reasoning branch February 28, 2026 07:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants