Skip to content

[Feature]: Notification #69

@raahulrahl

Description

@raahulrahl

Feature Request: Long-Running Task Notification System

Overview

Add support for persistent webhook notifications for long-running tasks, enabling users to receive real-time updates about task progress, state changes, and artifact generation without polling.

Motivation

Currently, the notification system:

  • Only supports in-memory webhook storage (lost on server restart)
  • Requires per-task webhook configuration
  • Only sends status-update events

For long-running tasks (e.g., data processing, model training, complex workflows), users need:

  • Persistent webhook configurations that survive server restarts
  • Global webhook defaults to avoid repetitive configuration
  • Comprehensive event notifications (status, artifacts, progress)
  • Explicit control over which tasks use persistent notifications

Requirements

1. Explicit Long-Running Flag

  • Users can mark tasks as long-running via long_running: bool flag in MessageSendConfiguration
  • When true, webhook configuration persists to database
  • Default: false (backward compatible)

2. Comprehensive Event Delivery

For long-running tasks, send all event types:

  • Status updates: State transitions (submitted → working → completed, etc.)
  • Artifact updates: When artifacts are generated or updated
  • Heartbeats (optional): Periodic "still working" signals for tasks in working state

3. Persistent Storage

  • Webhook configurations stored in database (PostgreSQL/Memory backend)
  • Configurations survive server restarts
  • Automatic restoration on TaskManager initialization

4. Global Webhook Configuration

  • Agent-level default webhook configuration
  • Fallback when task-specific webhook not provided
  • Priority: Task-specific > Global default

5. Existing Event Schema

  • Use existing status-update and artifact-update event formats
  • No breaking changes to protocol types

Technical Design

1. Protocol Type Changes

File: bindu/common/protocol/types.py

Add long_running flag to MessageSendConfiguration:

@pydantic.with_config(ConfigDict(alias_generator=to_camel))
class MessageSendConfiguration(TypedDict):
    """Configuration for message sending."""
    
    accepted_output_modes: Required[list[str]]
    blocking: NotRequired[bool]
    history_length: NotRequired[int]
    push_notification_config: NotRequired[PushNotificationConfig]
    long_running: NotRequired[bool]  # NEW: Enable persistent notifications

2. Agent Manifest Changes

File: bindu/common/models.py

Add global webhook configuration to AgentManifest:

@dataclass
class AgentManifest:
    # ... existing fields ...
    
    # Global webhook configuration (NEW)
    global_webhook_url: str | None = None
    global_webhook_token: str | None = None

3. Storage Interface Extension

File: bindu/server/storage/base.py

Add webhook persistence methods to Storage abstract class:

from abc import ABC, abstractmethod
from uuid import UUID
from bindu.common.protocol.types import PushNotificationConfig

class Storage(ABC):
    # ... existing methods ...
    
    @abstractmethod
    async def save_webhook_config(
        self, task_id: UUID, config: PushNotificationConfig
    ) -> None:
        """Persist webhook configuration for a task."""
        pass
    
    @abstractmethod
    async def load_webhook_config(
        self, task_id: UUID
    ) -> PushNotificationConfig | None:
        """Load webhook configuration for a task."""
        pass
    
    @abstractmethod
    async def delete_webhook_config(self, task_id: UUID) -> None:
        """Delete webhook configuration for a task."""
        pass
    
    @abstractmethod
    async def load_all_webhook_configs(
        self,
    ) -> dict[UUID, PushNotificationConfig]:
        """Load all webhook configurations (for restoration on startup)."""
        pass

4. Database Schema

New Migration: alembic/versions/YYYYMMDD_HHMM_add_webhook_configs.py

CREATE TABLE webhook_configs (
    task_id UUID PRIMARY KEY,
    config_id UUID NOT NULL,
    url TEXT NOT NULL,
    token TEXT,
    authentication JSONB,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_webhook_configs_task_id ON webhook_configs(task_id);

5. Storage Implementation

File: bindu/server/storage/postgres.py

Implement webhook persistence for PostgreSQL:

async def save_webhook_config(
    self, task_id: UUID, config: PushNotificationConfig
) -> None:
    """Save webhook configuration to database."""
    async with self.pool.acquire() as conn:
        await conn.execute(
            """
            INSERT INTO webhook_configs (task_id, config_id, url, token, authentication)
            VALUES ($1, $2, $3, $4, $5)
            ON CONFLICT (task_id) DO UPDATE
            SET config_id = $2, url = $3, token = $4, authentication = $5, updated_at = NOW()
            """,
            task_id,
            config["id"],
            config["url"],
            config.get("token"),
            json.dumps(config.get("authentication")) if config.get("authentication") else None,
        )

async def load_webhook_config(
    self, task_id: UUID
) -> PushNotificationConfig | None:
    """Load webhook configuration from database."""
    async with self.pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT config_id, url, token, authentication FROM webhook_configs WHERE task_id = $1",
            task_id,
        )
        if not row:
            return None
        
        config: PushNotificationConfig = {
            "id": row["config_id"],
            "url": row["url"],
        }
        if row["token"]:
            config["token"] = row["token"]
        if row["authentication"]:
            config["authentication"] = json.loads(row["authentication"])
        return config

async def delete_webhook_config(self, task_id: UUID) -> None:
    """Delete webhook configuration from database."""
    async with self.pool.acquire() as conn:
        await conn.execute(
            "DELETE FROM webhook_configs WHERE task_id = $1",
            task_id,
        )

async def load_all_webhook_configs(
    self,
) -> dict[UUID, PushNotificationConfig]:
    """Load all webhook configurations."""
    async with self.pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT task_id, config_id, url, token, authentication FROM webhook_configs"
        )
        
        configs = {}
        for row in rows:
            config: PushNotificationConfig = {
                "id": row["config_id"],
                "url": row["url"],
            }
            if row["token"]:
                config["token"] = row["token"]
            if row["authentication"]:
                config["authentication"] = json.loads(row["authentication"])
            configs[row["task_id"]] = config
        
        return configs

File: bindu/server/storage/memory.py

Implement webhook persistence for in-memory storage:

class MemoryStorage(Storage):
    def __init__(self):
        # ... existing fields ...
        self._webhook_configs: dict[UUID, PushNotificationConfig] = {}
    
    async def save_webhook_config(
        self, task_id: UUID, config: PushNotificationConfig
    ) -> None:
        self._webhook_configs[task_id] = config
    
    async def load_webhook_config(
        self, task_id: UUID
    ) -> PushNotificationConfig | None:
        return self._webhook_configs.get(task_id)
    
    async def delete_webhook_config(self, task_id: UUID) -> None:
        self._webhook_configs.pop(task_id, None)
    
    async def load_all_webhook_configs(
        self,
    ) -> dict[UUID, PushNotificationConfig]:
        return dict(self._webhook_configs)

6. Push Notification Manager Updates

File: bindu/server/notifications/push_manager.py

Enhance PushNotificationManager with persistence and global webhook support:

@dataclass
class PushNotificationManager:
    manifest: Any | None = None
    storage: Storage | None = None  # NEW: Storage for persistence
    notification_service: NotificationService = field(default_factory=NotificationService)
    _push_notification_configs: dict[uuid.UUID, PushNotificationConfig] = field(
        default_factory=dict, init=False
    )
    _notification_sequences: dict[uuid.UUID, int] = field(
        default_factory=dict, init=False
    )
    
    async def initialize(self) -> None:
        """Load persisted webhook configurations from storage."""
        if self.storage:
            configs = await self.storage.load_all_webhook_configs()
            self._push_notification_configs.update(configs)
            for task_id in configs:
                self._notification_sequences.setdefault(task_id, 0)
            logger.info(f"Loaded {len(configs)} webhook configurations from storage")
    
    def get_global_webhook_config(self) -> PushNotificationConfig | None:
        """Get global webhook configuration from manifest."""
        if not self.manifest:
            return None
        
        url = getattr(self.manifest, "global_webhook_url", None)
        if not url:
            return None
        
        config: PushNotificationConfig = {
            "id": uuid.uuid4(),
            "url": url,
        }
        
        token = getattr(self.manifest, "global_webhook_token", None)
        if token:
            config["token"] = token
        
        return config
    
    async def register_push_config(
        self, task_id: uuid.UUID, config: PushNotificationConfig, persist: bool = False
    ) -> None:
        """Register a push notification configuration for a task.
        
        Args:
            task_id: Task identifier
            config: Webhook configuration
            persist: If True, save to database for long-running tasks
        """
        config_copy = self._sanitize_push_config(config)
        self.notification_service.validate_config(config_copy)
        self._push_notification_configs[task_id] = config_copy
        self._notification_sequences.setdefault(task_id, 0)
        
        # Persist to storage if requested (for long-running tasks)
        if persist and self.storage:
            await self.storage.save_webhook_config(task_id, config_copy)
            logger.debug(f"Persisted webhook config for task {task_id}")
    
    async def remove_push_config(
        self, task_id: uuid.UUID, delete_from_storage: bool = False
    ) -> PushNotificationConfig | None:
        """Remove push notification configuration for a task.
        
        Args:
            task_id: Task identifier
            delete_from_storage: If True, also delete from database
        """
        self._notification_sequences.pop(task_id, None)
        config = self._push_notification_configs.pop(task_id, None)
        
        if delete_from_storage and self.storage:
            await self.storage.delete_webhook_config(task_id)
            logger.debug(f"Deleted webhook config from storage for task {task_id}")
        
        return config
    
    def get_effective_webhook_config(
        self, task_id: uuid.UUID
    ) -> PushNotificationConfig | None:
        """Get effective webhook config (task-specific or global fallback)."""
        # Priority 1: Task-specific config
        task_config = self._push_notification_configs.get(task_id)
        if task_config:
            return task_config
        
        # Priority 2: Global config
        return self.get_global_webhook_config()
    
    async def notify_artifact(
        self, task_id: uuid.UUID, context_id: uuid.UUID, artifact: dict[str, Any]
    ) -> None:
        """Send an artifact notification for a task."""
        if not self.is_push_supported():
            return
        
        config = self.get_effective_webhook_config(task_id)
        if not config:
            return
        
        timestamp = datetime.now(timezone.utc).isoformat()
        event = {
            "event_id": str(uuid.uuid4()),
            "sequence": self._next_sequence(task_id),
            "timestamp": timestamp,
            "kind": "artifact-update",
            "task_id": str(task_id),
            "context_id": str(context_id),
            "artifact": artifact,
        }
        
        try:
            await self.notification_service.send_event(config, event)
        except NotificationDeliveryError as exc:
            logger.warning(
                "Artifact notification delivery failed",
                task_id=str(task_id),
                context_id=str(context_id),
                status=exc.status,
                message=str(exc),
            )
        except Exception as exc:
            logger.error(
                "Unexpected error delivering artifact notification",
                task_id=str(task_id),
                context_id=str(context_id),
                error=str(exc),
            )
    
    async def notify_lifecycle(
        self, task_id: uuid.UUID, context_id: uuid.UUID, state: str, final: bool
    ) -> None:
        """Send a lifecycle notification for a task (updated to use effective config)."""
        if not self.is_push_supported():
            return
        
        config = self.get_effective_webhook_config(task_id)  # Changed from direct lookup
        if not config:
            return
        
        event = self.build_lifecycle_event(task_id, context_id, state, final)
        try:
            await self.notification_service.send_event(config, event)
        except NotificationDeliveryError as exc:
            logger.warning(
                "Push notification delivery failed",
                task_id=str(task_id),
                context_id=str(context_id),
                state=state,
                status=exc.status,
                message=str(exc),
            )
        except Exception as exc:
            logger.error(
                "Unexpected error delivering push notification",
                task_id=str(task_id),
                context_id=str(context_id),
                state=state,
                error=str(exc),
            )

7. Task Manager Integration

File: bindu/server/task_manager.py

Update TaskManager to initialize push manager with storage:

@dataclass
class TaskManager:
    scheduler: Scheduler
    storage: Storage[Any]
    manifest: Any | None = None
    
    _aexit_stack: AsyncExitStack | None = field(default=None, init=False)
    _workers: list[ManifestWorker] = field(default_factory=list, init=False)
    _push_manager: PushNotificationManager = field(init=False)
    # ... other fields ...
    
    def __post_init__(self) -> None:
        """Initialize push notification manager with storage."""
        self._push_manager = PushNotificationManager(
            manifest=self.manifest,
            storage=self.storage  # NEW: Pass storage for persistence
        )
    
    async def __aenter__(self) -> TaskManager:
        """Initialize the task manager and start all components."""
        self._aexit_stack = AsyncExitStack()
        await self._aexit_stack.__aenter__()
        await self._aexit_stack.enter_async_context(self.scheduler)
        
        # Initialize push manager (load persisted webhooks)
        await self._push_manager.initialize()  # NEW
        
        # ... rest of initialization ...

8. Message Handler Updates

File: bindu/server/handlers/message_handlers.py

Update message sending to handle long_running flag:

async def send_message(self, request: MessageSendRequest) -> MessageSendResponse:
    # ... existing validation ...
    
    params = request["params"]
    configuration = params["configuration"]
    message = params["message"]
    
    # Check if long-running flag is set
    is_long_running = configuration.get("long_running", False)
    
    # Register webhook if provided
    push_config = configuration.get("push_notification_config")
    if push_config:
        await self._push_manager.register_push_config(
            task_id,
            push_config,
            persist=is_long_running  # NEW: Persist if long-running
        )
    
    # ... rest of message handling ...

9. Worker Updates

File: bindu/server/workers/manifest_worker.py

Add artifact notification calls:

async def _process_task(self, task_id: UUID) -> None:
    # ... existing task processing ...
    
    # When artifacts are generated
    for artifact in generated_artifacts:
        # Notify about artifact
        if self.lifecycle_notifier:
            await self.lifecycle_notifier(
                task_id=task_id,
                context_id=context_id,
                artifact=artifact  # NEW: Send artifact events
            )
    
    # ... rest of processing ...

Note: This requires updating the lifecycle_notifier signature to support artifact events, or adding a separate artifact_notifier callback.

10. Application Configuration

File: bindu/server/applications.py

Pass global webhook config from application to TaskManager:

class BinduApplication(Starlette):
    def __init__(
        self,
        # ... existing params ...
        global_webhook_url: str | None = None,
        global_webhook_token: str | None = None,
    ):
        # ... existing initialization ...
        
        # Store global webhook config in manifest
        if manifest and global_webhook_url:
            manifest.global_webhook_url = global_webhook_url
            manifest.global_webhook_token = global_webhook_token

Event Schemas

Status Update Event (Existing)

{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "sequence": 1,
  "timestamp": "2025-12-17T09:30:00Z",
  "kind": "status-update",
  "task_id": "123e4567-e89b-12d3-a456-426614174000",
  "context_id": "789e0123-e89b-12d3-a456-426614174000",
  "status": {
    "state": "working",
    "timestamp": "2025-12-17T09:30:00Z"
  },
  "final": false
}

Artifact Update Event (New)

{
  "event_id": "550e8400-e29b-41d4-a716-446655440001",
  "sequence": 2,
  "timestamp": "2025-12-17T09:35:00Z",
  "kind": "artifact-update",
  "task_id": "123e4567-e89b-12d3-a456-426614174000",
  "context_id": "789e0123-e89b-12d3-a456-426614174000",
  "artifact": {
    "artifact_id": "456e7890-e89b-12d3-a456-426614174000",
    "name": "results.json",
    "parts": [
      {
        "kind": "data",
        "data": {"status": "processing", "progress": 0.5}
      }
    ]
  }
}

Usage Examples

Example 1: Long-Running Task with Task-Specific Webhook

from bindu import Agent, MessageSendConfiguration, PushNotificationConfig
from uuid import uuid4

# Create agent
agent = Agent(
    name="Data Processor",
    # ... other config ...
)

# Send long-running task with webhook
response = await agent.send_message(
    message={
        "message_id": uuid4(),
        "context_id": uuid4(),
        "task_id": uuid4(),
        "kind": "message",
        "role": "user",
        "parts": [{"kind": "text", "text": "Process large dataset"}]
    },
    configuration=MessageSendConfiguration(
        accepted_output_modes=["application/json"],
        long_running=True,  # Enable persistence
        push_notification_config=PushNotificationConfig(
            id=uuid4(),
            url="https://myapp.com/webhooks/task-updates",
            token="secret_token_123"
        )
    )
)

Example 2: Long-Running Task with Global Webhook

from bindu import BinduApplication, AgentManifest

# Configure agent with global webhook
app = BinduApplication(
    manifest=AgentManifest(
        # ... manifest config ...
        global_webhook_url="https://myapp.com/webhooks/global",
        global_webhook_token="global_secret_token"
    ),
    # ... other config ...
)

# Send long-running task (uses global webhook)
response = await agent.send_message(
    message={...},
    configuration=MessageSendConfiguration(
        accepted_output_modes=["application/json"],
        long_running=True  # Will use global webhook
    )
)

Example 3: Webhook Endpoint Implementation

from fastapi import FastAPI, Request, Header
from typing import Optional

app = FastAPI()

@app.post("/webhooks/task-updates")
async def handle_task_update(
    request: Request,
    authorization: Optional[str] = Header(None)
):
    # Verify token
    if authorization != "Bearer secret_token_123":
        return {"error": "Unauthorized"}, 401
    
    event = await request.json()
    
    # Handle different event types
    if event["kind"] == "status-update":
        print(f"Task {event['task_id']} is now {event['status']['state']}")
        
        if event["final"]:
            print(f"Task {event['task_id']} completed!")
    
    elif event["kind"] == "artifact-update":
        print(f"Task {event['task_id']} generated artifact: {event['artifact']['name']}")
    
    return {"status": "received"}

Migration Path

Phase 1: Database Schema

  1. Create migration for webhook_configs table
  2. Run migration on existing deployments

Phase 2: Storage Implementation

  1. Implement webhook methods in PostgreSQL storage
  2. Implement webhook methods in Memory storage
  3. Add tests for storage operations

Phase 3: Push Manager Enhancement

  1. Add storage parameter to PushNotificationManager
  2. Implement initialize() method to load configs
  3. Add global webhook fallback logic
  4. Add artifact notification support

Phase 4: Protocol Updates

  1. Add long_running flag to MessageSendConfiguration
  2. Add global webhook fields to AgentManifest
  3. Update documentation

Phase 5: Integration

  1. Update TaskManager to pass storage to push manager
  2. Update message handlers to handle long_running flag
  3. Update workers to send artifact notifications
  4. Add integration tests

Backward Compatibility

  • Existing behavior preserved: Tasks without long_running=True work exactly as before
  • Optional global webhook: No breaking changes if not configured
  • Storage methods: New methods don't affect existing storage operations
  • Event schema: Uses existing event formats, no protocol changes

Testing Strategy

Unit Tests

  • Storage webhook CRUD operations
  • Push manager initialization and config loading
  • Global webhook fallback logic
  • Event building and delivery

Integration Tests

  • End-to-end long-running task with webhook
  • Server restart with persisted webhooks
  • Global webhook fallback behavior
  • Multiple concurrent long-running tasks

Performance Tests

  • Webhook delivery latency
  • Database query performance for webhook loading
  • Memory usage with many persisted webhooks

Security Considerations

  1. Webhook URL Validation

    • Validate URL scheme (http/https only)
    • Prevent SSRF attacks (block internal IPs)
    • Rate limiting on webhook deliveries
  2. Token Storage

    • Encrypt tokens in database
    • Secure token transmission
    • Token rotation support
  3. Webhook Verification

    • Optional HMAC signature for webhook payloads
    • Timestamp validation to prevent replay attacks

Open Questions

  1. Webhook Cleanup Policy

    • Delete webhook configs when task completes?
    • Keep for audit trail with TTL?
    • User-configurable retention?
  2. Heartbeat Configuration

    • Should we add periodic heartbeat events for long-running tasks?
    • If yes, what interval? (30s, 1min, configurable?)
  3. Event Filtering

    • Should users specify which event types they want?
    • Or always send all events for long-running tasks?
  4. Retry Strategy

    • Current: 2 retries with exponential backoff
    • Should long-running tasks have different retry policy?
    • Dead letter queue for failed deliveries?
  5. Artifact Event Granularity

    • Send every artifact chunk during streaming?
    • Only send final/complete artifacts?
    • User-configurable?

Success Metrics

  • Webhook delivery success rate > 99%
  • Webhook delivery latency < 1s (p95)
  • Zero data loss on server restart
  • Backward compatibility maintained (0 breaking changes)

Documentation Requirements

  1. API documentation for new fields
  2. Migration guide for existing deployments
  3. Webhook endpoint implementation examples
  4. Security best practices guide
  5. Troubleshooting guide for webhook delivery issues

Timeline Estimate

  • Phase 1-2 (Storage): 2-3 days
  • Phase 3 (Push Manager): 2-3 days
  • Phase 4 (Protocol): 1 day
  • Phase 5 (Integration): 2-3 days
  • Testing & Documentation: 2-3 days

Total: ~10-15 days for complete implementation


References

  • A2A Protocol Specification: https://a2a-protocol.org/latest/specification/
  • Current Push Notification Implementation: bindu/server/notifications/push_manager.py
  • Storage Interface: bindu/server/storage/base.py
  • Protocol Types: bindu/common/protocol/types.py

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions