Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
private static Context otelContextWithBaggage(@Nullable Baggage baggage) {
if (baggage == null) {
return Context.current();
}
Expand Down Expand Up @@ -206,16 +205,19 @@ private static final class ClientTracer extends ClientStreamTracer {
volatile String backendService;
long attemptNanos;
Code statusCode;
final Baggage baggage;

ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
StreamInfo info, String target, String fullMethodName,
List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins,
Baggage baggage) {
this.attemptsState = attemptsState;
this.module = module;
this.info = info;
this.target = target;
this.fullMethodName = fullMethodName;
this.streamPlugins = streamPlugins;
this.baggage = baggage;
this.stopwatch = module.stopwatchSupplier.get().start();
}

Expand Down Expand Up @@ -282,7 +284,7 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(baggage);
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand All @@ -301,6 +303,10 @@ void recordFinishedAttempt() {
}
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
}
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry :
baggage.asMap().entrySet()) {
builder.put(entry.getKey(), entry.getValue().getValue());
}
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}
Expand Down Expand Up @@ -342,6 +348,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
private int activeStreams;
@GuardedBy("lock")
private boolean finishedCallToBeRecorded;
private final Baggage baggage;

CallAttemptsTracerFactory(
OpenTelemetryMetricsModule module,
Expand All @@ -354,6 +361,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
this.callPlugins = checkNotNull(callPlugins, "callPlugins");
this.attemptDelayStopwatch = module.stopwatchSupplier.get();
this.callStopWatch = module.stopwatchSupplier.get().start();
this.baggage = Baggage.fromContext(Context.current());

io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, fullMethodName,
Expand Down Expand Up @@ -407,7 +415,8 @@ private ClientTracer newClientTracer(StreamInfo info) {
}
streamPlugins = Collections.unmodifiableList(streamPlugins);
}
return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins,
baggage);
}

// Called whenever each attempt is ended.
Expand Down Expand Up @@ -448,7 +457,7 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(baggage);
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand All @@ -463,6 +472,12 @@ void recordFinishedCall() {
METHOD_KEY, fullMethodName,
TARGET_KEY, target
);
AttributesBuilder baseAttributesBuilder = baseAttributes.toBuilder();
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry
: baggage.asMap().entrySet()) {
baseAttributesBuilder.put(entry.getKey(), entry.getValue().getValue());
}
baseAttributes = baseAttributesBuilder.build();

// Duration
if (module.resource.clientCallDurationCounter() != null) {
Expand Down Expand Up @@ -606,7 +621,8 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
Baggage baggage = BAGGAGE_KEY.get();
Context otelContext = otelContextWithBaggage(baggage);
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -622,6 +638,12 @@ public void streamClosed(Status status) {
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
.put(STATUS_KEY, status.getCode().toString());
if (baggage != null) {
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry
: baggage.asMap().entrySet()) {
builder.put(entry.getKey(), entry.getValue().getValue());
}
}
for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
Expand Down Expand Up @@ -1629,6 +1630,55 @@ public void serverBasicMetrics() {

}

@Test
public void clientBaggagePropagationToMetrics() {
// Create module and tracer factory
// We use a custom resource with a mock counter to check the Context passed to
// record()
DoubleHistogram mockClientAttemptDurationCounter = org.mockito.Mockito
.mock(DoubleHistogram.class);
OpenTelemetryMetricsResource customResource = OpenTelemetryMetricsResource.builder()
.clientAttemptDurationCounter(mockClientAttemptDurationCounter)
.build();

OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
fakeClock.getStopwatchSupplier(), customResource, emptyList(), emptyList());

// Define the test baggage and create a Context with it
Baggage testBaggage = Baggage.builder()
.put("user-id", "42")
.build();
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context
.current().with(testBaggage);

// Create Tracer Factory within the Scope of the Context (simulating
// application thread)
CallAttemptsTracerFactory tracerFactory;
try (Scope scope = otelContext.makeCurrent()) {
tracerFactory = new CallAttemptsTracerFactory(
module, "target", method.getFullMethodName(), emptyList());
}

// 4. Create a stream tracer (simulating an attempt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Fix the numbering order.

ClientStreamTracer.StreamInfo streamInfo = ClientStreamTracer.StreamInfo.newBuilder().build();
ClientStreamTracer tracer = tracerFactory.newClientStreamTracer(streamInfo, new Metadata());

// 5. Trigger metric recording
tracer.streamClosed(Status.OK);

// Verify the record call and capture the OTel Context
verify(mockClientAttemptDurationCounter).record(
anyDouble(),
any(io.opentelemetry.api.common.Attributes.class),
contextCaptor.capture());

// Assert on the captured OTel Context
io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue();
Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext);

assertEquals("42", capturedBaggage.getEntryValue("user-id"));
}

@Test
public void serverBaggagePropagationToMetrics() {
// 1. Create module and tracer factory using the mock resource
Expand Down