-
Notifications
You must be signed in to change notification settings - Fork 581
feat(span-streaming): Add spans to telemetry pipeline, add span name and attributes (3) #5399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
bab4359
37c990e
e720dec
6738f30
3fd25de
8c52a9c
65e667c
ad47675
d9f5e02
7bf4b7f
b819059
43072fe
faec56d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| import threading | ||
| from collections import defaultdict | ||
| from datetime import datetime, timezone | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from sentry_sdk._batcher import Batcher | ||
| from sentry_sdk.consts import SPANSTATUS | ||
| from sentry_sdk.envelope import Envelope, Item, PayloadRef | ||
| from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Any, Callable, Optional | ||
| from sentry_sdk.traces import StreamedSpan | ||
| from sentry_sdk._types import SerializedAttributeValue | ||
|
|
||
|
|
||
| class SpanBatcher(Batcher["StreamedSpan"]): | ||
| # TODO[span-first]: size-based flushes | ||
| # TODO[span-first]: adjust flush/drop defaults | ||
| MAX_BEFORE_FLUSH = 1000 | ||
| MAX_BEFORE_DROP = 5000 | ||
| FLUSH_WAIT_TIME = 5.0 | ||
|
|
||
| TYPE = "span" | ||
| CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json" | ||
|
|
||
| def __init__( | ||
| self, | ||
| capture_func: "Callable[[Envelope], None]", | ||
| record_lost_func: "Callable[..., None]", | ||
| ) -> None: | ||
| # Spans from different traces cannot be emitted in the same envelope | ||
| # since the envelope contains a shared trace header. That's why we bucket | ||
| # by trace_id, so that we can then send the buckets each in its own | ||
| # envelope. | ||
| # trace_id -> span buffer | ||
| self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) | ||
| self._capture_func = capture_func | ||
| self._record_lost_func = record_lost_func | ||
| self._running = True | ||
| self._lock = threading.Lock() | ||
|
|
||
| self._flush_event: "threading.Event" = threading.Event() | ||
|
|
||
| self._flusher: "Optional[threading.Thread]" = None | ||
| self._flusher_pid: "Optional[int]" = None | ||
|
|
||
| def get_size(self) -> int: | ||
| # caller is responsible for locking before checking this | ||
| return sum(len(buffer) for buffer in self._span_buffer.values()) | ||
|
|
||
| def add(self, span: "StreamedSpan") -> None: | ||
| if not self._ensure_thread() or self._flusher is None: | ||
| return None | ||
|
|
||
| with self._lock: | ||
| size = self.get_size() | ||
| if size >= self.MAX_BEFORE_DROP: | ||
| self._record_lost_func( | ||
| reason="queue_overflow", | ||
| data_category="span", | ||
| quantity=1, | ||
| ) | ||
| return None | ||
|
|
||
| self._span_buffer[span.trace_id].append(span) | ||
| if size + 1 >= self.MAX_BEFORE_FLUSH: | ||
| self._flush_event.set() | ||
|
|
||
| @staticmethod | ||
| def _to_transport_format(item: "StreamedSpan") -> "Any": | ||
| # TODO[span-first] | ||
| res: "dict[str, Any]" = { | ||
| "name": item.name, | ||
| } | ||
|
|
||
| if item._attributes: | ||
| res["attributes"] = { | ||
| k: serialize_attribute(v) for (k, v) in item._attributes.items() | ||
| } | ||
|
|
||
| return res | ||
|
|
||
| def _flush(self) -> None: | ||
| with self._lock: | ||
| if len(self._span_buffer) == 0: | ||
| return None | ||
|
|
||
| envelopes = [] | ||
| for trace_id, spans in self._span_buffer.items(): | ||
| if spans: | ||
| # TODO[span-first] | ||
| # dsc = spans[0].dynamic_sampling_context() | ||
| dsc = None | ||
|
|
||
| envelope = Envelope( | ||
| headers={ | ||
| "sent_at": format_timestamp(datetime.now(timezone.utc)), | ||
| "trace": dsc, | ||
| } | ||
| ) | ||
|
|
||
| envelope.add_item( | ||
| Item( | ||
| type="span", | ||
| content_type="application/vnd.sentry.items.span.v2+json", | ||
| headers={ | ||
| "item_count": len(spans), | ||
| }, | ||
| payload=PayloadRef( | ||
| json={ | ||
| "items": [ | ||
| self._to_transport_format(span) | ||
| for span in spans | ||
| ] | ||
| } | ||
| ), | ||
| ) | ||
| ) | ||
|
|
||
| envelopes.append(envelope) | ||
|
|
||
| self._span_buffer.clear() | ||
|
|
||
| for envelope in envelopes: | ||
| self._capture_func(envelope) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,9 +29,11 @@ | |
| from sentry_sdk.tracing_utils import ( | ||
| Baggage, | ||
| has_tracing_enabled, | ||
| has_span_streaming_enabled, | ||
| normalize_incoming_data, | ||
| PropagationContext, | ||
| ) | ||
| from sentry_sdk.traces import StreamedSpan | ||
| from sentry_sdk.tracing import ( | ||
| BAGGAGE_HEADER_NAME, | ||
| SENTRY_TRACE_HEADER_NAME, | ||
|
|
@@ -1278,6 +1280,17 @@ def _capture_metric(self, metric: "Optional[Metric]") -> None: | |
|
|
||
| client._capture_metric(metric, scope=merged_scope) | ||
|
|
||
| def _capture_span(self, span: "Optional[StreamedSpan]") -> None: | ||
| if span is None: | ||
| return | ||
|
|
||
| client = self.get_client() | ||
| if not has_span_streaming_enabled(client.options): | ||
| return | ||
|
|
||
| merged_scope = self._merge_scopes() | ||
| client._capture_span(span, scope=merged_scope) | ||
|
|
||
| def capture_message( | ||
| self, | ||
| message: str, | ||
|
|
@@ -1522,16 +1535,25 @@ def _apply_flags_to_event( | |
| ) | ||
|
|
||
| def _apply_scope_attributes_to_telemetry( | ||
| self, telemetry: "Union[Log, Metric]" | ||
| self, telemetry: "Union[Log, Metric, StreamedSpan]" | ||
| ) -> None: | ||
| # TODO: turn Logs, Metrics into actual classes | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Want to do this eventually, but it's a breaking change so it has to wait. |
||
| if isinstance(telemetry, dict): | ||
| attributes = telemetry["attributes"] | ||
| else: | ||
| attributes = telemetry._attributes | ||
|
|
||
| for attribute, value in self._attributes.items(): | ||
| if attribute not in telemetry["attributes"]: | ||
| telemetry["attributes"][attribute] = value | ||
| if attribute not in attributes: | ||
| attributes[attribute] = value | ||
|
|
||
| def _apply_user_attributes_to_telemetry( | ||
| self, telemetry: "Union[Log, Metric]" | ||
| self, telemetry: "Union[Log, Metric, StreamedSpan]" | ||
| ) -> None: | ||
| attributes = telemetry["attributes"] | ||
| if isinstance(telemetry, dict): | ||
| attributes = telemetry["attributes"] | ||
| else: | ||
| attributes = telemetry._attributes | ||
|
|
||
| if not should_send_default_pii() or self._user is None: | ||
| return | ||
|
|
@@ -1651,16 +1673,19 @@ def apply_to_event( | |
| return event | ||
|
|
||
| @_disable_capture | ||
| def apply_to_telemetry(self, telemetry: "Union[Log, Metric]") -> None: | ||
| def apply_to_telemetry(self, telemetry: "Union[Log, Metric, StreamedSpan]") -> None: | ||
| # Attributes-based events and telemetry go through here (logs, metrics, | ||
| # spansV2) | ||
| trace_context = self.get_trace_context() | ||
| trace_id = trace_context.get("trace_id") | ||
| if telemetry.get("trace_id") is None: | ||
| telemetry["trace_id"] = trace_id or "00000000-0000-0000-0000-000000000000" | ||
| span_id = trace_context.get("span_id") | ||
| if telemetry.get("span_id") is None and span_id: | ||
| telemetry["span_id"] = span_id | ||
| if not isinstance(telemetry, StreamedSpan): | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The stuff for metrics and logs in this |
||
| trace_context = self.get_trace_context() | ||
| trace_id = trace_context.get("trace_id") | ||
| if telemetry.get("trace_id") is None: | ||
| telemetry["trace_id"] = ( | ||
| trace_id or "00000000-0000-0000-0000-000000000000" | ||
| ) | ||
| span_id = trace_context.get("span_id") | ||
| if telemetry.get("span_id") is None and span_id: | ||
| telemetry["span_id"] = span_id | ||
|
|
||
| self._apply_scope_attributes_to_telemetry(telemetry) | ||
| self._apply_user_attributes_to_telemetry(telemetry) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| """ | ||
| The API in this file is only meant to be used in span streaming mode. | ||
|
|
||
| You can enable span streaming mode via | ||
| sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}). | ||
| """ | ||
|
|
||
| import uuid | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from sentry_sdk.utils import format_attribute | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Optional | ||
| from sentry_sdk._types import Attributes, AttributeValue | ||
|
|
||
|
|
||
| class StreamedSpan: | ||
| """ | ||
| A span holds timing information of a block of code. | ||
|
|
||
| Spans can have multiple child spans thus forming a span tree. | ||
|
|
||
| This is the Span First span implementation. The original transaction-based | ||
| span implementation lives in tracing.Span. | ||
| """ | ||
|
|
||
| __slots__ = ( | ||
| "name", | ||
| "_attributes", | ||
| "_trace_id", | ||
| ) | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| name: str, | ||
| attributes: "Optional[Attributes]" = None, | ||
| trace_id: "Optional[str]" = None, | ||
| ): | ||
| self.name: str = name | ||
| self._attributes: "Attributes" = attributes or {} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initial attributes in StreamedSpan not formattedMedium Severity
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually correct, will fix |
||
|
|
||
| self._trace_id = trace_id | ||
|
|
||
| def get_attributes(self) -> "Attributes": | ||
| return self._attributes | ||
|
|
||
| def set_attribute(self, key: str, value: "AttributeValue") -> None: | ||
| self._attributes[key] = format_attribute(value) | ||
|
|
||
| def set_attributes(self, attributes: "Attributes") -> None: | ||
| for key, value in attributes.items(): | ||
| self.set_attribute(key, value) | ||
|
|
||
| def remove_attribute(self, key: str) -> None: | ||
| try: | ||
| del self._attributes[key] | ||
| except KeyError: | ||
| pass | ||
|
|
||
| @property | ||
| def trace_id(self) -> str: | ||
| if not self._trace_id: | ||
| self._trace_id = uuid.uuid4().hex | ||
|
|
||
| return self._trace_id | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing span_batcher in uWSGI thread support check
Low Severity
span_batcherspawns a background flusher thread via the inherited_ensure_threadmethod, but the condition checking whether to callcheck_uwsgi_thread_support()only includesself.log_batcher, notself.span_batcher. When running under uWSGI with threading disabled and only span streaming enabled (no logs, profiling, or monitor), users won't receive the warning about thread support issues, and spans will be silently dropped when thread creation fails.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix in follow up PR