From f1a1ee3f2fecf31f9788a3ea44acf430bb28a599 Mon Sep 17 00:00:00 2001 From: shikihane Date: Sat, 28 Feb 2026 19:28:27 +0800 Subject: [PATCH] feat(feishu): add tool progress feedback and user abort support - Send "Thinking..." placeholder on message receive - Report each tool call as a separate progress message (tool name + args) - Report tool results summary (exec output, errors) - Add /stop command to cancel in-progress agent processing - Add IsProgress flag to OutboundMessage for progress vs final distinction - Add abort registry to MessageBus (RegisterAbort/ClearAbort/TriggerAbort) - Add Bus() accessor to BaseChannel for abort support in channel code --- pkg/agent/loop.go | 109 ++++++++++++++++++++++++++++++- pkg/bus/bus.go | 18 +++++ pkg/bus/types.go | 7 +- pkg/channels/base.go | 3 + pkg/channels/feishu/feishu_64.go | 44 ++++++++++++- 5 files changed, 174 insertions(+), 7 deletions(-) diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index b803187b1..01a529d21 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -252,6 +252,10 @@ func (al *AgentLoop) Run(ctx context.Context) error { continue } + // Create cancellable context for this message, register abort + msgCtx, msgCancel := context.WithCancel(ctx) + al.bus.RegisterAbort(msg.ChatID, msgCancel) + // Process message func() { // TODO: Re-enable media cleanup after inbound media is properly consumed by the agent. @@ -267,9 +271,17 @@ func (al *AgentLoop) Run(ctx context.Context) error { // } // }() - response, err := al.processMessage(ctx, msg) + response, err := al.processMessage(msgCtx, msg) + + al.bus.ClearAbort(msg.ChatID) + msgCancel() // cleanup + if err != nil { - response = fmt.Sprintf("Error processing message: %v", err) + if msgCtx.Err() != nil { + response = "Stopped." + } else { + response = fmt.Sprintf("Error processing message: %v", err) + } } if response != "" { @@ -975,6 +987,16 @@ func (al *AgentLoop) runLLMIteration( "iteration": iteration, }) + // Send progress update to user + if !constants.IsInternalChannel(opts.Channel) { + al.bus.PublishOutbound(ctx, bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: formatToolProgress(tc.Name, tc.Arguments), + IsProgress: true, + }) + } + // Create async callback for tools that implement AsyncTool // NOTE: Following openclaw's design, async tools do NOT send results directly to users. // Instead, they notify the agent via PublishInbound, and the agent decides @@ -1014,6 +1036,18 @@ func (al *AgentLoop) runLLMIteration( }) } + // Report tool result to user via progress update + if !constants.IsInternalChannel(opts.Channel) { + if resultMsg := formatToolResult(tc.Name, tc.Arguments, toolResult); resultMsg != "" { + al.bus.PublishOutbound(ctx, bus.OutboundMessage{ + Channel: opts.Channel, + ChatID: opts.ChatID, + Content: resultMsg, + IsProgress: true, + }) + } + } + // If tool returned media refs, publish them as outbound media if len(toolResult.Media) > 0 && opts.SendResponse { parts := make([]bus.MediaPart, 0, len(toolResult.Media)) @@ -1483,3 +1517,74 @@ func extractParentPeer(msg bus.InboundMessage) *routing.RoutePeer { } return &routing.RoutePeer{Kind: parentKind, ID: parentID} } + +// formatToolProgress formats a human-readable progress string for a tool call. +func formatToolProgress(toolName string, args map[string]any) string { + argStr := func(key string) string { + if v, ok := args[key]; ok { + if s, ok := v.(string); ok { + return s + } + } + return "" + } + + switch toolName { + case "write_file": + if path := argStr("path"); path != "" { + return fmt.Sprintf("Writing %s", path) + } + case "read_file": + if path := argStr("path"); path != "" { + return fmt.Sprintf("Reading %s", path) + } + case "edit_file": + if path := argStr("path"); path != "" { + return fmt.Sprintf("Editing %s", path) + } + case "exec": + if cmd := argStr("command"); cmd != "" { + return fmt.Sprintf("Executing %s", utils.Truncate(cmd, 80)) + } + case "web_search": + if query := argStr("query"); query != "" { + return fmt.Sprintf("Searching %s", utils.Truncate(query, 60)) + } + case "web_fetch": + if u := argStr("url"); u != "" { + return fmt.Sprintf("Fetching %s", utils.Truncate(u, 80)) + } + case "spawn": + if label := argStr("label"); label != "" { + return fmt.Sprintf("Subtask %s", label) + } + case "message": + return "Sending message" + case "cron": + if action := argStr("action"); action != "" { + return fmt.Sprintf("Cron %s", action) + } + } + + return toolName +} + +// formatToolResult formats a result summary for a completed tool call. +// Returns empty string if no result message is needed. +func formatToolResult(toolName string, args map[string]any, result *tools.ToolResult) string { + desc := formatToolProgress(toolName, args) + + // Error: always report + if result.Err != nil { + errPreview := utils.Truncate(result.Err.Error(), 100) + return fmt.Sprintf("[%s] Failed: %s", desc, errPreview) + } + + // exec: show output summary + if toolName == "exec" && result.ForLLM != "" { + output := utils.Truncate(result.ForLLM, 120) + return fmt.Sprintf("[%s] Done:\n%s", desc, output) + } + + return "" +} diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index f5ff9587d..8f47a2fe3 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -3,6 +3,7 @@ package bus import ( "context" "errors" + "sync" "sync/atomic" "github.com/sipeed/picoclaw/pkg/logger" @@ -19,6 +20,7 @@ type MessageBus struct { outboundMedia chan OutboundMediaMessage done chan struct{} closed atomic.Bool + aborts sync.Map // chatID -> context.CancelFunc } func NewMessageBus() *MessageBus { @@ -114,6 +116,22 @@ func (mb *MessageBus) SubscribeOutboundMedia(ctx context.Context) (OutboundMedia } } +func (mb *MessageBus) RegisterAbort(chatID string, cancel context.CancelFunc) { + mb.aborts.Store(chatID, cancel) +} + +func (mb *MessageBus) ClearAbort(chatID string) { + mb.aborts.Delete(chatID) +} + +func (mb *MessageBus) TriggerAbort(chatID string) bool { + if v, ok := mb.aborts.LoadAndDelete(chatID); ok { + v.(context.CancelFunc)() + return true + } + return false +} + func (mb *MessageBus) Close() { if mb.closed.CompareAndSwap(false, true) { close(mb.done) diff --git a/pkg/bus/types.go b/pkg/bus/types.go index 7ad8f0417..df0e31ded 100644 --- a/pkg/bus/types.go +++ b/pkg/bus/types.go @@ -30,9 +30,10 @@ type InboundMessage struct { } type OutboundMessage struct { - Channel string `json:"channel"` - ChatID string `json:"chat_id"` - Content string `json:"content"` + Channel string `json:"channel"` + ChatID string `json:"chat_id"` + Content string `json:"content"` + IsProgress bool `json:"is_progress,omitempty"` // true = intermediate progress update (edit placeholder, keep it) } // MediaPart describes a single media attachment to send. diff --git a/pkg/channels/base.go b/pkg/channels/base.go index 063a66523..5e4b98f79 100644 --- a/pkg/channels/base.go +++ b/pkg/channels/base.go @@ -311,6 +311,9 @@ func (c *BaseChannel) SetMediaStore(s media.MediaStore) { c.mediaStore = s } // GetMediaStore returns the injected MediaStore (may be nil). func (c *BaseChannel) GetMediaStore() media.MediaStore { return c.mediaStore } +// Bus returns the underlying MessageBus. +func (c *BaseChannel) Bus() *bus.MessageBus { return c.bus } + // SetPlaceholderRecorder injects a PlaceholderRecorder into the channel. func (c *BaseChannel) SetPlaceholderRecorder(r PlaceholderRecorder) { c.placeholderRecorder = r diff --git a/pkg/channels/feishu/feishu_64.go b/pkg/channels/feishu/feishu_64.go index 00f73064d..ec808857f 100644 --- a/pkg/channels/feishu/feishu_64.go +++ b/pkg/channels/feishu/feishu_64.go @@ -10,6 +10,8 @@ import ( "net/http" "os" "path/filepath" + "strings" + "time" "sync" "sync/atomic" @@ -36,8 +38,9 @@ type FeishuChannel struct { botOpenID atomic.Value // stores string; populated lazily for @mention detection - mu sync.Mutex - cancel context.CancelFunc + mu sync.Mutex + cancel context.CancelFunc + placeholders sync.Map // chatID -> messageID (string) } func NewFeishuChannel(cfg config.FeishuConfig, bus *bus.MessageBus) (*FeishuChannel, error) { @@ -120,6 +123,11 @@ func (c *FeishuChannel) Send(ctx context.Context, msg bus.OutboundMessage) error return fmt.Errorf("chat ID is empty: %w", channels.ErrSendFailed) } + // Final response: consume placeholder (stop progress updates), then send as new message + if !msg.IsProgress { + c.placeholders.Delete(msg.ChatID) + } + // Build interactive card with markdown content cardContent, err := buildMarkdownCard(msg.Content) if err != nil { @@ -366,6 +374,19 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim. content = "[empty message]" } + // Intercept /stop command: cancel in-progress processing, don't forward to agent + if strings.HasPrefix(content, "/stop") { + if c.Bus().TriggerAbort(chatID) { + // Clear placeholder so the "Stopped" response creates a new message + c.placeholders.Delete(chatID) + logger.InfoCF("feishu", "User triggered abort", map[string]any{ + "chat_id": chatID, + "sender_id": senderID, + }) + } + return nil + } + metadata := map[string]string{} if messageID != "" { metadata["message_id"] = messageID @@ -410,6 +431,25 @@ func (c *FeishuChannel) handleMessageReceive(ctx context.Context, event *larkim. "preview": utils.Truncate(content, 80), }) + // Send thinking placeholder before dispatching to agent + placeholderCtx, placeholderCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer placeholderCancel() + + placeholderPayload, _ := json.Marshal(map[string]string{"text": "Thinking..."}) + placeholderReq := larkim.NewCreateMessageReqBuilder(). + ReceiveIdType(larkim.ReceiveIdTypeChatId). + Body(larkim.NewCreateMessageReqBodyBuilder(). + ReceiveId(chatID). + MsgType(larkim.MsgTypeText). + Content(string(placeholderPayload)). + Build()). + Build() + + placeholderResp, placeholderErr := c.client.Im.V1.Message.Create(placeholderCtx, placeholderReq) + if placeholderErr == nil && placeholderResp.Success() && placeholderResp.Data.MessageId != nil { + c.placeholders.Store(chatID, *placeholderResp.Data.MessageId) + } + c.HandleMessage(ctx, peer, messageID, senderID, chatID, content, mediaRefs, metadata, senderInfo) return nil }