diff --git a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py index 5d96fad8b3..1aec6fa85c 100644 --- a/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py +++ b/packages/opentelemetry-instrumentation-vertexai/opentelemetry/instrumentation/vertexai/__init__.py @@ -32,6 +32,7 @@ ) from opentelemetry.trace import SpanKind, get_tracer from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE from wrapt import wrap_function_wrapper logger = logging.getLogger(__name__) @@ -148,13 +149,20 @@ def handle_streaming_response(span, event_logger, llm_model, response, token_usa def _build_from_streaming_response(span, event_logger, response, llm_model): complete_response = "" token_usage = None - for item in response: - item_to_yield = item - complete_response += str(item.text) - if item.usage_metadata: - token_usage = item.usage_metadata - - yield item_to_yield + try: + for item in response: + item_to_yield = item + complete_response += str(item.text) + if item.usage_metadata: + token_usage = item.usage_metadata + + yield item_to_yield + except Exception as e: + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise handle_streaming_response( span, event_logger, llm_model, complete_response, token_usage @@ -167,15 +175,22 @@ def _build_from_streaming_response(span, event_logger, response, llm_model): async def _abuild_from_streaming_response(span, event_logger, response, llm_model): complete_response = "" token_usage = None - async for item in response: - item_to_yield = item - complete_response += str(item.text) - if item.usage_metadata: - token_usage = item.usage_metadata - - yield item_to_yield - - handle_streaming_response(span, event_logger, llm_model, response, token_usage) + try: + async for item in response: + item_to_yield = item + complete_response += str(item.text) + if item.usage_metadata: + token_usage = item.usage_metadata + + yield item_to_yield + except Exception as e: + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise + + handle_streaming_response(span, event_logger, llm_model, complete_response, token_usage) span.set_status(Status(StatusCode.OK)) span.end() @@ -245,7 +260,14 @@ async def _awrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs) await _handle_request(span, event_logger, args, kwargs, llm_model) - response = await wrapped(*args, **kwargs) + try: + response = await wrapped(*args, **kwargs) + except Exception as e: + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise if response: if is_streaming_response(response): @@ -299,7 +321,14 @@ def _wrap(tracer, event_logger, to_wrap, wrapped, instance, args, kwargs): else: set_input_attributes_sync(span, args) - response = wrapped(*args, **kwargs) + try: + response = wrapped(*args, **kwargs) + except Exception as e: + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise if response: if is_streaming_response(response):