Skip to content

Commit 595e84c

Browse files
feat(integrations): openai-agents streaming support (#5291)
### Description Adds instrumentation for `Runner.run_streamed` #### Issues Closes https://linear.app/getsentry/issue/TET-1705/openai-agents-streaming-support
1 parent 5d21d1d commit 595e84c

File tree

8 files changed

+420
-116
lines changed

8 files changed

+420
-116
lines changed

sentry_sdk/integrations/openai_agents/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
_create_get_model_wrapper,
55
_create_get_all_tools_wrapper,
66
_create_run_wrapper,
7+
_create_run_streamed_wrapper,
78
_patch_agent_run,
89
_patch_error_tracing,
910
)
@@ -25,12 +26,16 @@ def _patch_runner() -> None:
2526
# Create the root span for one full agent run (including eventual handoffs)
2627
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
2728
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
28-
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
2929
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
3030
agents.run.DEFAULT_AGENT_RUNNER.run
3131
)
3232

33-
# Creating the actual spans for each agent run.
33+
# Patch streaming runner
34+
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
35+
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
36+
)
37+
38+
# Creating the actual spans for each agent run (works for both streaming and non-streaming).
3439
_patch_agent_run()
3540

3641

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .models import _create_get_model_wrapper # noqa: F401
22
from .tools import _create_get_all_tools_wrapper # noqa: F401
3-
from .runner import _create_run_wrapper # noqa: F401
3+
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
44
from .agent_run import _patch_agent_run # noqa: F401
55
from .error_tracing import _patch_error_tracing # noqa: F401

sentry_sdk/integrations/openai_agents/patches/agent_run.py

Lines changed: 122 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import sys
22
from functools import wraps
33

4+
from sentry_sdk.consts import SPANDATA
45
from sentry_sdk.integrations import DidNotEnable
5-
from sentry_sdk.utils import reraise
6+
from sentry_sdk.utils import capture_internal_exceptions, reraise
67
from ..spans import (
78
invoke_agent_span,
89
end_invoke_agent_span,
@@ -31,22 +32,10 @@ def _patch_agent_run() -> None:
3132

3233
# Store original methods
3334
original_run_single_turn = agents.run.AgentRunner._run_single_turn
35+
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
3436
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
3537
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output
3638

37-
def _start_invoke_agent_span(
38-
context_wrapper: "agents.RunContextWrapper",
39-
agent: "agents.Agent",
40-
kwargs: "dict[str, Any]",
41-
) -> "Span":
42-
"""Start an agent invocation span"""
43-
# Store the agent on the context wrapper so we can access it later
44-
context_wrapper._sentry_current_agent = agent
45-
span = invoke_agent_span(context_wrapper, agent, kwargs)
46-
context_wrapper._sentry_agent_span = span
47-
48-
return span
49-
5039
def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
5140
"""Check if there's an active agent span for this context"""
5241
return getattr(context_wrapper, "_sentry_current_agent", None) is not None
@@ -57,6 +46,46 @@ def _get_current_agent(
5746
"""Get the current agent from context wrapper"""
5847
return getattr(context_wrapper, "_sentry_current_agent", None)
5948

49+
def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
50+
"""Close the workflow span for streaming executions if it exists."""
51+
if agent and hasattr(agent, "_sentry_workflow_span"):
52+
workflow_span = agent._sentry_workflow_span
53+
workflow_span.__exit__(*sys.exc_info())
54+
delattr(agent, "_sentry_workflow_span")
55+
56+
def _maybe_start_agent_span(
57+
context_wrapper: "agents.RunContextWrapper",
58+
agent: "agents.Agent",
59+
should_run_agent_start_hooks: bool,
60+
span_kwargs: "dict[str, Any]",
61+
is_streaming: bool = False,
62+
) -> "Optional[Span]":
63+
"""
64+
Start an agent invocation span if conditions are met.
65+
Handles ending any existing span for a different agent.
66+
67+
Returns the new span if started, or the existing span if conditions aren't met.
68+
"""
69+
if not (should_run_agent_start_hooks and agent and context_wrapper):
70+
return getattr(context_wrapper, "_sentry_agent_span", None)
71+
72+
# End any existing span for a different agent
73+
if _has_active_agent_span(context_wrapper):
74+
current_agent = _get_current_agent(context_wrapper)
75+
if current_agent and current_agent != agent:
76+
end_invoke_agent_span(context_wrapper, current_agent)
77+
78+
# Store the agent on the context wrapper so we can access it later
79+
context_wrapper._sentry_current_agent = agent
80+
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
81+
context_wrapper._sentry_agent_span = span
82+
agent._sentry_agent_span = span
83+
84+
if is_streaming:
85+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
86+
87+
return span
88+
6089
@wraps(
6190
original_run_single_turn.__func__
6291
if hasattr(original_run_single_turn, "__func__")
@@ -68,28 +97,18 @@ async def patched_run_single_turn(
6897
"""Patched _run_single_turn that creates agent invocation spans"""
6998
agent = kwargs.get("agent")
7099
context_wrapper = kwargs.get("context_wrapper")
71-
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks")
72-
73-
span = getattr(context_wrapper, "_sentry_agent_span", None)
74-
# Start agent span when agent starts (but only once per agent)
75-
if should_run_agent_start_hooks and agent and context_wrapper:
76-
# End any existing span for a different agent
77-
if _has_active_agent_span(context_wrapper):
78-
current_agent = _get_current_agent(context_wrapper)
79-
if current_agent and current_agent != agent:
80-
end_invoke_agent_span(context_wrapper, current_agent)
100+
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)
81101

82-
span = _start_invoke_agent_span(context_wrapper, agent, kwargs)
83-
agent._sentry_agent_span = span
102+
span = _maybe_start_agent_span(
103+
context_wrapper, agent, should_run_agent_start_hooks, kwargs
104+
)
84105

85-
# Call original method with all the correct parameters
86106
try:
87107
result = await original_run_single_turn(*args, **kwargs)
88108
except Exception as exc:
89109
if span is not None and span.timestamp is None:
90110
_record_exception_on_span(span, exc)
91111
end_invoke_agent_span(context_wrapper, agent)
92-
93112
reraise(*sys.exc_info())
94113

95114
return result
@@ -117,7 +136,11 @@ async def patched_execute_handoffs(
117136
# Call original method with all parameters
118137
try:
119138
result = await original_execute_handoffs(*args, **kwargs)
120-
139+
except Exception:
140+
exc_info = sys.exc_info()
141+
with capture_internal_exceptions():
142+
_close_streaming_workflow_span(agent)
143+
reraise(*exc_info)
121144
finally:
122145
# End span for current agent after handoff processing is complete
123146
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
@@ -139,18 +162,84 @@ async def patched_execute_final_output(
139162
context_wrapper = kwargs.get("context_wrapper")
140163
final_output = kwargs.get("final_output")
141164

142-
# Call original method with all parameters
143165
try:
144166
result = await original_execute_final_output(*args, **kwargs)
145167
finally:
146-
# End span for current agent after final output processing is complete
147-
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
148-
end_invoke_agent_span(context_wrapper, agent, final_output)
168+
with capture_internal_exceptions():
169+
if (
170+
agent
171+
and context_wrapper
172+
and _has_active_agent_span(context_wrapper)
173+
):
174+
end_invoke_agent_span(context_wrapper, agent, final_output)
175+
# For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
176+
_close_streaming_workflow_span(agent)
177+
178+
return result
179+
180+
@wraps(
181+
original_run_single_turn_streamed.__func__
182+
if hasattr(original_run_single_turn_streamed, "__func__")
183+
else original_run_single_turn_streamed
184+
)
185+
async def patched_run_single_turn_streamed(
186+
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
187+
) -> "Any":
188+
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming.
189+
190+
Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
191+
_run_single_turn_streamed uses positional arguments. The call signature is:
192+
_run_single_turn_streamed(
193+
streamed_result, # args[0]
194+
agent, # args[1]
195+
hooks, # args[2]
196+
context_wrapper, # args[3]
197+
run_config, # args[4]
198+
should_run_agent_start_hooks, # args[5]
199+
tool_use_tracker, # args[6]
200+
all_tools, # args[7]
201+
server_conversation_tracker, # args[8] (optional)
202+
)
203+
"""
204+
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
205+
agent = args[1] if len(args) > 1 else kwargs.get("agent")
206+
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
207+
should_run_agent_start_hooks = bool(
208+
args[5]
209+
if len(args) > 5
210+
else kwargs.get("should_run_agent_start_hooks", False)
211+
)
212+
213+
span_kwargs: "dict[str, Any]" = {}
214+
if streamed_result and hasattr(streamed_result, "input"):
215+
span_kwargs["original_input"] = streamed_result.input
216+
217+
span = _maybe_start_agent_span(
218+
context_wrapper,
219+
agent,
220+
should_run_agent_start_hooks,
221+
span_kwargs,
222+
is_streaming=True,
223+
)
224+
225+
try:
226+
result = await original_run_single_turn_streamed(*args, **kwargs)
227+
except Exception as exc:
228+
exc_info = sys.exc_info()
229+
with capture_internal_exceptions():
230+
if span is not None and span.timestamp is None:
231+
_record_exception_on_span(span, exc)
232+
end_invoke_agent_span(context_wrapper, agent)
233+
_close_streaming_workflow_span(agent)
234+
reraise(*exc_info)
149235

150236
return result
151237

152238
# Apply patches
153239
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
240+
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
241+
patched_run_single_turn_streamed
242+
)
154243
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
155244
agents._run_impl.RunImpl.execute_final_output = classmethod(
156245
patched_execute_final_output

sentry_sdk/integrations/openai_agents/patches/models.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import copy
2+
import sys
23
from functools import wraps
34

45
from sentry_sdk.integrations import DidNotEnable
@@ -17,7 +18,7 @@
1718
from typing import TYPE_CHECKING
1819

1920
if TYPE_CHECKING:
20-
from typing import Any, Callable
21+
from typing import Any, Callable, Optional
2122
from sentry_sdk.tracing import Span
2223

2324
try:
@@ -27,6 +28,16 @@
2728
raise DidNotEnable("OpenAI Agents not installed")
2829

2930

31+
def _set_response_model_on_agent_span(
32+
agent: "agents.Agent", response_model: "Optional[str]"
33+
) -> None:
34+
"""Set the response model on the agent's invoke_agent span if available."""
35+
if response_model:
36+
agent_span = getattr(agent, "_sentry_agent_span", None)
37+
if agent_span:
38+
agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)
39+
40+
3041
def _inject_trace_propagation_headers(
3142
hosted_tool: "HostedMCPTool", span: "Span"
3243
) -> None:
@@ -74,15 +85,19 @@ def wrapped_get_model(
7485
# because we only patch its direct methods, all underlying data can remain unchanged.
7586
model = copy.copy(original_get_model(agent, run_config))
7687

77-
# Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model
88+
# Capture the request model name for spans (agent.model can be None when using defaults)
89+
request_model_name = model.model if hasattr(model, "model") else str(model)
90+
agent._sentry_request_model = request_model_name
91+
92+
# Wrap _fetch_response if it exists (for OpenAI models) to capture response model
7893
if hasattr(model, "_fetch_response"):
7994
original_fetch_response = model._fetch_response
8095

8196
@wraps(original_fetch_response)
8297
async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any":
8398
response = await original_fetch_response(*args, **kwargs)
84-
if hasattr(response, "model"):
85-
agent._sentry_raw_response_model = str(response.model)
99+
if hasattr(response, "model") and response.model:
100+
agent._sentry_response_model = str(response.model)
86101
return response
87102

88103
model._fetch_response = wrapped_fetch_response
@@ -104,22 +119,56 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any":
104119

105120
result = await original_get_response(*args, **kwargs)
106121

107-
response_model = getattr(agent, "_sentry_raw_response_model", None)
122+
# Get response model captured from _fetch_response and clean up
123+
response_model = getattr(agent, "_sentry_response_model", None)
108124
if response_model:
109-
agent_span = getattr(agent, "_sentry_agent_span", None)
110-
if agent_span:
111-
agent_span.set_data(
112-
SPANDATA.GEN_AI_RESPONSE_MODEL, response_model
113-
)
125+
delattr(agent, "_sentry_response_model")
114126

115-
delattr(agent, "_sentry_raw_response_model")
116-
117-
update_ai_client_span(span, agent, kwargs, result, response_model)
127+
_set_response_model_on_agent_span(agent, response_model)
128+
update_ai_client_span(span, result, response_model)
118129

119130
return result
120131

121132
model.get_response = wrapped_get_response
122133

134+
# Also wrap stream_response for streaming support
135+
if hasattr(model, "stream_response"):
136+
original_stream_response = model.stream_response
137+
138+
@wraps(original_stream_response)
139+
async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
140+
# Uses explicit try/finally instead of context manager to ensure cleanup
141+
# even if the consumer abandons the stream (GeneratorExit).
142+
span_kwargs = dict(kwargs)
143+
if len(args) > 0:
144+
span_kwargs["system_instructions"] = args[0]
145+
if len(args) > 1:
146+
span_kwargs["input"] = args[1]
147+
148+
with ai_client_span(agent, span_kwargs) as span:
149+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
150+
151+
streaming_response = None
152+
153+
async for event in original_stream_response(*args, **kwargs):
154+
# Capture the full response from ResponseCompletedEvent
155+
if hasattr(event, "response"):
156+
streaming_response = event.response
157+
yield event
158+
159+
# Update span with response data (usage, output, model)
160+
if streaming_response:
161+
response_model = (
162+
str(streaming_response.model)
163+
if hasattr(streaming_response, "model")
164+
and streaming_response.model
165+
else None
166+
)
167+
_set_response_model_on_agent_span(agent, response_model)
168+
update_ai_client_span(span, streaming_response)
169+
170+
model.stream_response = wrapped_stream_response
171+
123172
return model
124173

125174
return wrapped_get_model

0 commit comments

Comments
 (0)