Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/apm-integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,17 @@ jobs:
with:
dd_api_key: ${{ secrets.DD_API_KEY }}

langgraph:
runs-on: ubuntu-latest
env:
PLUGINS: langgraph
steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- uses: ./.github/actions/plugins/test
with:
dd_api_key: ${{ secrets.DD_API_KEY }}


ldapjs:
runs-on: 'ubuntu-latest'
env:
Expand Down
2 changes: 2 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ tracer.use('pg', {
<h5 id="knex"></h5>
<h5 id="koa"></h5>
<h5 id="langchain"></h5>
<h5 id="langgraph"></h5>
<h5 id="mariadb"></h5>
<h5 id="memcached"></h5>
<h5 id="microgateway-core"></h5>
Expand Down Expand Up @@ -145,6 +146,7 @@ tracer.use('pg', {
* [knex](./interfaces/export_.plugins.knex.html)
* [koa](./interfaces/export_.plugins.koa.html)
* [langchain](./interfaces/export_.plugins.langchain.html)
* [langgraph](./interfaces/export_.plugins.langgraph.html)
* [mariadb](./interfaces/export_.plugins.mariadb.html)
* [memcached](./interfaces/export_.plugins.memcached.html)
* [microgateway-core](./interfaces/export_.plugins.microgateway_core.html)
Expand Down
1 change: 1 addition & 0 deletions docs/add-redirects.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ declare -a plugins=(
"knex"
"koa"
"langchain"
"langgraph"
"ldapjs"
"mariadb"
"memcached"
Expand Down
1 change: 1 addition & 0 deletions docs/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ tracer.use('koa');
tracer.use('koa', httpServerOptions);
tracer.use('langchain');
tracer.use('mariadb', { service: () => `my-custom-mariadb` })
tracer.use('langgraph');
tracer.use('memcached');
tracer.use('microgateway-core');
tracer.use('microgateway-core', httpServerOptions);
Expand Down
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ interface Plugins {
"knex": tracer.plugins.knex;
"koa": tracer.plugins.koa;
"langchain": tracer.plugins.langchain;
"langgraph": tracer.plugins.langgraph;
"mariadb": tracer.plugins.mariadb;
"memcached": tracer.plugins.memcached;
"microgateway-core": tracer.plugins.microgateway_core;
Expand Down Expand Up @@ -2268,6 +2269,12 @@ declare namespace tracer {
* This plugin automatically instruments the
* [ldapjs](https://github.com/ldapjs/node-ldapjs/) module.
*/
/**
* This plugin automatically instruments the
* [langgraph](https://github.com/npmjs/package/langgraph) library.
*/
interface langgraph extends Instrumentation {}

interface ldapjs extends Instrumentation {}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module.exports = {
'@anthropic-ai/sdk': { esmFirst: true, fn: () => require('../anthropic') },
'@apollo/server': () => require('../apollo-server'),
'@apollo/gateway': () => require('../apollo'),
'@langchain/langgraph': { esmFirst: true, fn: () => require('../langgraph') },
'apollo-server-core': () => require('../apollo-server-core'),
'@aws-sdk/smithy-client': () => require('../aws-sdk'),
'@azure/event-hubs': () => require('../azure-event-hubs'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
module.exports = [
...require('./langchain'),
...require('./bullmq'),
...require('./langgraph'),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'

module.exports = [
{
module: {
name: '@langchain/langgraph',
versionRange: '>=1.1.2',
filePath: 'dist/pregel/index.js',
},
functionQuery: {
methodName: 'invoke',
className: 'Pregel',
kind: 'Async',
},
channelName: 'Pregel_invoke',
},
{
module: {
name: '@langchain/langgraph',
versionRange: '>=1.1.2',
filePath: 'dist/pregel/index.cjs',
},
functionQuery: {
methodName: 'invoke',
className: 'Pregel',
kind: 'Async',
},
channelName: 'Pregel_invoke',
},
{
module: {
name: '@langchain/langgraph',
versionRange: '>=1.1.2',
filePath: 'dist/pregel/index.js',
},
functionQuery: {
methodName: 'stream',
className: 'Pregel',
kind: 'Async',
},
channelName: 'Pregel_stream',
},
{
module: {
name: '@langchain/langgraph',
versionRange: '>=1.1.2',
filePath: 'dist/pregel/index.cjs',
},
functionQuery: {
methodName: 'stream',
className: 'Pregel',
kind: 'Async',
},
channelName: 'Pregel_stream',
},
]
7 changes: 7 additions & 0 deletions packages/datadog-instrumentations/src/langgraph.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
'use strict'

const { addHook, getHooks } = require('./helpers/instrument')

for (const hook of getHooks('@langchain/langgraph')) {
addHook(hook, exports => exports)
}
24 changes: 24 additions & 0 deletions packages/datadog-plugin-langgraph/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const CompositePlugin = require('../../dd-trace/src/plugins/composite')
const langgraphLLMObsPlugins = require('../../dd-trace/src/llmobs/plugins/langgraph')
const internalPlugin = require('./internal')

const plugins = {}

// CRITICAL: LLMObs plugins MUST come first
for (const Plugin of langgraphLLMObsPlugins) {
plugins[Plugin.id] = Plugin
}

// Tracing plugins second
for (const [id, Plugin] of Object.entries(internalPlugin)) {
plugins[id] = Plugin
}

class LanggraphPlugin extends CompositePlugin {
static id = 'langgraph'
static plugins = plugins
}

module.exports = LanggraphPlugin
99 changes: 99 additions & 0 deletions packages/datadog-plugin-langgraph/src/internal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
'use strict'

const TracingPlugin = require('../../dd-trace/src/plugins/tracing')

class BaseLanggraphInternalPlugin extends TracingPlugin {
static id = 'langgraph'
static prefix = 'tracing:orchestrion:@langchain/langgraph:Pregel_invoke'

bindStart (ctx) {
const meta = this.getTags(ctx)

this.startSpan('langgraph.invoke', {
service: this.config.service,
meta,
}, ctx)

return ctx.currentStore
}

getTags (ctx) {
return {
component: 'langgraph',
'span.kind': 'internal',
}
}

asyncEnd (ctx) {
this.finish(ctx)
}
}

class PregelStreamPlugin extends TracingPlugin {
static prefix = 'tracing:orchestrion:@langchain/langgraph:Pregel_stream'

bindStart (ctx) {
this.startSpan('langgraph.stream', {
service: this.config.service,
component: 'langgraph',
'span.kind': 'internal',
}, ctx)

return ctx.currentStore
}

asyncStart (ctx) {
const span = ctx.currentStore?.span
if (!span) {
return
}

const asyncIterable = ctx.result

const originalAsyncIterator = asyncIterable[Symbol.asyncIterator].bind(asyncIterable)

asyncIterable[Symbol.asyncIterator] = function () {
Comment on lines +51 to +55
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should try and do this in the langgraph.js instrumentation file maybe, and add a custom subscriber there and use shimmer.wrap for robustness. then we can emit a new event with the context that this PregelStreamPlugin can listen for

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are trying to avoid this so that we can continue to use Orchestrion, right now since we only have tracePromise we need to handle this on the subscriber.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, what i mean is we can do this out directly in the langgraph.js instead

const orchestrionStreamCh = channel('tracing:orchestrion:@langchain/langgraph:Pregel_stream')

orchestrionStreamCh.subscribe({
  asyncStart (ctx) {
    // shimmer.wrap ctx.result
  }
})

just so any/all custom wrapping logic happens in instrumentation and not in the plugins directly. lmk if this doesn't make any sense tho as idk if we do something like this currently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a common pattern to mix publisher and subscriber logic in the instrumentation. I would think it would be cleaner to keep it on the subscriber/plugin side, but I can defer to @rochdev on this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok i don't have too strong a preference, just that we had done something like that previously for langchain before we had some more orchestrion capabilities (ref).

i think in either case we should use shimmer to patch instead of modifying properties directly (even if shimmer just does this directly under the hood)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have to implement iterator support in orchestrion-js and @rochdev already has an open PR for that, so I guess that is related.

const originalIterator = originalAsyncIterator()

return {
async next (...args) {
try {
const result = await originalIterator.next(...args)

if (result.done) span.finish()

return result
} catch (error) {
span.setTag('error', error)
span.finish()
throw error
}
},

async return (...args) {
span.finish()

if (originalIterator.return) {
return await originalIterator.return(...args)
}
return { done: true, value: undefined }
},

async throw (error) {
span.setTag('error', error)
span.finish()

if (originalIterator.throw) {
return await originalIterator.throw(error)
}
throw error
},
}
}
}
}

module.exports = {
BaseLanggraphInternalPlugin,
PregelStreamPlugin,
}
Loading
Loading