-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Fix/memory leak whatsapp reasoning #884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7276a2d
cdbc9c4
c7d75a1
1d0220f
9b80fdf
fc28c26
d1b10a0
7f425f1
871b2d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ package agent | |
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "path/filepath" | ||
| "strings" | ||
|
|
@@ -574,11 +575,36 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan | |
| return | ||
| } | ||
|
|
||
| al.bus.PublishOutbound(ctx, bus.OutboundMessage{ | ||
| // Use a short timeout so the goroutine does not block indefinitely when | ||
| // the outbound bus is full. Reasoning output is best-effort; dropping it | ||
| // is acceptable to avoid goroutine accumulation. | ||
| pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer pubCancel() | ||
|
|
||
| if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ | ||
| Channel: channelName, | ||
| ChatID: channelID, | ||
| Content: reasoningContent, | ||
| }) | ||
| }); err != nil { | ||
| // Treat context.DeadlineExceeded / context.Canceled as expected | ||
| // (bus full under load, or parent canceled). Check the error | ||
| // itself rather than ctx.Err(), because pubCtx may time out | ||
| // (5 s) while the parent ctx is still active. | ||
| // Also treat ErrBusClosed as expected β it occurs during normal | ||
| // shutdown when the bus is closed before all goroutines finish. | ||
| if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) || | ||
| errors.Is(err, bus.ErrBusClosed) { | ||
| logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } else { | ||
| logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } | ||
|
Comment on lines
588
to
606
|
||
| } | ||
|
Comment on lines
588
to
607
|
||
| } | ||
|
|
||
| // runLLMIteration executes the LLM call loop with tool handling. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ import ( | |||||||||||||||
| "path/filepath" | ||||||||||||||||
| "strings" | ||||||||||||||||
| "sync" | ||||||||||||||||
| "sync/atomic" | ||||||||||||||||
| "time" | ||||||||||||||||
|
|
||||||||||||||||
| "github.com/mdp/qrterminal/v3" | ||||||||||||||||
|
|
@@ -56,6 +57,8 @@ type WhatsAppNativeChannel struct { | |||||||||||||||
| runCancel context.CancelFunc | ||||||||||||||||
| reconnectMu sync.Mutex | ||||||||||||||||
| reconnecting bool | ||||||||||||||||
| stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls | ||||||||||||||||
| wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection. | ||||||||||||||||
|
|
@@ -80,6 +83,14 @@ func NewWhatsAppNativeChannel( | |||||||||||||||
| func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | ||||||||||||||||
| logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath}) | ||||||||||||||||
|
|
||||||||||||||||
| // Reset lifecycle state from any previous Stop() so a restarted channel | ||||||||||||||||
| // behaves correctly. Use reconnectMu to be consistent with eventHandler | ||||||||||||||||
| // and Stop() which coordinate under the same lock. | ||||||||||||||||
| c.reconnectMu.Lock() | ||||||||||||||||
| c.stopping.Store(false) | ||||||||||||||||
| c.reconnecting = false | ||||||||||||||||
| c.reconnectMu.Unlock() | ||||||||||||||||
|
|
||||||||||||||||
| if err := os.MkdirAll(c.storePath, 0o700); err != nil { | ||||||||||||||||
| return fmt.Errorf("create session store dir: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
|
|
@@ -112,63 +123,142 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | |||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| client := whatsmeow.NewClient(deviceStore, waLogger) | ||||||||||||||||
|
|
||||||||||||||||
| // Create runCtx/runCancel BEFORE registering event handler and starting | ||||||||||||||||
| // goroutines so that Stop() can cancel them at any time, including during | ||||||||||||||||
| // the QR-login flow. | ||||||||||||||||
| c.runCtx, c.runCancel = context.WithCancel(ctx) | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
+127
to
+131
|
||||||||||||||||
| client.AddEventHandler(c.eventHandler) | ||||||||||||||||
|
|
||||||||||||||||
| c.mu.Lock() | ||||||||||||||||
| c.container = container | ||||||||||||||||
| c.client = client | ||||||||||||||||
| c.mu.Unlock() | ||||||||||||||||
|
|
||||||||||||||||
| // cleanupOnError clears struct references and releases resources when | ||||||||||||||||
| // Start() fails after fields are already assigned. This prevents | ||||||||||||||||
| // Stop() from operating on stale references (double-close, disconnect | ||||||||||||||||
| // of a partially-initialized client, or stray event handler callbacks). | ||||||||||||||||
| startOK := false | ||||||||||||||||
| defer func() { | ||||||||||||||||
| if startOK { | ||||||||||||||||
| return | ||||||||||||||||
| } | ||||||||||||||||
| c.runCancel() | ||||||||||||||||
| client.Disconnect() | ||||||||||||||||
| c.mu.Lock() | ||||||||||||||||
| c.client = nil | ||||||||||||||||
| c.container = nil | ||||||||||||||||
| c.mu.Unlock() | ||||||||||||||||
| _ = container.Close() | ||||||||||||||||
| }() | ||||||||||||||||
|
|
||||||||||||||||
| if client.Store.ID == nil { | ||||||||||||||||
| qrChan, err := client.GetQRChannel(ctx) | ||||||||||||||||
| qrChan, err := client.GetQRChannel(c.runCtx) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| _ = container.Close() | ||||||||||||||||
| return fmt.Errorf("get QR channel: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| if err := client.Connect(); err != nil { | ||||||||||||||||
| _ = container.Close() | ||||||||||||||||
| return fmt.Errorf("connect: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| for evt := range qrChan { | ||||||||||||||||
| if evt.Event == "code" { | ||||||||||||||||
| logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil) | ||||||||||||||||
| qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{ | ||||||||||||||||
| Level: qrterminal.L, | ||||||||||||||||
| Writer: os.Stdout, | ||||||||||||||||
| HalfBlocks: true, | ||||||||||||||||
| }) | ||||||||||||||||
| } else { | ||||||||||||||||
| logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event}) | ||||||||||||||||
| } | ||||||||||||||||
| // Handle QR events in a background goroutine so Start() returns | ||||||||||||||||
| // promptly. The goroutine is tracked via c.wg and respects | ||||||||||||||||
| // c.runCtx for cancellation. | ||||||||||||||||
| // Guard wg.Add with reconnectMu + stopping check (same protocol | ||||||||||||||||
| // as eventHandler) so a concurrent Stop() cannot enter wg.Wait() | ||||||||||||||||
| // while we call wg.Add(1). | ||||||||||||||||
| c.reconnectMu.Lock() | ||||||||||||||||
| if c.stopping.Load() { | ||||||||||||||||
| c.reconnectMu.Unlock() | ||||||||||||||||
| return fmt.Errorf("channel stopped during QR setup") | ||||||||||||||||
| } | ||||||||||||||||
| c.wg.Add(1) | ||||||||||||||||
| c.reconnectMu.Unlock() | ||||||||||||||||
| go func() { | ||||||||||||||||
| defer c.wg.Done() | ||||||||||||||||
|
Comment on lines
166
to
179
|
||||||||||||||||
| // promptly. The goroutine is tracked via c.wg and respects | |
| // c.runCtx for cancellation. | |
| c.wg.Add(1) | |
| go func() { | |
| defer c.wg.Done() | |
| // promptly. The goroutine respects c.runCtx for cancellation. | |
| go func() { |
Copilot
AI
Feb 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests for
handleReasoning, but none cover the new βbus is full / publish times outβ path. Add a test that fills the outbound buffer and uses a short-deadline parent context to asserthandleReasoningreturns promptly (and does not publish) whenPublishOutboundcanβt enqueue before the deadline.