Integrate task scheduler into agent reminder system#408
Conversation
…tests Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2f1636a to
14447e0
Compare
There was a problem hiding this comment.
Pull request overview
Integrates a new persistent task scheduler into the agent reminder system, replacing the previous in-memory reminder execution with a scheduler-backed implementation and adding JSON-based task storage.
Changes:
- Added
core/schedulerpackage (task model, JSON store, scheduler loop, tests, docs). - Wired scheduler lifecycle into agent creation/run/stop and exposed it via
AgentSharedState. - Updated reminder actions (
set_reminder,list_reminders,remove_reminder) to operate on scheduled tasks.
Reviewed changes
Copilot reviewed 15 out of 17 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| go.mod / go.sum | Updates Go module dependencies used by scheduler integration. |
| core/types/state.go | Adds TaskScheduler to shared state and stores agent name for task attribution. |
| core/state/pool.go | Passes per-agent scheduler store path via agent options when starting agents from a pool. |
| core/scheduler/interfaces.go | Defines scheduler storage and execution interfaces. |
| core/scheduler/task.go | Implements task and schedule calculation logic (cron/interval/once). |
| core/scheduler/json_store.go | Adds JSON file persistence for tasks and run history. |
| core/scheduler/scheduler.go | Adds polling scheduler that executes due tasks and logs runs. |
| core/scheduler/*_test.go | Adds Ginkgo test coverage for scheduler/store behavior. |
| core/scheduler/README.md | Documents scheduler usage and integration. |
| core/agent/options.go | Adds WithSchedulerStorePath option. |
| core/agent/agent.go | Initializes/starts/stops scheduler and injects it into shared state. |
| core/agent/scheduler_executor.go | Executes scheduled tasks through agent job queue and observer. |
| core/agent/scheduler_wrapper.go | Adapts scheduler to TaskScheduler interface. |
| core/action/reminder.go | Reworks reminder actions to create/list/delete scheduler tasks. |
| .gitignore | Ignores scheduler example build artifacts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Initialize task scheduler for reminders | ||
| schedulerPath := options.schedulerStorePath | ||
| if schedulerPath == "" { | ||
| schedulerPath = "data/scheduled_tasks.json" | ||
| } | ||
|
|
||
| store, err := scheduler.NewJSONStore(schedulerPath) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create scheduler store: %v", err) | ||
| } |
There was a problem hiding this comment.
The scheduler store is initialized with a default path data/scheduled_tasks.json, but nothing ensures that data/ exists before calling scheduler.NewJSONStore. Unless the directory is always present in deployments, agent creation can fail on first run. Consider creating the directory before initializing the store (or have NewJSONStore handle it).
core/action/reminder.go
Outdated
| for i, taskInterface := range tasksInterface { | ||
| task, ok := taskInterface.(*scheduler.Task) | ||
| if !ok { | ||
| continue | ||
| } | ||
|
|
||
| status := "one-time" | ||
| if reminder.IsRecurring { | ||
| if task.ScheduleType == scheduler.ScheduleTypeCron || task.ScheduleType == scheduler.ScheduleTypeInterval { | ||
| status = "recurring" | ||
| } | ||
| result.WriteString(fmt.Sprintf("%d. %s (Next run: %s, Status: %s)\n", | ||
|
|
||
| result.WriteString(fmt.Sprintf("%d. %s (Next run: %s, Status: %s, ID: %s)\n", | ||
| i+1, | ||
| reminder.Message, | ||
| reminder.NextRun.Format(time.RFC3339), | ||
| status)) | ||
| task.Prompt, | ||
| task.NextRun.Format(time.RFC3339), | ||
| status, | ||
| task.ID)) | ||
| } |
There was a problem hiding this comment.
list_reminders iterates over all tasks returned by GetAllTasks() without filtering out tasks marked TaskStatusDeleted (one-time tasks are set to deleted after running). This will cause already-fired one-time reminders to keep appearing in the list (and affect indexing for remove_reminder). Filter to active/paused tasks (or have the store/scheduler exclude deleted tasks for “list reminders”).
core/agent/scheduler_wrapper.go
Outdated
| func (w *schedulerWrapper) CreateTask(task interface{}) error { | ||
| t, ok := task.(*scheduler.Task) | ||
| if !ok { | ||
| return nil | ||
| } | ||
| return w.Scheduler.CreateTask(t) |
There was a problem hiding this comment.
CreateTask silently returns nil when the passed value isn’t a *scheduler.Task. This can hide programming errors and cause reminders to be dropped without any feedback. Return a descriptive error instead (or avoid interface{} here by making the shared interface use concrete scheduler types).
| // For one-time tasks, mark as deleted | ||
| if task.ScheduleType == ScheduleTypeOnce { | ||
| task.Status = TaskStatusDeleted | ||
| } else { | ||
| // Calculate next run | ||
| if err := task.CalculateNextRun(); err != nil { | ||
| xlog.Error("Failed to calculate next run", "task_id", task.ID, "error", err) | ||
| task.Status = TaskStatusPaused | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
For one-time tasks you set task.Status = TaskStatusDeleted but keep the task in the store. Downstream (e.g., reminder listing) will still show these unless it explicitly filters them, and indexes can shift unexpectedly. Either delete one-time tasks from the store after successful execution, or make GetAll/reminder listing exclude deleted tasks (while preserving run history separately if needed).
| // For one-time tasks, mark as deleted | |
| if task.ScheduleType == ScheduleTypeOnce { | |
| task.Status = TaskStatusDeleted | |
| } else { | |
| // Calculate next run | |
| if err := task.CalculateNextRun(); err != nil { | |
| xlog.Error("Failed to calculate next run", "task_id", task.ID, "error", err) | |
| task.Status = TaskStatusPaused | |
| } | |
| } | |
| // For one-time tasks, remove them from the store after execution | |
| if task.ScheduleType == ScheduleTypeOnce { | |
| if delErr := s.store.Delete(task.ID); delErr != nil { | |
| // If deletion fails, fall back to marking the task as deleted | |
| xlog.Error("Failed to delete one-time task, falling back to soft delete", "task_id", task.ID, "error", delErr) | |
| task.Status = TaskStatusDeleted | |
| if err := s.store.Update(task); err != nil { | |
| xlog.Error("Failed to update task after soft delete fallback", "task_id", task.ID, "error", err) | |
| } | |
| } | |
| return | |
| } | |
| // For recurring tasks, calculate the next run and update | |
| // Calculate next run | |
| if err := task.CalculateNextRun(); err != nil { | |
| xlog.Error("Failed to calculate next run", "task_id", task.ID, "error", err) | |
| task.Status = TaskStatusPaused | |
| } |
| // NewJSONStore creates a new JSON-based task store | ||
| func NewJSONStore(filePath string) (*JSONStore, error) { | ||
| store := &JSONStore{ | ||
| filePath: filePath, | ||
| data: &storeData{ | ||
| Tasks: make([]*Task, 0), | ||
| TaskRuns: make([]*TaskRun, 0), | ||
| }, | ||
| } | ||
|
|
||
| if err := store.load(); err != nil { | ||
| if !os.IsNotExist(err) { | ||
| return nil, fmt.Errorf("failed to load store: %w", err) | ||
| } | ||
| // File doesn't exist, create it | ||
| if err := store.save(); err != nil { | ||
| return nil, fmt.Errorf("failed to create store file: %w", err) | ||
| } |
There was a problem hiding this comment.
NewJSONStore tries to create the store file via save(), but save() will fail if the parent directory doesn’t exist (e.g., default data/scheduled_tasks.json). This can prevent agents from starting on a fresh install. Ensure NewJSONStore creates the parent directory (e.g., os.MkdirAll(filepath.Dir(filePath), ...)) before the first save.
| func (e *agentSchedulerExecutor) Execute(ctx context.Context, agentName string, prompt string) (*scheduler.JobResult, error) { | ||
| // Create a job for the reminder | ||
| reminderJob := types.NewJob( | ||
| types.WithText(fmt.Sprintf("I have a reminder for you: %s", prompt)), | ||
| types.WithReasoningCallback(e.agent.options.reasoningCallback), | ||
| types.WithResultCallback(e.agent.options.resultCallback), | ||
| ) | ||
|
|
||
| // Add metadata to indicate this is a reminder | ||
| reminderJob.Metadata = map[string]interface{}{ | ||
| "message": prompt, | ||
| "is_reminder": true, | ||
| } | ||
|
|
||
| // Attach observable so UI can show reminder processing state | ||
| if e.agent.observer != nil { | ||
| obs := e.agent.observer.NewObservable() | ||
| obs.Name = "reminder" | ||
| obs.Icon = "bell" | ||
| e.agent.observer.Update(*obs) | ||
| reminderJob.Obs = obs | ||
| } | ||
|
|
||
| // Send the job to be processed | ||
| e.agent.jobQueue <- reminderJob | ||
|
|
||
| // Wait for the job to complete or context to be cancelled | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| default: | ||
| result := reminderJob.Result.WaitResult() | ||
| if result.Error != nil { | ||
| return &scheduler.JobResult{ | ||
| Response: "", | ||
| Error: result.Error, | ||
| }, result.Error | ||
| } | ||
| return &scheduler.JobResult{ | ||
| Response: result.Response, | ||
| Error: nil, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
The context cancellation logic here is ineffective: the select uses a default branch and then blocks inside WaitResult(), so ctx.Done() won’t be observed once waiting begins. Also, the job isn’t created with types.WithContext(ctx), so canceling the scheduler task won’t propagate into the agent job. Consider attaching ctx to the job and waiting in a goroutine (or via a completion channel) so you can select between job completion and ctx.Done(), canceling the job if the context is canceled.
| func (a *Agent) Stop() { | ||
| a.Lock() | ||
| defer a.Unlock() | ||
| xlog.Debug("Stopping agent", "agent", a.Character.Name) | ||
|
|
||
| // Stop the scheduler | ||
| a.taskScheduler.Stop() | ||
| xlog.Info("Task scheduler stopped") | ||
|
|
||
| a.closeMCPServers() | ||
| a.context.Cancel() | ||
| } |
There was a problem hiding this comment.
Agent.Stop() holds the agent mutex while calling a.taskScheduler.Stop(). If a scheduled task is in-flight, Stop() waits for it to finish; but finishing requires the agent workers to process the queued job, and consumeJob takes the same agent mutex. This can deadlock shutdown. Release the agent lock before stopping the scheduler (or stop the scheduler asynchronously), and consider making scheduler stop non-blocking w.r.t. agent locks.
| // Start begins the scheduler's polling loop | ||
| func (s *Scheduler) Start() { | ||
| s.wg.Add(1) | ||
| go s.run() | ||
| xlog.Info("Task scheduler started", "poll_interval", s.pollInterval) | ||
| } |
There was a problem hiding this comment.
Start() is not idempotent: calling it multiple times will spawn multiple polling goroutines and increment the waitgroup without any guard. Since agent.Run() calls Start() unconditionally (and AgentPool.Start() can call Run() on an already-started agent), this can lead to duplicated executions and shutdown issues. Add a running flag/sync.Once, or return an error/no-op on subsequent Start() calls.
| case ScheduleTypeInterval: | ||
| intervalMs, err := strconv.ParseInt(t.ScheduleValue, 10, 64) | ||
| if err != nil { | ||
| return fmt.Errorf("invalid interval: %w", err) | ||
| } | ||
| if t.LastRun != nil { | ||
| t.NextRun = t.LastRun.Add(time.Duration(intervalMs) * time.Millisecond) | ||
| } else { | ||
| t.NextRun = now.Add(time.Duration(intervalMs) * time.Millisecond) | ||
| } |
There was a problem hiding this comment.
Interval schedules accept any parsed integer, including 0 or negative values. That can produce immediate/retroactive NextRun values and cause tight execution loops. Validate that the interval is a positive duration and return an error otherwise.
core/scheduler/README.md
Outdated
| ```go | ||
| task, err := sched.GetTask(taskID) | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
| fmt.Printf("Task status: %s\n", task.Status) | ||
| fmt.Printf("Next run: %s\n", task.NextRun) | ||
| ```go | ||
| task, err := scheduler.NewTask( | ||
| "agent-name", | ||
| "Check for new emails", | ||
| scheduler.ScheduleTypeCron, | ||
| "0 0 0 * * *", // Daily at midnight (6 fields: second minute hour day month day-of-week) | ||
| ) | ||
| sched.CreateTask(task) | ||
| ``` |
There was a problem hiding this comment.
The Markdown code fences are broken here: the code block that starts at line 110 is never closed before starting a new go block at line 117, which will render incorrectly. Close the first fence with and then start a new fenced block.
ea8c237 to
942a26b
Compare
The task scheduler was implemented as a standalone example. It's now integrated into the agent system, replacing the in-memory reminder implementation with persistent storage while maintaining API compatibility.
Changes
Agent Integration
New(),Run(),Stop())data/scheduled_tasks.json, configurable viaWithSchedulerStorePath()periodicRunssettingReminder Action Updates
set_reminder,list_reminders,remove_remindernow use scheduler when availableExecutor Implementation
agentSchedulerExecutorimplementsscheduler.AgentExecutorInterface Bridge
TaskSchedulerinterface toAgentSharedStateschedulerWrapperadapts scheduler to interfacesharedState.SchedulerUsage
Agent-based (typical):
Action-based (user-facing):
Removed
example/scheduler/standalone implementationOriginal prompt
Add Task Scheduler with Cron/Interval/Once Support
🎯 Overview
Add a comprehensive task scheduling system to LocalAGI, enabling agents to execute tasks on cron schedules, intervals, or one-time execution. This addresses the missing functionality compared to similar projects like NanoClaw, while maintaining LocalAGI's interface-based design philosophy.
📦 Requirements
Core Features
Architecture
Create the following new files in
core/scheduler/:1.
interfaces.go- Core Interfaces2.
task.go- Task Data Structures