Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (al *AgentLoop) Run(ctx context.Context) error {
continue
}

// Create cancellable context for this message, register abort
msgCtx, msgCancel := context.WithCancel(ctx)
al.bus.RegisterAbort(msg.ChatID, msgCancel)

// Process message
func() {
// TODO: Re-enable media cleanup after inbound media is properly consumed by the agent.
Expand All @@ -187,9 +191,17 @@ func (al *AgentLoop) Run(ctx context.Context) error {
// }
// }()

response, err := al.processMessage(ctx, msg)
response, err := al.processMessage(msgCtx, msg)

al.bus.ClearAbort(msg.ChatID)
msgCancel() // cleanup

if err != nil {
response = fmt.Sprintf("Error processing message: %v", err)
if msgCtx.Err() != nil {
response = "Stopped."
} else {
response = fmt.Sprintf("Error processing message: %v", err)
}
}

if response != "" {
Expand Down Expand Up @@ -844,6 +856,16 @@ func (al *AgentLoop) runLLMIteration(
"iteration": iteration,
})

// Send progress update to user
if !constants.IsInternalChannel(opts.Channel) {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: formatToolProgress(tc.Name, tc.Arguments),
IsProgress: true,
})
}

// Create async callback for tools that implement AsyncTool
// NOTE: Following openclaw's design, async tools do NOT send results directly to users.
// Instead, they notify the agent via PublishInbound, and the agent decides
Expand Down Expand Up @@ -883,6 +905,18 @@ func (al *AgentLoop) runLLMIteration(
})
}

// Report tool result to user via progress update
if !constants.IsInternalChannel(opts.Channel) {
if resultMsg := formatToolResult(tc.Name, tc.Arguments, toolResult); resultMsg != "" {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: resultMsg,
IsProgress: true,
})
}
}

// If tool returned media refs, publish them as outbound media
if len(toolResult.Media) > 0 && opts.SendResponse {
parts := make([]bus.MediaPart, 0, len(toolResult.Media))
Expand Down Expand Up @@ -1342,3 +1376,74 @@ func extractParentPeer(msg bus.InboundMessage) *routing.RoutePeer {
}
return &routing.RoutePeer{Kind: parentKind, ID: parentID}
}

// formatToolProgress formats a human-readable progress string for a tool call.
func formatToolProgress(toolName string, args map[string]any) string {
argStr := func(key string) string {
if v, ok := args[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return ""
}

switch toolName {
case "write_file":
if path := argStr("path"); path != "" {
return fmt.Sprintf("Writing %s", path)
}
case "read_file":
if path := argStr("path"); path != "" {
return fmt.Sprintf("Reading %s", path)
}
case "edit_file":
if path := argStr("path"); path != "" {
return fmt.Sprintf("Editing %s", path)
}
case "exec":
if cmd := argStr("command"); cmd != "" {
return fmt.Sprintf("Executing %s", utils.Truncate(cmd, 80))
}
case "web_search":
if query := argStr("query"); query != "" {
return fmt.Sprintf("Searching %s", utils.Truncate(query, 60))
}
case "web_fetch":
if u := argStr("url"); u != "" {
return fmt.Sprintf("Fetching %s", utils.Truncate(u, 80))
}
case "spawn":
if label := argStr("label"); label != "" {
return fmt.Sprintf("Subtask %s", label)
}
case "message":
return "Sending message"
case "cron":
if action := argStr("action"); action != "" {
return fmt.Sprintf("Cron %s", action)
}
}

return toolName
}

// formatToolResult formats a result summary for a completed tool call.
// Returns empty string if no result message is needed.
func formatToolResult(toolName string, args map[string]any, result *tools.ToolResult) string {
desc := formatToolProgress(toolName, args)

// Error: always report
if result.Err != nil {
errPreview := utils.Truncate(result.Err.Error(), 100)
return fmt.Sprintf("[%s] Failed: %s", desc, errPreview)
}

// exec: show output summary
if toolName == "exec" && result.ForLLM != "" {
output := utils.Truncate(result.ForLLM, 120)
return fmt.Sprintf("[%s] Done:\n%s", desc, output)
}

return ""
}
18 changes: 18 additions & 0 deletions pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bus
import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/sipeed/picoclaw/pkg/logger"
Expand All @@ -19,6 +20,7 @@ type MessageBus struct {
outboundMedia chan OutboundMediaMessage
done chan struct{}
closed atomic.Bool
aborts sync.Map // chatID -> context.CancelFunc
}

func NewMessageBus() *MessageBus {
Expand Down Expand Up @@ -114,6 +116,22 @@ func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMedia
}
}

func (mb *MessageBus) RegisterAbort(chatID string, cancel context.CancelFunc) {
mb.aborts.Store(chatID, cancel)
}

func (mb *MessageBus) ClearAbort(chatID string) {
mb.aborts.Delete(chatID)
}

func (mb *MessageBus) TriggerAbort(chatID string) bool {
if v, ok := mb.aborts.LoadAndDelete(chatID); ok {
v.(context.CancelFunc)()
return true
}
return false
}

func (mb *MessageBus) Close() {
if mb.closed.CompareAndSwap(false, true) {
close(mb.done)
Expand Down
7 changes: 4 additions & 3 deletions pkg/bus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type InboundMessage struct {
}

type OutboundMessage struct {
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
Channel string `json:"channel"`
ChatID string `json:"chat_id"`
Content string `json:"content"`
IsProgress bool `json:"is_progress,omitempty"` // true = intermediate progress update (edit placeholder, keep it)
}

// MediaPart describes a single media attachment to send.
Expand Down
3 changes: 3 additions & 0 deletions pkg/channels/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ func (c *BaseChannel) SetMediaStore(s media.MediaStore) { c.mediaStore = s }
// GetMediaStore returns the injected MediaStore (may be nil).
func (c *BaseChannel) GetMediaStore() media.MediaStore { return c.mediaStore }

// Bus returns the underlying MessageBus.
func (c *BaseChannel) Bus() *bus.MessageBus { return c.bus }

// SetPlaceholderRecorder injects a PlaceholderRecorder into the channel.
func (c *BaseChannel) SetPlaceholderRecorder(r PlaceholderRecorder) {
c.placeholderRecorder = r
Expand Down
45 changes: 43 additions & 2 deletions pkg/channels/feishu/feishu_64.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -28,8 +29,9 @@ type FeishuChannel struct {
client *lark.Client
wsClient *larkws.Client

mu sync.Mutex
cancel context.CancelFunc
mu sync.Mutex
cancel context.CancelFunc
placeholders sync.Map // chatID -> messageID (string)
}

func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChannel, error) {
Expand Down Expand Up @@ -107,6 +109,13 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error
return fmt.Errorf("failed to marshal feishu content: %w", err)
}

// Final response: consume placeholder (stop progress updates), then send as new message
if !msg.IsProgress {
c.placeholders.Delete(msg.ChatID)
}

// All messages (progress and final): create a new message

req := larkim.NewCreateMessageReqBuilder().
ReceiveIdType(larkim.ReceiveIdTypeChatId).
Body(larkim.NewCreateMessageReqBodyBuilder().
Expand Down Expand Up @@ -156,6 +165,19 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim.
content = "[empty message]"
}

// Intercept /stop command: cancel in-progress processing, don't forward to agent
if strings.HasPrefix(content, "/stop") {
if c.Bus().TriggerAbort(chatID) {
// Clear placeholder so the "Stopped" response creates a new message
c.placeholders.Delete(chatID)
logger.InfoCF("feishu", "User triggered abort", map[string]any{
"chat_id": chatID,
"sender_id": senderID,
})
}
return nil
}

metadata := map[string]string{}
messageID := ""
if mid := stringValue(message.MessageId); mid != "" {
Expand Down Expand Up @@ -201,6 +223,25 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim.
return nil
}

// Send thinking placeholder before dispatching to agent
placeholderCtx, placeholderCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer placeholderCancel()

placeholderPayload, _ := json.Marshal(map[string]string{"text": "Thinking..."})
placeholderReq := larkim.NewCreateMessageReqBuilder().
ReceiveIdType(larkim.ReceiveIdTypeChatId).
Body(larkim.NewCreateMessageReqBodyBuilder().
ReceiveId(chatID).
MsgType(larkim.MsgTypeText).
Content(string(placeholderPayload)).
Build()).
Build()

placeholderResp, placeholderErr := c.client.Im.V1.Message.Create(placeholderCtx, placeholderReq)
if placeholderErr == nil && placeholderResp.Success() && placeholderResp.Data.MessageId != nil {
c.placeholders.Store(chatID, *placeholderResp.Data.MessageId)
}

c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, nil, metadata, senderInfo)
return nil
}
Expand Down