Skip to content

Commit 14b968a

Browse files
gtukmachevclaude
andcommitted
Fix Netty HTTP span lifecycle for chunked/streaming responses
The original HttpServerResponseTracingHandler had a single dispatch branch that always closed the span when HttpResponse (headers) was written. LastHttpContent — the actual end of a chunked stream — was silently passed through with no span involvement, causing near-zero latency in APM for any streaming/chunked response. Two bugs fixed: Bug 1 — span always closed on HttpResponse, no LastHttpContent handling. Added explicit routing for all four Netty message types: - FullHttpResponse → finish span immediately (checked FIRST: extends both LastHttpContent and HttpResponse) - HttpResponse → chunked headers only; save context, don't finish span - LastHttpContent → finish span here to capture full streaming duration - HttpContent → intermediate chunk; pass through Bug 2 — keep-alive race condition. Under HTTP keep-alive, channelRead for the next request can overwrite CONTEXT_ATTRIBUTE_KEY before the pending LastHttpContent write task runs, causing handleLastHttpContent to finish the wrong span (~1-chunk duration). Fix: STREAMING_CONTEXT_KEY, a separate channel attribute set at chunked header time and consumed (getAndRemove) by LastHttpContent — immune to overwrite by the next request's span. Verified: concurrent load test (48 requests, 8 threads): streaming/slow → ~5020ms, streaming/fast → ~61ms in DataDog APM. No outliers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 20abae4 commit 14b968a

File tree

2 files changed

+145
-7
lines changed

2 files changed

+145
-7
lines changed

dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/HttpServerResponseTracingHandler.java

Lines changed: 135 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge.spanFromContext;
44
import static datadog.trace.instrumentation.netty41.AttributeKeys.CONTEXT_ATTRIBUTE_KEY;
5+
import static datadog.trace.instrumentation.netty41.AttributeKeys.STREAMING_CONTEXT_KEY;
56
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_SENDER_HANDLER_CONTEXT;
67
import static datadog.trace.instrumentation.netty41.server.NettyHttpServerDecorator.DECORATE;
78

@@ -13,9 +14,11 @@
1314
import io.netty.channel.ChannelHandlerContext;
1415
import io.netty.channel.ChannelOutboundHandlerAdapter;
1516
import io.netty.channel.ChannelPromise;
17+
import io.netty.handler.codec.http.FullHttpResponse;
1618
import io.netty.handler.codec.http.HttpHeaderNames;
1719
import io.netty.handler.codec.http.HttpResponse;
1820
import io.netty.handler.codec.http.HttpResponseStatus;
21+
import io.netty.handler.codec.http.LastHttpContent;
1922

2023
@ChannelHandler.Sharable
2124
public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
@@ -26,36 +29,161 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
2629
final Context storedContext = ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get();
2730
final AgentSpan span = spanFromContext(storedContext);
2831

29-
if (span == null || !(msg instanceof HttpResponse)) {
30-
ctx.write(msg, prm);
32+
// FullHttpResponse must be checked BEFORE LastHttpContent and HttpResponse,
33+
// because FullHttpResponse extends both LastHttpContent and HttpResponse.
34+
if (msg instanceof FullHttpResponse) {
35+
handleFullHttpResponse(ctx, storedContext, span, (FullHttpResponse) msg, prm);
3136
return;
3237
}
3338

34-
try (final ContextScope scope = storedContext.attach()) {
35-
final HttpResponse response = (HttpResponse) msg;
39+
// Handle HttpResponse (headers only — start of chunked/streaming response).
40+
// Must be checked BEFORE LastHttpContent/HttpContent.
41+
if (msg instanceof HttpResponse) {
42+
handleHttpResponse(ctx, storedContext, span, (HttpResponse) msg, prm);
43+
return;
44+
}
45+
46+
// Handle LastHttpContent (end of chunked/streaming response).
47+
// Must be checked BEFORE HttpContent (LastHttpContent extends HttpContent).
48+
// IMPORTANT: Use STREAMING_CONTEXT_KEY to avoid keep-alive race condition where
49+
// channelRead for the next request may overwrite CONTEXT_ATTRIBUTE_KEY before
50+
// this LastHttpContent write task runs on the EventLoop.
51+
if (msg instanceof LastHttpContent) {
52+
Context streamingContext = ctx.channel().attr(STREAMING_CONTEXT_KEY).getAndRemove();
53+
Context contextForLastContent = streamingContext != null ? streamingContext : storedContext;
54+
AgentSpan spanForLastContent =
55+
streamingContext != null ? spanFromContext(streamingContext) : span;
56+
handleLastHttpContent(
57+
ctx, contextForLastContent, spanForLastContent, (LastHttpContent) msg, prm);
58+
return;
59+
}
60+
61+
// Intermediate HttpContent chunks — pass through without touching the span.
62+
ctx.write(msg, prm);
63+
}
3664

65+
/** Complete response in a single message (non-streaming). Finish span immediately. */
66+
private void handleFullHttpResponse(
67+
final ChannelHandlerContext ctx,
68+
final Context storedContext,
69+
final AgentSpan span,
70+
final FullHttpResponse response,
71+
final ChannelPromise prm) {
72+
73+
if (span == null) {
74+
ctx.write(response, prm);
75+
return;
76+
}
77+
78+
try (final ContextScope scope = storedContext.attach()) {
3779
try {
38-
ctx.write(msg, prm);
80+
ctx.write(response, prm);
3981
} catch (final Throwable throwable) {
4082
DECORATE.onError(span, throwable);
4183
span.setHttpStatusCode(500);
42-
span.finish(); // Finish the span manually since finishSpanOnClose was false
84+
span.finish();
4385
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
4486
throw throwable;
4587
}
88+
4689
final boolean isWebsocketUpgrade =
4790
response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS
4891
&& "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE));
92+
4993
if (isWebsocketUpgrade) {
5094
ctx.channel()
5195
.attr(WEBSOCKET_SENDER_HANDLER_CONTEXT)
5296
.set(new HandlerContext.Sender(span, ctx.channel().id().asShortText()));
5397
}
98+
5499
if (response.status() != HttpResponseStatus.CONTINUE
55100
&& (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) {
56101
DECORATE.onResponse(span, response);
57102
DECORATE.beforeFinish(scope.context());
58-
span.finish(); // Finish the span manually since finishSpanOnClose was false
103+
span.finish();
104+
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
105+
}
106+
}
107+
}
108+
109+
/**
110+
* Chunked response headers — record status but do NOT finish the span yet. The span will be
111+
* finished when the corresponding LastHttpContent is written. Context is saved to
112+
* STREAMING_CONTEXT_KEY so that a keep-alive channelRead for the next request cannot overwrite it
113+
* before LastHttpContent arrives.
114+
*/
115+
private void handleHttpResponse(
116+
final ChannelHandlerContext ctx,
117+
final Context storedContext,
118+
final AgentSpan span,
119+
final HttpResponse response,
120+
final ChannelPromise prm) {
121+
122+
if (span == null) {
123+
ctx.write(response, prm);
124+
return;
125+
}
126+
127+
try (final ContextScope scope = storedContext.attach()) {
128+
try {
129+
ctx.write(response, prm);
130+
} catch (final Throwable throwable) {
131+
DECORATE.onError(span, throwable);
132+
span.setHttpStatusCode(500);
133+
span.finish();
134+
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
135+
throw throwable;
136+
}
137+
138+
final boolean isWebsocketUpgrade =
139+
response.status() == HttpResponseStatus.SWITCHING_PROTOCOLS
140+
&& "websocket".equals(response.headers().get(HttpHeaderNames.UPGRADE));
141+
142+
if (isWebsocketUpgrade) {
143+
ctx.channel()
144+
.attr(WEBSOCKET_SENDER_HANDLER_CONTEXT)
145+
.set(new HandlerContext.Sender(span, ctx.channel().id().asShortText()));
146+
}
147+
148+
if (response.status() != HttpResponseStatus.CONTINUE
149+
&& (response.status() != HttpResponseStatus.SWITCHING_PROTOCOLS || isWebsocketUpgrade)) {
150+
DECORATE.onResponse(span, response);
151+
ctx.channel().attr(STREAMING_CONTEXT_KEY).set(storedContext);
152+
// Span finish is deferred to handleLastHttpContent.
153+
}
154+
}
155+
}
156+
157+
/** End of chunked/streaming response — finish the span now that the full duration is known. */
158+
private void handleLastHttpContent(
159+
final ChannelHandlerContext ctx,
160+
final Context storedContext,
161+
final AgentSpan span,
162+
final LastHttpContent msg,
163+
final ChannelPromise prm) {
164+
165+
if (span == null) {
166+
ctx.write(msg, prm);
167+
return;
168+
}
169+
170+
try (final ContextScope scope = storedContext.attach()) {
171+
try {
172+
ctx.write(msg, prm);
173+
} catch (final Throwable throwable) {
174+
DECORATE.onError(span, throwable);
175+
span.setHttpStatusCode(500);
176+
span.finish();
177+
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
178+
throw throwable;
179+
}
180+
181+
DECORATE.beforeFinish(scope.context());
182+
span.finish();
183+
// Only remove CONTEXT_ATTRIBUTE_KEY if it still holds our context.
184+
// Under keep-alive a new request's channelRead may have already replaced it.
185+
// All channel ops run on the same EventLoop thread so this check is race-free.
186+
if (ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).get() == storedContext) {
59187
ctx.channel().attr(CONTEXT_ATTRIBUTE_KEY).remove();
60188
}
61189
}

dd-java-agent/instrumentation/netty/netty-common/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ public final class AttributeKeys {
2020
public static final AttributeKey<Context> CONTEXT_ATTRIBUTE_KEY =
2121
attributeKey(DD_CONTEXT_ATTRIBUTE);
2222

23+
/**
24+
* Stores the context of the currently-streaming (chunked) response. Set when the HTTP response
25+
* headers are sent, cleared when LastHttpContent is processed. Using a separate key (instead of
26+
* CONTEXT_ATTRIBUTE_KEY) avoids a keep-alive race: Netty can process the next request's
27+
* channelRead before the current response's LastHttpContent write task runs, overwriting
28+
* CONTEXT_ATTRIBUTE_KEY with the new request's span.
29+
*/
30+
public static final AttributeKey<Context> STREAMING_CONTEXT_KEY =
31+
attributeKey("datadog.server.streaming.context");
32+
2333
public static final AttributeKey<AgentSpan> CLIENT_PARENT_ATTRIBUTE_KEY =
2434
attributeKey("datadog.client.parent.span");
2535

0 commit comments

Comments
 (0)