Skip to content

fix(dsm): sync context to currentStore to prevent leaking between concurrent handlers#7395

Open
robcarlan-datadog wants to merge 10 commits intomasterfrom
rob.carlan/dsm-context-prop-race-conditions
Open

fix(dsm): sync context to currentStore to prevent leaking between concurrent handlers#7395
robcarlan-datadog wants to merge 10 commits intomasterfrom
rob.carlan/dsm-context-prop-race-conditions

Conversation

@robcarlan-datadog
Copy link
Contributor

@robcarlan-datadog robcarlan-datadog commented Jan 30, 2026

Summary

Fixes DSM context leaking between concurrent message handlers by adding a centralized syncToStore helper that ensures DSM context is properly scoped to each handler's async continuations.

Problem

When processing concurrent messages, DSM context was being set via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore. Since ctx.currentStore is what gets returned from bindStart and bound to async continuations via store.run(), DSM context would leak between concurrent handlers.

Root cause: setDataStreamsContext uses enterWith() which modifies AsyncLocalStorage directly, but store.run() (called by the diagnostic channel's runStores) overwrites this with the value returned from bindStart. Without syncing, the DSM context set via enterWith was immediately lost.

Solution

Added syncToStore(ctx) helper in packages/dd-trace/src/datastreams/context.js:

function syncToStore (ctx) {
  const dsmContext = getDataStreamsContext()
  if (dsmContext && ctx?.currentStore) {
    ctx.currentStore = { ...ctx.currentStore, dataStreamsContext: dsmContext }
  }
  return ctx?.currentStore
}

All DSM-enabled plugins now call syncToStore(ctx) after their DSM operations complete, ensuring the context is properly bound for async continuations.

Changes

Core:

  • packages/dd-trace/src/datastreams/context.js - Added syncToStore helper

Plugins updated:

  • kafkajs (consumer, producer)
  • amqplib (consumer, producer)
  • bullmq (consumer, producer)
  • rhea (consumer)
  • google-cloud-pubsub (consumer, producer)
  • aws-sdk (sqs, kinesis)

Tests

Unit tests (packages/dd-trace/test/datastreams/context.spec.js):

  • Tests syncToStore helper syncs DSM context to ctx.currentStore
  • Tests edge cases (null ctx, missing currentStore, no DSM context)
  • Tests integration with setDataStreamsContext

Integration spy tests (6 plugin test files):

  • kafkajs, amqplib, bullmq, google-cloud-pubsub, aws-sdk/sqs, aws-sdk/kinesis
  • Verifies syncToStore is called after produce and consume operations

Test plan

  • Unit tests for syncToStore helper (6 tests)
  • Integration spy tests for all updated plugins (12 tests)
  • Manual testing with NestJS + KafkaJS application
  • Verify DSM latency calculations are correct for concurrent message processing

Not included (different patterns)

  • rhea/producer.js - DSM in encode subscription after bindStart returns
  • kafkajs/batch-consumer.js - Uses start subscription, not bindStart
  • aws-sdk/sns.js - Producer only, no consumer-side DSM

🤖 Generated with Claude Code

@github-actions
Copy link
Contributor

github-actions bot commented Jan 30, 2026

Overall package size

Self size: 4.5 MB
Deduped: 5.34 MB
No deduping: 5.34 MB

Dependency sizes | name | version | self size | total size | |------|---------|-----------|------------| | import-in-the-middle | 2.0.3 | 76.87 kB | 808.03 kB | | dc-polyfill | 0.1.10 | 26.73 kB | 26.73 kB |

🤖 This report was automatically generated by heaviest-objects-in-the-universe

@pr-commenter
Copy link

pr-commenter bot commented Jan 30, 2026

Benchmarks

Benchmark execution time: 2026-02-02 20:13:23

Comparing candidate commit 2493ae6 in PR branch rob.carlan/dsm-context-prop-race-conditions with baseline commit 2e337bd in branch master.

Found 0 performance improvements and 0 performance regressions! Performance is the same for 228 metrics, 32 unstable metrics.

@codecov
Copy link

codecov bot commented Jan 30, 2026

Codecov Report

❌ Patch coverage is 41.66667% with 21 lines in your changes missing coverage. Please review.
✅ Project coverage is 61.32%. Comparing base (2e337bd) to head (2493ae6).

Files with missing lines Patch % Lines
...ges/datadog-plugin-aws-sdk/src/services/kinesis.js 20.00% 4 Missing ⚠️
...ackages/datadog-plugin-aws-sdk/src/services/sqs.js 20.00% 4 Missing ⚠️
packages/datadog-plugin-kafkajs/src/producer.js 20.00% 4 Missing ⚠️
...datadog-plugin-google-cloud-pubsub/src/producer.js 33.33% 2 Missing ⚠️
packages/datadog-plugin-amqplib/src/consumer.js 50.00% 1 Missing ⚠️
packages/datadog-plugin-amqplib/src/producer.js 50.00% 1 Missing ⚠️
packages/datadog-plugin-bullmq/src/consumer.js 50.00% 1 Missing ⚠️
packages/datadog-plugin-bullmq/src/producer.js 50.00% 1 Missing ⚠️
...datadog-plugin-google-cloud-pubsub/src/consumer.js 50.00% 1 Missing ⚠️
packages/datadog-plugin-kafkajs/src/consumer.js 50.00% 1 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #7395       +/-   ##
===========================================
- Coverage   80.39%   61.32%   -19.07%     
===========================================
  Files         732      622      -110     
  Lines       31027    26040     -4987     
===========================================
- Hits        24945    15970     -8975     
- Misses       6082    10070     +3988     
Flag Coverage Δ
aiguard-macos 39.06% <ø> (-0.11%) ⬇️
aiguard-ubuntu ?
aiguard-windows ?
apm-capabilities-tracing-macos 48.89% <41.66%> (-0.02%) ⬇️
apm-capabilities-tracing-ubuntu ?
apm-capabilities-tracing-windows ?
apm-integrations-child-process ?
apm-integrations-couchbase-18 ?
apm-integrations-couchbase-eol ?
apm-integrations-oracledb ?
appsec-express ?
appsec-fastify ?
appsec-graphql ?
appsec-kafka ?
appsec-ldapjs ?
appsec-lodash ?
appsec-macos 58.45% <ø> (-0.07%) ⬇️
appsec-mongodb-core ?
appsec-mongoose ?
appsec-mysql ?
appsec-node-serialize ?
appsec-passport ?
appsec-postgres ?
appsec-sourcing ?
appsec-template ?
appsec-ubuntu ?
appsec-windows ?
instrumentations-instrumentation-bluebird ?
instrumentations-instrumentation-body-parser ?
instrumentations-instrumentation-child_process ?
instrumentations-instrumentation-cookie-parser ?
instrumentations-instrumentation-express ?
instrumentations-instrumentation-express-mongo-sanitize ?
instrumentations-instrumentation-express-session ?
instrumentations-instrumentation-fs ?
instrumentations-instrumentation-generic-pool ?
instrumentations-instrumentation-http ?
instrumentations-instrumentation-knex ?
instrumentations-instrumentation-mongoose ?
instrumentations-instrumentation-multer ?
instrumentations-instrumentation-mysql2 ?
instrumentations-instrumentation-passport ?
instrumentations-instrumentation-passport-http ?
instrumentations-instrumentation-passport-local ?
instrumentations-instrumentation-pg ?
instrumentations-instrumentation-promise ?
instrumentations-instrumentation-promise-js ?
instrumentations-instrumentation-q ?
instrumentations-instrumentation-url ?
instrumentations-instrumentation-when ?
llmobs-ai ?
llmobs-anthropic ?
llmobs-bedrock ?
llmobs-google-genai ?
llmobs-langchain ?
llmobs-openai ?
llmobs-vertex-ai ?
platform-core ?
platform-esbuild ?
platform-instrumentations-misc ?
platform-shimmer ?
platform-unit-guardrails ?
plugins-azure-event-hubs ?
plugins-azure-service-bus ?
plugins-bullmq ?
plugins-cassandra ?
plugins-cookie ?
plugins-cookie-parser ?
plugins-crypto ?
plugins-dd-trace-api ?
plugins-express-mongo-sanitize ?
plugins-express-session ?
plugins-fastify ?
plugins-fetch ?
plugins-fs ?
plugins-generic-pool ?
plugins-google-cloud-pubsub ?
plugins-grpc ?
plugins-handlebars ?
plugins-hapi ?
plugins-hono ?
plugins-ioredis ?
plugins-knex ?
plugins-ldapjs ?
plugins-limitd-client ?
plugins-lodash ?
plugins-mariadb ?
plugins-memcached ?
plugins-microgateway-core ?
plugins-moleculer ?
plugins-mongodb ?
plugins-mongodb-core ?
plugins-mongoose ?
plugins-multer ?
plugins-mysql ?
plugins-mysql2 ?
plugins-node-serialize ?
plugins-opensearch ?
plugins-postgres ?
plugins-process ?
plugins-pug ?
plugins-redis ?
plugins-router ?
plugins-sequelize ?
plugins-test-and-upstream-amqp10 ?
plugins-test-and-upstream-amqplib ?
plugins-test-and-upstream-apollo ?
plugins-test-and-upstream-avsc ?
plugins-test-and-upstream-bunyan ?
plugins-test-and-upstream-connect ?
plugins-test-and-upstream-graphql ?
plugins-test-and-upstream-koa ?
plugins-test-and-upstream-protobufjs ?
plugins-test-and-upstream-rhea ?
plugins-undici ?
plugins-url ?
plugins-valkey ?
plugins-vm ?
plugins-winston ?
plugins-ws ?
profiling-macos 40.01% <ø> (-0.10%) ⬇️
profiling-ubuntu ?
profiling-windows ?
serverless-azure-functions-client ?
serverless-azure-functions-eventhubs ?
serverless-azure-functions-servicebus ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@watson
Copy link
Collaborator

watson commented Feb 1, 2026

FYI: Due to changes in master you need to rebase and run the linter. There's changes in this PR that will break if they land as-is, even though the tests currently pass here.

robcarlan-datadog and others added 3 commits February 2, 2026 12:06
…king

When processing concurrent Kafka messages, the DSM context was being set
via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore.
Since ctx.currentStore is what gets returned from bindStart and bound to
async continuations via runStores, this caused DSM context to leak between
concurrent message handlers.

The fix syncs the DSM context to ctx.currentStore after DSM operations
complete, ensuring each handler's async continuations maintain the correct
DSM context.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add tests that verify DSM context is properly scoped to each handler's
async continuations when using diagnostic channels with runStores.

The tests verify that:
1. ctx.currentStore has dataStreamsContext after setDataStreamsContext
2. Concurrent handlers maintain isolated DSM contexts
3. DSM context persists through multiple async boundaries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add syncToStore helper in DSM context module that syncs DSM context
from AsyncLocalStorage to ctx.currentStore after DSM operations.

This fixes a race condition where DSM context was being set via
enterWith() but not synced to ctx.currentStore, which is what gets
bound to async continuations via store.run(). Without syncing, DSM
context would leak between concurrent message handlers.

Updated plugins:
- kafkajs (consumer, producer)
- amqplib (consumer, producer)
- bullmq (consumer, producer)
- rhea (consumer)
- google-cloud-pubsub (consumer, producer)
- aws-sdk (sqs, kinesis)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/dsm-context-prop-race-conditions branch from 50700ad to df9f01e Compare February 2, 2026 17:11
@robcarlan-datadog robcarlan-datadog added bug Something isn't working semver-patch labels Feb 2, 2026
@robcarlan-datadog robcarlan-datadog changed the title fix(kafkajs): sync DSM context to currentStore to prevent context leaking fix(DSM): sync DSM context to currentStore to prevent context leaks Feb 2, 2026
@robcarlan-datadog robcarlan-datadog changed the title fix(DSM): sync DSM context to currentStore to prevent context leaks fix(dsm): sync context to currentStore to prevent leaking between concurrent handlers Feb 2, 2026
The unit test was useful for validating the fix during development
but is contrived and doesn't add value as a permanent test.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@robcarlan-datadog robcarlan-datadog force-pushed the rob.carlan/dsm-context-prop-race-conditions branch from 08689a7 to 20b2e55 Compare February 2, 2026 17:48
Add comprehensive tests for the new syncToStore helper:
- Unit tests for syncToStore in context.spec.js covering normal
  operation, edge cases, and integration with setDataStreamsContext
- Spy tests in 6 integration test files to verify syncToStore is
  called after produce and consume operations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@datadog-datadog-prod-us1
Copy link

datadog-datadog-prod-us1 bot commented Feb 2, 2026

✅ Tests

🎉 All green!

❄️ No new flaky tests detected
🧪 All tests passed

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 2493ae6 | Docs | Datadog PR Page | Was this helpful? Give us feedback!

robcarlan-datadog and others added 4 commits February 2, 2026 13:45
Change all plugins to call DataStreamsContext.syncToStore(ctx)
instead of destructuring syncToStore at import time. This allows
sinon spies to intercept calls during testing.

When functions are destructured at require-time, they bind to the
original function reference. Spies set up later on the module
object don't affect these bindings. Calling via the module object
ensures spies work correctly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The syncToStore fix only applies to the consumer path where bindStart
is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers,
which doesn't need context synchronization since:

1. requestInject is called before the request is sent
2. DSM context is encoded directly into the message
3. There's no async continuation where context leaking would occur

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@tlhunter
Copy link
Member

tlhunter commented Feb 2, 2026

Maybe we would be better off having some sort of bindStart static helper in the producer / consumer base classes? Something to signal that it should always happen at the end of bindStart? That would at least reduce the number of requires and hopefully ensure that the author of the next messaging plugin remembers to call the method.

Copy link
Member

@rochdev rochdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a helper should not be needed if we use Diagnostics Channel properly. There should either be a completely separate store or the syncing should happen implicitly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants