Skip to content

Commit 5a26c3b

Browse files
merge master
2 parents 6b81f86 + 54207a2 commit 5a26c3b

File tree

5 files changed

+326
-14
lines changed

5 files changed

+326
-14
lines changed

sentry_sdk/_span_batcher.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import threading
2+
from collections import defaultdict
3+
from datetime import datetime, timezone
4+
from typing import TYPE_CHECKING
5+
6+
from sentry_sdk._batcher import Batcher
7+
from sentry_sdk.consts import SPANSTATUS
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr
10+
11+
if TYPE_CHECKING:
12+
from typing import Any, Callable, Optional
13+
from sentry_sdk.traces import StreamedSpan
14+
from sentry_sdk._types import SerializedAttributeValue
15+
16+
17+
class SpanBatcher(Batcher["StreamedSpan"]):
18+
# TODO[span-first]: size-based flushes
19+
# TODO[span-first]: adjust flush/drop defaults
20+
MAX_BEFORE_FLUSH = 1000
21+
MAX_BEFORE_DROP = 5000
22+
FLUSH_WAIT_TIME = 5.0
23+
24+
TYPE = "span"
25+
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
26+
27+
def __init__(
28+
self,
29+
capture_func: "Callable[[Envelope], None]",
30+
record_lost_func: "Callable[..., None]",
31+
) -> None:
32+
# Spans from different traces cannot be emitted in the same envelope
33+
# since the envelope contains a shared trace header. That's why we bucket
34+
# by trace_id, so that we can then send the buckets each in its own
35+
# envelope.
36+
# trace_id -> span buffer
37+
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
38+
self._capture_func = capture_func
39+
self._record_lost_func = record_lost_func
40+
self._running = True
41+
self._lock = threading.Lock()
42+
43+
self._flush_event: "threading.Event" = threading.Event()
44+
45+
self._flusher: "Optional[threading.Thread]" = None
46+
self._flusher_pid: "Optional[int]" = None
47+
48+
def get_size(self) -> int:
49+
# caller is responsible for locking before checking this
50+
return sum(len(buffer) for buffer in self._span_buffer.values())
51+
52+
def add(self, span: "StreamedSpan") -> None:
53+
if not self._ensure_thread() or self._flusher is None:
54+
return None
55+
56+
with self._lock:
57+
size = self.get_size()
58+
if size >= self.MAX_BEFORE_DROP:
59+
self._record_lost_func(
60+
reason="queue_overflow",
61+
data_category="span",
62+
quantity=1,
63+
)
64+
return None
65+
66+
self._span_buffer[span.trace_id].append(span)
67+
if size + 1 >= self.MAX_BEFORE_FLUSH:
68+
self._flush_event.set()
69+
70+
@staticmethod
71+
def _to_transport_format(item: "StreamedSpan") -> "Any":
72+
# TODO[span-first]
73+
res: "dict[str, Any]" = {
74+
"name": item.name,
75+
}
76+
77+
if item._attributes:
78+
res["attributes"] = {
79+
k: serialize_attribute(v) for (k, v) in item._attributes.items()
80+
}
81+
82+
return res
83+
84+
def _flush(self) -> None:
85+
with self._lock:
86+
if len(self._span_buffer) == 0:
87+
return None
88+
89+
envelopes = []
90+
for trace_id, spans in self._span_buffer.items():
91+
if spans:
92+
# TODO[span-first]
93+
# dsc = spans[0].dynamic_sampling_context()
94+
dsc = None
95+
96+
envelope = Envelope(
97+
headers={
98+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
99+
"trace": dsc,
100+
}
101+
)
102+
103+
envelope.add_item(
104+
Item(
105+
type="span",
106+
content_type="application/vnd.sentry.items.span.v2+json",
107+
headers={
108+
"item_count": len(spans),
109+
},
110+
payload=PayloadRef(
111+
json={
112+
"items": [
113+
self._to_transport_format(span)
114+
for span in spans
115+
]
116+
}
117+
),
118+
)
119+
)
120+
121+
envelopes.append(envelope)
122+
123+
self._span_buffer.clear()
124+
125+
for envelope in envelopes:
126+
self._capture_func(envelope)

sentry_sdk/client.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import sentry_sdk
1212
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
1313
from sentry_sdk._metrics_batcher import MetricsBatcher
14+
from sentry_sdk._span_batcher import SpanBatcher
1415
from sentry_sdk.utils import (
1516
AnnotatedValue,
1617
ContextVar,
@@ -31,6 +32,7 @@
3132
)
3233
from sentry_sdk.serializer import serialize
3334
from sentry_sdk.tracing import trace
35+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
3436
from sentry_sdk.transport import BaseHttpTransport, make_transport
3537
from sentry_sdk.consts import (
3638
SPANDATA,
@@ -67,6 +69,7 @@
6769
from sentry_sdk.scope import Scope
6870
from sentry_sdk.session import Session
6971
from sentry_sdk.spotlight import SpotlightClient
72+
from sentry_sdk.traces import StreamedSpan
7073
from sentry_sdk.transport import Transport, Item
7174
from sentry_sdk._log_batcher import LogBatcher
7275
from sentry_sdk._metrics_batcher import MetricsBatcher
@@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
188191
self.monitor: "Optional[Monitor]" = None
189192
self.log_batcher: "Optional[LogBatcher]" = None
190193
self.metrics_batcher: "Optional[MetricsBatcher]" = None
194+
self.span_batcher: "Optional[SpanBatcher]" = None
191195
self.integrations: "dict[str, Integration]" = {}
192196

193197
def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
@@ -224,6 +228,9 @@ def _capture_log(self, log: "Log", scope: "Scope") -> None:
224228
def _capture_metric(self, metric: "Metric", scope: "Scope") -> None:
225229
pass
226230

231+
def _capture_span(self, span: "StreamedSpan", scope: "Scope") -> None:
232+
pass
233+
227234
def capture_session(self, *args: "Any", **kwargs: "Any") -> None:
228235
return None
229236

@@ -399,6 +406,13 @@ def _record_lost_event(
399406
record_lost_func=_record_lost_event,
400407
)
401408

409+
self.span_batcher = None
410+
if has_span_streaming_enabled(self.options):
411+
self.span_batcher = SpanBatcher(
412+
capture_func=_capture_envelope,
413+
record_lost_func=_record_lost_event,
414+
)
415+
402416
max_request_body_size = ("always", "never", "small", "medium")
403417
if self.options["max_request_body_size"] not in max_request_body_size:
404418
raise ValueError(
@@ -909,7 +923,10 @@ def capture_event(
909923
return return_value
910924

911925
def _capture_telemetry(
912-
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
926+
self,
927+
telemetry: "Optional[Union[Log, Metric, StreamedSpan]]",
928+
ty: str,
929+
scope: "Scope",
913930
) -> None:
914931
# Capture attributes-based telemetry (logs, metrics, spansV2)
915932
if telemetry is None:
@@ -934,6 +951,8 @@ def _capture_telemetry(
934951
batcher = self.log_batcher
935952
elif ty == "metric":
936953
batcher = self.metrics_batcher # type: ignore
954+
elif ty == "span":
955+
batcher = self.span_batcher # type: ignore
937956

938957
if batcher is not None:
939958
batcher.add(telemetry) # type: ignore
@@ -944,6 +963,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
944963
def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None:
945964
self._capture_telemetry(metric, "metric", scope)
946965

966+
def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None:
967+
self._capture_telemetry(span, "span", scope)
968+
947969
def capture_session(
948970
self,
949971
session: "Session",
@@ -993,6 +1015,8 @@ def close(
9931015
self.log_batcher.kill()
9941016
if self.metrics_batcher is not None:
9951017
self.metrics_batcher.kill()
1018+
if self.span_batcher is not None:
1019+
self.span_batcher.kill()
9961020
if self.monitor:
9971021
self.monitor.kill()
9981022
self.transport.kill()
@@ -1018,6 +1042,8 @@ def flush(
10181042
self.log_batcher.flush()
10191043
if self.metrics_batcher is not None:
10201044
self.metrics_batcher.flush()
1045+
if self.span_batcher is not None:
1046+
self.span_batcher.flush()
10211047
self.transport.flush(timeout=timeout, callback=callback)
10221048

10231049
def __enter__(self) -> "_Client":

sentry_sdk/scope.py

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
from sentry_sdk.tracing_utils import (
3030
Baggage,
3131
has_tracing_enabled,
32+
has_span_streaming_enabled,
3233
normalize_incoming_data,
3334
PropagationContext,
3435
)
36+
from sentry_sdk.traces import StreamedSpan
3537
from sentry_sdk.tracing import (
3638
BAGGAGE_HEADER_NAME,
3739
SENTRY_TRACE_HEADER_NAME,
@@ -1278,6 +1280,17 @@ def _capture_metric(self, metric: "Optional[Metric]") -> None:
12781280

12791281
client._capture_metric(metric, scope=merged_scope)
12801282

1283+
def _capture_span(self, span: "Optional[StreamedSpan]") -> None:
1284+
if span is None:
1285+
return
1286+
1287+
client = self.get_client()
1288+
if not has_span_streaming_enabled(client.options):
1289+
return
1290+
1291+
merged_scope = self._merge_scopes()
1292+
client._capture_span(span, scope=merged_scope)
1293+
12811294
def capture_message(
12821295
self,
12831296
message: str,
@@ -1522,16 +1535,25 @@ def _apply_flags_to_event(
15221535
)
15231536

15241537
def _apply_scope_attributes_to_telemetry(
1525-
self, telemetry: "Union[Log, Metric]"
1538+
self, telemetry: "Union[Log, Metric, StreamedSpan]"
15261539
) -> None:
1540+
# TODO: turn Logs, Metrics into actual classes
1541+
if isinstance(telemetry, dict):
1542+
attributes = telemetry["attributes"]
1543+
else:
1544+
attributes = telemetry._attributes
1545+
15271546
for attribute, value in self._attributes.items():
1528-
if attribute not in telemetry["attributes"]:
1529-
telemetry["attributes"][attribute] = value
1547+
if attribute not in attributes:
1548+
attributes[attribute] = value
15301549

15311550
def _apply_user_attributes_to_telemetry(
1532-
self, telemetry: "Union[Log, Metric]"
1551+
self, telemetry: "Union[Log, Metric, StreamedSpan]"
15331552
) -> None:
1534-
attributes = telemetry["attributes"]
1553+
if isinstance(telemetry, dict):
1554+
attributes = telemetry["attributes"]
1555+
else:
1556+
attributes = telemetry._attributes
15351557

15361558
if not should_send_default_pii() or self._user is None:
15371559
return
@@ -1651,16 +1673,19 @@ def apply_to_event(
16511673
return event
16521674

16531675
@_disable_capture
1654-
def apply_to_telemetry(self, telemetry: "Union[Log, Metric]") -> None:
1676+
def apply_to_telemetry(self, telemetry: "Union[Log, Metric, StreamedSpan]") -> None:
16551677
# Attributes-based events and telemetry go through here (logs, metrics,
16561678
# spansV2)
1657-
trace_context = self.get_trace_context()
1658-
trace_id = trace_context.get("trace_id")
1659-
if telemetry.get("trace_id") is None:
1660-
telemetry["trace_id"] = trace_id or "00000000-0000-0000-0000-000000000000"
1661-
span_id = trace_context.get("span_id")
1662-
if telemetry.get("span_id") is None and span_id:
1663-
telemetry["span_id"] = span_id
1679+
if not isinstance(telemetry, StreamedSpan):
1680+
trace_context = self.get_trace_context()
1681+
trace_id = trace_context.get("trace_id")
1682+
if telemetry.get("trace_id") is None:
1683+
telemetry["trace_id"] = (
1684+
trace_id or "00000000-0000-0000-0000-000000000000"
1685+
)
1686+
span_id = trace_context.get("span_id")
1687+
if telemetry.get("span_id") is None and span_id:
1688+
telemetry["span_id"] = span_id
16641689

16651690
self._apply_scope_attributes_to_telemetry(telemetry)
16661691
self._apply_user_attributes_to_telemetry(telemetry)

sentry_sdk/traces.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""
2+
The API in this file is only meant to be used in span streaming mode.
3+
4+
You can enable span streaming mode via
5+
sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}).
6+
"""
7+
8+
import uuid
9+
from typing import TYPE_CHECKING
10+
11+
from sentry_sdk.utils import format_attribute
12+
13+
if TYPE_CHECKING:
14+
from typing import Optional
15+
from sentry_sdk._types import Attributes, AttributeValue
16+
17+
18+
class StreamedSpan:
19+
"""
20+
A span holds timing information of a block of code.
21+
22+
Spans can have multiple child spans thus forming a span tree.
23+
24+
This is the Span First span implementation. The original transaction-based
25+
span implementation lives in tracing.Span.
26+
"""
27+
28+
__slots__ = (
29+
"name",
30+
"_attributes",
31+
"_trace_id",
32+
)
33+
34+
def __init__(
35+
self,
36+
*,
37+
name: str,
38+
attributes: "Optional[Attributes]" = None,
39+
trace_id: "Optional[str]" = None,
40+
):
41+
self.name: str = name
42+
self._attributes: "Attributes" = attributes or {}
43+
44+
self._trace_id = trace_id
45+
46+
def get_attributes(self) -> "Attributes":
47+
return self._attributes
48+
49+
def set_attribute(self, key: str, value: "AttributeValue") -> None:
50+
self._attributes[key] = format_attribute(value)
51+
52+
def set_attributes(self, attributes: "Attributes") -> None:
53+
for key, value in attributes.items():
54+
self.set_attribute(key, value)
55+
56+
def remove_attribute(self, key: str) -> None:
57+
try:
58+
del self._attributes[key]
59+
except KeyError:
60+
pass
61+
62+
@property
63+
def trace_id(self) -> str:
64+
if not self._trace_id:
65+
self._trace_id = uuid.uuid4().hex
66+
67+
return self._trace_id

0 commit comments

Comments
 (0)