Skip to content
24 changes: 22 additions & 2 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,31 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan
return
}

al.bus.PublishOutbound(ctx, bus.OutboundMessage{
// Use a short timeout so the goroutine does not block indefinitely when
// the outbound bus is full. Reasoning output is best-effort; dropping it
// is acceptable to avoid goroutine accumulation.
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()

if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channelName,
ChatID: channelID,
Content: reasoningContent,
})
}); err != nil {
Comment on lines +578 to +588
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests for handleReasoning, but none cover the new β€œbus is full / publish times out” path. Add a test that fills the outbound buffer and uses a short-deadline parent context to assert handleReasoning returns promptly (and does not publish) when PublishOutbound can’t enqueue before the deadline.

Copilot uses AI. Check for mistakes.
// Only log unexpected errors; context deadline/cancel are expected when
// the bus is full under load and would be too noisy to warn about.
if ctx.Err() == nil {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.DebugCF("agent", "Reasoning publish skipped (context done)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
Comment on lines 588 to 606
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error logging path checks ctx.Err() to decide whether the PublishOutbound error is expected, but the failure may come from pubCtx timing out (5s) while the parent ctx is still active. In that case ctx.Err() is nil and this will incorrectly warn for the expected "bus full" timeout. Consider basing the decision on err / pubCtx.Err() (e.g., treat context.Canceled/DeadlineExceeded as expected) instead of ctx.Err().

Copilot uses AI. Check for mistakes.
}
Comment on lines 588 to 607
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleReasoning treats reasoning as best-effort, but logs a warning on every publish failure/timeout. Under load (when the outbound bus is full), this can become very noisy and add its own performance/ops overhead. Consider lowering the log level, sampling/rate-limiting, or only warning on unexpected errors (e.g., excluding context deadline/cancel).

Copilot uses AI. Check for mistakes.
}

// runLLMIteration executes the LLM call loop with tool handling.
Expand Down
49 changes: 49 additions & 0 deletions pkg/agent/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,4 +797,53 @@ func TestHandleReasoning(t *testing.T) {
t.Fatalf("expected no outbound message, got %+v", msg)
}
})

t.Run("returns promptly when bus is full", func(t *testing.T) {
al, msgBus := newLoop(t)

// Fill the outbound bus buffer (default size is 64) so that the
// next PublishOutbound will block until the context deadline.
for i := 0; i < 64; i++ {
err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
if err != nil {
t.Fatalf("failed to fill bus: %v", err)
}
}
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test hard-codes the outbound bus buffer size (64) and uses context.Background() when filling it. If the bus buffer size changes in the future, the fill loop could block indefinitely and hang the test suite. Consider filling until PublishOutbound fails using a short timeout context (or introducing a helper that determines/fills capacity) so the test remains robust to buffer-size changes.

Suggested change
// Fill the outbound bus buffer (default size is 64) so that the
// next PublishOutbound will block until the context deadline.
for i := 0; i < 64; i++ {
err := msgBus.PublishOutbound(context.Background(), bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
if err != nil {
t.Fatalf("failed to fill bus: %v", err)
}
}
// Fill the outbound bus buffer so that the next PublishOutbound will
// block until the context deadline. Do not assume a specific buffer
// size; instead, publish until PublishOutbound fails with a short
// timeout context.
const maxFillAttempts = 10000
filledCount := 0
for i := 0; i < maxFillAttempts; i++ {
fillCtx, fillCancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
fillCancel()
if err != nil {
// We assume the bus is full or otherwise unable to accept more
// messages once PublishOutbound returns an error.
if filledCount == 0 {
t.Fatalf("failed to publish to outbound bus even once: %v", err)
}
break
}
filledCount++
}
if filledCount == maxFillAttempts {
t.Fatalf("failed to reach outbound bus capacity after %d attempts; test may not reflect fullness semantics", maxFillAttempts)
}

Copilot uses AI. Check for mistakes.

// Use a short-deadline parent context to bound the test.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

start := time.Now()
al.handleReasoning(ctx, "should timeout", "slack", "channel-full")
elapsed := time.Since(start)

// handleReasoning uses a 5s internal timeout, but the parent ctx
// expires in 500ms. It should return within ~500ms, not 5s.
if elapsed > 2*time.Second {
t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed)
}

// Drain the bus and verify the reasoning message was NOT published
// (it should have been dropped due to timeout).
drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer drainCancel()
foundReasoning := false
for {
msg, ok := msgBus.SubscribeOutbound(drainCtx)
if !ok {
break
}
if msg.Content == "should timeout" {
foundReasoning = true
}
}
if foundReasoning {
t.Fatal("expected reasoning message to be dropped when bus is full, but it was published")
}
})
}
103 changes: 87 additions & 16 deletions pkg/channels/whatsapp_native/whatsapp_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/mdp/qrterminal/v3"
Expand Down Expand Up @@ -56,6 +57,8 @@ type WhatsAppNativeChannel struct {
runCancel context.CancelFunc
reconnectMu sync.Mutex
reconnecting bool
stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls
wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect)
}

// NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection.
Expand Down Expand Up @@ -112,6 +115,12 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
}

client := whatsmeow.NewClient(deviceStore, waLogger)

// Create runCtx/runCancel BEFORE registering event handler and starting
// goroutines so that Stop() can cancel them at any time, including during
// the QR-login flow.
c.runCtx, c.runCancel = context.WithCancel(ctx)

Comment on lines +127 to +131
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.stopping is set to true in Stop() but never reset in Start(). If this channel instance is ever restarted after a Stop(), the disconnect handler will permanently skip reconnection attempts because eventHandler returns early when stopping is true. Consider resetting stopping (and any related reconnect state) at the beginning of Start(), under the same lock used in Stop()/eventHandler, so lifecycle restarts behave correctly.

Copilot uses AI. Check for mistakes.
client.AddEventHandler(c.eventHandler)

c.mu.Lock()
Expand All @@ -120,55 +129,106 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
c.mu.Unlock()

if client.Store.ID == nil {
qrChan, err := client.GetQRChannel(ctx)
qrChan, err := client.GetQRChannel(c.runCtx)
if err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("get QR channel: %w", err)
}
if err := client.Connect(); err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
for evt := range qrChan {
if evt.Event == "code" {
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
Level: qrterminal.L,
Writer: os.Stdout,
HalfBlocks: true,
})
} else {
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
// Handle QR events in a background goroutine so Start() returns
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
c.wg.Add(1)
go func() {
defer c.wg.Done()
Comment on lines 166 to 179
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.wg.Add(1) for the QR handler goroutine is not coordinated with Stop() calling c.wg.Wait(). If Stop() can run concurrently with Start() (the comments imply it can, e.g. during QR-login), there is a real sync.WaitGroup misuse risk: Stop may enter Wait() while Start subsequently calls Add(1), which can panic. To fix, ensure all wg.Add calls are prevented once Stop begins (e.g., guard wg.Add with the same mutex/flag protocol used in eventHandler, or introduce a lifecycle mutex that serializes Start/Stop).

Suggested change
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
c.wg.Add(1)
go func() {
defer c.wg.Done()
// promptly. The goroutine respects c.runCtx for cancellation.
go func() {

Copilot uses AI. Check for mistakes.
for {
select {
case <-c.runCtx.Done():
return
case evt, ok := <-qrChan:
if !ok {
return
}
if evt.Event == "code" {
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
Level: qrterminal.L,
Writer: os.Stdout,
HalfBlocks: true,
})
} else {
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
}
}
}
}
}()
} else {
if err := client.Connect(); err != nil {
c.runCancel()
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
}

c.runCtx, c.runCancel = context.WithCancel(ctx)
c.SetRunning(true)
logger.InfoC("whatsapp", "WhatsApp native channel connected")
Comment on lines 202 to 209
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.

Copilot uses AI. Check for mistakes.
return nil
}

func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error {
logger.InfoC("whatsapp", "Stopping WhatsApp native channel")

// Mark as stopping under reconnectMu so the flag is visible to
// eventHandler atomically with respect to its wg.Add(1) call.
// This closes the TOCTOU window where eventHandler could check
// stopping (false), then Stop sets it true + enters wg.Wait,
// then eventHandler calls wg.Add(1) β€” causing a panic.
c.reconnectMu.Lock()
c.stopping.Store(true)
c.reconnectMu.Unlock()

if c.runCancel != nil {
c.runCancel()
}

// Disconnect the client first so any blocking Connect()/reconnect loops
// can be interrupted before we wait on the goroutines.
c.mu.Lock()
client := c.client
container := c.container
c.client = nil
c.container = nil
c.mu.Unlock()

if client != nil {
client.Disconnect()
}

// Wait for background goroutines (QR handler, reconnect) to finish in a
// context-aware way so Stop can be bounded by ctx.
done := make(chan struct{})
go func() {
c.wg.Wait()
close(done)
}()

select {
case <-done:
// All goroutines have finished.
case <-ctx.Done():
// Context canceled or timed out; log and proceed with best-effort cleanup.
logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err()))
}

// Now it is safe to clear and close resources.
c.mu.Lock()
c.client = nil
c.container = nil
c.mu.Unlock()

if container != nil {
_ = container.Close()
}
Expand All @@ -187,9 +247,20 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) {
c.reconnectMu.Unlock()
return
}
// Check stopping while holding the lock so the check and wg.Add
// are atomic with respect to Stop() setting the flag + calling
// wg.Wait(). This prevents the TOCTOU race.
if c.stopping.Load() {
c.reconnectMu.Unlock()
return
}
c.reconnecting = true
c.wg.Add(1)
c.reconnectMu.Unlock()
go c.reconnectWithBackoff()
go func() {
defer c.wg.Done()
c.reconnectWithBackoff()
}()
}
}

Expand Down