diff --git a/packages/datadog-plugin-amqplib/src/consumer.js b/packages/datadog-plugin-amqplib/src/consumer.js index fe0cbf89f44..a2880b17b98 100644 --- a/packages/datadog-plugin-amqplib/src/consumer.js +++ b/packages/datadog-plugin-amqplib/src/consumer.js @@ -3,6 +3,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { getResourceName } = require('./util') class AmqplibConsumerPlugin extends ConsumerPlugin { @@ -38,6 +39,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(message.properties.headers) this.tracer .setCheckpoint(['direction:in', `topic:${queueName}`, 'type:rabbitmq'], span, payloadSize) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-amqplib/src/producer.js b/packages/datadog-plugin-amqplib/src/producer.js index d23cc2bf9cc..47eed959e3f 100644 --- a/packages/datadog-plugin-amqplib/src/producer.js +++ b/packages/datadog-plugin-amqplib/src/producer.js @@ -4,6 +4,7 @@ const { TEXT_MAP } = require('../../../ext/formats') const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants') const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { getResourceName } = require('./util') class AmqplibProducerPlugin extends ProducerPlugin { @@ -50,6 +51,7 @@ class AmqplibProducerPlugin extends ProducerPlugin { ['direction:out', exchangeOrTopicTag, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq'] , span, payloadSize) DsmPathwayCodec.encode(dataStreamsContext, fields.headers) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-amqplib/test/dsm.spec.js b/packages/datadog-plugin-amqplib/test/dsm.spec.js index 9865453f5be..5772c3dde1e 100644 --- a/packages/datadog-plugin-amqplib/test/dsm.spec.js +++ b/packages/datadog-plugin-amqplib/test/dsm.spec.js @@ -4,7 +4,9 @@ const assert = require('node:assert/strict') const { Buffer } = require('node:buffer') const { afterEach, beforeEach, describe, it } = require('mocha') +const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') @@ -275,6 +277,42 @@ describe('Plugin', () => { }) }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('Should call syncToStore after producing', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('syncToStore test')) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + done() + }) + }) + + it('Should call syncToStore after consuming', (done) => { + channel.assertQueue(queue, {}, (err, ok) => { + if (err) return done(err) + + channel.sendToQueue(ok.queue, Buffer.from('syncToStore test')) + channel.consume(ok.queue, () => { + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + done() + }, {}, (err) => { + if (err) done(err) + }) + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js index b866182529f..3d45a1c6def 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/kinesis.js +++ b/packages/datadog-plugin-aws-sdk/src/services/kinesis.js @@ -1,5 +1,6 @@ 'use strict' const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../../dd-trace/src/datastreams/context') const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') @@ -51,6 +52,12 @@ class Kinesis extends BaseAwsSdkPlugin { this.responseExtractDSMContext( request.operation, request.params, response, span || null, { streamName } ) + + if (this.config.dsmEnabled) { + const storeCtx = { currentStore: store } + DataStreamsContext.syncToStore(storeCtx) + store = storeCtx.currentStore + } } return store diff --git a/packages/datadog-plugin-aws-sdk/src/services/sqs.js b/packages/datadog-plugin-aws-sdk/src/services/sqs.js index e8985d463b1..1f1a6782e3e 100644 --- a/packages/datadog-plugin-aws-sdk/src/services/sqs.js +++ b/packages/datadog-plugin-aws-sdk/src/services/sqs.js @@ -3,6 +3,7 @@ const log = require('../../../dd-trace/src/log') const BaseAwsSdkPlugin = require('../base') const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../../dd-trace/src/datastreams/context') const { extractQueueMetadata } = require('../util') class Sqs extends BaseAwsSdkPlugin { @@ -44,6 +45,12 @@ class Sqs extends BaseAwsSdkPlugin { request.operation, request.params, response, span || null, { parsedAttributes: parsedMessageAttributes } ) + if (this.config.dsmEnabled) { + const storeCtx = { currentStore: store } + DataStreamsContext.syncToStore(storeCtx) + store = storeCtx.currentStore + } + return store }) diff --git a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js index e26cf84f9ff..4bedaa0be4f 100644 --- a/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js @@ -5,6 +5,7 @@ const assert = require('node:assert/strict') const { afterEach, beforeEach, describe, it } = require('mocha') const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { assertObjectContains } = require('../../../integration-tests/helpers') const { withVersions } = require('../../dd-trace/test/setup/mocha') const agent = require('../../dd-trace/test/plugins/agent') @@ -237,6 +238,30 @@ describe('Kinesis', function () { // Swallow the error as it doesn't matter for this test. }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('Should call syncToStore after getRecord', done => { + helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => { + if (err) return done(err) + + helpers.getTestData(kinesis, streamNameDSM, data, (err) => { + if (err) return done(err) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on getRecord') + done() + }) + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js index f7c7e7c57ce..8eb152e3550 100644 --- a/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js +++ b/packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js @@ -7,6 +7,7 @@ const { after, afterEach, before, beforeEach, describe, it } = require('mocha') const semver = require('semver') const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor') const agent = require('../../dd-trace/test/plugins/agent') @@ -301,6 +302,36 @@ describe('Plugin', () => { nowStub.restore() }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('Should call syncToStore after receiving a message', done => { + sqs.sendMessage({ + MessageBody: 'syncToStore test', + QueueUrl: QueueUrlDsm, + }, (err) => { + if (err) return done(err) + + sqs.receiveMessage({ + QueueUrl: QueueUrlDsm, + MessageAttributeNames: ['.*'], + }, (err) => { + if (err) return done(err) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on receive') + done() + }) + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-bullmq/src/consumer.js b/packages/datadog-plugin-bullmq/src/consumer.js index e0db4a3ccc2..c75af6459c5 100644 --- a/packages/datadog-plugin-bullmq/src/consumer.js +++ b/packages/datadog-plugin-bullmq/src/consumer.js @@ -2,6 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') class BullmqConsumerPlugin extends ConsumerPlugin { static id = 'bullmq' @@ -35,6 +36,7 @@ class BullmqConsumerPlugin extends ConsumerPlugin { if (this.config.dsmEnabled) { this.setConsumerCheckpoint(span, ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-bullmq/src/producer.js b/packages/datadog-plugin-bullmq/src/producer.js index 7fcb87641df..d3d7365aad4 100644 --- a/packages/datadog-plugin-bullmq/src/producer.js +++ b/packages/datadog-plugin-bullmq/src/producer.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') class BaseBullmqProducerPlugin extends ProducerPlugin { static id = 'bullmq' @@ -27,6 +28,7 @@ class BaseBullmqProducerPlugin extends ProducerPlugin { if (this.config.dsmEnabled) { this.setProducerCheckpoint(span, ctx) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-bullmq/test/dsm.spec.js b/packages/datadog-plugin-bullmq/test/dsm.spec.js index 966244b0dbe..2b3427cfa20 100644 --- a/packages/datadog-plugin-bullmq/test/dsm.spec.js +++ b/packages/datadog-plugin-bullmq/test/dsm.spec.js @@ -45,13 +45,16 @@ createIntegrationTestSuite('bullmq', 'bullmq', { describe('checkpoints', () => { let setDataStreamsContextSpy + let syncToStoreSpy beforeEach(() => { setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') }) afterEach(() => { setDataStreamsContextSpy.restore() + syncToStoreSpy.restore() }) it('should set a checkpoint on produce (Queue.add)', async () => { @@ -76,6 +79,17 @@ createIntegrationTestSuite('bullmq', 'bullmq', { }) assert.ok(consumerCall, 'Consumer checkpoint call not found') }) + + it('should call syncToStore after producing', async () => { + await testSetup.queueAdd() + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + }) + + it('should call syncToStore after consuming', async function () { + this.timeout(10000) + await testSetup.workerProcessJob() + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + }) }) describe('payload size', () => { diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js index 2cea664d483..990de85bd7f 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/consumer.js @@ -1,6 +1,7 @@ 'use strict' const { getMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const SpanContext = require('../../dd-trace/src/opentracing/span_context') const id = require('../../dd-trace/src/id') @@ -189,6 +190,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(message.attributes) this.tracer .setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index 09f50c55193..f3ee14d70f7 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const id = require('../../dd-trace/src/id') class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { @@ -138,6 +139,10 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { } }) + if (this.config.dsmEnabled) { + DataStreamsContext.syncToStore(ctx) + } + ctx.batchSpan = batchSpan return ctx.currentStore } diff --git a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js index 8572e24ca16..f73297e1344 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js +++ b/packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js @@ -5,6 +5,7 @@ const assert = require('node:assert/strict') const { after, before, beforeEach, describe, it } = require('mocha') const sinon = require('sinon') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway') const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor') const id = require('../../dd-trace/src/id') @@ -148,6 +149,30 @@ describe('Plugin', () => { }) }) }) + + describe('syncToStore', () => { + let syncToStoreSpy + + beforeEach(() => { + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') + }) + + afterEach(() => { + syncToStoreSpy.restore() + }) + + it('should call syncToStore after producing', async () => { + await publish(dsmTopic, { data: Buffer.from('syncToStore produce test') }) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + }) + + it('should call syncToStore after consuming', async () => { + await publish(dsmTopic, { data: Buffer.from('syncToStore consume test') }) + await consume(async () => { + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + }) + }) + }) }) }) }) diff --git a/packages/datadog-plugin-kafkajs/src/consumer.js b/packages/datadog-plugin-kafkajs/src/consumer.js index 4638ab0ca8f..c511a8feb75 100644 --- a/packages/datadog-plugin-kafkajs/src/consumer.js +++ b/packages/datadog-plugin-kafkajs/src/consumer.js @@ -2,6 +2,7 @@ const dc = require('dc-polyfill') const { getMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { convertToTextMap } = require('./utils') const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart') @@ -97,6 +98,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin { edgeTags.push(`kafka_cluster_id:${clusterId}`) } this.tracer.setCheckpoint(edgeTags, span, payloadSize) + DataStreamsContext.syncToStore(ctx) } if (afterStartCh.hasSubscribers) { diff --git a/packages/datadog-plugin-kafkajs/src/producer.js b/packages/datadog-plugin-kafkajs/src/producer.js index 70d0a9bfdd0..4994864c1fb 100644 --- a/packages/datadog-plugin-kafkajs/src/producer.js +++ b/packages/datadog-plugin-kafkajs/src/producer.js @@ -2,6 +2,7 @@ const ProducerPlugin = require('../../dd-trace/src/plugins/producer') const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') const BOOTSTRAP_SERVERS_KEY = 'messaging.kafka.bootstrap.servers' const MESSAGING_DESTINATION_KEY = 'messaging.destination.name' @@ -88,6 +89,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { if (bootstrapServers) { span.setTag(BOOTSTRAP_SERVERS_KEY, bootstrapServers) } + let hasDsmContext = false for (const message of messages) { if (message !== null && typeof message === 'object') { // message headers are not supported for kafka broker versions <0.11 @@ -96,6 +98,7 @@ class KafkajsProducerPlugin extends ProducerPlugin { this.tracer.inject(span, 'text_map', message.headers) } if (this.config.dsmEnabled) { + hasDsmContext = true const payloadSize = getMessageSize(message) const edgeTags = ['direction:out', `topic:${topic}`, 'type:kafka'] @@ -111,6 +114,10 @@ class KafkajsProducerPlugin extends ProducerPlugin { } } + if (hasDsmContext) { + DataStreamsContext.syncToStore(ctx) + } + return ctx.currentStore } } diff --git a/packages/datadog-plugin-kafkajs/test/dsm.spec.js b/packages/datadog-plugin-kafkajs/test/dsm.spec.js index 685fb2f8498..cf46fecdce2 100644 --- a/packages/datadog-plugin-kafkajs/test/dsm.spec.js +++ b/packages/datadog-plugin-kafkajs/test/dsm.spec.js @@ -80,6 +80,7 @@ describe('Plugin', () => { describe('checkpoints', () => { let consumer let setDataStreamsContextSpy + let syncToStoreSpy beforeEach(async () => { tracer.init() @@ -88,10 +89,12 @@ describe('Plugin', () => { await consumer.connect() await consumer.subscribe({ topic: testTopic }) setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext') + syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore') }) afterEach(async () => { setDataStreamsContextSpy.restore() + syncToStoreSpy.restore() await consumer.disconnect() }) @@ -154,6 +157,21 @@ describe('Plugin', () => { }, }) }) + + it('Should call syncToStore after producing', async () => { + const messages = [{ key: 'syncTest1', value: 'test' }] + await sendMessages(kafka, testTopic, messages) + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce') + }) + + it('Should call syncToStore after consuming', async () => { + await consumer.run({ + eachMessage: async () => {}, + }) + await sendMessages(kafka, testTopic, messages) + await consumer.disconnect() + assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume') + }) }) describe('backlogs', () => { diff --git a/packages/datadog-plugin-rhea/src/consumer.js b/packages/datadog-plugin-rhea/src/consumer.js index 53c194c245d..bfad4176363 100644 --- a/packages/datadog-plugin-rhea/src/consumer.js +++ b/packages/datadog-plugin-rhea/src/consumer.js @@ -2,6 +2,7 @@ const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer') const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams') +const DataStreamsContext = require('../../dd-trace/src/datastreams/context') class RheaConsumerPlugin extends ConsumerPlugin { static id = 'rhea' @@ -41,6 +42,7 @@ class RheaConsumerPlugin extends ConsumerPlugin { this.tracer.decodeDataStreamsContext(msgObj.message.delivery_annotations) this.tracer .setCheckpoint(['direction:in', `topic:${name}`, 'type:rabbitmq'], span, payloadSize) + DataStreamsContext.syncToStore(ctx) } return ctx.currentStore diff --git a/packages/dd-trace/src/datastreams/context.js b/packages/dd-trace/src/datastreams/context.js index be4eb35f721..89da06e447e 100644 --- a/packages/dd-trace/src/datastreams/context.js +++ b/packages/dd-trace/src/datastreams/context.js @@ -14,7 +14,16 @@ function setDataStreamsContext (dataStreamsContext) { if (dataStreamsContext) storage('legacy').enterWith({ ...(storage('legacy').getStore()), dataStreamsContext }) } +function syncToStore (ctx) { + const dsmContext = getDataStreamsContext() + if (dsmContext && ctx?.currentStore) { + ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: dsmContext } + } + return ctx?.currentStore +} + module.exports = { getDataStreamsContext, setDataStreamsContext, + syncToStore, } diff --git a/packages/dd-trace/test/datastreams/context.spec.js b/packages/dd-trace/test/datastreams/context.spec.js new file mode 100644 index 00000000000..b30025218ef --- /dev/null +++ b/packages/dd-trace/test/datastreams/context.spec.js @@ -0,0 +1,111 @@ +'use strict' + +const assert = require('node:assert/strict') + +const { describe, it, beforeEach, afterEach } = require('mocha') + +const { storage } = require('../../../datadog-core') +const { + getDataStreamsContext, + setDataStreamsContext, + syncToStore, +} = require('../../src/datastreams/context') + +describe('DSM Context', () => { + let originalStore + + beforeEach(() => { + originalStore = storage('legacy').getStore() + }) + + afterEach(() => { + storage('legacy').enterWith(originalStore) + }) + + describe('syncToStore', () => { + it('should sync DSM context from AsyncLocalStorage to ctx.currentStore', () => { + const dsmContext = { + hash: Buffer.from('testhash'), + pathwayStartNs: 1000, + edgeStartNs: 2000, + } + + // Set DSM context via enterWith (simulating what setDataStreamsContext does) + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + // ctx.currentStore doesn't have DSM context yet + const ctx = { currentStore: { span: { name: 'test-span' } } } + + // syncToStore should copy DSM context to ctx.currentStore + syncToStore(ctx) + + assert.deepStrictEqual(ctx.currentStore.dataStreamsContext, dsmContext) + assert.strictEqual(ctx.currentStore.span.name, 'test-span') + }) + + it('should not modify ctx.currentStore if no DSM context exists', () => { + storage('legacy').enterWith({}) + + const ctx = { currentStore: { span: { name: 'test-span' } } } + syncToStore(ctx) + + assert.strictEqual(ctx.currentStore.dataStreamsContext, undefined) + assert.strictEqual(ctx.currentStore.span.name, 'test-span') + }) + + it('should handle missing ctx.currentStore gracefully', () => { + const dsmContext = { hash: Buffer.from('test') } + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + const ctx = {} + const result = syncToStore(ctx) + + assert.strictEqual(result, undefined) + }) + + it('should handle null ctx gracefully', () => { + const dsmContext = { hash: Buffer.from('test') } + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + const result = syncToStore(null) + + assert.strictEqual(result, undefined) + }) + + it('should return the updated currentStore', () => { + const dsmContext = { hash: Buffer.from('test') } + storage('legacy').enterWith({ dataStreamsContext: dsmContext }) + + const ctx = { currentStore: { span: {} } } + const result = syncToStore(ctx) + + assert.strictEqual(result, ctx.currentStore) + assert.deepStrictEqual(result.dataStreamsContext, dsmContext) + }) + + it('should work with setDataStreamsContext', () => { + const dsmContext = { + hash: Buffer.from('realcontext'), + pathwayStartNs: 5000, + edgeStartNs: 6000, + } + + // Initialize store + storage('legacy').enterWith({ span: {} }) + + // This is what plugins do: set DSM context via the API + setDataStreamsContext(dsmContext) + + // Verify it's in AsyncLocalStorage + assert.deepStrictEqual(getDataStreamsContext(), dsmContext) + + // But ctx.currentStore (captured earlier) doesn't have it + const ctx = { currentStore: { span: { name: 'handler-span' } } } + + // syncToStore fixes this + syncToStore(ctx) + + assert.deepStrictEqual(ctx.currentStore.dataStreamsContext, dsmContext) + }) + }) +})