-
Notifications
You must be signed in to change notification settings - Fork 147
Description
Feature Request: Long-Running Task Notification System
Overview
Add support for persistent webhook notifications for long-running tasks, enabling users to receive real-time updates about task progress, state changes, and artifact generation without polling.
Motivation
Currently, the notification system:
- Only supports in-memory webhook storage (lost on server restart)
- Requires per-task webhook configuration
- Only sends status-update events
For long-running tasks (e.g., data processing, model training, complex workflows), users need:
- Persistent webhook configurations that survive server restarts
- Global webhook defaults to avoid repetitive configuration
- Comprehensive event notifications (status, artifacts, progress)
- Explicit control over which tasks use persistent notifications
Requirements
1. Explicit Long-Running Flag
- Users can mark tasks as long-running via
long_running: boolflag inMessageSendConfiguration - When
true, webhook configuration persists to database - Default:
false(backward compatible)
2. Comprehensive Event Delivery
For long-running tasks, send all event types:
- Status updates: State transitions (submitted → working → completed, etc.)
- Artifact updates: When artifacts are generated or updated
- Heartbeats (optional): Periodic "still working" signals for tasks in
workingstate
3. Persistent Storage
- Webhook configurations stored in database (PostgreSQL/Memory backend)
- Configurations survive server restarts
- Automatic restoration on TaskManager initialization
4. Global Webhook Configuration
- Agent-level default webhook configuration
- Fallback when task-specific webhook not provided
- Priority: Task-specific > Global default
5. Existing Event Schema
- Use existing
status-updateandartifact-updateevent formats - No breaking changes to protocol types
Technical Design
1. Protocol Type Changes
File: bindu/common/protocol/types.py
Add long_running flag to MessageSendConfiguration:
@pydantic.with_config(ConfigDict(alias_generator=to_camel))
class MessageSendConfiguration(TypedDict):
"""Configuration for message sending."""
accepted_output_modes: Required[list[str]]
blocking: NotRequired[bool]
history_length: NotRequired[int]
push_notification_config: NotRequired[PushNotificationConfig]
long_running: NotRequired[bool] # NEW: Enable persistent notifications2. Agent Manifest Changes
File: bindu/common/models.py
Add global webhook configuration to AgentManifest:
@dataclass
class AgentManifest:
# ... existing fields ...
# Global webhook configuration (NEW)
global_webhook_url: str | None = None
global_webhook_token: str | None = None3. Storage Interface Extension
File: bindu/server/storage/base.py
Add webhook persistence methods to Storage abstract class:
from abc import ABC, abstractmethod
from uuid import UUID
from bindu.common.protocol.types import PushNotificationConfig
class Storage(ABC):
# ... existing methods ...
@abstractmethod
async def save_webhook_config(
self, task_id: UUID, config: PushNotificationConfig
) -> None:
"""Persist webhook configuration for a task."""
pass
@abstractmethod
async def load_webhook_config(
self, task_id: UUID
) -> PushNotificationConfig | None:
"""Load webhook configuration for a task."""
pass
@abstractmethod
async def delete_webhook_config(self, task_id: UUID) -> None:
"""Delete webhook configuration for a task."""
pass
@abstractmethod
async def load_all_webhook_configs(
self,
) -> dict[UUID, PushNotificationConfig]:
"""Load all webhook configurations (for restoration on startup)."""
pass4. Database Schema
New Migration: alembic/versions/YYYYMMDD_HHMM_add_webhook_configs.py
CREATE TABLE webhook_configs (
task_id UUID PRIMARY KEY,
config_id UUID NOT NULL,
url TEXT NOT NULL,
token TEXT,
authentication JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_webhook_configs_task_id ON webhook_configs(task_id);5. Storage Implementation
File: bindu/server/storage/postgres.py
Implement webhook persistence for PostgreSQL:
async def save_webhook_config(
self, task_id: UUID, config: PushNotificationConfig
) -> None:
"""Save webhook configuration to database."""
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO webhook_configs (task_id, config_id, url, token, authentication)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (task_id) DO UPDATE
SET config_id = $2, url = $3, token = $4, authentication = $5, updated_at = NOW()
""",
task_id,
config["id"],
config["url"],
config.get("token"),
json.dumps(config.get("authentication")) if config.get("authentication") else None,
)
async def load_webhook_config(
self, task_id: UUID
) -> PushNotificationConfig | None:
"""Load webhook configuration from database."""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT config_id, url, token, authentication FROM webhook_configs WHERE task_id = $1",
task_id,
)
if not row:
return None
config: PushNotificationConfig = {
"id": row["config_id"],
"url": row["url"],
}
if row["token"]:
config["token"] = row["token"]
if row["authentication"]:
config["authentication"] = json.loads(row["authentication"])
return config
async def delete_webhook_config(self, task_id: UUID) -> None:
"""Delete webhook configuration from database."""
async with self.pool.acquire() as conn:
await conn.execute(
"DELETE FROM webhook_configs WHERE task_id = $1",
task_id,
)
async def load_all_webhook_configs(
self,
) -> dict[UUID, PushNotificationConfig]:
"""Load all webhook configurations."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT task_id, config_id, url, token, authentication FROM webhook_configs"
)
configs = {}
for row in rows:
config: PushNotificationConfig = {
"id": row["config_id"],
"url": row["url"],
}
if row["token"]:
config["token"] = row["token"]
if row["authentication"]:
config["authentication"] = json.loads(row["authentication"])
configs[row["task_id"]] = config
return configsFile: bindu/server/storage/memory.py
Implement webhook persistence for in-memory storage:
class MemoryStorage(Storage):
def __init__(self):
# ... existing fields ...
self._webhook_configs: dict[UUID, PushNotificationConfig] = {}
async def save_webhook_config(
self, task_id: UUID, config: PushNotificationConfig
) -> None:
self._webhook_configs[task_id] = config
async def load_webhook_config(
self, task_id: UUID
) -> PushNotificationConfig | None:
return self._webhook_configs.get(task_id)
async def delete_webhook_config(self, task_id: UUID) -> None:
self._webhook_configs.pop(task_id, None)
async def load_all_webhook_configs(
self,
) -> dict[UUID, PushNotificationConfig]:
return dict(self._webhook_configs)6. Push Notification Manager Updates
File: bindu/server/notifications/push_manager.py
Enhance PushNotificationManager with persistence and global webhook support:
@dataclass
class PushNotificationManager:
manifest: Any | None = None
storage: Storage | None = None # NEW: Storage for persistence
notification_service: NotificationService = field(default_factory=NotificationService)
_push_notification_configs: dict[uuid.UUID, PushNotificationConfig] = field(
default_factory=dict, init=False
)
_notification_sequences: dict[uuid.UUID, int] = field(
default_factory=dict, init=False
)
async def initialize(self) -> None:
"""Load persisted webhook configurations from storage."""
if self.storage:
configs = await self.storage.load_all_webhook_configs()
self._push_notification_configs.update(configs)
for task_id in configs:
self._notification_sequences.setdefault(task_id, 0)
logger.info(f"Loaded {len(configs)} webhook configurations from storage")
def get_global_webhook_config(self) -> PushNotificationConfig | None:
"""Get global webhook configuration from manifest."""
if not self.manifest:
return None
url = getattr(self.manifest, "global_webhook_url", None)
if not url:
return None
config: PushNotificationConfig = {
"id": uuid.uuid4(),
"url": url,
}
token = getattr(self.manifest, "global_webhook_token", None)
if token:
config["token"] = token
return config
async def register_push_config(
self, task_id: uuid.UUID, config: PushNotificationConfig, persist: bool = False
) -> None:
"""Register a push notification configuration for a task.
Args:
task_id: Task identifier
config: Webhook configuration
persist: If True, save to database for long-running tasks
"""
config_copy = self._sanitize_push_config(config)
self.notification_service.validate_config(config_copy)
self._push_notification_configs[task_id] = config_copy
self._notification_sequences.setdefault(task_id, 0)
# Persist to storage if requested (for long-running tasks)
if persist and self.storage:
await self.storage.save_webhook_config(task_id, config_copy)
logger.debug(f"Persisted webhook config for task {task_id}")
async def remove_push_config(
self, task_id: uuid.UUID, delete_from_storage: bool = False
) -> PushNotificationConfig | None:
"""Remove push notification configuration for a task.
Args:
task_id: Task identifier
delete_from_storage: If True, also delete from database
"""
self._notification_sequences.pop(task_id, None)
config = self._push_notification_configs.pop(task_id, None)
if delete_from_storage and self.storage:
await self.storage.delete_webhook_config(task_id)
logger.debug(f"Deleted webhook config from storage for task {task_id}")
return config
def get_effective_webhook_config(
self, task_id: uuid.UUID
) -> PushNotificationConfig | None:
"""Get effective webhook config (task-specific or global fallback)."""
# Priority 1: Task-specific config
task_config = self._push_notification_configs.get(task_id)
if task_config:
return task_config
# Priority 2: Global config
return self.get_global_webhook_config()
async def notify_artifact(
self, task_id: uuid.UUID, context_id: uuid.UUID, artifact: dict[str, Any]
) -> None:
"""Send an artifact notification for a task."""
if not self.is_push_supported():
return
config = self.get_effective_webhook_config(task_id)
if not config:
return
timestamp = datetime.now(timezone.utc).isoformat()
event = {
"event_id": str(uuid.uuid4()),
"sequence": self._next_sequence(task_id),
"timestamp": timestamp,
"kind": "artifact-update",
"task_id": str(task_id),
"context_id": str(context_id),
"artifact": artifact,
}
try:
await self.notification_service.send_event(config, event)
except NotificationDeliveryError as exc:
logger.warning(
"Artifact notification delivery failed",
task_id=str(task_id),
context_id=str(context_id),
status=exc.status,
message=str(exc),
)
except Exception as exc:
logger.error(
"Unexpected error delivering artifact notification",
task_id=str(task_id),
context_id=str(context_id),
error=str(exc),
)
async def notify_lifecycle(
self, task_id: uuid.UUID, context_id: uuid.UUID, state: str, final: bool
) -> None:
"""Send a lifecycle notification for a task (updated to use effective config)."""
if not self.is_push_supported():
return
config = self.get_effective_webhook_config(task_id) # Changed from direct lookup
if not config:
return
event = self.build_lifecycle_event(task_id, context_id, state, final)
try:
await self.notification_service.send_event(config, event)
except NotificationDeliveryError as exc:
logger.warning(
"Push notification delivery failed",
task_id=str(task_id),
context_id=str(context_id),
state=state,
status=exc.status,
message=str(exc),
)
except Exception as exc:
logger.error(
"Unexpected error delivering push notification",
task_id=str(task_id),
context_id=str(context_id),
state=state,
error=str(exc),
)7. Task Manager Integration
File: bindu/server/task_manager.py
Update TaskManager to initialize push manager with storage:
@dataclass
class TaskManager:
scheduler: Scheduler
storage: Storage[Any]
manifest: Any | None = None
_aexit_stack: AsyncExitStack | None = field(default=None, init=False)
_workers: list[ManifestWorker] = field(default_factory=list, init=False)
_push_manager: PushNotificationManager = field(init=False)
# ... other fields ...
def __post_init__(self) -> None:
"""Initialize push notification manager with storage."""
self._push_manager = PushNotificationManager(
manifest=self.manifest,
storage=self.storage # NEW: Pass storage for persistence
)
async def __aenter__(self) -> TaskManager:
"""Initialize the task manager and start all components."""
self._aexit_stack = AsyncExitStack()
await self._aexit_stack.__aenter__()
await self._aexit_stack.enter_async_context(self.scheduler)
# Initialize push manager (load persisted webhooks)
await self._push_manager.initialize() # NEW
# ... rest of initialization ...8. Message Handler Updates
File: bindu/server/handlers/message_handlers.py
Update message sending to handle long_running flag:
async def send_message(self, request: MessageSendRequest) -> MessageSendResponse:
# ... existing validation ...
params = request["params"]
configuration = params["configuration"]
message = params["message"]
# Check if long-running flag is set
is_long_running = configuration.get("long_running", False)
# Register webhook if provided
push_config = configuration.get("push_notification_config")
if push_config:
await self._push_manager.register_push_config(
task_id,
push_config,
persist=is_long_running # NEW: Persist if long-running
)
# ... rest of message handling ...9. Worker Updates
File: bindu/server/workers/manifest_worker.py
Add artifact notification calls:
async def _process_task(self, task_id: UUID) -> None:
# ... existing task processing ...
# When artifacts are generated
for artifact in generated_artifacts:
# Notify about artifact
if self.lifecycle_notifier:
await self.lifecycle_notifier(
task_id=task_id,
context_id=context_id,
artifact=artifact # NEW: Send artifact events
)
# ... rest of processing ...Note: This requires updating the lifecycle_notifier signature to support artifact events, or adding a separate artifact_notifier callback.
10. Application Configuration
File: bindu/server/applications.py
Pass global webhook config from application to TaskManager:
class BinduApplication(Starlette):
def __init__(
self,
# ... existing params ...
global_webhook_url: str | None = None,
global_webhook_token: str | None = None,
):
# ... existing initialization ...
# Store global webhook config in manifest
if manifest and global_webhook_url:
manifest.global_webhook_url = global_webhook_url
manifest.global_webhook_token = global_webhook_tokenEvent Schemas
Status Update Event (Existing)
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"sequence": 1,
"timestamp": "2025-12-17T09:30:00Z",
"kind": "status-update",
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"context_id": "789e0123-e89b-12d3-a456-426614174000",
"status": {
"state": "working",
"timestamp": "2025-12-17T09:30:00Z"
},
"final": false
}Artifact Update Event (New)
{
"event_id": "550e8400-e29b-41d4-a716-446655440001",
"sequence": 2,
"timestamp": "2025-12-17T09:35:00Z",
"kind": "artifact-update",
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"context_id": "789e0123-e89b-12d3-a456-426614174000",
"artifact": {
"artifact_id": "456e7890-e89b-12d3-a456-426614174000",
"name": "results.json",
"parts": [
{
"kind": "data",
"data": {"status": "processing", "progress": 0.5}
}
]
}
}Usage Examples
Example 1: Long-Running Task with Task-Specific Webhook
from bindu import Agent, MessageSendConfiguration, PushNotificationConfig
from uuid import uuid4
# Create agent
agent = Agent(
name="Data Processor",
# ... other config ...
)
# Send long-running task with webhook
response = await agent.send_message(
message={
"message_id": uuid4(),
"context_id": uuid4(),
"task_id": uuid4(),
"kind": "message",
"role": "user",
"parts": [{"kind": "text", "text": "Process large dataset"}]
},
configuration=MessageSendConfiguration(
accepted_output_modes=["application/json"],
long_running=True, # Enable persistence
push_notification_config=PushNotificationConfig(
id=uuid4(),
url="https://myapp.com/webhooks/task-updates",
token="secret_token_123"
)
)
)Example 2: Long-Running Task with Global Webhook
from bindu import BinduApplication, AgentManifest
# Configure agent with global webhook
app = BinduApplication(
manifest=AgentManifest(
# ... manifest config ...
global_webhook_url="https://myapp.com/webhooks/global",
global_webhook_token="global_secret_token"
),
# ... other config ...
)
# Send long-running task (uses global webhook)
response = await agent.send_message(
message={...},
configuration=MessageSendConfiguration(
accepted_output_modes=["application/json"],
long_running=True # Will use global webhook
)
)Example 3: Webhook Endpoint Implementation
from fastapi import FastAPI, Request, Header
from typing import Optional
app = FastAPI()
@app.post("/webhooks/task-updates")
async def handle_task_update(
request: Request,
authorization: Optional[str] = Header(None)
):
# Verify token
if authorization != "Bearer secret_token_123":
return {"error": "Unauthorized"}, 401
event = await request.json()
# Handle different event types
if event["kind"] == "status-update":
print(f"Task {event['task_id']} is now {event['status']['state']}")
if event["final"]:
print(f"Task {event['task_id']} completed!")
elif event["kind"] == "artifact-update":
print(f"Task {event['task_id']} generated artifact: {event['artifact']['name']}")
return {"status": "received"}Migration Path
Phase 1: Database Schema
- Create migration for
webhook_configstable - Run migration on existing deployments
Phase 2: Storage Implementation
- Implement webhook methods in PostgreSQL storage
- Implement webhook methods in Memory storage
- Add tests for storage operations
Phase 3: Push Manager Enhancement
- Add storage parameter to
PushNotificationManager - Implement
initialize()method to load configs - Add global webhook fallback logic
- Add artifact notification support
Phase 4: Protocol Updates
- Add
long_runningflag toMessageSendConfiguration - Add global webhook fields to
AgentManifest - Update documentation
Phase 5: Integration
- Update
TaskManagerto pass storage to push manager - Update message handlers to handle
long_runningflag - Update workers to send artifact notifications
- Add integration tests
Backward Compatibility
- Existing behavior preserved: Tasks without
long_running=Truework exactly as before - Optional global webhook: No breaking changes if not configured
- Storage methods: New methods don't affect existing storage operations
- Event schema: Uses existing event formats, no protocol changes
Testing Strategy
Unit Tests
- Storage webhook CRUD operations
- Push manager initialization and config loading
- Global webhook fallback logic
- Event building and delivery
Integration Tests
- End-to-end long-running task with webhook
- Server restart with persisted webhooks
- Global webhook fallback behavior
- Multiple concurrent long-running tasks
Performance Tests
- Webhook delivery latency
- Database query performance for webhook loading
- Memory usage with many persisted webhooks
Security Considerations
-
Webhook URL Validation
- Validate URL scheme (http/https only)
- Prevent SSRF attacks (block internal IPs)
- Rate limiting on webhook deliveries
-
Token Storage
- Encrypt tokens in database
- Secure token transmission
- Token rotation support
-
Webhook Verification
- Optional HMAC signature for webhook payloads
- Timestamp validation to prevent replay attacks
Open Questions
-
Webhook Cleanup Policy
- Delete webhook configs when task completes?
- Keep for audit trail with TTL?
- User-configurable retention?
-
Heartbeat Configuration
- Should we add periodic heartbeat events for long-running tasks?
- If yes, what interval? (30s, 1min, configurable?)
-
Event Filtering
- Should users specify which event types they want?
- Or always send all events for long-running tasks?
-
Retry Strategy
- Current: 2 retries with exponential backoff
- Should long-running tasks have different retry policy?
- Dead letter queue for failed deliveries?
-
Artifact Event Granularity
- Send every artifact chunk during streaming?
- Only send final/complete artifacts?
- User-configurable?
Success Metrics
- Webhook delivery success rate > 99%
- Webhook delivery latency < 1s (p95)
- Zero data loss on server restart
- Backward compatibility maintained (0 breaking changes)
Documentation Requirements
- API documentation for new fields
- Migration guide for existing deployments
- Webhook endpoint implementation examples
- Security best practices guide
- Troubleshooting guide for webhook delivery issues
Timeline Estimate
- Phase 1-2 (Storage): 2-3 days
- Phase 3 (Push Manager): 2-3 days
- Phase 4 (Protocol): 1 day
- Phase 5 (Integration): 2-3 days
- Testing & Documentation: 2-3 days
Total: ~10-15 days for complete implementation
References
- A2A Protocol Specification: https://a2a-protocol.org/latest/specification/
- Current Push Notification Implementation:
bindu/server/notifications/push_manager.py - Storage Interface:
bindu/server/storage/base.py - Protocol Types:
bindu/common/protocol/types.py
Metadata
Metadata
Assignees
Labels
Type
Projects
Status