Skip to content

Integrate task scheduler into agent reminder system#408

Merged
mudler merged 15 commits intomainfrom
copilot/add-task-scheduler-support
Feb 8, 2026
Merged

Integrate task scheduler into agent reminder system#408
mudler merged 15 commits intomainfrom
copilot/add-task-scheduler-support

Conversation

Copy link
Contributor

Copilot AI commented Feb 8, 2026

The task scheduler was implemented as a standalone example. It's now integrated into the agent system, replacing the in-memory reminder implementation with persistent storage while maintaining API compatibility.

Changes

Agent Integration

  • Scheduler lifecycle managed by agent (New(), Run(), Stop())
  • Default storage: data/scheduled_tasks.json, configurable via WithSchedulerStorePath()
  • Poll interval reuses agent's periodicRuns setting

Reminder Action Updates

  • set_reminder, list_reminders, remove_reminder now use scheduler when available
  • Backward compatible: falls back to in-memory storage if scheduler unavailable
  • Added support for interval and once schedules (previously cron-only)

Executor Implementation

  • agentSchedulerExecutor implements scheduler.AgentExecutor
  • Scheduled tasks execute through agent's job queue
  • Observable integration for UI updates

Interface Bridge

  • Added TaskScheduler interface to AgentSharedState
  • schedulerWrapper adapts scheduler to interface
  • Actions access via sharedState.Scheduler

Usage

Agent-based (typical):

// Scheduler auto-initialized and managed
agent, _ := agent.New(
    agent.WithSchedulerStorePath("custom/tasks.json"),
)
agent.Run()  // Scheduler starts automatically

Action-based (user-facing):

User: "Remind me to check emails every 5 minutes"
Agent: Uses set_reminder action → creates persistent scheduler task

Removed

  • example/scheduler/ standalone implementation
Original prompt

Add Task Scheduler with Cron/Interval/Once Support

🎯 Overview

Add a comprehensive task scheduling system to LocalAGI, enabling agents to execute tasks on cron schedules, intervals, or one-time execution. This addresses the missing functionality compared to similar projects like NanoClaw, while maintaining LocalAGI's interface-based design philosophy.

📦 Requirements

Core Features

  • Task Scheduling: Support for cron expressions, interval-based (milliseconds), and one-time (ISO timestamp) tasks
  • JSON Storage: Simple file-based persistence with thread-safe operations
  • Task Management: Create, update, pause, resume, delete tasks
  • Execution Logging: Track task runs with duration, status, and results
  • Interface-Based Design: Easy to extend with different storage backends (SQLite, PostgreSQL, etc.)

Architecture

Create the following new files in core/scheduler/:

1. interfaces.go - Core Interfaces

package scheduler

import (
	"context"
	"time"
)

// TaskStore defines the interface for task persistence
type TaskStore interface {
	// Create adds a new task
	Create(task *Task) error
	
	// Get retrieves a task by ID
	Get(id string) (*Task, error)
	
	// GetAll retrieves all tasks
	GetAll() ([]*Task, error)
	
	// GetDue retrieves tasks that are due for execution
	GetDue() ([]*Task, error)
	
	// GetByAgent retrieves all tasks for a specific agent
	GetByAgent(agentName string) ([]*Task, error)
	
	// Update updates an existing task
	Update(task *Task) error
	
	// Delete removes a task
	Delete(id string) error
	
	// LogRun records a task execution
	LogRun(run *TaskRun) error
	
	// GetRuns retrieves execution history for a task
	GetRuns(taskID string, limit int) ([]*TaskRun, error)
	
	// Close releases resources
	Close() error
}

// AgentExecutor defines the interface for executing agent tasks
type AgentExecutor interface {
	Execute(ctx context.Context, agentName string, prompt string) (*JobResult, error)
}

// JobResult represents the result of an agent execution
type JobResult struct {
	Response string
	Error    error
}

2. task.go - Task Data Structures

package scheduler

import (
	"fmt"
	"strconv"
	"time"

	"github.com/google/uuid"
	"github.com/robfig/cron/v3"
)

type TaskStatus string

const (
	TaskStatusActive  TaskStatus = "active"
	TaskStatusPaused  TaskStatus = "paused"
	TaskStatusDeleted TaskStatus = "deleted"
)

type ScheduleType string

const (
	ScheduleTypeCron     ScheduleType = "cron"
	ScheduleTypeInterval ScheduleType = "interval"
	ScheduleTypeOnce     ScheduleType = "once"
)

// Task represents a scheduled task
type Task struct {
	ID            string                 `json:"id"`
	AgentName     string                 `json:"agent_name"`
	Prompt        string                 `json:"prompt"`
	ScheduleType  ScheduleType           `json:"schedule_type"`
	ScheduleValue string                 `json:"schedule_value"`
	Status        TaskStatus             `json:"status"`
	NextRun       time.Time              `json:"next_run"`
	LastRun       *time.Time             `json:"last_run,omitempty"`
	CreatedAt     time.Time              `json:"created_at"`
	UpdatedAt     time.Time              `json:"updated_at"`
	ContextMode   string                 `json:"context_mode"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
}

// TaskRun represents a single execution of a task
type TaskRun struct {
	ID         string    `json:"id"`
	TaskID     string    `json:"task_id"`
	RunAt      time.Time `json:"run_at"`
	DurationMs int64     `json:"duration_ms"`
	Status     string    `json:"status"` // "success", "error", "timeout"
	Result     string    `json:"result,omitempty"`
	Error      string    `json:"error,omitempty"`
}

// NewTask creates a new task with the given parameters
func NewTask(agentName, prompt string, scheduleType ScheduleType, scheduleValue string) (*Task, error) {
	task := &Task{
		ID:            uuid.New().String(),
		AgentName:     agentName,
		Prompt:        prompt,
		ScheduleType:  scheduleType,
		ScheduleValue: scheduleValue,
		Status:        TaskStatusActive,
		CreatedAt:     time.Now(),
		UpdatedAt:     time.Now(),
		ContextMode:   "agent",
		Metadata:      make(map[string]interface{}),
	}

	if err := task.CalculateNextRun(); err != nil {
		return nil, err
	}

	return task, nil
}

// CalculateNextRun calculates the next run time based on schedule type
func (t *Task) CalculateNextRun() error {
	now := time.Now()

	switch t.ScheduleType {
	case ScheduleTypeCron:
		parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
		schedule, err := parser.Parse(t.ScheduleValue)
		if err != nil {
			return fmt.Errorf("invalid cron expression: %w", err)
		}
		t.NextRun = schedule.Next(now)

	case ScheduleTypeInterval:
		intervalMs, err := strconv.ParseInt(t.ScheduleValue, 10, 64)
		if err != nil {
			return fmt.Errorf("invalid interval: %w", err)
		}
		if t.LastRun != nil {
			t.Nex...

</details>



<!-- START COPILOT CODING AGENT SUFFIX -->

*This pull request was created from Copilot chat.*
>

<!-- START COPILOT CODING AGENT TIPS -->
---

💬 We'd love your input! Share your thoughts on Copilot coding agent in our [2 minute survey](https://gh.io/copilot-coding-agent-survey).

Copilot AI and others added 4 commits February 8, 2026 08:19
…tests

Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Copilot AI changed the title [WIP] Add task scheduler with cron, interval, and one-time support Add task scheduler with cron/interval/once support Feb 8, 2026
Copilot AI requested a review from mudler February 8, 2026 08:26
Copilot AI and others added 4 commits February 8, 2026 09:18
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Co-authored-by: mudler <2420543+mudler@users.noreply.github.com>
Copilot AI changed the title Add task scheduler with cron/interval/once support Integrate task scheduler into agent reminder system Feb 8, 2026
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
@mudler mudler force-pushed the copilot/add-task-scheduler-support branch from 2f1636a to 14447e0 Compare February 8, 2026 12:46
@mudler mudler marked this pull request as ready for review February 8, 2026 12:47
Copilot AI review requested due to automatic review settings February 8, 2026 12:47
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Integrates a new persistent task scheduler into the agent reminder system, replacing the previous in-memory reminder execution with a scheduler-backed implementation and adding JSON-based task storage.

Changes:

  • Added core/scheduler package (task model, JSON store, scheduler loop, tests, docs).
  • Wired scheduler lifecycle into agent creation/run/stop and exposed it via AgentSharedState.
  • Updated reminder actions (set_reminder, list_reminders, remove_reminder) to operate on scheduled tasks.

Reviewed changes

Copilot reviewed 15 out of 17 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
go.mod / go.sum Updates Go module dependencies used by scheduler integration.
core/types/state.go Adds TaskScheduler to shared state and stores agent name for task attribution.
core/state/pool.go Passes per-agent scheduler store path via agent options when starting agents from a pool.
core/scheduler/interfaces.go Defines scheduler storage and execution interfaces.
core/scheduler/task.go Implements task and schedule calculation logic (cron/interval/once).
core/scheduler/json_store.go Adds JSON file persistence for tasks and run history.
core/scheduler/scheduler.go Adds polling scheduler that executes due tasks and logs runs.
core/scheduler/*_test.go Adds Ginkgo test coverage for scheduler/store behavior.
core/scheduler/README.md Documents scheduler usage and integration.
core/agent/options.go Adds WithSchedulerStorePath option.
core/agent/agent.go Initializes/starts/stops scheduler and injects it into shared state.
core/agent/scheduler_executor.go Executes scheduled tasks through agent job queue and observer.
core/agent/scheduler_wrapper.go Adapts scheduler to TaskScheduler interface.
core/action/reminder.go Reworks reminder actions to create/list/delete scheduler tasks.
.gitignore Ignores scheduler example build artifacts.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 124 to 133
// Initialize task scheduler for reminders
schedulerPath := options.schedulerStorePath
if schedulerPath == "" {
schedulerPath = "data/scheduled_tasks.json"
}

store, err := scheduler.NewJSONStore(schedulerPath)
if err != nil {
return nil, fmt.Errorf("failed to create scheduler store: %v", err)
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler store is initialized with a default path data/scheduled_tasks.json, but nothing ensures that data/ exists before calling scheduler.NewJSONStore. Unless the directory is always present in deployments, agent creation can fail on first run. Consider creating the directory before initializing the store (or have NewJSONStore handle it).

Copilot uses AI. Check for mistakes.
Comment on lines 106 to 123
for i, taskInterface := range tasksInterface {
task, ok := taskInterface.(*scheduler.Task)
if !ok {
continue
}

status := "one-time"
if reminder.IsRecurring {
if task.ScheduleType == scheduler.ScheduleTypeCron || task.ScheduleType == scheduler.ScheduleTypeInterval {
status = "recurring"
}
result.WriteString(fmt.Sprintf("%d. %s (Next run: %s, Status: %s)\n",

result.WriteString(fmt.Sprintf("%d. %s (Next run: %s, Status: %s, ID: %s)\n",
i+1,
reminder.Message,
reminder.NextRun.Format(time.RFC3339),
status))
task.Prompt,
task.NextRun.Format(time.RFC3339),
status,
task.ID))
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_reminders iterates over all tasks returned by GetAllTasks() without filtering out tasks marked TaskStatusDeleted (one-time tasks are set to deleted after running). This will cause already-fired one-time reminders to keep appearing in the list (and affect indexing for remove_reminder). Filter to active/paused tasks (or have the store/scheduler exclude deleted tasks for “list reminders”).

Copilot uses AI. Check for mistakes.
Comment on lines 12 to 17
func (w *schedulerWrapper) CreateTask(task interface{}) error {
t, ok := task.(*scheduler.Task)
if !ok {
return nil
}
return w.Scheduler.CreateTask(t)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateTask silently returns nil when the passed value isn’t a *scheduler.Task. This can hide programming errors and cause reminders to be dropped without any feedback. Return a descriptive error instead (or avoid interface{} here by making the shared interface use concrete scheduler types).

Copilot uses AI. Check for mistakes.
Comment on lines 148 to 158
// For one-time tasks, mark as deleted
if task.ScheduleType == ScheduleTypeOnce {
task.Status = TaskStatusDeleted
} else {
// Calculate next run
if err := task.CalculateNextRun(); err != nil {
xlog.Error("Failed to calculate next run", "task_id", task.ID, "error", err)
task.Status = TaskStatusPaused
}
}

Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one-time tasks you set task.Status = TaskStatusDeleted but keep the task in the store. Downstream (e.g., reminder listing) will still show these unless it explicitly filters them, and indexes can shift unexpectedly. Either delete one-time tasks from the store after successful execution, or make GetAll/reminder listing exclude deleted tasks (while preserving run history separately if needed).

Suggested change
// For one-time tasks, mark as deleted
if task.ScheduleType == ScheduleTypeOnce {
task.Status = TaskStatusDeleted
} else {
// Calculate next run
if err := task.CalculateNextRun(); err != nil {
xlog.Error("Failed to calculate next run", "task_id", task.ID, "error", err)
task.Status = TaskStatusPaused
}
}
// For one-time tasks, remove them from the store after execution
if task.ScheduleType == ScheduleTypeOnce {
if delErr := s.store.Delete(task.ID); delErr != nil {
// If deletion fails, fall back to marking the task as deleted
xlog.Error("Failed to delete one-time task, falling back to soft delete", "task_id", task.ID, "error", delErr)
task.Status = TaskStatusDeleted
if err := s.store.Update(task); err != nil {
xlog.Error("Failed to update task after soft delete fallback", "task_id", task.ID, "error", err)
}
}
return
}
// For recurring tasks, calculate the next run and update
// Calculate next run
if err := task.CalculateNextRun(); err != nil {
xlog.Error("Failed to calculate next run", "task_id", task.ID, "error", err)
task.Status = TaskStatusPaused
}

Copilot uses AI. Check for mistakes.
Comment on lines +23 to +40
// NewJSONStore creates a new JSON-based task store
func NewJSONStore(filePath string) (*JSONStore, error) {
store := &JSONStore{
filePath: filePath,
data: &storeData{
Tasks: make([]*Task, 0),
TaskRuns: make([]*TaskRun, 0),
},
}

if err := store.load(); err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to load store: %w", err)
}
// File doesn't exist, create it
if err := store.save(); err != nil {
return nil, fmt.Errorf("failed to create store file: %w", err)
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewJSONStore tries to create the store file via save(), but save() will fail if the parent directory doesn’t exist (e.g., default data/scheduled_tasks.json). This can prevent agents from starting on a fresh install. Ensure NewJSONStore creates the parent directory (e.g., os.MkdirAll(filepath.Dir(filePath), ...)) before the first save.

Copilot uses AI. Check for mistakes.
Comment on lines 17 to 59
func (e *agentSchedulerExecutor) Execute(ctx context.Context, agentName string, prompt string) (*scheduler.JobResult, error) {
// Create a job for the reminder
reminderJob := types.NewJob(
types.WithText(fmt.Sprintf("I have a reminder for you: %s", prompt)),
types.WithReasoningCallback(e.agent.options.reasoningCallback),
types.WithResultCallback(e.agent.options.resultCallback),
)

// Add metadata to indicate this is a reminder
reminderJob.Metadata = map[string]interface{}{
"message": prompt,
"is_reminder": true,
}

// Attach observable so UI can show reminder processing state
if e.agent.observer != nil {
obs := e.agent.observer.NewObservable()
obs.Name = "reminder"
obs.Icon = "bell"
e.agent.observer.Update(*obs)
reminderJob.Obs = obs
}

// Send the job to be processed
e.agent.jobQueue <- reminderJob

// Wait for the job to complete or context to be cancelled
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
result := reminderJob.Result.WaitResult()
if result.Error != nil {
return &scheduler.JobResult{
Response: "",
Error: result.Error,
}, result.Error
}
return &scheduler.JobResult{
Response: result.Response,
Error: nil,
}, nil
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context cancellation logic here is ineffective: the select uses a default branch and then blocks inside WaitResult(), so ctx.Done() won’t be observed once waiting begins. Also, the job isn’t created with types.WithContext(ctx), so canceling the scheduler task won’t propagate into the agent job. Consider attaching ctx to the job and waiting in a goroutine (or via a completion channel) so you can select between job completion and ctx.Done(), canceling the job if the context is canceled.

Copilot uses AI. Check for mistakes.
Comment on lines 294 to 305
func (a *Agent) Stop() {
a.Lock()
defer a.Unlock()
xlog.Debug("Stopping agent", "agent", a.Character.Name)

// Stop the scheduler
a.taskScheduler.Stop()
xlog.Info("Task scheduler stopped")

a.closeMCPServers()
a.context.Cancel()
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agent.Stop() holds the agent mutex while calling a.taskScheduler.Stop(). If a scheduled task is in-flight, Stop() waits for it to finish; but finishing requires the agent workers to process the queued job, and consumeJob takes the same agent mutex. This can deadlock shutdown. Release the agent lock before stopping the scheduler (or stop the scheduler asynchronously), and consider making scheduler stop non-blocking w.r.t. agent locks.

Copilot uses AI. Check for mistakes.
Comment on lines 38 to 43
// Start begins the scheduler's polling loop
func (s *Scheduler) Start() {
s.wg.Add(1)
go s.run()
xlog.Info("Task scheduler started", "poll_interval", s.pollInterval)
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start() is not idempotent: calling it multiple times will spawn multiple polling goroutines and increment the waitgroup without any guard. Since agent.Run() calls Start() unconditionally (and AgentPool.Start() can call Run() on an already-started agent), this can lead to duplicated executions and shutdown issues. Add a running flag/sync.Once, or return an error/no-op on subsequent Start() calls.

Copilot uses AI. Check for mistakes.
Comment on lines 90 to 99
case ScheduleTypeInterval:
intervalMs, err := strconv.ParseInt(t.ScheduleValue, 10, 64)
if err != nil {
return fmt.Errorf("invalid interval: %w", err)
}
if t.LastRun != nil {
t.NextRun = t.LastRun.Add(time.Duration(intervalMs) * time.Millisecond)
} else {
t.NextRun = now.Add(time.Duration(intervalMs) * time.Millisecond)
}
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interval schedules accept any parsed integer, including 0 or negative values. That can produce immediate/retroactive NextRun values and cause tight execution loops. Validate that the interval is a positive duration and return an error otherwise.

Copilot uses AI. Check for mistakes.
Comment on lines 110 to 125
```go
task, err := sched.GetTask(taskID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Task status: %s\n", task.Status)
fmt.Printf("Next run: %s\n", task.NextRun)
```go
task, err := scheduler.NewTask(
"agent-name",
"Check for new emails",
scheduler.ScheduleTypeCron,
"0 0 0 * * *", // Daily at midnight (6 fields: second minute hour day month day-of-week)
)
sched.CreateTask(task)
```
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Markdown code fences are broken here: the code block that starts at line 110 is never closed before starting a new go block at line 117, which will render incorrectly. Close the first fence with and then start a new fenced block.

Copilot uses AI. Check for mistakes.
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
@mudler mudler force-pushed the copilot/add-task-scheduler-support branch from ea8c237 to 942a26b Compare February 8, 2026 21:10
@mudler mudler merged commit 57023d6 into main Feb 8, 2026
0 of 2 checks passed
@mudler mudler deleted the copilot/add-task-scheduler-support branch February 8, 2026 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants