Skip to content

AGUIAdapter.streaming_response doesn't preserve context vars #4264

@hudyweas

Description

@hudyweas

Description

Hello, I'm trying to add additional information to the pydantic_ai agent, but it's missing using this configuration

@router.post("/agui", name="gigenius_canvas_agui")
@inject
async def gigenius_canvas_agui(
    request: Request,
    history_manager: HistoryManager = Depends(AsyncProvide[Container.history_manager]),  # ty:ignore[invalid-type-form]
) -> Response:
    accept = request.headers.get("accept", SSE_CONTENT_TYPE)

    try:
        run_input = AGUIAdapter.build_run_input(await request.body())
    except ValidationError as e:
        return Response(
            content=json.dumps(e.json()),
            media_type="application/json",
            status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
        )

    with set_baggage(project_id=run_input.thread_id):
        run_input.tools = []

        messages = history_manager.get_ag_ui_history(UUID(run_input.thread_id))

        run_input.messages = messages + [run_input.messages[-1]]

        history_manager.add_message(
            id=run_input.messages[-1].id,
            thread_id=UUID(run_input.thread_id),
            role="user",
            message=str(run_input.messages[-1].content),
        )

        adapter = AGUIAdapter(agent=gigenius_agent, run_input=run_input, accept=accept)

        original_stream = adapter.run_stream(deps=StateDeps(GigeniusDependencies()))

        async def stream_and_capture():
            accumulated_text = []

            async for event in original_stream:
                if hasattr(event, "type") and hasattr(event, "delta"):
                    if event.type == "TEXT_MESSAGE_CONTENT":
                        accumulated_text.append(event.delta)

                yield event

            full_response = "".join(accumulated_text)

            if full_response:
                history_manager.add_message(
                    id=uuid.uuid4(),
                    thread_id=UUID(run_input.thread_id),
                    role="assistant",
                    message=full_response,
                )

        return adapter.streaming_response(stream_and_capture())
Image

Python, Logfire & OS Versions, related packages (not required)

logfire="4.22.0"
platform="macOS-26.2-arm64-arm-64bit-Mach-O"
python="3.13.2 (main, Mar 17 2025, 21:26:38) [Clang 20.1.0 ]"
[related_packages]
requests="2.32.5"
pydantic="2.12.5"
fastapi="0.128.2"
openai="2.17.0"
protobuf="6.33.5"
rich="14.3.2"
executing="2.2.1"
opentelemetry-api="1.39.1"
opentelemetry-exporter-otlp="1.39.1"
opentelemetry-exporter-otlp-proto-common="1.39.1"
opentelemetry-exporter-otlp-proto-grpc="1.39.1"
opentelemetry-exporter-otlp-proto-http="1.39.1"
opentelemetry-instrumentation="0.60b1"
opentelemetry-instrumentation-asgi="0.60b1"
opentelemetry-instrumentation-dbapi="0.60b1"
opentelemetry-instrumentation-fastapi="0.60b1"
opentelemetry-instrumentation-httpx="0.60b1"
opentelemetry-instrumentation-psycopg2="0.60b1"
opentelemetry-instrumentation-system-metrics="0.60b1"
opentelemetry-proto="1.39.1"
opentelemetry-sdk="1.39.1"
opentelemetry-semantic-conventions="0.60b1"
opentelemetry-util-http="0.60b1"

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions