From 0d550b96a68f49ded528f31f273be6d6c561f869 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Fri, 30 Jan 2026 15:12:14 -0500 Subject: [PATCH 1/9] fix(kafkajs): sync DSM context to currentStore to prevent context leaking When processing concurrent Kafka messages, the DSM context was being set via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore. Since ctx.currentStore is what gets returned from bindStart and bound to async continuations via runStores, this caused DSM context to leak between concurrent message handlers. The fix syncs the DSM context to ctx.currentStore after DSM operations complete, ensuring each handler's async continuations maintain the correct DSM context. Co-Authored-By: Claude Opus 4.5 --- packages/datadog-plugin-kafkajs/src/consumer.js | 6 ++++++ packages/datadog-plugin-kafkajs/src/producer.js | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 4638ab0ca8f..ef70e5cce04 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -2,6 +2,7 @@ const dc = require('dc-polyfill') const { getMessageSize } = require('../../dd-trace/src/datastreams') +const { getDataStreamsContext } = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { convertToTextMap } = require('./utils') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') @@ -97,6 +98,11 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { edgeTags.push(`kafka_cluster_id:${clusterId}`) } this.tracer.setCheckpoint(edgeTags, span, payloadSize) + + // Sync DSM context to currentStore to ensure it's properly scoped + // to this handler's async continuations (fixes context leaking between + // concurrent handlers) + ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: getDataStreamsContext() } } if (afterStartCh.hasSubscribers) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 70d0a9bfdd0..b7e8385a3a2 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') +const { getDataStreamsContext } = require('../../dd-trace/src/datastreams/context') const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers' const MESSAGING_DESTINATION_KEY = 'messaging.destination.name' @@ -88,6 +89,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { if (bootstrapServers) { span.setTag(BOOTSTRAP_SERVERS_KEY, bootstrapServers) } + let hasDsmContext = false for (const message of messages) { if (message !== null && typeof message === 'object') { // message headers are not supported for kafka broker versions <0.11 @@ -96,6 +98,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { this.tracer.inject(span, 'text_map', message.headers) } if (this.config.dsmEnabled) { + hasDsmContext = true const payloadSize = getMessageSize(message) const edgeTags = ['direction:out', `topic:${topic}`, 'type:kafka'] @@ -111,6 +114,13 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } + // Sync DSM context to currentStore to ensure it's properly scoped + // to this handler's async continuations (fixes context leaking between + // concurrent handlers) + if (hasDsmContext) { + ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: getDataStreamsContext() } + } + return ctx.currentStore } } From 40b2dc85ef8e8cc14078094a327c7af03aa0a8b3 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Fri, 30 Jan 2026 15:15:18 -0500 Subject: [PATCH 2/9] test(dsm): add regression test for context propagation race condition Add tests that verify DSM context is properly scoped to each handler's async continuations when using diagnostic channels with runStores. The tests verify that: 1. ctx.currentStore has dataStreamsContext after setDataStreamsContext 2. Concurrent handlers maintain isolated DSM contexts 3. DSM context persists through multiple async boundaries Co-Authored-By: Claude Opus 4.5 --- .../context-race-condition.spec.js | 245 ++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 packages/dd-trace/test/datastreams/context-race-condition.spec.js diff --git a/packages/dd-trace/test/datastreams/context-race-condition.spec.js b/packages/dd-trace/test/datastreams/context-race-condition.spec.js new file mode 100644 index 00000000000..f90b929757f --- /dev/null +++ b/packages/dd-trace/test/datastreams/context-race-condition.spec.js @@ -0,0 +1,245 @@ +'use strict' + +/** + * Regression test for DSM context propagation race condition. + * + * Bug: When setDataStreamsContext uses enterWith(), it modifies the AsyncLocalStorage + * but does NOT update ctx.currentStore. Since ctx.currentStore is what gets returned + * from bindStart and bound to async continuations via runStores, this creates a + * disconnect where: + * - AsyncLocalStorage has the DSM context + * - ctx.currentStore (the bound context) does NOT have the DSM context + * + * This can cause DSM context to leak between concurrent message handlers when one + * handler awaits an async operation and another handler starts processing. + */ + +const assert = require('node:assert/strict') +const { describe, it, beforeEach, afterEach } = require('mocha') +const dc = require('dc-polyfill') + +const { storage } = require('../../../datadog-core') +const DataStreamsContext = require('../../src/datastreams/context') + +describe('DSM Context Propagation', () => { + const startCh = dc.channel('test:dsm-context:start') + + beforeEach(() => { + startCh.bindStore(storage('legacy'), data => data.currentStore) + }) + + afterEach(() => { + startCh.unbindStore(storage('legacy')) + }) + + describe('ctx.currentStore synchronization (regression test)', () => { + it('should have dataStreamsContext in ctx.currentStore after setDataStreamsContext', () => { + /** + * This test verifies the fix for the DSM context propagation bug. + * + * The bug: setDataStreamsContext uses enterWith() which modifies AsyncLocalStorage + * but does NOT update ctx.currentStore. The returned ctx.currentStore is what + * gets bound to async continuations, so DSM context was not properly scoped. + * + * The fix: After calling setDataStreamsContext, sync the DSM context to + * ctx.currentStore so it's properly bound for async continuations. + */ + const ctx = { + currentStore: { span: { name: 'test-span' } } + } + + const dsmContext = { + hash: Buffer.from('testhash'), + pathwayStartNs: 1000, + edgeStartNs: 1000 + } + + startCh.runStores(ctx, () => { + // Simulate what the plugin does: set DSM context via enterWith + DataStreamsContext.setDataStreamsContext(dsmContext) + + // The DSM context should be accessible via getDataStreamsContext + const retrievedContext = DataStreamsContext.getDataStreamsContext() + assert.deepStrictEqual(retrievedContext, dsmContext, 'DSM context should be retrievable') + + // BUG: ctx.currentStore does NOT have dataStreamsContext after setDataStreamsContext + // This is the structural issue - enterWith modifies AsyncLocalStorage but not ctx.currentStore + // + // With the fix applied in the plugin (syncing DSM context to ctx.currentStore), + // this would pass. Without the fix, ctx.currentStore.dataStreamsContext is undefined. + // + // Note: This test documents the expected behavior after the fix. + // The actual fix is applied in the plugin's bindStart method. + }) + }) + + it('should maintain DSM context isolation between concurrent handlers', async () => { + /** + * This test simulates two concurrent Kafka message handlers. + * Each handler should maintain its own DSM context throughout execution. + */ + const contextA = { hash: Buffer.from('aaaaaaaa'), pathwayStartNs: 1000, edgeStartNs: 1000 } + const contextB = { hash: Buffer.from('bbbbbbbb'), pathwayStartNs: 2000, edgeStartNs: 2000 } + + const simulateHandler = (id, dsmContext, delayMs) => { + return new Promise(resolve => { + const ctx = { + currentStore: { span: { name: `handler-${id}` } } + } + + startCh.runStores(ctx, () => { + // Set DSM context (this is what decodeDataStreamsContext does) + DataStreamsContext.setDataStreamsContext(dsmContext) + + // THE FIX: Sync DSM context to currentStore + // This is what the plugin should do after setting DSM context + ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: DataStreamsContext.getDataStreamsContext() } + + // Simulate async work (e.g., database call, HTTP request) + setTimeout(() => { + // Read DSM context - with the fix, this should return this handler's context + const store = storage('legacy').getStore() + resolve({ + id, + expectedContext: dsmContext, + retrievedContext: store?.dataStreamsContext + }) + }, delayMs) + }) + }) + } + + // Start handler A with longer processing time + const handlerAPromise = simulateHandler('A', contextA, 50) + + // Start handler B while A is still processing + await new Promise(resolve => setTimeout(resolve, 10)) + const handlerBPromise = simulateHandler('B', contextB, 10) + + const [resultA, resultB] = await Promise.all([handlerAPromise, handlerBPromise]) + + // Both handlers should see their own DSM context + assert.deepStrictEqual( + resultA.retrievedContext, + contextA, + 'Handler A should maintain its DSM context after async work' + ) + assert.deepStrictEqual( + resultB.retrievedContext, + contextB, + 'Handler B should maintain its DSM context after async work' + ) + }) + + it('should maintain DSM context through multiple async boundaries', async () => { + /** + * This test simulates a handler that goes through multiple async operations, + * which is common in NestJS applications. + */ + const contexts = [ + { hash: Buffer.from('ctx1'), pathwayStartNs: 1000, edgeStartNs: 1000 }, + { hash: Buffer.from('ctx2'), pathwayStartNs: 2000, edgeStartNs: 2000 }, + { hash: Buffer.from('ctx3'), pathwayStartNs: 3000, edgeStartNs: 3000 }, + { hash: Buffer.from('ctx4'), pathwayStartNs: 4000, edgeStartNs: 4000 } + ] + + const simulateMultiStepHandler = (id, dsmContext) => { + return new Promise(resolve => { + const ctx = { + currentStore: { span: { name: `handler-${id}` } } + } + + const observations = { + atStart: null, + afterFirstAwait: null, + afterSecondAwait: null, + atEnd: null + } + + startCh.runStores(ctx, () => { + DataStreamsContext.setDataStreamsContext(dsmContext) + ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: DataStreamsContext.getDataStreamsContext() } + + observations.atStart = storage('legacy').getStore()?.dataStreamsContext + + // First async operation + setTimeout(() => { + observations.afterFirstAwait = storage('legacy').getStore()?.dataStreamsContext + + // Second async operation + setTimeout(() => { + observations.afterSecondAwait = storage('legacy').getStore()?.dataStreamsContext + + // Final operation (e.g., produce) + setTimeout(() => { + observations.atEnd = storage('legacy').getStore()?.dataStreamsContext + resolve({ id, expected: dsmContext, observations }) + }, 5) + }, 10) + }, 15) + }) + }) + } + + // Start all handlers concurrently + const promises = contexts.map((ctx, i) => simulateMultiStepHandler(i, ctx)) + const results = await Promise.all(promises) + + // Each handler should maintain its context through all async boundaries + for (let i = 0; i < results.length; i++) { + const { id, expected, observations } = results[i] + assert.deepStrictEqual(observations.atStart, expected, `Handler ${id}: context at start`) + assert.deepStrictEqual(observations.afterFirstAwait, expected, `Handler ${id}: context after first await`) + assert.deepStrictEqual(observations.afterSecondAwait, expected, `Handler ${id}: context after second await`) + assert.deepStrictEqual(observations.atEnd, expected, `Handler ${id}: context at end`) + } + }) + }) + + describe('without fix (demonstrates the bug pattern)', () => { + it('should show that enterWith alone does not update ctx.currentStore', () => { + /** + * This test demonstrates the structural issue that causes the bug. + * When setDataStreamsContext uses enterWith, it modifies AsyncLocalStorage + * but ctx.currentStore remains unchanged. + */ + const ctx = { + currentStore: { span: { name: 'test-span' } } + } + + const dsmContext = { + hash: Buffer.from('testhash'), + pathwayStartNs: 1000, + edgeStartNs: 1000 + } + + startCh.runStores(ctx, () => { + // Before setting DSM context + assert.strictEqual( + ctx.currentStore.dataStreamsContext, + undefined, + 'ctx.currentStore should not have dataStreamsContext initially' + ) + + // Set DSM context via enterWith (this is what setDataStreamsContext does) + DataStreamsContext.setDataStreamsContext(dsmContext) + + // The DSM context is in AsyncLocalStorage + const alsContext = DataStreamsContext.getDataStreamsContext() + assert.deepStrictEqual(alsContext, dsmContext, 'AsyncLocalStorage should have DSM context') + + // BUT ctx.currentStore still does NOT have it - this is the bug! + // The ctx.currentStore is what was passed to runStores and what will be + // returned from bindStart. Without explicit syncing, it won't have the + // DSM context. + assert.strictEqual( + ctx.currentStore.dataStreamsContext, + undefined, + 'BUG: ctx.currentStore does not have dataStreamsContext after enterWith' + ) + + // The fix is to explicitly sync: ctx.currentStore = { ...ctx.currentStore, dataStreamsContext } + }) + }) + }) +}) From df9f01e9966372642b08fa71b03f1956fd66a729 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 12:11:22 -0500 Subject: [PATCH 3/9] fix(dsm): sync DSM context to currentStore to prevent context leaking Add syncToStore helper in DSM context module that syncs DSM context from AsyncLocalStorage to ctx.currentStore after DSM operations. This fixes a race condition where DSM context was being set via enterWith() but not synced to ctx.currentStore, which is what gets bound to async continuations via store.run(). Without syncing, DSM context would leak between concurrent message handlers. Updated plugins: - kafkajs (consumer, producer) - amqplib (consumer, producer) - bullmq (consumer, producer) - rhea (consumer) - google-cloud-pubsub (consumer, producer) - aws-sdk (sqs, kinesis) Co-Authored-By: Claude Opus 4.5 --- .../datadog-plugin-amqplib/src/consumer.js | 2 ++ .../datadog-plugin-amqplib/src/producer.js | 2 ++ .../src/services/kinesis.js | 7 +++++++ .../src/services/sqs.js | 7 +++++++ .../datadog-plugin-bullmq/src/consumer.js | 2 ++ .../datadog-plugin-bullmq/src/producer.js | 2 ++ .../src/consumer.js | 2 ++ .../src/producer.js | 5 +++++ .../datadog-plugin-kafkajs/src/consumer.js | 8 ++------ .../datadog-plugin-kafkajs/src/producer.js | 7 ++----- packages/datadog-plugin-rhea/src/consumer.js | 2 ++ packages/dd-trace/src/datastreams/context.js | 20 +++++++++++++++++++ .../context-race-condition.spec.js | 18 ++++++++--------- 13 files changed, 64 insertions(+), 20 deletions(-) diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index fe0cbf89f44..29dab476c17 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -3,6 +3,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') const { getResourceName } = require('./util') class AmqplibConsumerPlugin extends ConsumerPlugin { @@ -38,6 +39,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(message.properties.headers) this.tracer .setCheckpoint(['direction:in', `topic:${queueName}`, 'type:rabbitmq'], span, payloadSize) + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-amqplib/src/producer.js b/packages/datadog-plugin-amqplib/src/producer.js index d23cc2bf9cc..ec9b25c10a7 100644 --- a/packages/datadog-plugin-amqplib/src/producer.js +++ b/packages/datadog-plugin-amqplib/src/producer.js @@ -4,6 +4,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants') const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') const { getResourceName } = require('./util') class AmqplibProducerPlugin extends ProducerPlugin { @@ -50,6 +51,7 @@ class AmqplibProducerPlugin extends ProducerPlugin { ['direction:out', exchangeOrTopicTag, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq'] , span, payloadSize) DsmPathwayCodec.encode(dataStreamsContext, fields.headers) + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index b866182529f..8ea5a85f8dd 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -1,5 +1,6 @@ 'use strict' const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams') +const { syncToStore } = require('../../../dd-trace/src/datastreams/context') const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') @@ -51,6 +52,12 @@ class Kinesis extends BaseAwsSdkPlugin { this.responseExtractDSMContext( request.operation, request.params, response, span || null, { streamName } ) + + if (this.config.dsmEnabled) { + const storeCtx = { currentStore: store } + syncToStore(storeCtx) + store = storeCtx.currentStore + } } return store diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index e8985d463b1..f0d6d22558e 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -3,6 +3,7 @@ const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams') +const { syncToStore } = require('../../../dd-trace/src/datastreams/context') const { extractQueueMetadata } = require('../util') class Sqs extends BaseAwsSdkPlugin { @@ -44,6 +45,12 @@ class Sqs extends BaseAwsSdkPlugin { request.operation, request.params, response, span || null, { parsedAttributes: parsedMessageAttributes } ) + if (this.config.dsmEnabled) { + const storeCtx = { currentStore: store } + syncToStore(storeCtx) + store = storeCtx.currentStore + } + return store }) diff --git a/packages/datadog-plugin-bullmq/src/consumer.js b/packages/datadog-plugin-bullmq/src/consumer.js index e0db4a3ccc2..d3179e3e993 100644 --- a/packages/datadog-plugin-bullmq/src/consumer.js +++ b/packages/datadog-plugin-bullmq/src/consumer.js @@ -2,6 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') class BullmqConsumerPlugin extends ConsumerPlugin { static id = 'bullmq' @@ -35,6 +36,7 @@ class BullmqConsumerPlugin extends ConsumerPlugin { if (this.config.dsmEnabled) { this.setConsumerCheckpoint(span, ctx) + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-bullmq/src/producer.js b/packages/datadog-plugin-bullmq/src/producer.js index 7fcb87641df..3776f4b2910 100644 --- a/packages/datadog-plugin-bullmq/src/producer.js +++ b/packages/datadog-plugin-bullmq/src/producer.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') class BaseBullmqProducerPlugin extends ProducerPlugin { static id = 'bullmq' @@ -27,6 +28,7 @@ class BaseBullmqProducerPlugin extends ProducerPlugin { if (this.config.dsmEnabled) { this.setProducerCheckpoint(span, ctx) + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index 2cea664d483..6de31e4fb10 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -1,6 +1,7 @@ 'use strict' const { getMessageSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const SpanContext = require('../../dd-trace/src/opentracing/span_context') const id = require('../../dd-trace/src/id') @@ -189,6 +190,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(message.attributes) this.tracer .setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index 09f50c55193..c0b4cd9ef3e 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') const id = require('../../dd-trace/src/id') class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { @@ -138,6 +139,10 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { } }) + if (this.config.dsmEnabled) { + syncToStore(ctx) + } + ctx.batchSpan = batchSpan return ctx.currentStore } diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index ef70e5cce04..21f6fcfa83d 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -2,7 +2,7 @@ const dc = require('dc-polyfill') const { getMessageSize } = require('../../dd-trace/src/datastreams') -const { getDataStreamsContext } = require('../../dd-trace/src/datastreams/context') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { convertToTextMap } = require('./utils') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') @@ -98,11 +98,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { edgeTags.push(`kafka_cluster_id:${clusterId}`) } this.tracer.setCheckpoint(edgeTags, span, payloadSize) - - // Sync DSM context to currentStore to ensure it's properly scoped - // to this handler's async continuations (fixes context leaking between - // concurrent handlers) - ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: getDataStreamsContext() } + syncToStore(ctx) } if (afterStartCh.hasSubscribers) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index b7e8385a3a2..a4b5e30d5b4 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -2,7 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') -const { getDataStreamsContext } = require('../../dd-trace/src/datastreams/context') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers' const MESSAGING_DESTINATION_KEY = 'messaging.destination.name' @@ -114,11 +114,8 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } - // Sync DSM context to currentStore to ensure it's properly scoped - // to this handler's async continuations (fixes context leaking between - // concurrent handlers) if (hasDsmContext) { - ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: getDataStreamsContext() } + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 53c194c245d..813e7302615 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -2,6 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') +const { syncToStore } = require('../../dd-trace/src/datastreams/context') class RheaConsumerPlugin extends ConsumerPlugin { static id = 'rhea' @@ -41,6 +42,7 @@ class RheaConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(msgObj.message.delivery_annotations) this.tracer .setCheckpoint(['direction:in', `topic:${name}`, 'type:rabbitmq'], span, payloadSize) + syncToStore(ctx) } return ctx.currentStore diff --git a/packages/dd-trace/src/datastreams/context.js b/packages/dd-trace/src/datastreams/context.js index be4eb35f721..a011693fb32 100644 --- a/packages/dd-trace/src/datastreams/context.js +++ b/packages/dd-trace/src/datastreams/context.js @@ -14,7 +14,27 @@ function setDataStreamsContext (dataStreamsContext) { if (dataStreamsContext) storage('legacy').enterWith({ ...(storage('legacy').getStore()), dataStreamsContext }) } +/** + * Syncs the current DSM context from AsyncLocalStorage to ctx.currentStore. + * + * This is necessary because setDataStreamsContext uses enterWith() which modifies + * AsyncLocalStorage directly, but ctx.currentStore (returned from bindStart) is what + * gets bound to async continuations via store.run(). Without syncing, DSM context + * would not be properly scoped to each handler's async continuations. + * + * @param {object} ctx - The context object containing currentStore + * @returns {object|undefined} The updated currentStore, or undefined if no sync occurred + */ +function syncToStore (ctx) { + const dsmContext = getDataStreamsContext() + if (dsmContext && ctx?.currentStore) { + ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: dsmContext } + } + return ctx?.currentStore +} + module.exports = { getDataStreamsContext, setDataStreamsContext, + syncToStore, } diff --git a/packages/dd-trace/test/datastreams/context-race-condition.spec.js b/packages/dd-trace/test/datastreams/context-race-condition.spec.js index f90b929757f..71d1f0f1c16 100644 --- a/packages/dd-trace/test/datastreams/context-race-condition.spec.js +++ b/packages/dd-trace/test/datastreams/context-race-condition.spec.js @@ -45,13 +45,13 @@ describe('DSM Context Propagation', () => { * ctx.currentStore so it's properly bound for async continuations. */ const ctx = { - currentStore: { span: { name: 'test-span' } } + currentStore: { span: { name: 'test-span' } }, } const dsmContext = { hash: Buffer.from('testhash'), pathwayStartNs: 1000, - edgeStartNs: 1000 + edgeStartNs: 1000, } startCh.runStores(ctx, () => { @@ -84,7 +84,7 @@ describe('DSM Context Propagation', () => { const simulateHandler = (id, dsmContext, delayMs) => { return new Promise(resolve => { const ctx = { - currentStore: { span: { name: `handler-${id}` } } + currentStore: { span: { name: `handler-${id}` } }, } startCh.runStores(ctx, () => { @@ -102,7 +102,7 @@ describe('DSM Context Propagation', () => { resolve({ id, expectedContext: dsmContext, - retrievedContext: store?.dataStreamsContext + retrievedContext: store?.dataStreamsContext, }) }, delayMs) }) @@ -140,20 +140,20 @@ describe('DSM Context Propagation', () => { { hash: Buffer.from('ctx1'), pathwayStartNs: 1000, edgeStartNs: 1000 }, { hash: Buffer.from('ctx2'), pathwayStartNs: 2000, edgeStartNs: 2000 }, { hash: Buffer.from('ctx3'), pathwayStartNs: 3000, edgeStartNs: 3000 }, - { hash: Buffer.from('ctx4'), pathwayStartNs: 4000, edgeStartNs: 4000 } + { hash: Buffer.from('ctx4'), pathwayStartNs: 4000, edgeStartNs: 4000 }, ] const simulateMultiStepHandler = (id, dsmContext) => { return new Promise(resolve => { const ctx = { - currentStore: { span: { name: `handler-${id}` } } + currentStore: { span: { name: `handler-${id}` } }, } const observations = { atStart: null, afterFirstAwait: null, afterSecondAwait: null, - atEnd: null + atEnd: null, } startCh.runStores(ctx, () => { @@ -204,13 +204,13 @@ describe('DSM Context Propagation', () => { * but ctx.currentStore remains unchanged. */ const ctx = { - currentStore: { span: { name: 'test-span' } } + currentStore: { span: { name: 'test-span' } }, } const dsmContext = { hash: Buffer.from('testhash'), pathwayStartNs: 1000, - edgeStartNs: 1000 + edgeStartNs: 1000, } startCh.runStores(ctx, () => { From 30726c9235f38bc48c64c904941fbf4a97b94e63 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 12:17:26 -0500 Subject: [PATCH 4/9] remove comment --- packages/dd-trace/src/datastreams/context.js | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/packages/dd-trace/src/datastreams/context.js b/packages/dd-trace/src/datastreams/context.js index a011693fb32..89da06e447e 100644 --- a/packages/dd-trace/src/datastreams/context.js +++ b/packages/dd-trace/src/datastreams/context.js @@ -14,17 +14,6 @@ function setDataStreamsContext (dataStreamsContext) { if (dataStreamsContext) storage('legacy').enterWith({ ...(storage('legacy').getStore()), dataStreamsContext }) } -/** - * Syncs the current DSM context from AsyncLocalStorage to ctx.currentStore. - * - * This is necessary because setDataStreamsContext uses enterWith() which modifies - * AsyncLocalStorage directly, but ctx.currentStore (returned from bindStart) is what - * gets bound to async continuations via store.run(). Without syncing, DSM context - * would not be properly scoped to each handler's async continuations. - * - * @param {object} ctx - The context object containing currentStore - * @returns {object|undefined} The updated currentStore, or undefined if no sync occurred - */ function syncToStore (ctx) { const dsmContext = getDataStreamsContext() if (dsmContext && ctx?.currentStore) { From 20b2e55921b493116a0388e673612f157961b5a5 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 12:48:01 -0500 Subject: [PATCH 5/9] chore: remove contrived regression test The unit test was useful for validating the fix during development but is contrived and doesn't add value as a permanent test. Co-Authored-By: Claude Opus 4.5 --- .../context-race-condition.spec.js | 245 ------------------ 1 file changed, 245 deletions(-) delete mode 100644 packages/dd-trace/test/datastreams/context-race-condition.spec.js diff --git a/packages/dd-trace/test/datastreams/context-race-condition.spec.js b/packages/dd-trace/test/datastreams/context-race-condition.spec.js deleted file mode 100644 index 71d1f0f1c16..00000000000 --- a/packages/dd-trace/test/datastreams/context-race-condition.spec.js +++ /dev/null @@ -1,245 +0,0 @@ -'use strict' - -/** - * Regression test for DSM context propagation race condition. - * - * Bug: When setDataStreamsContext uses enterWith(), it modifies the AsyncLocalStorage - * but does NOT update ctx.currentStore. Since ctx.currentStore is what gets returned - * from bindStart and bound to async continuations via runStores, this creates a - * disconnect where: - * - AsyncLocalStorage has the DSM context - * - ctx.currentStore (the bound context) does NOT have the DSM context - * - * This can cause DSM context to leak between concurrent message handlers when one - * handler awaits an async operation and another handler starts processing. - */ - -const assert = require('node:assert/strict') -const { describe, it, beforeEach, afterEach } = require('mocha') -const dc = require('dc-polyfill') - -const { storage } = require('../../../datadog-core') -const DataStreamsContext = require('../../src/datastreams/context') - -describe('DSM Context Propagation', () => { - const startCh = dc.channel('test:dsm-context:start') - - beforeEach(() => { - startCh.bindStore(storage('legacy'), data => data.currentStore) - }) - - afterEach(() => { - startCh.unbindStore(storage('legacy')) - }) - - describe('ctx.currentStore synchronization (regression test)', () => { - it('should have dataStreamsContext in ctx.currentStore after setDataStreamsContext', () => { - /** - * This test verifies the fix for the DSM context propagation bug. - * - * The bug: setDataStreamsContext uses enterWith() which modifies AsyncLocalStorage - * but does NOT update ctx.currentStore. The returned ctx.currentStore is what - * gets bound to async continuations, so DSM context was not properly scoped. - * - * The fix: After calling setDataStreamsContext, sync the DSM context to - * ctx.currentStore so it's properly bound for async continuations. - */ - const ctx = { - currentStore: { span: { name: 'test-span' } }, - } - - const dsmContext = { - hash: Buffer.from('testhash'), - pathwayStartNs: 1000, - edgeStartNs: 1000, - } - - startCh.runStores(ctx, () => { - // Simulate what the plugin does: set DSM context via enterWith - DataStreamsContext.setDataStreamsContext(dsmContext) - - // The DSM context should be accessible via getDataStreamsContext - const retrievedContext = DataStreamsContext.getDataStreamsContext() - assert.deepStrictEqual(retrievedContext, dsmContext, 'DSM context should be retrievable') - - // BUG: ctx.currentStore does NOT have dataStreamsContext after setDataStreamsContext - // This is the structural issue - enterWith modifies AsyncLocalStorage but not ctx.currentStore - // - // With the fix applied in the plugin (syncing DSM context to ctx.currentStore), - // this would pass. Without the fix, ctx.currentStore.dataStreamsContext is undefined. - // - // Note: This test documents the expected behavior after the fix. - // The actual fix is applied in the plugin's bindStart method. - }) - }) - - it('should maintain DSM context isolation between concurrent handlers', async () => { - /** - * This test simulates two concurrent Kafka message handlers. - * Each handler should maintain its own DSM context throughout execution. - */ - const contextA = { hash: Buffer.from('aaaaaaaa'), pathwayStartNs: 1000, edgeStartNs: 1000 } - const contextB = { hash: Buffer.from('bbbbbbbb'), pathwayStartNs: 2000, edgeStartNs: 2000 } - - const simulateHandler = (id, dsmContext, delayMs) => { - return new Promise(resolve => { - const ctx = { - currentStore: { span: { name: `handler-${id}` } }, - } - - startCh.runStores(ctx, () => { - // Set DSM context (this is what decodeDataStreamsContext does) - DataStreamsContext.setDataStreamsContext(dsmContext) - - // THE FIX: Sync DSM context to currentStore - // This is what the plugin should do after setting DSM context - ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: DataStreamsContext.getDataStreamsContext() } - - // Simulate async work (e.g., database call, HTTP request) - setTimeout(() => { - // Read DSM context - with the fix, this should return this handler's context - const store = storage('legacy').getStore() - resolve({ - id, - expectedContext: dsmContext, - retrievedContext: store?.dataStreamsContext, - }) - }, delayMs) - }) - }) - } - - // Start handler A with longer processing time - const handlerAPromise = simulateHandler('A', contextA, 50) - - // Start handler B while A is still processing - await new Promise(resolve => setTimeout(resolve, 10)) - const handlerBPromise = simulateHandler('B', contextB, 10) - - const [resultA, resultB] = await Promise.all([handlerAPromise, handlerBPromise]) - - // Both handlers should see their own DSM context - assert.deepStrictEqual( - resultA.retrievedContext, - contextA, - 'Handler A should maintain its DSM context after async work' - ) - assert.deepStrictEqual( - resultB.retrievedContext, - contextB, - 'Handler B should maintain its DSM context after async work' - ) - }) - - it('should maintain DSM context through multiple async boundaries', async () => { - /** - * This test simulates a handler that goes through multiple async operations, - * which is common in NestJS applications. - */ - const contexts = [ - { hash: Buffer.from('ctx1'), pathwayStartNs: 1000, edgeStartNs: 1000 }, - { hash: Buffer.from('ctx2'), pathwayStartNs: 2000, edgeStartNs: 2000 }, - { hash: Buffer.from('ctx3'), pathwayStartNs: 3000, edgeStartNs: 3000 }, - { hash: Buffer.from('ctx4'), pathwayStartNs: 4000, edgeStartNs: 4000 }, - ] - - const simulateMultiStepHandler = (id, dsmContext) => { - return new Promise(resolve => { - const ctx = { - currentStore: { span: { name: `handler-${id}` } }, - } - - const observations = { - atStart: null, - afterFirstAwait: null, - afterSecondAwait: null, - atEnd: null, - } - - startCh.runStores(ctx, () => { - DataStreamsContext.setDataStreamsContext(dsmContext) - ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: DataStreamsContext.getDataStreamsContext() } - - observations.atStart = storage('legacy').getStore()?.dataStreamsContext - - // First async operation - setTimeout(() => { - observations.afterFirstAwait = storage('legacy').getStore()?.dataStreamsContext - - // Second async operation - setTimeout(() => { - observations.afterSecondAwait = storage('legacy').getStore()?.dataStreamsContext - - // Final operation (e.g., produce) - setTimeout(() => { - observations.atEnd = storage('legacy').getStore()?.dataStreamsContext - resolve({ id, expected: dsmContext, observations }) - }, 5) - }, 10) - }, 15) - }) - }) - } - - // Start all handlers concurrently - const promises = contexts.map((ctx, i) => simulateMultiStepHandler(i, ctx)) - const results = await Promise.all(promises) - - // Each handler should maintain its context through all async boundaries - for (let i = 0; i < results.length; i++) { - const { id, expected, observations } = results[i] - assert.deepStrictEqual(observations.atStart, expected, `Handler ${id}: context at start`) - assert.deepStrictEqual(observations.afterFirstAwait, expected, `Handler ${id}: context after first await`) - assert.deepStrictEqual(observations.afterSecondAwait, expected, `Handler ${id}: context after second await`) - assert.deepStrictEqual(observations.atEnd, expected, `Handler ${id}: context at end`) - } - }) - }) - - describe('without fix (demonstrates the bug pattern)', () => { - it('should show that enterWith alone does not update ctx.currentStore', () => { - /** - * This test demonstrates the structural issue that causes the bug. - * When setDataStreamsContext uses enterWith, it modifies AsyncLocalStorage - * but ctx.currentStore remains unchanged. - */ - const ctx = { - currentStore: { span: { name: 'test-span' } }, - } - - const dsmContext = { - hash: Buffer.from('testhash'), - pathwayStartNs: 1000, - edgeStartNs: 1000, - } - - startCh.runStores(ctx, () => { - // Before setting DSM context - assert.strictEqual( - ctx.currentStore.dataStreamsContext, - undefined, - 'ctx.currentStore should not have dataStreamsContext initially' - ) - - // Set DSM context via enterWith (this is what setDataStreamsContext does) - DataStreamsContext.setDataStreamsContext(dsmContext) - - // The DSM context is in AsyncLocalStorage - const alsContext = DataStreamsContext.getDataStreamsContext() - assert.deepStrictEqual(alsContext, dsmContext, 'AsyncLocalStorage should have DSM context') - - // BUT ctx.currentStore still does NOT have it - this is the bug! - // The ctx.currentStore is what was passed to runStores and what will be - // returned from bindStart. Without explicit syncing, it won't have the - // DSM context. - assert.strictEqual( - ctx.currentStore.dataStreamsContext, - undefined, - 'BUG: ctx.currentStore does not have dataStreamsContext after enterWith' - ) - - // The fix is to explicitly sync: ctx.currentStore = { ...ctx.currentStore, dataStreamsContext } - }) - }) - }) -}) From 530a65819ebaa62f5f51b6b67bf31326bcd0486b Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 13:00:47 -0500 Subject: [PATCH 6/9] test(dsm): add unit tests for syncToStore and integration spy tests Add comprehensive tests for the new syncToStore helper: - Unit tests for syncToStore in context.spec.js covering normal operation, edge cases, and integration with setDataStreamsContext - Spy tests in 6 integration test files to verify syncToStore is called after produce and consume operations Co-Authored-By: Claude Opus 4.5 --- .../datadog-plugin-amqplib/test/dsm.spec.js | 38 ++++++ .../test/kinesis.dsm.spec.js | 33 ++++++ .../test/sqs.dsm.spec.js | 42 +++++++ .../datadog-plugin-bullmq/test/dsm.spec.js | 14 +++ .../test/dsm.spec.js | 25 ++++ .../datadog-plugin-kafkajs/test/dsm.spec.js | 18 +++ .../dd-trace/test/datastreams/context.spec.js | 111 ++++++++++++++++++ 7 files changed, 281 insertions(+) create mode 100644 packages/dd-trace/test/datastreams/context.spec.js diff --git a/packages/datadog-plugin-amqplib/test/dsm.spec.js b/packages/datadog-plugin-amqplib/test/dsm.spec.js index 9865453f5be..5772c3dde1e 100644 --- a/packages/datadog-plugin-amqplib/test/dsm.spec.js +++ b/packages/datadog-plugin-amqplib/test/dsm.spec.js @@ -4,7 +4,9 @@ const assert = require('node:assert/strict') const { Buffer } = require('node:buffer') const { afterEach, beforeEach, describe, it } = require('mocha') +const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') @@ -275,6 +277,42 @@ describe('Plugin', () => { }) }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('Should call syncToStore after producing', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('syncToStore test')) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + done() + }) + }) + + it('Should call syncToStore after consuming', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('syncToStore test')) + channel.consume(ok.queue, () => { + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + done() + }, {}, (err) => { + if (err) done(err) + }) + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js index e26cf84f9ff..8964188b0f2 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -5,6 +5,7 @@ const assert = require('node:assert/strict') const { afterEach, beforeEach, describe, it } = require('mocha') const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { assertObjectContains } = require('../../../integration-tests/helpers') const { withVersions } = require('../../dd-trace/test/setup/mocha') const agent = require('../../dd-trace/test/plugins/agent') @@ -237,6 +238,38 @@ describe('Kinesis', function () { // Swallow the error as it doesn't matter for this test. }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('Should call syncToStore after putRecord', done => { + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err) => { + if (err) return done(err) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on putRecord') + done() + }) + }) + + it('Should call syncToStore after getRecord', done => { + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + helpers.getTestData(kinesis, streamNameDSM, data, (err) => { + if (err) return done(err) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on getRecord') + done() + }) + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index f7c7e7c57ce..19ecc263561 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -7,6 +7,7 @@ const { after, afterEach, before, beforeEach, describe, it } = require('mocha') const semver = require('semver') const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const agent = require('../../dd-trace/test/plugins/agent') @@ -301,6 +302,47 @@ describe('Plugin', () => { nowStub.restore() }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('Should call syncToStore after sending a message', done => { + sqs.sendMessage({ + MessageBody: 'syncToStore test', + QueueUrl: QueueUrlDsm, + }, (err) => { + if (err) return done(err) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on send') + done() + }) + }) + + it('Should call syncToStore after receiving a message', done => { + sqs.sendMessage({ + MessageBody: 'syncToStore test', + QueueUrl: QueueUrlDsm, + }, (err) => { + if (err) return done(err) + + sqs.receiveMessage({ + QueueUrl: QueueUrlDsm, + MessageAttributeNames: ['.*'], + }, (err) => { + if (err) return done(err) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on receive') + done() + }) + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-bullmq/test/dsm.spec.js b/packages/datadog-plugin-bullmq/test/dsm.spec.js index 966244b0dbe..2b3427cfa20 100644 --- a/packages/datadog-plugin-bullmq/test/dsm.spec.js +++ b/packages/datadog-plugin-bullmq/test/dsm.spec.js @@ -45,13 +45,16 @@ createIntegrationTestSuite('bullmq', 'bullmq', { describe('checkpoints', () => { let setDataStreamsContextSpy + let syncToStoreSpy beforeEach(() => { setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') }) afterEach(() => { setDataStreamsContextSpy.restore() + syncToStoreSpy.restore() }) it('should set a checkpoint on produce (Queue.add)', async () => { @@ -76,6 +79,17 @@ createIntegrationTestSuite('bullmq', 'bullmq', { }) assert.ok(consumerCall, 'Consumer checkpoint call not found') }) + + it('should call syncToStore after producing', async () => { + await testSetup.queueAdd() + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + }) + + it('should call syncToStore after consuming', async function () { + this.timeout(10000) + await testSetup.workerProcessJob() + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + }) }) describe('payload size', () => { diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js index 8572e24ca16..f73297e1344 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -5,6 +5,7 @@ const assert = require('node:assert/strict') const { after, before, beforeEach, describe, it } = require('mocha') const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') @@ -148,6 +149,30 @@ describe('Plugin', () => { }) }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('should call syncToStore after producing', async () => { + await publish(dsmTopic, { data: Buffer.from('syncToStore produce test') }) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + }) + + it('should call syncToStore after consuming', async () => { + await publish(dsmTopic, { data: Buffer.from('syncToStore consume test') }) + await consume(async () => { + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 685fb2f8498..cf46fecdce2 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -80,6 +80,7 @@ describe('Plugin', () => { describe('checkpoints', () => { let consumer let setDataStreamsContextSpy + let syncToStoreSpy beforeEach(async () => { tracer.init() @@ -88,10 +89,12 @@ describe('Plugin', () => { await consumer.connect() await consumer.subscribe({ topic: testTopic }) setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') }) afterEach(async () => { setDataStreamsContextSpy.restore() + syncToStoreSpy.restore() await consumer.disconnect() }) @@ -154,6 +157,21 @@ describe('Plugin', () => { }, }) }) + + it('Should call syncToStore after producing', async () => { + const messages = [{ key: 'syncTest1', value: 'test' }] + await sendMessages(kafka, testTopic, messages) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + }) + + it('Should call syncToStore after consuming', async () => { + await consumer.run({ + eachMessage: async () => {}, + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + }) }) describe('backlogs', () => { diff --git a/packages/dd-trace/test/datastreams/context.spec.js b/packages/dd-trace/test/datastreams/context.spec.js new file mode 100644 index 00000000000..b30025218ef --- /dev/null +++ b/packages/dd-trace/test/datastreams/context.spec.js @@ -0,0 +1,111 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { describe, it, beforeEach, afterEach } = require('mocha') + +const { storage } = require('../../../datadog-core') +const { + getDataStreamsContext, + setDataStreamsContext, + syncToStore, +} = require('../../src/datastreams/context') + +describe('DSM Context', () => { + let originalStore + + beforeEach(() => { + originalStore = storage('legacy').getStore() + }) + + afterEach(() => { + storage('legacy').enterWith(originalStore) + }) + + describe('syncToStore', () => { + it('should sync DSM context from AsyncLocalStorage to ctx.currentStore', () => { + const dsmContext = { + hash: Buffer.from('testhash'), + pathwayStartNs: 1000, + edgeStartNs: 2000, + } + + // Set DSM context via enterWith (simulating what setDataStreamsContext does) + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + // ctx.currentStore doesn't have DSM context yet + const ctx = { currentStore: { span: { name: 'test-span' } } } + + // syncToStore should copy DSM context to ctx.currentStore + syncToStore(ctx) + + assert.deepStrictEqual(ctx.currentStore.dataStreamsContext, dsmContext) + assert.strictEqual(ctx.currentStore.span.name, 'test-span') + }) + + it('should not modify ctx.currentStore if no DSM context exists', () => { + storage('legacy').enterWith({}) + + const ctx = { currentStore: { span: { name: 'test-span' } } } + syncToStore(ctx) + + assert.strictEqual(ctx.currentStore.dataStreamsContext, undefined) + assert.strictEqual(ctx.currentStore.span.name, 'test-span') + }) + + it('should handle missing ctx.currentStore gracefully', () => { + const dsmContext = { hash: Buffer.from('test') } + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + const ctx = {} + const result = syncToStore(ctx) + + assert.strictEqual(result, undefined) + }) + + it('should handle null ctx gracefully', () => { + const dsmContext = { hash: Buffer.from('test') } + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + const result = syncToStore(null) + + assert.strictEqual(result, undefined) + }) + + it('should return the updated currentStore', () => { + const dsmContext = { hash: Buffer.from('test') } + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + const ctx = { currentStore: { span: {} } } + const result = syncToStore(ctx) + + assert.strictEqual(result, ctx.currentStore) + assert.deepStrictEqual(result.dataStreamsContext, dsmContext) + }) + + it('should work with setDataStreamsContext', () => { + const dsmContext = { + hash: Buffer.from('realcontext'), + pathwayStartNs: 5000, + edgeStartNs: 6000, + } + + // Initialize store + storage('legacy').enterWith({ span: {} }) + + // This is what plugins do: set DSM context via the API + setDataStreamsContext(dsmContext) + + // Verify it's in AsyncLocalStorage + assert.deepStrictEqual(getDataStreamsContext(), dsmContext) + + // But ctx.currentStore (captured earlier) doesn't have it + const ctx = { currentStore: { span: { name: 'handler-span' } } } + + // syncToStore fixes this + syncToStore(ctx) + + assert.deepStrictEqual(ctx.currentStore.dataStreamsContext, dsmContext) + }) + }) +}) From 5ed719cc497ece9c52f60e9a52cce1df366896d8 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 13:45:44 -0500 Subject: [PATCH 7/9] fix(dsm): call syncToStore via module object for testability Change all plugins to call DataStreamsContext.syncToStore(ctx) instead of destructuring syncToStore at import time. This allows sinon spies to intercept calls during testing. When functions are destructured at require-time, they bind to the original function reference. Spies set up later on the module object don't affect these bindings. Calling via the module object ensures spies work correctly. Co-Authored-By: Claude Opus 4.5 --- packages/datadog-plugin-amqplib/src/consumer.js | 4 ++-- packages/datadog-plugin-amqplib/src/producer.js | 4 ++-- packages/datadog-plugin-aws-sdk/src/services/kinesis.js | 4 ++-- packages/datadog-plugin-aws-sdk/src/services/sqs.js | 4 ++-- packages/datadog-plugin-bullmq/src/consumer.js | 4 ++-- packages/datadog-plugin-bullmq/src/producer.js | 4 ++-- packages/datadog-plugin-google-cloud-pubsub/src/consumer.js | 4 ++-- packages/datadog-plugin-google-cloud-pubsub/src/producer.js | 4 ++-- packages/datadog-plugin-kafkajs/src/consumer.js | 4 ++-- packages/datadog-plugin-kafkajs/src/producer.js | 4 ++-- packages/datadog-plugin-rhea/src/consumer.js | 4 ++-- 11 files changed, 22 insertions(+), 22 deletions(-) diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index 29dab476c17..a2880b17b98 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -3,7 +3,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { getResourceName } = require('./util') class AmqplibConsumerPlugin extends ConsumerPlugin { @@ -39,7 +39,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(message.properties.headers) this.tracer .setCheckpoint(['direction:in', `topic:${queueName}`, 'type:rabbitmq'], span, payloadSize) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-amqplib/src/producer.js b/packages/datadog-plugin-amqplib/src/producer.js index ec9b25c10a7..47eed959e3f 100644 --- a/packages/datadog-plugin-amqplib/src/producer.js +++ b/packages/datadog-plugin-amqplib/src/producer.js @@ -4,7 +4,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants') const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { getResourceName } = require('./util') class AmqplibProducerPlugin extends ProducerPlugin { @@ -51,7 +51,7 @@ class AmqplibProducerPlugin extends ProducerPlugin { ['direction:out', exchangeOrTopicTag, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq'] , span, payloadSize) DsmPathwayCodec.encode(dataStreamsContext, fields.headers) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index 8ea5a85f8dd..3d45a1c6def 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -1,6 +1,6 @@ 'use strict' const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams') -const { syncToStore } = require('../../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../../dd-trace/src/datastreams/context') const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') @@ -55,7 +55,7 @@ class Kinesis extends BaseAwsSdkPlugin { if (this.config.dsmEnabled) { const storeCtx = { currentStore: store } - syncToStore(storeCtx) + DataStreamsContext.syncToStore(storeCtx) store = storeCtx.currentStore } } diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index f0d6d22558e..1f1a6782e3e 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -3,7 +3,7 @@ const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams') -const { syncToStore } = require('../../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../../dd-trace/src/datastreams/context') const { extractQueueMetadata } = require('../util') class Sqs extends BaseAwsSdkPlugin { @@ -47,7 +47,7 @@ class Sqs extends BaseAwsSdkPlugin { if (this.config.dsmEnabled) { const storeCtx = { currentStore: store } - syncToStore(storeCtx) + DataStreamsContext.syncToStore(storeCtx) store = storeCtx.currentStore } diff --git a/packages/datadog-plugin-bullmq/src/consumer.js b/packages/datadog-plugin-bullmq/src/consumer.js index d3179e3e993..c75af6459c5 100644 --- a/packages/datadog-plugin-bullmq/src/consumer.js +++ b/packages/datadog-plugin-bullmq/src/consumer.js @@ -2,7 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') class BullmqConsumerPlugin extends ConsumerPlugin { static id = 'bullmq' @@ -36,7 +36,7 @@ class BullmqConsumerPlugin extends ConsumerPlugin { if (this.config.dsmEnabled) { this.setConsumerCheckpoint(span, ctx) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-bullmq/src/producer.js b/packages/datadog-plugin-bullmq/src/producer.js index 3776f4b2910..d3d7365aad4 100644 --- a/packages/datadog-plugin-bullmq/src/producer.js +++ b/packages/datadog-plugin-bullmq/src/producer.js @@ -2,7 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') class BaseBullmqProducerPlugin extends ProducerPlugin { static id = 'bullmq' @@ -28,7 +28,7 @@ class BaseBullmqProducerPlugin extends ProducerPlugin { if (this.config.dsmEnabled) { this.setProducerCheckpoint(span, ctx) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index 6de31e4fb10..990de85bd7f 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -1,7 +1,7 @@ 'use strict' const { getMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const SpanContext = require('../../dd-trace/src/opentracing/span_context') const id = require('../../dd-trace/src/id') @@ -190,7 +190,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(message.attributes) this.tracer .setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index c0b4cd9ef3e..f3ee14d70f7 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -2,7 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const id = require('../../dd-trace/src/id') class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { @@ -140,7 +140,7 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { }) if (this.config.dsmEnabled) { - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } ctx.batchSpan = batchSpan diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 21f6fcfa83d..c511a8feb75 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -2,7 +2,7 @@ const dc = require('dc-polyfill') const { getMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { convertToTextMap } = require('./utils') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') @@ -98,7 +98,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { edgeTags.push(`kafka_cluster_id:${clusterId}`) } this.tracer.setCheckpoint(edgeTags, span, payloadSize) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } if (afterStartCh.hasSubscribers) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index a4b5e30d5b4..4994864c1fb 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -2,7 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers' const MESSAGING_DESTINATION_KEY = 'messaging.destination.name' @@ -115,7 +115,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { } if (hasDsmContext) { - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 813e7302615..bfad4176363 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -2,7 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') -const { syncToStore } = require('../../dd-trace/src/datastreams/context') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') class RheaConsumerPlugin extends ConsumerPlugin { static id = 'rhea' @@ -42,7 +42,7 @@ class RheaConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(msgObj.message.delivery_annotations) this.tracer .setCheckpoint(['direction:in', `topic:${name}`, 'type:rabbitmq'], span, payloadSize) - syncToStore(ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore From e5cddf74c1049492dd85378fae1d95b13442b384 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 14:05:37 -0500 Subject: [PATCH 8/9] fix(dsm): remove invalid producer-side syncToStore tests for AWS SDK The syncToStore fix only applies to the consumer path where bindStart is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers, which doesn't need context synchronization since: 1. requestInject is called before the request is sent 2. DSM context is encoded directly into the message 3. There's no async continuation where context leaking would occur Co-Authored-By: Claude Opus 4.5 --- .../datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js | 10 ++-------- .../datadog-plugin-aws-sdk/test/sqs.dsm.spec.js | 13 ++----------- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js index 8964188b0f2..7b7e6e53a58 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -250,14 +250,8 @@ describe('Kinesis', function () { syncToStoreSpy.restore() }) - it('Should call syncToStore after putRecord', done => { - helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err) => { - if (err) return done(err) - assert.ok(syncToStoreSpy.called, 'syncToStore should be called on putRecord') - done() - }) - }) - + // Note: syncToStore is only called on the consumer path (getRecords), not on putRecord + // because putRecord uses requestInject which doesn't need context synchronization it('Should call syncToStore after getRecord', done => { helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { if (err) return done(err) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index 19ecc263561..c4638a87bb9 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -314,17 +314,8 @@ describe('Plugin', () => { syncToStoreSpy.restore() }) - it('Should call syncToStore after sending a message', done => { - sqs.sendMessage({ - MessageBody: 'syncToStore test', - QueueUrl: QueueUrlDsm, - }, (err) => { - if (err) return done(err) - assert.ok(syncToStoreSpy.called, 'syncToStore should be called on send') - done() - }) - }) - + // Note: syncToStore is only called on the consumer path (receiveMessage), not on sendMessage + // because sendMessage uses requestInject which doesn't need context synchronization it('Should call syncToStore after receiving a message', done => { sqs.sendMessage({ MessageBody: 'syncToStore test', From bc56828b5ee9c48feb8106a1aa4715c056a556f8 Mon Sep 17 00:00:00 2001 From: Rob Carlan Date: Mon, 2 Feb 2026 14:10:52 -0500 Subject: [PATCH 9/9] chore: remove unnecessary comments from test files Co-Authored-By: Claude Opus 4.5 --- packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js | 2 -- packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js | 2 -- 2 files changed, 4 deletions(-) diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js index 7b7e6e53a58..4bedaa0be4f 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -250,8 +250,6 @@ describe('Kinesis', function () { syncToStoreSpy.restore() }) - // Note: syncToStore is only called on the consumer path (getRecords), not on putRecord - // because putRecord uses requestInject which doesn't need context synchronization it('Should call syncToStore after getRecord', done => { helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { if (err) return done(err) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index c4638a87bb9..8eb152e3550 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -314,8 +314,6 @@ describe('Plugin', () => { syncToStoreSpy.restore() }) - // Note: syncToStore is only called on the consumer path (receiveMessage), not on sendMessage - // because sendMessage uses requestInject which doesn't need context synchronization it('Should call syncToStore after receiving a message', done => { sqs.sendMessage({ MessageBody: 'syncToStore test',