Skip to content

Improving A2A Agent Integration for tau2-bench #111

@wuTims

Description

@wuTims

Problem/Goal

Problem

The tau2-bench framework needed a method to evaluate conversational agents via the A2A (Agent-to-Agent) protocol. The existing experiments/agentify_tau_bench implementation had several limitations:

  • Parsing rigidity: Strict XML tag requirements (<json>...</json>) with no fallback mechanisms
  • Limited integration: Interfaces directly with the gym environment, bypassing tau2's orchestrator, registry, and CLI infrastructure
  • No observability: Only collects total time and success boolean - no per-request latency, token counts, or HTTP status codes

Goal

An A2A integration that:

  • Enables tau2-bench to evaluate remote A2A-compliant agents using the existing orchestrator infrastructure
  • Exposes tau2-bench evaluation capabilities as an A2A service via Google's ADK framework
  • Provides Docker deployment support

Proposed Solution

Initial implementation: https://github.com/wuTims/tau2-bench-agent

This proposed enhancement implements a bidirectional A2A integration with two main components:

1. A2AAgent - Remote Agent Evaluation

File: src/tau2/agent/a2a_agent.py

A new agent type that integrates with tau2's existing orchestrator to evaluate remote A2A agents:

# Registration alongside existing agent types
registry.register_agent(LLMAgent, "llm_agent")
registry.register_agent(LLMGTAgent, "llm_agent_gt")
registry.register_agent(LLMSoloAgent, "llm_agent_solo")
registry.register_agent(A2AAgent, "a2a_agent")  # NEW

Example usage:

# Configure evaluation of a remote A2A agent
config = RunConfig(
    domain="airline",
    task_split_name="base",
    num_tasks=5,
    agent="a2a_agent",
    llm_agent="http://localhost:8001/a2a/my_agent",  # A2A endpoint URL
    llm_args_agent={},  # Can include auth_token, timeout
    user="user_simulator",
    llm_user="gpt-4o",
    llm_args_user={},
    num_trials=1,
    max_steps=50,
)

results = run_domain(config)

2. Message Translation Layer

File: src/tau2/a2a/translation.py

JSON-based parsing for tool calls:

def parse_a2a_tool_calls(content: str) -> list[ToolCall] | None:
    """
    Parse tool calls from A2A agent response content.
    
    Looks for JSON-formatted tool calls:
    - Single: {"tool_call": {"name": "...", "arguments": {...}}}
    - Multiple: {"tool_calls": [{...}, {...}]}
    
    Returns None if content is not parseable as tool calls.
    """

The A2A client handles 5 different response format variants to support different A2A implementations:

  1. Google ADK style: result.artifacts[].parts[].text
  2. Direct Message: result.parts[].text
  3. TaskStatusUpdateEvent: result.status.message.parts[].text
  4. Legacy wrapper: result.message.parts[].text
  5. History-based: Last agent message in result.history

3. ADK Integration

Directory: tau2_agent/

The tau2_agent exposes evaluation capabilities via Google's Agent Development Kit:

root_agent = LlmAgent(
    name="tau2_agent",
    model=create_model(),
    instruction=INSTRUCTION,
    description="Agent evaluation service using tau2-bench framework",
    after_model_callback=parse_text_tool_call,  # Handles {"tool_call": {...}}
    tools=[
        RunTau2Evaluation(name="run_tau2_evaluation", description="..."),
        ListDomains(name="list_domains", description="..."),
        GetEvaluationResults(name="get_evaluation_results", description="..."),
    ],
)

Tool Declaration Example (run_tau2_evaluation.py):

def _get_declaration(self) -> types.FunctionDeclaration | None:
    return types.FunctionDeclaration(
        name=self.name,
        description=self.description,
        parameters=types.Schema(
            type=types.Type.OBJECT,
            properties={
                "domain": types.Schema(
                    type=types.Type.STRING,
                    description="Evaluation domain: airline, retail, telecom, or mock",
                ),
                "agent_endpoint": types.Schema(
                    type=types.Type.STRING,
                    description="A2A endpoint URL of the agent to evaluate",
                ),
                # ... additional parameter definitions
            },
            required=["domain", "agent_endpoint"],
        ),
    )

4. CLI Integration

Full tau2 CLI support for A2A agents:

uv run tau2 run \
  --domain airline \
  --agent a2a_agent \
  --agent-a2a-endpoint http://localhost:8001/a2a/my_agent \
  --agent-a2a-timeout 300 \
  --a2a-debug  # TRACE logging for protocol debugging

This provides access to all tau2 features:

  • Multiple domains (airline, retail, telecom, mock, vacation_rental)
  • Task splits and specific task IDs
  • Concurrent evaluation with --max-concurrency
  • Results saving and resume from checkpoint
  • All user simulator configurations

5. Testing Coverage

Test Category Files Coverage
A2A Client tests/test_a2a_client/ (8 files) Agent discovery, message translation, metrics, performance
ADK Server tests/test_adk_server/ (3 test files) A2A endpoint, agent card, tool validation
E2E Tests tests/test_a2a_e2e/ (2 test files) Full evaluation flow, client-to-server communication
Backward Compatibility (not strictly necessary) tests/test_backward_compatibility/ (4 files) LLM agent, CLI, regression tests

Example test (test_a2a_agent.py):

def test_a2a_agent_generate_message(mock_a2a_client, sample_domain_tools):
    """Test A2AAgent can generate messages via A2A protocol."""
    config = A2AConfig(endpoint="http://test-agent.example.com")
    agent = A2AAgent(
        config=config,
        tools=sample_domain_tools,
        domain_policy="Airline customer service",
        http_client=mock_a2a_client,
    )

    state = agent.get_init_state()
    user_msg = UserMessage(
        role="user",
        content="I need to book a flight from SFO to JFK.",
    )

    assistant_msg, new_state = agent.generate_next_message(user_msg, state)

    assert isinstance(assistant_msg, AssistantMessage)
    assert new_state.request_count == 1
    assert new_state.context_id is not None

6. Docker Support

Docker deployment with multi-stage build:

# Stage 1: Build wheels
FROM python:3.11-slim AS builder
# ... build dependencies

# Stage 2: Runtime
FROM python:3.11-slim AS runtime
# Non-root user, health checks, volume mounts
CMD ["adk", "api_server", "--a2a", ".", "--port", "8001", "--host", "0.0.0.0"]

docker-compose.yml:

services:
  tau2-agent:
    image: tau2-agent:latest
    ports: ["8001:8001"]
    volumes:
      - tau2-evals:/app/data/tau2/evals  # Persistent results
    deploy:
      resources:
        limits:
          memory: 2G

7. ADK Web GUI

The ADK framework provides a built-in web interface for interactive testing:

# Start with web interface
adk web .

# Access at http://localhost:8000

Note: Work is in progress to improve the ADK web view with full trace visibility and enhanced logging output.

8. ADK State Management

ADK provides built-in hierarchical state management with four scopes:

Scope Prefix Persistence Visibility Use Case
App State app: Across all sessions All users Global configuration, shared settings
User State user: Across sessions Single user User preferences, authentication
Session State (none) Single session Single session Conversation context, current evaluation
Temp State temp: Never persisted Current invocation Intermediate calculations

State is accessible within tools via tool_context.state:

async def run_async(self, *, args: dict[str, Any], tool_context: ToolContext) -> Any:
    # Store the evaluation ID in session state for later reference
    evaluation_id = run_evaluation(...)
    tool_context.state["last_evaluation_id"] = evaluation_id

    # Store in user state for cross-session persistence
    tool_context.state["user:preferred_domain"] = args.get("domain")

    # Retrieve previous evaluation if user says "show me the last results"
    previous_id = tool_context.state.get("last_evaluation_id")

Potential tau2_agent state usage:

  • Session state: Track current evaluation ID so users can say "show me the results" without specifying the ID
  • User state: Remember preferred domains, default num_tasks, or agent endpoints across sessions
  • App state: Store global rate limits or shared configuration
  • Temp state: Track in-progress evaluation status during long-running operations

ADK supports multiple persistence backends:

  • InMemorySessionService - Development/testing
  • DatabaseSessionService - Production with SQLite/PostgreSQL/MySQL
  • VertexAiSessionService - Google Cloud managed persistence

Impact

Components affected

  1. src/tau2/agent/ - New A2AAgent implementation
  2. src/tau2/a2a/ - New A2A protocol module (client, translation, models, metrics, exceptions)
  3. src/tau2/registry.py - A2AAgent registration
  4. src/tau2/cli.py - New CLI arguments for A2A configuration
  5. src/tau2/run.py - A2AAgent construction logic
  6. tau2_agent/ - New ADK agent package
  7. tests/ - Test additions
  8. Add a global config for registering custom LLMs (mainly to facilitate price and token tracking for nebius/ models since this is what was provided by competition)

Key architectural points

Area Description
Agent Extensibility Registry-based agent system allows adding new agent types without modifying orchestrator
Protocol Flexibility Multiple response format support accommodates different A2A implementations
Error Handling Typed exceptions (A2ADiscoveryError, A2AAuthError, etc.) with structured logging
Observability Protocol metrics collection (latency, tokens, status codes) with export support
Deployment Docker containerization with health checks and resource limits

Future Considerations

  • Gym evaluation mode: Add gym-based evaluation support to tau2_agent, allowing it to run evaluations using the GymAgent interface in addition to A2A agents
  • Async evaluation: Leverage the async-native A2A client for non-blocking evaluation runs
  • Evaluation store: Persist and query evaluation results via the GetEvaluationResults tool interface

Files Changed Summary

tau2_agent/                          # NEW - ADK agent package
├── agent.py                         # Main ADK agent
├── tools/
│   ├── run_tau2_evaluation.py       # Evaluation tool
│   ├── list_domains.py              # Domain discovery
│   └── get_evaluation_results.py    # Results retrieval
└── docker_setup/
    ├── Dockerfile                   # Container build
    └── docker-compose.yml           # Service orchestration

src/tau2/
├── a2a/                             # NEW - A2A protocol module
│   ├── client.py                    # HTTP client
│   ├── translation.py               # Message conversion
│   ├── models.py                    # Data structures
│   ├── metrics.py                   # Protocol metrics
│   └── exceptions.py                # Error types
├── agent/
│   └── a2a_agent.py                 # NEW - A2A agent implementation
├── registry.py                      # MODIFIED - A2AAgent registration
├── cli.py                           # MODIFIED - A2A CLI arguments
└── run.py                           # MODIFIED - A2AAgent construction

tests/
├── test_a2a_client/                 # NEW - Client tests
├── test_adk_server/                 # NEW - Server tests
├── test_a2a_e2e/                    # NEW - E2E tests
└── test_backward_compatibility/     # NEW - Regression tests

Timeline

  • MVP features have been implemented
  • Proposed changes and features have been tested with test suite, adk web ui, CLI, and docker configs
  • Awaiting feedback

Dependencies

  • Features were implemented without fully deployed AgentBeats platform. Potentially need to add changes to integrate with their platform

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions