Skip to content
30 changes: 28 additions & 2 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package agent
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
Expand Down Expand Up @@ -574,11 +575,36 @@ func (al *AgentLoop) handleReasoning(ctx context.Context, reasoningContent, chan
return
}

al.bus.PublishOutbound(ctx, bus.OutboundMessage{
// Use a short timeout so the goroutine does not block indefinitely when
// the outbound bus is full. Reasoning output is best-effort; dropping it
// is acceptable to avoid goroutine accumulation.
pubCtx, pubCancel := context.WithTimeout(ctx, 5*time.Second)
defer pubCancel()

if err := al.bus.PublishOutbound(pubCtx, bus.OutboundMessage{
Channel: channelName,
ChatID: channelID,
Content: reasoningContent,
})
}); err != nil {
Comment on lines +578 to +588
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tests for handleReasoning, but none cover the new β€œbus is full / publish times out” path. Add a test that fills the outbound buffer and uses a short-deadline parent context to assert handleReasoning returns promptly (and does not publish) when PublishOutbound can’t enqueue before the deadline.

Copilot uses AI. Check for mistakes.
// Treat context.DeadlineExceeded / context.Canceled as expected
// (bus full under load, or parent canceled). Check the error
// itself rather than ctx.Err(), because pubCtx may time out
// (5 s) while the parent ctx is still active.
// Also treat ErrBusClosed as expected β€” it occurs during normal
// shutdown when the bus is closed before all goroutines finish.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) ||
errors.Is(err, bus.ErrBusClosed) {
logger.DebugCF("agent", "Reasoning publish skipped (timeout/cancel)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
} else {
logger.WarnCF("agent", "Failed to publish reasoning (best-effort)", map[string]any{
"channel": channelName,
"error": err.Error(),
})
}
Comment on lines 588 to 606
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error logging path checks ctx.Err() to decide whether the PublishOutbound error is expected, but the failure may come from pubCtx timing out (5s) while the parent ctx is still active. In that case ctx.Err() is nil and this will incorrectly warn for the expected "bus full" timeout. Consider basing the decision on err / pubCtx.Err() (e.g., treat context.Canceled/DeadlineExceeded as expected) instead of ctx.Err().

Copilot uses AI. Check for mistakes.
}
Comment on lines 588 to 607
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleReasoning treats reasoning as best-effort, but logs a warning on every publish failure/timeout. Under load (when the outbound bus is full), this can become very noisy and add its own performance/ops overhead. Consider lowering the log level, sampling/rate-limiting, or only warning on unexpected errors (e.g., excluding context deadline/cancel).

Copilot uses AI. Check for mistakes.
}

// runLLMIteration executes the LLM call loop with tool handling.
Expand Down
53 changes: 53 additions & 0 deletions pkg/agent/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,4 +797,57 @@ func TestHandleReasoning(t *testing.T) {
t.Fatalf("expected no outbound message, got %+v", msg)
}
})

t.Run("returns promptly when bus is full", func(t *testing.T) {
al, msgBus := newLoop(t)

// Fill the outbound bus buffer until a publish would block.
// Use a short timeout to detect when the buffer is full,
// rather than hardcoding the buffer size.
for i := 0; ; i++ {
fillCtx, fillCancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
err := msgBus.PublishOutbound(fillCtx, bus.OutboundMessage{
Channel: "filler",
ChatID: "filler",
Content: fmt.Sprintf("filler-%d", i),
})
fillCancel()
if err != nil {
// Buffer is full (timed out trying to send).
break
}
}

// Use a short-deadline parent context to bound the test.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

start := time.Now()
al.handleReasoning(ctx, "should timeout", "slack", "channel-full")
elapsed := time.Since(start)

// handleReasoning uses a 5s internal timeout, but the parent ctx
// expires in 500ms. It should return within ~500ms, not 5s.
if elapsed > 2*time.Second {
t.Fatalf("handleReasoning blocked too long (%v); expected prompt return", elapsed)
}

// Drain the bus and verify the reasoning message was NOT published
// (it should have been dropped due to timeout).
drainCtx, drainCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer drainCancel()
foundReasoning := false
for {
msg, ok := msgBus.SubscribeOutbound(drainCtx)
if !ok {
break
}
if msg.Content == "should timeout" {
foundReasoning = true
}
}
if foundReasoning {
t.Fatal("expected reasoning message to be dropped when bus is full, but it was published")
}
})
}
145 changes: 126 additions & 19 deletions pkg/channels/whatsapp_native/whatsapp_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/mdp/qrterminal/v3"
Expand Down Expand Up @@ -56,6 +57,8 @@ type WhatsAppNativeChannel struct {
runCancel context.CancelFunc
reconnectMu sync.Mutex
reconnecting bool
stopping atomic.Bool // set once Stop begins; prevents new wg.Add calls
wg sync.WaitGroup // tracks background goroutines (QR handler, reconnect)
}

// NewWhatsAppNativeChannel creates a WhatsApp channel that uses whatsmeow for connection.
Expand All @@ -80,6 +83,14 @@ func NewWhatsAppNativeChannel(
func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
logger.InfoCF("whatsapp", "Starting WhatsApp native channel (whatsmeow)", map[string]any{"store": c.storePath})

// Reset lifecycle state from any previous Stop() so a restarted channel
// behaves correctly. Use reconnectMu to be consistent with eventHandler
// and Stop() which coordinate under the same lock.
c.reconnectMu.Lock()
c.stopping.Store(false)
c.reconnecting = false
c.reconnectMu.Unlock()

if err := os.MkdirAll(c.storePath, 0o700); err != nil {
return fmt.Errorf("create session store dir: %w", err)
}
Expand Down Expand Up @@ -112,63 +123,142 @@ func (c *WhatsAppNativeChannel) Start(ctx context.Context) error {
}

client := whatsmeow.NewClient(deviceStore, waLogger)

// Create runCtx/runCancel BEFORE registering event handler and starting
// goroutines so that Stop() can cancel them at any time, including during
// the QR-login flow.
c.runCtx, c.runCancel = context.WithCancel(ctx)

Comment on lines +127 to +131
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.stopping is set to true in Stop() but never reset in Start(). If this channel instance is ever restarted after a Stop(), the disconnect handler will permanently skip reconnection attempts because eventHandler returns early when stopping is true. Consider resetting stopping (and any related reconnect state) at the beginning of Start(), under the same lock used in Stop()/eventHandler, so lifecycle restarts behave correctly.

Copilot uses AI. Check for mistakes.
client.AddEventHandler(c.eventHandler)

c.mu.Lock()
c.container = container
c.client = client
c.mu.Unlock()

// cleanupOnError clears struct references and releases resources when
// Start() fails after fields are already assigned. This prevents
// Stop() from operating on stale references (double-close, disconnect
// of a partially-initialized client, or stray event handler callbacks).
startOK := false
defer func() {
if startOK {
return
}
c.runCancel()
client.Disconnect()
c.mu.Lock()
c.client = nil
c.container = nil
c.mu.Unlock()
_ = container.Close()
}()

if client.Store.ID == nil {
qrChan, err := client.GetQRChannel(ctx)
qrChan, err := client.GetQRChannel(c.runCtx)
if err != nil {
_ = container.Close()
return fmt.Errorf("get QR channel: %w", err)
}
if err := client.Connect(); err != nil {
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
for evt := range qrChan {
if evt.Event == "code" {
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
Level: qrterminal.L,
Writer: os.Stdout,
HalfBlocks: true,
})
} else {
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
}
// Handle QR events in a background goroutine so Start() returns
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
// Guard wg.Add with reconnectMu + stopping check (same protocol
// as eventHandler) so a concurrent Stop() cannot enter wg.Wait()
// while we call wg.Add(1).
c.reconnectMu.Lock()
if c.stopping.Load() {
c.reconnectMu.Unlock()
return fmt.Errorf("channel stopped during QR setup")
}
c.wg.Add(1)
c.reconnectMu.Unlock()
go func() {
defer c.wg.Done()
Comment on lines 166 to 179
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.wg.Add(1) for the QR handler goroutine is not coordinated with Stop() calling c.wg.Wait(). If Stop() can run concurrently with Start() (the comments imply it can, e.g. during QR-login), there is a real sync.WaitGroup misuse risk: Stop may enter Wait() while Start subsequently calls Add(1), which can panic. To fix, ensure all wg.Add calls are prevented once Stop begins (e.g., guard wg.Add with the same mutex/flag protocol used in eventHandler, or introduce a lifecycle mutex that serializes Start/Stop).

Suggested change
// promptly. The goroutine is tracked via c.wg and respects
// c.runCtx for cancellation.
c.wg.Add(1)
go func() {
defer c.wg.Done()
// promptly. The goroutine respects c.runCtx for cancellation.
go func() {

Copilot uses AI. Check for mistakes.
for {
select {
case <-c.runCtx.Done():
return
case evt, ok := <-qrChan:
if !ok {
return
}
if evt.Event == "code" {
logger.InfoCF("whatsapp", "Scan this QR code with WhatsApp (Linked Devices):", nil)
qrterminal.GenerateWithConfig(evt.Code, qrterminal.Config{
Level: qrterminal.L,
Writer: os.Stdout,
HalfBlocks: true,
})
} else {
logger.InfoCF("whatsapp", "WhatsApp login event", map[string]any{"event": evt.Event})
}
}
}
}()
} else {
if err := client.Connect(); err != nil {
_ = container.Close()
return fmt.Errorf("connect: %w", err)
}
}

c.runCtx, c.runCancel = context.WithCancel(ctx)
startOK = true
c.SetRunning(true)
logger.InfoC("whatsapp", "WhatsApp native channel connected")
Comment on lines 202 to 209
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the QR-login path (Store.ID == nil), Start() now returns immediately but still calls SetRunning(true). That means the channel manager will start outbound workers while the client may not be paired yet, and Send() currently only checks IsConnected() (not Store.ID / login state). Consider delaying SetRunning(true) until pairing completes, or have Send() explicitly detect the unpaired state (e.g., Store.ID == nil) and return a clear temporary/not-running error to avoid futile retries/log noise during QR setup.

Copilot uses AI. Check for mistakes.
return nil
}

func (c *WhatsAppNativeChannel) Stop(ctx context.Context) error {
logger.InfoC("whatsapp", "Stopping WhatsApp native channel")

// Mark as stopping under reconnectMu so the flag is visible to
// eventHandler atomically with respect to its wg.Add(1) call.
// This closes the TOCTOU window where eventHandler could check
// stopping (false), then Stop sets it true + enters wg.Wait,
// then eventHandler calls wg.Add(1) β€” causing a panic.
c.reconnectMu.Lock()
c.stopping.Store(true)
c.reconnectMu.Unlock()

if c.runCancel != nil {
c.runCancel()
}

// Disconnect the client first so any blocking Connect()/reconnect loops
// can be interrupted before we wait on the goroutines.
c.mu.Lock()
client := c.client
container := c.container
c.client = nil
c.container = nil
c.mu.Unlock()

if client != nil {
client.Disconnect()
}

// Wait for background goroutines (QR handler, reconnect) to finish in a
// context-aware way so Stop can be bounded by ctx.
done := make(chan struct{})
go func() {
c.wg.Wait()
close(done)
}()

select {
case <-done:
// All goroutines have finished.
case <-ctx.Done():
// Context canceled or timed out; log and proceed with best-effort cleanup.
logger.WarnC("whatsapp", fmt.Sprintf("Stop context canceled before all goroutines finished: %v", ctx.Err()))
}

// Now it is safe to clear and close resources.
c.mu.Lock()
c.client = nil
c.container = nil
c.mu.Unlock()

if container != nil {
_ = container.Close()
}
Expand All @@ -187,9 +277,20 @@ func (c *WhatsAppNativeChannel) eventHandler(evt any) {
c.reconnectMu.Unlock()
return
}
// Check stopping while holding the lock so the check and wg.Add
// are atomic with respect to Stop() setting the flag + calling
// wg.Wait(). This prevents the TOCTOU race.
if c.stopping.Load() {
c.reconnectMu.Unlock()
return
}
c.reconnecting = true
c.wg.Add(1)
c.reconnectMu.Unlock()
go c.reconnectWithBackoff()
go func() {
defer c.wg.Done()
c.reconnectWithBackoff()
}()
}
}

Expand Down Expand Up @@ -313,6 +414,12 @@ func (c *WhatsAppNativeChannel) Send(ctx context.Context, msg bus.OutboundMessag
return fmt.Errorf("whatsapp connection not established: %w", channels.ErrTemporary)
}

// Detect unpaired state: the client is connected (to WhatsApp servers)
// but has not completed QR-login yet, so sending would fail.
if client.Store.ID == nil {
return fmt.Errorf("whatsapp not yet paired (QR login pending): %w", channels.ErrTemporary)
}

to, err := parseJID(msg.ChatID)
if err != nil {
return fmt.Errorf("invalid chat id %q: %w", msg.ChatID, err)
Expand Down