-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Fix/memory leak whatsapp reasoning #884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
7276a2d
cdbc9c4
c7d75a1
1d0220f
9b80fdf
fc28c26
d1b10a0
7f425f1
871b2d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -574,11 +574,22 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan | |
| return | ||
| } | ||
|
|
||
| al.bus.PublishOutbound(ctx, bus.OutboundMessage{ | ||
| // Use a short timeout so the goroutine does not block indefinitely when | ||
| // the outbound bus is full. Reasoning output is best-effort; dropping it | ||
| // is acceptable to avoid goroutine accumulation. | ||
| pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer pubCancel() | ||
|
|
||
| if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ | ||
| Channel: channelName, | ||
| ChatID: channelID, | ||
| Content: reasoningContent, | ||
| }) | ||
| }); err != nil { | ||
| logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } | ||
|
Comment on lines
588
to
607
|
||
| } | ||
|
|
||
| // runLLMIteration executes the LLM call loop with tool handling. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -56,6 +56,7 @@ type WhatsAppNativeChannel struct { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| runCancel context.CancelFunc | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reconnectMu sync.Mutex | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| reconnecting bool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -112,6 +113,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client := whatsmeow.NewClient(deviceStore, waLogger) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create runCtx/runCancel BEFORE registering event handler and starting | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // goroutines so that Stop() can cancel them at any time, including during | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // the QR-login flow. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| c.runCtx, c.runCancel = context.WithCancel(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+127
to
+131
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client.AddEventHandler(c.eventHandler) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| c.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -122,33 +129,50 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if client.Store.ID == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| qrChan, err := client.GetQRChannel(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| qrChan, err := client.GetQRChannel(ctx) | |
| qrChan, err := client.GetQRChannel(c.runCtx) |
Copilot
AI
Feb 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c.wg.Add(1) for the QR handler goroutine is not coordinated with Stop() calling c.wg.Wait(). If Stop() can run concurrently with Start() (the comments imply it can, e.g. during QR-login), there is a real sync.WaitGroup misuse risk: Stop may enter Wait() while Start subsequently calls Add(1), which can panic. To fix, ensure all wg.Add calls are prevented once Stop begins (e.g., guard wg.Add with the same mutex/flag protocol used in eventHandler, or introduce a lifecycle mutex that serializes Start/Stop).
| // promptly. The goroutine is tracked via c.wg and respects | |
| // c.runCtx for cancellation. | |
| c.wg.Add(1) | |
| go func() { | |
| defer c.wg.Done() | |
| // promptly. The goroutine respects c.runCtx for cancellation. | |
| go func() { |
Copilot
AI
Feb 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.
Outdated
Copilot
AI
Feb 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop() calls c.wg.Wait() while eventHandler can still call c.wg.Add(1) concurrently (e.g., a *events.Disconnected arriving during shutdown). This is a WaitGroup misuse and can panic with sync: WaitGroup misuse: Add called concurrently with Wait, or let reconnect goroutines run after cleanup. Add a shutdown/stopping guard (protected by a mutex/atomic) that prevents new goroutines from being added once stopping begins, and/or ensure the client can't dispatch events before Wait() is entered.
Outdated
Copilot
AI
Feb 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop(ctx) now waits unconditionally on c.wg.Wait(), but the tracked goroutines can block for an unbounded time (notably reconnectWithBackoff during client.Connect()), and Stop ignores its ctx parameter. Consider making the wait context-aware (e.g., wait in a separate goroutine and select on ctx.Done()), and/or disconnect the client before waiting so Connect() can be interrupted.
| // Wait for background goroutines (QR handler, reconnect) to finish so | |
| // they don't reference the client/container after cleanup. | |
| c.wg.Wait() | |
| c.mu.Lock() | |
| client := c.client | |
| container := c.container | |
| // Grab current client/container so we can disconnect the client before waiting. | |
| c.mu.Lock() | |
| client := c.client | |
| container := c.container | |
| c.mu.Unlock() | |
| // Disconnect the client first so any blocking Connect()/reconnect loops | |
| // can be interrupted before we wait on the goroutines. | |
| if client != nil { | |
| client.Disconnect() | |
| } | |
| // Wait for background goroutines (QR handler, reconnect) to finish in a | |
| // context-aware way so Stop can be bounded by ctx. | |
| done := make(chan struct{}) | |
| go func() { | |
| c.wg.Wait() | |
| close(done) | |
| }() | |
| select { | |
| case <-done: | |
| // All goroutines have finished. | |
| case <-ctx.Done(): | |
| // Context canceled or timed out; log and proceed with best-effort cleanup. | |
| logger.WarnC("whatsapp", "Stop context canceled before all goroutines finished: %v", ctx.Err()) | |
| } | |
| // Now it is safe to clear and close resources. | |
| c.mu.Lock() | |
| client = c.client | |
| container = c.container |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests for
handleReasoning, but none cover the new βbus is full / publish times outβ path. Add a test that fills the outbound buffer and uses a short-deadline parent context to asserthandleReasoningreturns promptly (and does not publish) whenPublishOutboundcanβt enqueue before the deadline.