Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Releases prior to 7.0 has been removed from this file to declutter search result

- cli: Added `init --no-base` option to skip creating the base template.
- env: Added `DIPDUP_NO_BASE` environment variable to skip creating the base template.
- mcp: Added `ctx.api` datasource and `ctx.call_api` helper to server context.

### Fixed

Expand Down
36 changes: 15 additions & 21 deletions docs/5.advanced/7.mcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ This tool will:

### Using application context

You can use the application context the same way as in handlers and hooks. Use the `mcp.get_ctx()` function to get the context object.
You can use the application context the same way as in handlers and hooks with a few differences. There is a single context object per server process and it's not passed as a first argument to callbacks.

Use the `mcp.get_ctx()` function to get the context object.

```python [mcp/tools.py]
from dipdup import mcp
Expand Down Expand Up @@ -180,34 +182,26 @@ For a low-level access you can use `dipdup.mcp.server` singleton to interact wit

### Interacting with running indexer

DipDup provides [management API](../7.references/4.api.md) to interact with the running indexer. For example you can use it to add indexes in runtime. First, add running indexer as a HTTP datasource:
DipDup provides [management API](../7.references/4.api.md) to interact with the running indexer. For example you can use it to add indexes in runtime.

```yaml [dipdup.yaml]
datasources:
indexer:
kind: http
# NOTE: Default for Compose stack
url: http://api:46339
```

Then, call this datasource from your MCP tool:
You can use `ctx.call_api` to get nice plaintext output or `ctx.api` to access the datasource directly.

```python
from dipdup import mcp

@mcp.tool(...)
async def tool():
ctx = mcp.get_ctx()
datasource = ctx.get_http_datasource('indexer')
response = await datasource.post(
'/add_index',
params={
'name': 'my_index',
'template': 'my_template',
'values': {'param': 'value'},
'first_level': 0,
'last_level': 1000,
}
response = await ctx.call_api(
method='post',
path='/add_index',
params={
'name': 'my_index',
'template': 'my_template',
'values': {'param': 'value'},
'first_level': 0,
'last_level': 1000,
}
)
```

Expand Down
17 changes: 17 additions & 0 deletions src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,11 @@ async def mcp_run(ctx: click.Context) -> None:

from dipdup import mcp
from dipdup.config import DipDupConfig
from dipdup.config import HttpConfig
from dipdup.config import McpConfig
from dipdup.config.http import HttpDatasourceConfig
from dipdup.context import McpContext
from dipdup.datasources.http import HttpDatasource
from dipdup.dipdup import DipDup

config: DipDupConfig = ctx.obj.config
Expand All @@ -578,10 +581,20 @@ async def mcp_run(ctx: click.Context) -> None:
config.mcp = McpConfig()
mcp_config = config.mcp

api_datasource_config = HttpDatasourceConfig(
url=mcp_config.default_api_url,
http=HttpConfig(
retry_count=0,
),
)
api_datasource_config._name = 'api'
api_datasource = HttpDatasource(api_datasource_config)

mcp_ctx = McpContext._wrap(
ctx=dipdup._ctx,
logger=mcp._logger,
server=mcp.server,
api=api_datasource,
)
mcp.set_ctx(mcp_ctx)

Expand Down Expand Up @@ -622,9 +635,13 @@ async def handle_sse(request: Any) -> None:
logging.getLogger('mcp').setLevel(logging.INFO)

async with AsyncExitStack() as stack:
# NOTE: Create, but doesn't initialize (no WS loop)
await dipdup._create_datasources()
await dipdup._set_up_database(stack)

# NOTE: Not available in `ctx.datasources`, but directly as `ctx.api`
await stack.enter_async_context(api_datasource)

# NOTE: Run MCP in a separate thread to avoid blocking the DB connection
portal = stack.enter_context(from_thread.start_blocking_portal())
portal.call(server.serve)
Expand Down
23 changes: 23 additions & 0 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ class McpContext(DipDupContext):
:param transactions: Transaction manager (low-level interface)
:param logger: Context-aware logger instance
:param server: Running MCP server instance
:param api: DipDup API datasource
"""

def __init__(
Expand All @@ -935,6 +936,7 @@ def __init__(
transactions: TransactionManager,
logger: Logger,
server: McpServer[Any],
api: HttpDatasource,
) -> None:
super().__init__(
config=config,
Expand All @@ -944,13 +946,15 @@ def __init__(
)
self.logger = logger
self.server = server
self.api = api

@classmethod
def _wrap(
cls,
ctx: DipDupContext,
logger: Logger,
server: Any,
api: Any,
) -> Self:
new_ctx = cls(
config=ctx.config,
Expand All @@ -959,6 +963,25 @@ def _wrap(
transactions=ctx.transactions,
logger=logger,
server=server,
api=api,
)
ctx._link(new_ctx)
return new_ctx

async def call_api(
self,
method: str,
path: str,
params: dict[str, Any] | None = None,
) -> str:
_logger.info('Calling API: %s %s', method, path)

res = await self.api.request(
method=method,
url=path.lstrip('/'),
json={k: v for k, v in (params or {}).items() if v is not None},
raw=True,
)
if res.status != 200:
return f'ERROR: {res.status} {res.reason}'
return await res.text() # type: ignore[no-any-return]
55 changes: 7 additions & 48 deletions src/dipdup/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
T = TypeVar('T', bound=Callable[..., Awaitable[Any]])

from dipdup import models
from dipdup.context import McpContext
from dipdup.context import McpContext as McpContext
from dipdup.exceptions import FrameworkException
from dipdup.utils import json_dumps

Expand Down Expand Up @@ -73,16 +73,9 @@ async def _resource_indexes() -> list[dict[str, Any]]:
return res


def _get_indexer_url() -> str:
from dipdup.config import McpConfig

ctx = get_ctx()
return (ctx.config.mcp or McpConfig()).default_api_url


async def _tool_api_config() -> str:
return await api_call(
url=_get_indexer_url(),
ctx = get_ctx()
return await ctx.call_api(
method='get',
path='/config',
)
Expand All @@ -95,8 +88,8 @@ async def _tool_api_add_contract(
typename: str | None = None,
code_hash: str | int | None = None,
) -> str:
await api_call(
url=_get_indexer_url(),
ctx = get_ctx()
await ctx.call_api(
method='post',
path='/add_contract',
params={
Expand All @@ -117,8 +110,8 @@ async def _tool_api_add_index(
first_level: int | None = None,
last_level: int | None = None,
) -> str:
await api_call(
url=_get_indexer_url(),
ctx = get_ctx()
await ctx.call_api(
method='post',
path='/add_index',
params={
Expand Down Expand Up @@ -187,40 +180,6 @@ def set_ctx(ctx: McpContext) -> None:
_ctx = ctx


async def api_call(
url: str,
method: str,
path: str,
params: dict[str, Any] | None = None,
) -> str:
from dipdup.config import HttpConfig
from dipdup.config.http import HttpDatasourceConfig
from dipdup.datasources.http import HttpDatasource

_logger.info('Calling API: %s %s', method, url + path)

config = HttpDatasourceConfig(
kind='http',
url=url,
http=HttpConfig(
retry_count=0,
),
)
config._name = 'dipdup_api'

datasource = HttpDatasource(config)
async with datasource:
res = await datasource.request(
method=method,
url=path.lstrip('/'),
json={k: v for k, v in (params or {}).items() if v is not None},
raw=True,
)
if res.status != 200:
return f'ERROR: {res.status} {res.reason}'
return await res.text() # type: ignore[no-any-return]


# TODO: Add instructions
server: mcp.server.Server[Any] = mcp.server.Server(name='DipDup')
_user_tools: dict[str, types.Tool] = {}
Expand Down