diff --git a/neuro_san/client/direct_agent_session_factory.py b/neuro_san/client/direct_agent_session_factory.py index 881dd161a..1400e36ed 100644 --- a/neuro_san/client/direct_agent_session_factory.py +++ b/neuro_san/client/direct_agent_session_factory.py @@ -13,6 +13,7 @@ from typing import Dict from leaf_common.time.timeout import Timeout +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.interfaces.agent_session import AgentSession from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory @@ -74,7 +75,8 @@ def create_session(self, agent_name: str, use_direct: bool = False, toolbox_factory.load() factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage) - invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata) + executors_pool: AsyncioExecutorPool = AsyncioExecutorPool() + invocation_context = SessionInvocationContext(factory, executors_pool, llm_factory, toolbox_factory, metadata) invocation_context.start() session: DirectAgentSession = DirectAgentSession(agent_network=agent_network, invocation_context=invocation_context, diff --git a/neuro_san/internals/utils/asyncio_executor_pool_provider.py b/neuro_san/internals/utils/asyncio_executor_pool_provider.py new file mode 100644 index 000000000..115450717 --- /dev/null +++ b/neuro_san/internals/utils/asyncio_executor_pool_provider.py @@ -0,0 +1,32 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT +""" +See class comments +""" +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool + + +class AsyncioExecutorPoolProvider: + """ + Class providing server-wide instance of AsyncioExecutorPool + for use by specific agent services. + """ + + _executors_pool: AsyncioExecutorPool = None + + @classmethod + def set_executors_pool(cls, reuse_mode: bool = True): + cls._executors_pool = AsyncioExecutorPool(reuse_mode=reuse_mode) + + @classmethod + def get_executors_pool(cls): + return cls._executors_pool diff --git a/neuro_san/service/generic/agent_service.py b/neuro_san/service/generic/agent_service.py index 1884ead6e..17abfc818 100644 --- a/neuro_san/service/generic/agent_service.py +++ b/neuro_san/service/generic/agent_service.py @@ -19,6 +19,7 @@ import uuid from leaf_common.asyncio.asyncio_executor import AsyncioExecutor +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from leaf_server_common.server.atomic_counter import AtomicCounter from leaf_server_common.server.request_logger import RequestLogger @@ -29,6 +30,7 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory +from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory @@ -86,6 +88,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) + self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() # Load once self.llm_factory.load() self.toolbox_factory.load() @@ -214,7 +217,12 @@ def streaming_chat(self, request_dict: Dict[str, Any], # Prepare factory = ExternalAgentSessionFactory(use_direct=False) - invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) + invocation_context = SessionInvocationContext( + factory, + self.async_executors_pool, + self.llm_factory, + self.toolbox_factory, + metadata) invocation_context.start() # Set up logging inside async thread diff --git a/neuro_san/service/generic/async_agent_service.py b/neuro_san/service/generic/async_agent_service.py index 2d6ef9ca3..61df30be2 100644 --- a/neuro_san/service/generic/async_agent_service.py +++ b/neuro_san/service/generic/async_agent_service.py @@ -18,6 +18,7 @@ import uuid from leaf_common.asyncio.asyncio_executor import AsyncioExecutor +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from leaf_server_common.server.atomic_counter import AtomicCounter @@ -27,6 +28,7 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory +from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger @@ -81,6 +83,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) + self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() # Load once. self.llm_factory.load() self.toolbox_factory.load() @@ -208,7 +211,12 @@ async def streaming_chat(self, request_dict: Dict[str, Any], # Prepare factory = ExternalAgentSessionFactory(use_direct=False) - invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) + invocation_context = SessionInvocationContext( + factory, + self.async_executors_pool, + self.llm_factory, + self.toolbox_factory, + metadata) invocation_context.start() # Set up logging inside async thread diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index 512eddb32..d7ae189ec 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -28,6 +28,7 @@ from neuro_san.internals.graph.registry.agent_network import AgentNetwork from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.utils.file_of_class import FileOfClass +from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS from neuro_san.service.grpc.grpc_agent_server import DEFAULT_MAX_CONCURRENT_REQUESTS @@ -177,6 +178,8 @@ def main_loop(self): metadata_set = metadata_set | set(self.usage_logger_metadata.split()) metadata_str: str = " ".join(sorted(metadata_set)) + AsyncioExecutorPoolProvider.set_executors_pool(reuse_mode=True) + if self.server_status.grpc_service.is_requested(): self.grpc_server = GrpcAgentServer( self.grpc_port, diff --git a/neuro_san/session/session_invocation_context.py b/neuro_san/session/session_invocation_context.py index 01e98332a..d3bb85451 100644 --- a/neuro_san/session/session_invocation_context.py +++ b/neuro_san/session/session_invocation_context.py @@ -15,6 +15,7 @@ from asyncio import Future from leaf_common.asyncio.asyncio_executor import AsyncioExecutor +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from leaf_server_common.logging.logging_setup import setup_extra_logging_fields from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue @@ -35,7 +36,10 @@ class SessionInvocationContext(InvocationContext): service call or library call. """ + # pylint: disable=too-many-arguments + # pylint: disable=too-many-positional-arguments def __init__(self, async_session_factory: AsyncAgentSessionFactory, + async_executors_pool: AsyncioExecutorPool, llm_factory: ContextTypeLlmFactory, toolbox_factory: ContextTypeToolboxFactory = None, metadata: Dict[str, str] = None): @@ -44,6 +48,8 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory, :param async_session_factory: The AsyncAgentSessionFactory to use when connecting with external agents. + :param async_executors_pool: pool of AsyncioExecutors to use for obtaining + an executor instance to use for this context; :param llm_factory: The ContextTypeLlmFactory instance :param metadata: A grpc metadata of key/value pairs to be inserted into the header. Default is None. Preferred format is a @@ -51,8 +57,9 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory, """ self.async_session_factory: AsyncAgentSessionFactory = async_session_factory + self.async_executors_pool: AsyncioExecutorPool = async_executors_pool # Get an async executor to run all tasks for this session instance: - self.asyncio_executor: AsyncioExecutor = AsyncioExecutor() + self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor() self.origination: Origination = Origination() self.queue: AsyncCollatingQueue = AsyncCollatingQueue() self.journal: Journal = MessageJournal(self.queue) @@ -66,6 +73,7 @@ def start(self): Starts the active components of this invocation context. Do this separately from constructor for more control. Currently, we only start internal AsyncioExecutor. + It could be already running, but starting it twice is allowed. """ self.asyncio_executor.start() @@ -119,7 +127,7 @@ def close(self): Release resources owned by this context """ if self.asyncio_executor is not None: - self.asyncio_executor.shutdown() + self.async_executors_pool.return_executor(self.asyncio_executor) self.asyncio_executor = None if self.queue is not None: self.queue.close() diff --git a/requirements.txt b/requirements.txt index 211e40345..0499367a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ # requirements-build.txt # In-house dependencies -leaf-common>=1.2.23 +leaf-common>=1.2.24 leaf-server-common>=0.1.20 # These are needed for generating code from .proto files