Skip to content
118 changes: 118 additions & 0 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import threading
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk.consts import SPANSTATUS
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr

if TYPE_CHECKING:
from typing import Any, Callable, Optional
from sentry_sdk.traces import StreamedSpan
from sentry_sdk._types import SerializedAttributeValue


class SpanBatcher(Batcher["StreamedSpan"]):
# TODO[span-first]: size-based flushes
# TODO[span-first]: adjust flush/drop defaults
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 5000
FLUSH_WAIT_TIME = 5.0

TYPE = "span"
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"

def __init__(
self,
capture_func: "Callable[[Envelope], None]",
record_lost_func: "Callable[..., None]",
) -> None:
# Spans from different traces cannot be emitted in the same envelope
# since the envelope contains a shared trace header. That's why we bucket
# by trace_id, so that we can then send the buckets each in its own
# envelope.
# trace_id -> span buffer
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
self._capture_func = capture_func
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def get_size(self) -> int:
# caller is responsible for locking before checking this
return sum(len(buffer) for buffer in self._span_buffer.values())

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
size = self.get_size()
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()

@staticmethod
def _to_transport_format(item: "StreamedSpan") -> "Any":
# TODO[span-first]
res: "dict[str, Any]" = {}
return res

def _flush(self) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return None

envelopes = []
for trace_id, spans in self._span_buffer.items():
if spans:
# TODO[span-first]
# dsc = spans[0].dynamic_sampling_context()
dsc = None

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type="span",
content_type="application/vnd.sentry.items.span.v2+json",
headers={
"item_count": len(spans),
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(span)
for span in spans
]
}
),
)
)

envelopes.append(envelope)

self._span_buffer.clear()

for envelope in envelopes:
self._capture_func(envelope)
28 changes: 27 additions & 1 deletion sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sentry_sdk
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
from sentry_sdk._metrics_batcher import MetricsBatcher
from sentry_sdk._span_batcher import SpanBatcher
from sentry_sdk.utils import (
AnnotatedValue,
ContextVar,
Expand All @@ -31,6 +32,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.consts import (
SPANDATA,
Expand Down Expand Up @@ -67,6 +69,7 @@
from sentry_sdk.scope import Scope
from sentry_sdk.session import Session
from sentry_sdk.spotlight import SpotlightClient
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.transport import Transport, Item
from sentry_sdk._log_batcher import LogBatcher
from sentry_sdk._metrics_batcher import MetricsBatcher
Expand Down Expand Up @@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
self.monitor: "Optional[Monitor]" = None
self.log_batcher: "Optional[LogBatcher]" = None
self.metrics_batcher: "Optional[MetricsBatcher]" = None
self.span_batcher: "Optional[SpanBatcher]" = None
self.integrations: "dict[str, Integration]" = {}

def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
Expand Down Expand Up @@ -224,6 +228,9 @@ def _capture_log(self, log: "Log", scope: "Scope") -> None:
def _capture_metric(self, metric: "Metric", scope: "Scope") -> None:
pass

def _capture_span(self, span: "StreamedSpan", scope: "Scope") -> None:
pass

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing span_batcher in uWSGI thread support check

Low Severity

span_batcher spawns a background flusher thread via the inherited _ensure_thread method, but the condition checking whether to call check_uwsgi_thread_support() only includes self.log_batcher, not self.span_batcher. When running under uWSGI with threading disabled and only span streaming enabled (no logs, profiling, or monitor), users won't receive the warning about thread support issues, and spans will be silently dropped when thread creation fails.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix in follow up PR

def capture_session(self, *args: "Any", **kwargs: "Any") -> None:
return None

Expand Down Expand Up @@ -399,6 +406,13 @@ def _record_lost_event(
record_lost_func=_record_lost_event,
)

self.span_batcher = None
if has_span_streaming_enabled(self.options):
self.span_batcher = SpanBatcher(
capture_func=_capture_envelope,
record_lost_func=_record_lost_event,
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
raise ValueError(
Expand Down Expand Up @@ -909,7 +923,10 @@ def capture_event(
return return_value

def _capture_telemetry(
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
self,
telemetry: "Optional[Union[Log, Metric, StreamedSpan]]",
ty: str,
scope: "Scope",
) -> None:
# Capture attributes-based telemetry (logs, metrics, spansV2)
if telemetry is None:
Expand All @@ -934,6 +951,8 @@ def _capture_telemetry(
batcher = self.log_batcher
elif ty == "metric":
batcher = self.metrics_batcher # type: ignore
elif ty == "span":
batcher = self.span_batcher # type: ignore

if batcher is not None:
batcher.add(telemetry) # type: ignore
Expand All @@ -944,6 +963,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None:
self._capture_telemetry(metric, "metric", scope)

def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None:
self._capture_telemetry(span, "span", scope)

def capture_session(
self,
session: "Session",
Expand Down Expand Up @@ -993,6 +1015,8 @@ def close(
self.log_batcher.kill()
if self.metrics_batcher is not None:
self.metrics_batcher.kill()
if self.span_batcher is not None:
self.span_batcher.kill()
if self.monitor:
self.monitor.kill()
self.transport.kill()
Expand All @@ -1018,6 +1042,8 @@ def flush(
self.log_batcher.flush()
if self.metrics_batcher is not None:
self.metrics_batcher.flush()
if self.span_batcher is not None:
self.span_batcher.flush()
self.transport.flush(timeout=timeout, callback=callback)

def __enter__(self) -> "_Client":
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class CompressionAlgo(Enum):
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
"enable_metrics": Optional[bool],
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
"trace_lifecycle": Optional[Literal["static", "stream"]],
},
total=False,
)
Expand Down
51 changes: 38 additions & 13 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
from sentry_sdk.tracing_utils import (
Baggage,
has_tracing_enabled,
has_span_streaming_enabled,
normalize_incoming_data,
PropagationContext,
)
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.tracing import (
BAGGAGE_HEADER_NAME,
SENTRY_TRACE_HEADER_NAME,
Expand Down Expand Up @@ -1278,6 +1280,17 @@ def _capture_metric(self, metric: "Optional[Metric]") -> None:

client._capture_metric(metric, scope=merged_scope)

def _capture_span(self, span: "Optional[StreamedSpan]") -> None:
if span is None:
return

client = self.get_client()
if not has_span_streaming_enabled(client.options):
return

merged_scope = self._merge_scopes()
client._capture_span(span, scope=merged_scope)

def capture_message(
self,
message: str,
Expand Down Expand Up @@ -1522,16 +1535,25 @@ def _apply_flags_to_event(
)

def _apply_scope_attributes_to_telemetry(
self, telemetry: "Union[Log, Metric]"
self, telemetry: "Union[Log, Metric, StreamedSpan]"
) -> None:
# TODO: turn Logs, Metrics into actual classes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to do this eventually, but it's a breaking change so it has to wait.

if isinstance(telemetry, dict):
attributes = telemetry["attributes"]
else:
attributes = telemetry.attributes

for attribute, value in self._attributes.items():
if attribute not in telemetry["attributes"]:
telemetry["attributes"][attribute] = value
if attribute not in attributes:
attributes[attribute] = value

def _apply_user_attributes_to_telemetry(
self, telemetry: "Union[Log, Metric]"
self, telemetry: "Union[Log, Metric, StreamedSpan]"
) -> None:
attributes = telemetry["attributes"]
if isinstance(telemetry, dict):
attributes = telemetry["attributes"]
else:
attributes = telemetry.attributes

if not should_send_default_pii() or self._user is None:
return
Expand Down Expand Up @@ -1651,16 +1673,19 @@ def apply_to_event(
return event

@_disable_capture
def apply_to_telemetry(self, telemetry: "Union[Log, Metric]") -> None:
def apply_to_telemetry(self, telemetry: "Union[Log, Metric, StreamedSpan]") -> None:
# Attributes-based events and telemetry go through here (logs, metrics,
# spansV2)
trace_context = self.get_trace_context()
trace_id = trace_context.get("trace_id")
if telemetry.get("trace_id") is None:
telemetry["trace_id"] = trace_id or "00000000-0000-0000-0000-000000000000"
span_id = trace_context.get("span_id")
if telemetry.get("span_id") is None and span_id:
telemetry["span_id"] = span_id
if not isinstance(telemetry, StreamedSpan):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stuff for metrics and logs in this if should be moved elsewhere, but out of scope of this PR.

trace_context = self.get_trace_context()
trace_id = trace_context.get("trace_id")
if telemetry.get("trace_id") is None:
telemetry["trace_id"] = (
trace_id or "00000000-0000-0000-0000-000000000000"
)
span_id = trace_context.get("span_id")
if telemetry.get("span_id") is None and span_id:
telemetry["span_id"] = span_id

self._apply_scope_attributes_to_telemetry(telemetry)
self._apply_user_attributes_to_telemetry(telemetry)
Expand Down
39 changes: 39 additions & 0 deletions sentry_sdk/traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
The API in this file is only meant to be used in span streaming mode.

You can enable span streaming mode via
sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}).
"""

import uuid
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Optional


class StreamedSpan:
"""
A span holds timing information of a block of code.

Spans can have multiple child spans thus forming a span tree.

This is the Span First span implementation. The original transaction-based
span implementation lives in tracing.Span.
"""

__slots__ = ("_trace_id",)

def __init__(
self,
*,
trace_id: "Optional[str]" = None,
):
self._trace_id = trace_id

@property
def trace_id(self) -> str:
if not self._trace_id:
self._trace_id = uuid.uuid4().hex

return self._trace_id
7 changes: 7 additions & 0 deletions sentry_sdk/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ def has_tracing_enabled(options: "Optional[Dict[str, Any]]") -> bool:
)


def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool:
if options is None:
return False

return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream"


@contextlib.contextmanager
def record_sql_queries(
cursor: "Any",
Expand Down
Loading