-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Fix/memory leak whatsapp reasoning #884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
7276a2d
cdbc9c4
c7d75a1
1d0220f
9b80fdf
fc28c26
d1b10a0
7f425f1
871b2d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -574,11 +574,31 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan | |
| return | ||
| } | ||
|
|
||
| al.bus.PublishOutbound(ctx, bus.OutboundMessage{ | ||
| // Use a short timeout so the goroutine does not block indefinitely when | ||
| // the outbound bus is full. Reasoning output is best-effort; dropping it | ||
| // is acceptable to avoid goroutine accumulation. | ||
| pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer pubCancel() | ||
|
|
||
| if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{ | ||
| Channel: channelName, | ||
| ChatID: channelID, | ||
| Content: reasoningContent, | ||
| }) | ||
| }); err != nil { | ||
| // Only log unexpected errors; context deadline/cancel are expected when | ||
| // the bus is full under load and would be too noisy to warn about. | ||
| if ctx.Err() == nil { | ||
| logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } else { | ||
| logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{ | ||
| "channel": channelName, | ||
| "error": err.Error(), | ||
| }) | ||
| } | ||
|
Comment on lines
588
to
606
|
||
| } | ||
|
Comment on lines
588
to
607
|
||
| } | ||
|
|
||
| // runLLMIteration executes the LLM call loop with tool handling. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -797,4 +797,53 @@ func TestHandleReasoning(t *testing.T) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| t.Fatalf("expected no outbound message, got %+v", msg) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| t.Run("returns promptly when bus is full", func(t *testing.T) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| al, msgBus := newLoop(t) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Fill the outbound bus buffer (default size is 64) so that the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // next PublishOutbound will block until the context deadline. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for i := 0; i < 64; i++ { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Channel: "filler", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ChatID: "filler", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Content: fmt.Sprintf("filler-%d", i), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| t.Fatalf("failed to fill bus: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Fill the outbound bus buffer (default size is 64) so that the | |
| // next PublishOutbound will block until the context deadline. | |
| for i := 0; i < 64; i++ { | |
| err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{ | |
| Channel: "filler", | |
| ChatID: "filler", | |
| Content: fmt.Sprintf("filler-%d", i), | |
| }) | |
| if err != nil { | |
| t.Fatalf("failed to fill bus: %v", err) | |
| } | |
| } | |
| // Fill the outbound bus buffer so that the next PublishOutbound will | |
| // block until the context deadline. Do not assume a specific buffer | |
| // size; instead, publish until PublishOutbound fails with a short | |
| // timeout context. | |
| const maxFillAttempts = 10000 | |
| filledCount := 0 | |
| for i := 0; i < maxFillAttempts; i++ { | |
| fillCtx, fillCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |
| err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{ | |
| Channel: "filler", | |
| ChatID: "filler", | |
| Content: fmt.Sprintf("filler-%d", i), | |
| }) | |
| fillCancel() | |
| if err != nil { | |
| // We assume the bus is full or otherwise unable to accept more | |
| // messages once PublishOutbound returns an error. | |
| if filledCount == 0 { | |
| t.Fatalf("failed to publish to outbound bus even once: %v", err) | |
| } | |
| break | |
| } | |
| filledCount++ | |
| } | |
| if filledCount == maxFillAttempts { | |
| t.Fatalf("failed to reach outbound bus capacity after %d attempts; test may not reflect fullness semantics", maxFillAttempts) | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,6 +15,7 @@ import ( | |||||||||||||||
| "path/filepath" | ||||||||||||||||
| "strings" | ||||||||||||||||
| "sync" | ||||||||||||||||
| "sync/atomic" | ||||||||||||||||
| "time" | ||||||||||||||||
|
|
||||||||||||||||
| "github.com/mdp/qrterminal/v3" | ||||||||||||||||
|
|
@@ -56,6 +57,8 @@ type WhatsAppNativeChannel struct { | |||||||||||||||
| runCancel context.CancelFunc | ||||||||||||||||
| reconnectMu sync.Mutex | ||||||||||||||||
| reconnecting bool | ||||||||||||||||
| stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls | ||||||||||||||||
| wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect) | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection. | ||||||||||||||||
|
|
@@ -112,6 +115,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | |||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| client := whatsmeow.NewClient(deviceStore, waLogger) | ||||||||||||||||
|
|
||||||||||||||||
| // Create runCtx/runCancel BEFORE registering event handler and starting | ||||||||||||||||
| // goroutines so that Stop() can cancel them at any time, including during | ||||||||||||||||
| // the QR-login flow. | ||||||||||||||||
| c.runCtx, c.runCancel = context.WithCancel(ctx) | ||||||||||||||||
|
|
||||||||||||||||
|
Comment on lines
+127
to
+131
|
||||||||||||||||
| client.AddEventHandler(c.eventHandler) | ||||||||||||||||
|
|
||||||||||||||||
| c.mu.Lock() | ||||||||||||||||
|
|
@@ -120,55 +129,106 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error { | |||||||||||||||
| c.mu.Unlock() | ||||||||||||||||
|
|
||||||||||||||||
| if client.Store.ID == nil { | ||||||||||||||||
| qrChan, err := client.GetQRChannel(ctx) | ||||||||||||||||
| qrChan, err := client.GetQRChannel(c.runCtx) | ||||||||||||||||
| if err != nil { | ||||||||||||||||
| c.runCancel() | ||||||||||||||||
| _ = container.Close() | ||||||||||||||||
| return fmt.Errorf("get QR channel: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| if err := client.Connect(); err != nil { | ||||||||||||||||
| c.runCancel() | ||||||||||||||||
| _ = container.Close() | ||||||||||||||||
| return fmt.Errorf("connect: %w", err) | ||||||||||||||||
| } | ||||||||||||||||
| for evt := range qrChan { | ||||||||||||||||
| if evt.Event == "code" { | ||||||||||||||||
| logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil) | ||||||||||||||||
| qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{ | ||||||||||||||||
| Level: qrterminal.L, | ||||||||||||||||
| Writer: os.Stdout, | ||||||||||||||||
| HalfBlocks: true, | ||||||||||||||||
| }) | ||||||||||||||||
| } else { | ||||||||||||||||
| logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event}) | ||||||||||||||||
| // Handle QR events in a background goroutine so Start() returns | ||||||||||||||||
| // promptly. The goroutine is tracked via c.wg and respects | ||||||||||||||||
| // c.runCtx for cancellation. | ||||||||||||||||
| c.wg.Add(1) | ||||||||||||||||
| go func() { | ||||||||||||||||
| defer c.wg.Done() | ||||||||||||||||
|
Comment on lines
166
to
179
|
||||||||||||||||
| // promptly. The goroutine is tracked via c.wg and respects | |
| // c.runCtx for cancellation. | |
| c.wg.Add(1) | |
| go func() { | |
| defer c.wg.Done() | |
| // promptly. The goroutine respects c.runCtx for cancellation. | |
| go func() { |
Copilot
AI
Feb 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are tests for
handleReasoning, but none cover the new βbus is full / publish times outβ path. Add a test that fills the outbound buffer and uses a short-deadline parent context to asserthandleReasoningreturns promptly (and does not publish) whenPublishOutboundcanβt enqueue before the deadline.