Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const { TEXT_MAP } = require('../../../ext/formats')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getAmqpMessageSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { getResourceName } = require('./util')

class AmqplibConsumerPlugin extends ConsumerPlugin {
Expand Down Expand Up @@ -38,6 +39,7 @@ class AmqplibConsumerPlugin extends ConsumerPlugin {
this.tracer.decodeDataStreamsContext(message.properties.headers)
this.tracer
.setCheckpoint(['direction:in', `topic:${queueName}`, 'type:rabbitmq'], span, payloadSize)
DataStreamsContext.syncToStore(ctx)
}

return ctx.currentStore
Expand Down
2 changes: 2 additions & 0 deletions packages/datadog-plugin-amqplib/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { TEXT_MAP } = require('../../../ext/formats')
const { CLIENT_PORT_KEY } = require('../../dd-trace/src/constants')
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec, getAmqpMessageSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { getResourceName } = require('./util')

class AmqplibProducerPlugin extends ProducerPlugin {
Expand Down Expand Up @@ -50,6 +51,7 @@ class AmqplibProducerPlugin extends ProducerPlugin {
['direction:out', exchangeOrTopicTag, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq']
, span, payloadSize)
DsmPathwayCodec.encode(dataStreamsContext, fields.headers)
DataStreamsContext.syncToStore(ctx)
}

return ctx.currentStore
Expand Down
38 changes: 38 additions & 0 deletions packages/datadog-plugin-amqplib/test/dsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ const assert = require('node:assert/strict')
const { Buffer } = require('node:buffer')

const { afterEach, beforeEach, describe, it } = require('mocha')
const sinon = require('sinon')

const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor')
const id = require('../../dd-trace/src/id')
Expand Down Expand Up @@ -275,6 +277,42 @@ describe('Plugin', () => {
})
})
})

describe('syncToStore', () => {
let syncToStoreSpy

beforeEach(() => {
syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore')
})

afterEach(() => {
syncToStoreSpy.restore()
})

it('Should call syncToStore after producing', (done) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('syncToStore test'))
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce')
done()
})
})

it('Should call syncToStore after consuming', (done) => {
channel.assertQueue(queue, {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('syncToStore test'))
channel.consume(ok.queue, () => {
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume')
done()
}, {}, (err) => {
if (err) done(err)
})
})
})
})
})
})
})
Expand Down
7 changes: 7 additions & 0 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'
const { DsmPathwayCodec, getSizeOrZero } = require('../../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../../dd-trace/src/datastreams/context')
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')

Expand Down Expand Up @@ -51,6 +52,12 @@ class Kinesis extends BaseAwsSdkPlugin {
this.responseExtractDSMContext(
request.operation, request.params, response, span || null, { streamName }
)

if (this.config.dsmEnabled) {
const storeCtx = { currentStore: store }
DataStreamsContext.syncToStore(storeCtx)
store = storeCtx.currentStore
}
}

return store
Expand Down
7 changes: 7 additions & 0 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const log = require('../../../dd-trace/src/log')
const BaseAwsSdkPlugin = require('../base')
const { DsmPathwayCodec, getHeadersSize } = require('../../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../../dd-trace/src/datastreams/context')
const { extractQueueMetadata } = require('../util')

class Sqs extends BaseAwsSdkPlugin {
Expand Down Expand Up @@ -44,6 +45,12 @@ class Sqs extends BaseAwsSdkPlugin {
request.operation, request.params, response, span || null, { parsedAttributes: parsedMessageAttributes }
)

if (this.config.dsmEnabled) {
const storeCtx = { currentStore: store }
DataStreamsContext.syncToStore(storeCtx)
store = storeCtx.currentStore
}

return store
})

Expand Down
25 changes: 25 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/kinesis.dsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const assert = require('node:assert/strict')
const { afterEach, beforeEach, describe, it } = require('mocha')
const sinon = require('sinon')

const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { assertObjectContains } = require('../../../integration-tests/helpers')
const { withVersions } = require('../../dd-trace/test/setup/mocha')
const agent = require('../../dd-trace/test/plugins/agent')
Expand Down Expand Up @@ -237,6 +238,30 @@ describe('Kinesis', function () {
// Swallow the error as it doesn't matter for this test.
})
})

describe('syncToStore', () => {
let syncToStoreSpy

beforeEach(() => {
syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore')
})

afterEach(() => {
syncToStoreSpy.restore()
})

it('Should call syncToStore after getRecord', done => {
helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => {
if (err) return done(err)

helpers.getTestData(kinesis, streamNameDSM, data, (err) => {
if (err) return done(err)
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on getRecord')
done()
})
})
})
})
})
})
})
31 changes: 31 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/sqs.dsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const { after, afterEach, before, beforeEach, describe, it } = require('mocha')
const semver = require('semver')
const sinon = require('sinon')

const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor')
const agent = require('../../dd-trace/test/plugins/agent')
Expand Down Expand Up @@ -301,6 +302,36 @@ describe('Plugin', () => {
nowStub.restore()
})
})

describe('syncToStore', () => {
let syncToStoreSpy

beforeEach(() => {
syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore')
})

afterEach(() => {
syncToStoreSpy.restore()
})

it('Should call syncToStore after receiving a message', done => {
sqs.sendMessage({
MessageBody: 'syncToStore test',
QueueUrl: QueueUrlDsm,
}, (err) => {
if (err) return done(err)

sqs.receiveMessage({
QueueUrl: QueueUrlDsm,
MessageAttributeNames: ['.*'],
}, (err) => {
if (err) return done(err)
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on receive')
done()
})
})
})
})
})
})
})
Expand Down
2 changes: 2 additions & 0 deletions packages/datadog-plugin-bullmq/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')

class BullmqConsumerPlugin extends ConsumerPlugin {
static id = 'bullmq'
Expand Down Expand Up @@ -35,6 +36,7 @@ class BullmqConsumerPlugin extends ConsumerPlugin {

if (this.config.dsmEnabled) {
this.setConsumerCheckpoint(span, ctx)
DataStreamsContext.syncToStore(ctx)
}

return ctx.currentStore
Expand Down
2 changes: 2 additions & 0 deletions packages/datadog-plugin-bullmq/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec, getMessageSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')

class BaseBullmqProducerPlugin extends ProducerPlugin {
static id = 'bullmq'
Expand All @@ -27,6 +28,7 @@ class BaseBullmqProducerPlugin extends ProducerPlugin {

if (this.config.dsmEnabled) {
this.setProducerCheckpoint(span, ctx)
DataStreamsContext.syncToStore(ctx)
}

return ctx.currentStore
Expand Down
14 changes: 14 additions & 0 deletions packages/datadog-plugin-bullmq/test/dsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ createIntegrationTestSuite('bullmq', 'bullmq', {

describe('checkpoints', () => {
let setDataStreamsContextSpy
let syncToStoreSpy

beforeEach(() => {
setDataStreamsContextSpy = sinon.spy(DataStreamsContext, 'setDataStreamsContext')
syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore')
})

afterEach(() => {
setDataStreamsContextSpy.restore()
syncToStoreSpy.restore()
})

it('should set a checkpoint on produce (Queue.add)', async () => {
Expand All @@ -76,6 +79,17 @@ createIntegrationTestSuite('bullmq', 'bullmq', {
})
assert.ok(consumerCall, 'Consumer checkpoint call not found')
})

it('should call syncToStore after producing', async () => {
await testSetup.queueAdd()
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce')
})

it('should call syncToStore after consuming', async function () {
this.timeout(10000)
await testSetup.workerProcessJob()
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume')
})
})

describe('payload size', () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/datadog-plugin-google-cloud-pubsub/src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const { getMessageSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const SpanContext = require('../../dd-trace/src/opentracing/span_context')
const id = require('../../dd-trace/src/id')
Expand Down Expand Up @@ -189,6 +190,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
this.tracer.decodeDataStreamsContext(message.attributes)
this.tracer
.setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
DataStreamsContext.syncToStore(ctx)
}

return ctx.currentStore
Expand Down
5 changes: 5 additions & 0 deletions packages/datadog-plugin-google-cloud-pubsub/src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec, getHeadersSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const id = require('../../dd-trace/src/id')

class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
Expand Down Expand Up @@ -138,6 +139,10 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
}
})

if (this.config.dsmEnabled) {
DataStreamsContext.syncToStore(ctx)
}

ctx.batchSpan = batchSpan
return ctx.currentStore
}
Expand Down
25 changes: 25 additions & 0 deletions packages/datadog-plugin-google-cloud-pubsub/test/dsm.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const assert = require('node:assert/strict')
const { after, before, beforeEach, describe, it } = require('mocha')
const sinon = require('sinon')

const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor')
const id = require('../../dd-trace/src/id')
Expand Down Expand Up @@ -148,6 +149,30 @@ describe('Plugin', () => {
})
})
})

describe('syncToStore', () => {
let syncToStoreSpy

beforeEach(() => {
syncToStoreSpy = sinon.spy(DataStreamsContext, 'syncToStore')
})

afterEach(() => {
syncToStoreSpy.restore()
})

it('should call syncToStore after producing', async () => {
await publish(dsmTopic, { data: Buffer.from('syncToStore produce test') })
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on produce')
})

it('should call syncToStore after consuming', async () => {
await publish(dsmTopic, { data: Buffer.from('syncToStore consume test') })
await consume(async () => {
assert.ok(syncToStoreSpy.called, 'syncToStore should be called on consume')
})
})
})
})
})
})
Expand Down
2 changes: 2 additions & 0 deletions packages/datadog-plugin-kafkajs/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const dc = require('dc-polyfill')
const { getMessageSize } = require('../../dd-trace/src/datastreams')
const DataStreamsContext = require('../../dd-trace/src/datastreams/context')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')
const { convertToTextMap } = require('./utils')
const afterStartCh = dc.channel('dd-trace:kafkajs:consumer:afterStart')
Expand Down Expand Up @@ -97,6 +98,7 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
edgeTags.push(`kafka_cluster_id:${clusterId}`)
}
this.tracer.setCheckpoint(edgeTags, span, payloadSize)
DataStreamsContext.syncToStore(ctx)
}

if (afterStartCh.hasSubscribers) {
Expand Down
Loading
Loading