Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion neuro_san/client/direct_agent_session_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool
Copy link
Collaborator

@d1donlydfink d1donlydfink Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot import stuff from neuro_san.service in the client area. Huge reach-around tangle
and will be a problem should we decide to break service stuff out to separate repo.

I think what you want to do is put this in internals.utils for now.
It could easily make its way down to leaf-common.asyncio.

from neuro_san.session.direct_agent_session import DirectAgentSession
from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory
from neuro_san.session.missing_agent_check import MissingAgentCheck
Expand Down Expand Up @@ -74,7 +75,8 @@ def create_session(self, agent_name: str, use_direct: bool = False,
toolbox_factory.load()

factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage)
invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata)
executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(0)
invocation_context = SessionInvocationContext(factory, executors_pool, llm_factory, toolbox_factory, metadata)
invocation_context.start()
session: DirectAgentSession = DirectAgentSession(agent_network=agent_network,
invocation_context=invocation_context,
Expand Down
12 changes: 11 additions & 1 deletion neuro_san/service/generic/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from neuro_san.service.generic.chat_message_converter import ChatMessageConverter
from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory
from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger
from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool
from neuro_san.session.direct_agent_session import DirectAgentSession
from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory
from neuro_san.session.session_invocation_context import SessionInvocationContext
Expand All @@ -42,6 +43,9 @@
DO_NOT_LOG_REQUESTS = [
]

# No limit on number of concurrently running executors
MAX_CONCURRENT_EXECUTORS = 0
Copy link
Collaborator

@d1donlydfink d1donlydfink Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be defined twice?

Maybe this should move to AsyncioExecutorPool and be the default for the constructor?


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we don't limit number of AsyncExecutors which can be created in the pool.


# pylint: disable=too-many-instance-attributes
class AgentService:
Expand Down Expand Up @@ -86,6 +90,7 @@ def __init__(self,
config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this pool be owned by the server and not the service?

# Load once
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an instance of AsyncioExecutorPool for this service.

self.llm_factory.load()
self.toolbox_factory.load()
Expand Down Expand Up @@ -214,7 +219,12 @@ def streaming_chat(self, request_dict: Dict[str, Any],

# Prepare
factory = ExternalAgentSessionFactory(use_direct=False)
invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata)
invocation_context = SessionInvocationContext(
factory,
self.async_executors_pool,
self.llm_factory,
self.toolbox_factory,
metadata)
invocation_context.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionInvocationContext now takes AsyncioExecutorPool as a parameter.


# Set up logging inside async thread
Expand Down
12 changes: 11 additions & 1 deletion neuro_san/service/generic/async_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger
from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory
from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger
from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool
from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession
from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory
from neuro_san.session.session_invocation_context import SessionInvocationContext
Expand All @@ -41,6 +42,9 @@
DO_NOT_LOG_REQUESTS = [
]

# No limit on number of concurrently running executors
MAX_CONCURRENT_EXECUTORS = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be defined twice?


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we don't limit number of AsyncExecutors which can be created in the pool.


# pylint: disable=too-many-instance-attributes
class AsyncAgentService:
Expand Down Expand Up @@ -81,6 +85,7 @@ def __init__(self,
config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this pool be owned by the server and not the service?

# Load once.
self.llm_factory.load()
self.toolbox_factory.load()
Expand Down Expand Up @@ -208,7 +213,12 @@ async def streaming_chat(self, request_dict: Dict[str, Any],

# Prepare
factory = ExternalAgentSessionFactory(use_direct=False)
invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata)
invocation_context = SessionInvocationContext(
factory,
self.async_executors_pool,
self.llm_factory,
self.toolbox_factory,
metadata)
invocation_context.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do the same thing as for sync AgentService above.


# Set up logging inside async thread
Expand Down
67 changes: 67 additions & 0 deletions neuro_san/service/generic/asyncio_executor_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI.
# All Rights Reserved.
# Issued under the Academic Public License.
#
# You can be released from the terms, and requirements of the Academic Public
# License by purchasing a commercial license.
# Purchase of a commercial license is mandatory for any use of the
# neuro-san SDK Software in commercial settings.
#
# END COPYRIGHT

import logging
import threading
from leaf_common.asyncio.asyncio_executor import AsyncioExecutor


class AsyncioExecutorPool:
"""
Class maintaining a dynamic set of reusable AsyncioExecutor instances.
"""

def __init__(self, max_concurrent: int, reuse_mode: bool = True):
"""
Constructor.
:param max_concurrent: maximum allowed number of concurrently running AsyncioExecutors.
:param reuse_mode: True, if requested executor instances
are taken from pool of available ones (pool mode);
False, if requested executor instances are created new
and shutdown on return (backward compatible mode)
"""
self.max_concurrent = max_concurrent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is self.max_concurrent ever used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nowhere right now. Was thinking somewhat ahead. Maybe not worth it.

self.reuse_mode: bool = reuse_mode
self.pool = []
self.lock: threading.Lock = threading.Lock()
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.info("AsyncioExecutorPool created: %s reuse: %s max concurrent: %d",
id(self), str(self.reuse_mode), self.max_concurrent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty simple-minded pool of AsyncioExecutors implementation.
Instances move through self.pool sequence in queue mode: fist in - first out.

def get_executor(self) -> AsyncioExecutor:
"""
Get active (running) executor from the pool
:return: AsyncioExecutor instance
"""
with self.lock:
result: AsyncioExecutor = None
if self.reuse_mode and len(self.pool) > 0:
result = self.pool.pop(0)
self.logger.info("Reusing AsyncExecutor %s", id(result))
else:
result = AsyncioExecutor()
result.start()
self.logger.info("Creating AsyncExecutor %s", id(result))
return result

def return_executor(self, executor: AsyncioExecutor):
"""
Return AsyncioExecutor instance back to the pool of available instances.
:param executor: AsyncioExecutor to return.
"""
if self.reuse_mode:
with self.lock:
self.pool.append(executor)
self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d", id(executor), len(self.pool))
else:
self.logger.info("Shutting down: AsyncExecutor %s", id(executor))
executor.shutdown()
12 changes: 10 additions & 2 deletions neuro_san/session/session_invocation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from leaf_common.asyncio.asyncio_executor import AsyncioExecutor
from leaf_server_common.logging.logging_setup import setup_extra_logging_fields

from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot include neuro_san.service here either.

Move it to internals.utils.

from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue
from neuro_san.internals.interfaces.async_agent_session_factory import AsyncAgentSessionFactory
from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory
Expand All @@ -35,7 +36,10 @@ class SessionInvocationContext(InvocationContext):
service call or library call.
"""

# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
def __init__(self, async_session_factory: AsyncAgentSessionFactory,
async_executors_pool: AsyncioExecutorPool,
llm_factory: ContextTypeLlmFactory,
toolbox_factory: ContextTypeToolboxFactory = None,
metadata: Dict[str, str] = None):
Expand All @@ -44,15 +48,18 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory,

:param async_session_factory: The AsyncAgentSessionFactory to use
when connecting with external agents.
:param async_executors_pool: pool of AsyncioExecutors to use for obtaining
an executor instance to use for this context;
:param llm_factory: The ContextTypeLlmFactory instance
:param metadata: A grpc metadata of key/value pairs to be inserted into
the header. Default is None. Preferred format is a
dictionary of string keys to string values.
"""

self.async_session_factory: AsyncAgentSessionFactory = async_session_factory
self.async_executors_pool: AsyncioExecutorPool = async_executors_pool
# Get an async executor to run all tasks for this session instance:
self.asyncio_executor: AsyncioExecutor = AsyncioExecutor()
self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor()
self.origination: Origination = Origination()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating AsynioExecutor instance directly here, request it from the pool.

self.queue: AsyncCollatingQueue = AsyncCollatingQueue()
self.journal: Journal = MessageJournal(self.queue)
Expand All @@ -66,6 +73,7 @@ def start(self):
Starts the active components of this invocation context.
Do this separately from constructor for more control.
Currently, we only start internal AsyncioExecutor.
It could be already running, but starting it twice is allowed.
"""
self.asyncio_executor.start()

Expand Down Expand Up @@ -119,7 +127,7 @@ def close(self):
Release resources owned by this context
"""
if self.asyncio_executor is not None:
self.asyncio_executor.shutdown()
self.async_executors_pool.return_executor(self.asyncio_executor)
self.asyncio_executor = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then we are done, return our AsyncioExecutor back to the pool.

if self.queue is not None:
self.queue.close()
Expand Down