From 2a3d6b5dec588ba3ba8e6c2ed2b43afde7e87531 Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Wed, 23 Jul 2025 17:21:26 -0700 Subject: [PATCH 1/8] First implementation for AsyncExecutorsPool. --- neuro_san/service/generic/agent_service.py | 12 ++++- .../service/generic/async_agent_service.py | 12 ++++- .../service/generic/asyncio_executor_pool.py | 50 +++++++++++++++++++ .../session/session_invocation_context.py | 10 +++- 4 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 neuro_san/service/generic/asyncio_executor_pool.py diff --git a/neuro_san/service/generic/agent_service.py b/neuro_san/service/generic/agent_service.py index 1884ead6e..c48491f48 100644 --- a/neuro_san/service/generic/agent_service.py +++ b/neuro_san/service/generic/agent_service.py @@ -33,6 +33,7 @@ from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger +from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -42,6 +43,9 @@ DO_NOT_LOG_REQUESTS = [ ] +# No limit on number of concurrently running executors +MAX_CONCURRENT_EXECUTORS = 0 + # pylint: disable=too-many-instance-attributes class AgentService: @@ -86,6 +90,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) + self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) # Load once self.llm_factory.load() self.toolbox_factory.load() @@ -214,7 +219,12 @@ def streaming_chat(self, request_dict: Dict[str, Any], # Prepare factory = ExternalAgentSessionFactory(use_direct=False) - invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) + invocation_context = SessionInvocationContext( + factory, + self.async_executors_pool, + self.llm_factory, + self.toolbox_factory, + metadata) invocation_context.start() # Set up logging inside async thread diff --git a/neuro_san/service/generic/async_agent_service.py b/neuro_san/service/generic/async_agent_service.py index 2d6ef9ca3..11b0cd581 100644 --- a/neuro_san/service/generic/async_agent_service.py +++ b/neuro_san/service/generic/async_agent_service.py @@ -32,6 +32,7 @@ from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger +from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -41,6 +42,9 @@ DO_NOT_LOG_REQUESTS = [ ] +# No limit on number of concurrently running executors +MAX_CONCURRENT_EXECUTORS = 0 + # pylint: disable=too-many-instance-attributes class AsyncAgentService: @@ -81,6 +85,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) + self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) # Load once. self.llm_factory.load() self.toolbox_factory.load() @@ -208,7 +213,12 @@ async def streaming_chat(self, request_dict: Dict[str, Any], # Prepare factory = ExternalAgentSessionFactory(use_direct=False) - invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) + invocation_context = SessionInvocationContext( + factory, + self.async_executors_pool, + self.llm_factory, + self.toolbox_factory, + metadata) invocation_context.start() # Set up logging inside async thread diff --git a/neuro_san/service/generic/asyncio_executor_pool.py b/neuro_san/service/generic/asyncio_executor_pool.py new file mode 100644 index 000000000..7c4936684 --- /dev/null +++ b/neuro_san/service/generic/asyncio_executor_pool.py @@ -0,0 +1,50 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +import threading +from leaf_common.asyncio.asyncio_executor import AsyncioExecutor + +class AsyncioExecutorPool: + """ + Class maintaining a dynamic set of reusable AsyncioExecutor instances. + """ + + def __init__(self, max_concurrent: int): + """ + Constructor. + :param max_concurrent: maximum allowed number of concurrently running AsyncioExecutors. + """ + self.max_concurrent = max_concurrent + self.pool = [] + self.lock: threading.Lock = threading.Lock() + + def get_executor(self) -> AsyncioExecutor: + """ + Get active (running) executor from the pool + :return: AsyncioExecutor instance + """ + with self.lock: + result: AsyncioExecutor = None + if len(self.pool) > 0: + result = self.pool.pop(0) + else: + result = AsyncioExecutor() + result.start() + return result + + def return_executor(self, executor: AsyncioExecutor): + """ + Return AsyncioExecutor instance back to the pool of available instances. + :param executor: AsyncioExecutor to return. + """ + with self.lock: + self.pool.append(executor) diff --git a/neuro_san/session/session_invocation_context.py b/neuro_san/session/session_invocation_context.py index 01e98332a..16cf7975c 100644 --- a/neuro_san/session/session_invocation_context.py +++ b/neuro_san/session/session_invocation_context.py @@ -17,6 +17,7 @@ from leaf_common.asyncio.asyncio_executor import AsyncioExecutor from leaf_server_common.logging.logging_setup import setup_extra_logging_fields +from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue from neuro_san.internals.interfaces.async_agent_session_factory import AsyncAgentSessionFactory from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory @@ -36,6 +37,7 @@ class SessionInvocationContext(InvocationContext): """ def __init__(self, async_session_factory: AsyncAgentSessionFactory, + async_executors_pool: AsyncioExecutorPool, llm_factory: ContextTypeLlmFactory, toolbox_factory: ContextTypeToolboxFactory = None, metadata: Dict[str, str] = None): @@ -44,6 +46,8 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory, :param async_session_factory: The AsyncAgentSessionFactory to use when connecting with external agents. + :param async_executors_pool: pool of AsyncioExecutors to use for obtaining + an executor instance to use for this context; :param llm_factory: The ContextTypeLlmFactory instance :param metadata: A grpc metadata of key/value pairs to be inserted into the header. Default is None. Preferred format is a @@ -51,8 +55,9 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory, """ self.async_session_factory: AsyncAgentSessionFactory = async_session_factory + self.async_executors_pool: AsyncioExecutorPool = async_executors_pool # Get an async executor to run all tasks for this session instance: - self.asyncio_executor: AsyncioExecutor = AsyncioExecutor() + self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor() self.origination: Origination = Origination() self.queue: AsyncCollatingQueue = AsyncCollatingQueue() self.journal: Journal = MessageJournal(self.queue) @@ -66,6 +71,7 @@ def start(self): Starts the active components of this invocation context. Do this separately from constructor for more control. Currently, we only start internal AsyncioExecutor. + It could be already running, but starting it twice is allowed. """ self.asyncio_executor.start() @@ -119,7 +125,7 @@ def close(self): Release resources owned by this context """ if self.asyncio_executor is not None: - self.asyncio_executor.shutdown() + self.async_executors_pool.return_executor(self.asyncio_executor) self.asyncio_executor = None if self.queue is not None: self.queue.close() From 05b68b15abe365542232699800352b1354059f7a Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 08:29:36 -0700 Subject: [PATCH 2/8] Add reuse_mode parameter to constructor. --- .../service/generic/asyncio_executor_pool.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/neuro_san/service/generic/asyncio_executor_pool.py b/neuro_san/service/generic/asyncio_executor_pool.py index 7c4936684..8e51cb4fb 100644 --- a/neuro_san/service/generic/asyncio_executor_pool.py +++ b/neuro_san/service/generic/asyncio_executor_pool.py @@ -18,12 +18,17 @@ class AsyncioExecutorPool: Class maintaining a dynamic set of reusable AsyncioExecutor instances. """ - def __init__(self, max_concurrent: int): + def __init__(self, max_concurrent: int, reuse_mode: bool = True): """ Constructor. :param max_concurrent: maximum allowed number of concurrently running AsyncioExecutors. + :param reuse_mode: True, if requested executor instances + are taken from pool of available ones (pool mode); + False, if requested executor instances are created new + and shutdown on return (backward compatible mode) """ self.max_concurrent = max_concurrent + self.reuse_mode: bool = reuse_mode self.pool = [] self.lock: threading.Lock = threading.Lock() @@ -34,7 +39,7 @@ def get_executor(self) -> AsyncioExecutor: """ with self.lock: result: AsyncioExecutor = None - if len(self.pool) > 0: + if self.reuse_mode and len(self.pool) > 0: result = self.pool.pop(0) else: result = AsyncioExecutor() @@ -46,5 +51,8 @@ def return_executor(self, executor: AsyncioExecutor): Return AsyncioExecutor instance back to the pool of available instances. :param executor: AsyncioExecutor to return. """ - with self.lock: - self.pool.append(executor) + if self.reuse_mode: + with self.lock: + self.pool.append(executor) + else: + executor.shutdown() From 44394f6dbaf802846970d94c272ed11a5346322f Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 09:00:51 -0700 Subject: [PATCH 3/8] More logging. --- neuro_san/service/generic/asyncio_executor_pool.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/neuro_san/service/generic/asyncio_executor_pool.py b/neuro_san/service/generic/asyncio_executor_pool.py index 8e51cb4fb..58c6bae6a 100644 --- a/neuro_san/service/generic/asyncio_executor_pool.py +++ b/neuro_san/service/generic/asyncio_executor_pool.py @@ -10,6 +10,7 @@ # # END COPYRIGHT +import logging import threading from leaf_common.asyncio.asyncio_executor import AsyncioExecutor @@ -31,6 +32,9 @@ def __init__(self, max_concurrent: int, reuse_mode: bool = True): self.reuse_mode: bool = reuse_mode self.pool = [] self.lock: threading.Lock = threading.Lock() + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.info("AsyncioExecutorPool created: %s reuse: %s max concurrent: %d", + id(self), str(self.reuse_mode), self.max_concurrent) def get_executor(self) -> AsyncioExecutor: """ @@ -41,9 +45,11 @@ def get_executor(self) -> AsyncioExecutor: result: AsyncioExecutor = None if self.reuse_mode and len(self.pool) > 0: result = self.pool.pop(0) + self.logger.info("Reusing AsyncExecutor %s", id(result)) else: result = AsyncioExecutor() result.start() + self.logger.info("Creating AsyncExecutor %s", id(result)) return result def return_executor(self, executor: AsyncioExecutor): @@ -54,5 +60,7 @@ def return_executor(self, executor: AsyncioExecutor): if self.reuse_mode: with self.lock: self.pool.append(executor) + self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d" , id(executor), len(self.pool)) else: + self.logger.info("Shutting down: AsyncExecutor %s", id(executor)) executor.shutdown() From 156654f530bce34259263040bcf57377731c5ebb Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 09:28:16 -0700 Subject: [PATCH 4/8] Pylint+flake8 fixes. --- neuro_san/service/generic/asyncio_executor_pool.py | 3 ++- neuro_san/session/session_invocation_context.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/neuro_san/service/generic/asyncio_executor_pool.py b/neuro_san/service/generic/asyncio_executor_pool.py index 58c6bae6a..65d3fbe42 100644 --- a/neuro_san/service/generic/asyncio_executor_pool.py +++ b/neuro_san/service/generic/asyncio_executor_pool.py @@ -14,6 +14,7 @@ import threading from leaf_common.asyncio.asyncio_executor import AsyncioExecutor + class AsyncioExecutorPool: """ Class maintaining a dynamic set of reusable AsyncioExecutor instances. @@ -60,7 +61,7 @@ def return_executor(self, executor: AsyncioExecutor): if self.reuse_mode: with self.lock: self.pool.append(executor) - self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d" , id(executor), len(self.pool)) + self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d", id(executor), len(self.pool)) else: self.logger.info("Shutting down: AsyncExecutor %s", id(executor)) executor.shutdown() diff --git a/neuro_san/session/session_invocation_context.py b/neuro_san/session/session_invocation_context.py index 16cf7975c..b61492970 100644 --- a/neuro_san/session/session_invocation_context.py +++ b/neuro_san/session/session_invocation_context.py @@ -36,6 +36,8 @@ class SessionInvocationContext(InvocationContext): service call or library call. """ + # pylint: disable=too-many-arguments + # pylint: disable=too-many-positional-arguments def __init__(self, async_session_factory: AsyncAgentSessionFactory, async_executors_pool: AsyncioExecutorPool, llm_factory: ContextTypeLlmFactory, From 774ac985b619e5a3d035d21a0125ae6ef486e3e1 Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 10:09:00 -0700 Subject: [PATCH 5/8] Add AsyncioExecutorPool() to direct client session. --- neuro_san/client/direct_agent_session_factory.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/neuro_san/client/direct_agent_session_factory.py b/neuro_san/client/direct_agent_session_factory.py index 881dd161a..bbb2aad49 100644 --- a/neuro_san/client/direct_agent_session_factory.py +++ b/neuro_san/client/direct_agent_session_factory.py @@ -24,6 +24,7 @@ from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage +from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.missing_agent_check import MissingAgentCheck @@ -74,7 +75,8 @@ def create_session(self, agent_name: str, use_direct: bool = False, toolbox_factory.load() factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage) - invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata) + executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(0) + invocation_context = SessionInvocationContext(factory, executors_pool, allm_factory, toolbox_factory, metadata) invocation_context.start() session: DirectAgentSession = DirectAgentSession(agent_network=agent_network, invocation_context=invocation_context, From d5bf66c4afae028df506a3a2e3e017a63f827c6f Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 10:11:33 -0700 Subject: [PATCH 6/8] Typo. --- neuro_san/client/direct_agent_session_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neuro_san/client/direct_agent_session_factory.py b/neuro_san/client/direct_agent_session_factory.py index bbb2aad49..844900ff3 100644 --- a/neuro_san/client/direct_agent_session_factory.py +++ b/neuro_san/client/direct_agent_session_factory.py @@ -76,7 +76,7 @@ def create_session(self, agent_name: str, use_direct: bool = False, factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage) executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(0) - invocation_context = SessionInvocationContext(factory, executors_pool, allm_factory, toolbox_factory, metadata) + invocation_context = SessionInvocationContext(factory, executors_pool, llm_factory, toolbox_factory, metadata) invocation_context.start() session: DirectAgentSession = DirectAgentSession(agent_network=agent_network, invocation_context=invocation_context, From 2f4d7db52cbcee7e049fa979ba4f142545593a3d Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 13:31:01 -0700 Subject: [PATCH 7/8] Better handling of AsyncioExecutor instances. --- .../service/generic/asyncio_executor_pool.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/neuro_san/service/generic/asyncio_executor_pool.py b/neuro_san/service/generic/asyncio_executor_pool.py index 65d3fbe42..483a5d9fd 100644 --- a/neuro_san/service/generic/asyncio_executor_pool.py +++ b/neuro_san/service/generic/asyncio_executor_pool.py @@ -42,16 +42,18 @@ def get_executor(self) -> AsyncioExecutor: Get active (running) executor from the pool :return: AsyncioExecutor instance """ - with self.lock: - result: AsyncioExecutor = None - if self.reuse_mode and len(self.pool) > 0: - result = self.pool.pop(0) - self.logger.info("Reusing AsyncExecutor %s", id(result)) - else: - result = AsyncioExecutor() - result.start() - self.logger.info("Creating AsyncExecutor %s", id(result)) - return result + if self.reuse_mode: + with self.lock: + if len(self.pool) > 0: + result = self.pool.pop(0) + self.logger.info("Reusing AsyncExecutor %s", id(result)) + return result + # Create AsyncioExecutor outside of lock + # to avoid potentially longer locked periods + result = AsyncioExecutor() + result.start() + self.logger.info("Creating AsyncExecutor %s", id(result)) + return result def return_executor(self, executor: AsyncioExecutor): """ @@ -63,5 +65,7 @@ def return_executor(self, executor: AsyncioExecutor): self.pool.append(executor) self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d", id(executor), len(self.pool)) else: + # Shutdown AsyncioExecutor outside of lock + # to avoid potentially longer locked periods self.logger.info("Shutting down: AsyncExecutor %s", id(executor)) executor.shutdown() From 3494bf36d09948db1ca63435e097caeab0d3a0d8 Mon Sep 17 00:00:00 2001 From: Andrei Denissov Date: Thu, 24 Jul 2025 19:44:09 -0700 Subject: [PATCH 8/8] Switch to using AsyncioExecutorPool from leaf-common. --- .../client/direct_agent_session_factory.py | 4 +- .../utils/asyncio_executor_pool_provider.py | 32 +++++++++ neuro_san/service/generic/agent_service.py | 8 +-- .../service/generic/async_agent_service.py | 8 +-- .../service/generic/asyncio_executor_pool.py | 71 ------------------- .../service/main_loop/server_main_loop.py | 3 + .../session/session_invocation_context.py | 2 +- requirements.txt | 2 +- 8 files changed, 45 insertions(+), 85 deletions(-) create mode 100644 neuro_san/internals/utils/asyncio_executor_pool_provider.py delete mode 100644 neuro_san/service/generic/asyncio_executor_pool.py diff --git a/neuro_san/client/direct_agent_session_factory.py b/neuro_san/client/direct_agent_session_factory.py index 844900ff3..1400e36ed 100644 --- a/neuro_san/client/direct_agent_session_factory.py +++ b/neuro_san/client/direct_agent_session_factory.py @@ -13,6 +13,7 @@ from typing import Dict from leaf_common.time.timeout import Timeout +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.interfaces.agent_session import AgentSession from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory @@ -24,7 +25,6 @@ from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage -from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.missing_agent_check import MissingAgentCheck @@ -75,7 +75,7 @@ def create_session(self, agent_name: str, use_direct: bool = False, toolbox_factory.load() factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage) - executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(0) + executors_pool: AsyncioExecutorPool = AsyncioExecutorPool() invocation_context = SessionInvocationContext(factory, executors_pool, llm_factory, toolbox_factory, metadata) invocation_context.start() session: DirectAgentSession = DirectAgentSession(agent_network=agent_network, diff --git a/neuro_san/internals/utils/asyncio_executor_pool_provider.py b/neuro_san/internals/utils/asyncio_executor_pool_provider.py new file mode 100644 index 000000000..115450717 --- /dev/null +++ b/neuro_san/internals/utils/asyncio_executor_pool_provider.py @@ -0,0 +1,32 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT +""" +See class comments +""" +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool + + +class AsyncioExecutorPoolProvider: + """ + Class providing server-wide instance of AsyncioExecutorPool + for use by specific agent services. + """ + + _executors_pool: AsyncioExecutorPool = None + + @classmethod + def set_executors_pool(cls, reuse_mode: bool = True): + cls._executors_pool = AsyncioExecutorPool(reuse_mode=reuse_mode) + + @classmethod + def get_executors_pool(cls): + return cls._executors_pool diff --git a/neuro_san/service/generic/agent_service.py b/neuro_san/service/generic/agent_service.py index c48491f48..17abfc818 100644 --- a/neuro_san/service/generic/agent_service.py +++ b/neuro_san/service/generic/agent_service.py @@ -19,6 +19,7 @@ import uuid from leaf_common.asyncio.asyncio_executor import AsyncioExecutor +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from leaf_server_common.server.atomic_counter import AtomicCounter from leaf_server_common.server.request_logger import RequestLogger @@ -29,11 +30,11 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory +from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger -from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -43,9 +44,6 @@ DO_NOT_LOG_REQUESTS = [ ] -# No limit on number of concurrently running executors -MAX_CONCURRENT_EXECUTORS = 0 - # pylint: disable=too-many-instance-attributes class AgentService: @@ -90,7 +88,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) + self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() # Load once self.llm_factory.load() self.toolbox_factory.load() diff --git a/neuro_san/service/generic/async_agent_service.py b/neuro_san/service/generic/async_agent_service.py index 11b0cd581..61df30be2 100644 --- a/neuro_san/service/generic/async_agent_service.py +++ b/neuro_san/service/generic/async_agent_service.py @@ -18,6 +18,7 @@ import uuid from leaf_common.asyncio.asyncio_executor import AsyncioExecutor +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from leaf_server_common.server.atomic_counter import AtomicCounter @@ -27,12 +28,12 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory +from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger -from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -42,9 +43,6 @@ DO_NOT_LOG_REQUESTS = [ ] -# No limit on number of concurrently running executors -MAX_CONCURRENT_EXECUTORS = 0 - # pylint: disable=too-many-instance-attributes class AsyncAgentService: @@ -85,7 +83,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) + self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() # Load once. self.llm_factory.load() self.toolbox_factory.load() diff --git a/neuro_san/service/generic/asyncio_executor_pool.py b/neuro_san/service/generic/asyncio_executor_pool.py deleted file mode 100644 index 483a5d9fd..000000000 --- a/neuro_san/service/generic/asyncio_executor_pool.py +++ /dev/null @@ -1,71 +0,0 @@ - -# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. -# All Rights Reserved. -# Issued under the Academic Public License. -# -# You can be released from the terms, and requirements of the Academic Public -# License by purchasing a commercial license. -# Purchase of a commercial license is mandatory for any use of the -# neuro-san SDK Software in commercial settings. -# -# END COPYRIGHT - -import logging -import threading -from leaf_common.asyncio.asyncio_executor import AsyncioExecutor - - -class AsyncioExecutorPool: - """ - Class maintaining a dynamic set of reusable AsyncioExecutor instances. - """ - - def __init__(self, max_concurrent: int, reuse_mode: bool = True): - """ - Constructor. - :param max_concurrent: maximum allowed number of concurrently running AsyncioExecutors. - :param reuse_mode: True, if requested executor instances - are taken from pool of available ones (pool mode); - False, if requested executor instances are created new - and shutdown on return (backward compatible mode) - """ - self.max_concurrent = max_concurrent - self.reuse_mode: bool = reuse_mode - self.pool = [] - self.lock: threading.Lock = threading.Lock() - self.logger = logging.getLogger(self.__class__.__name__) - self.logger.info("AsyncioExecutorPool created: %s reuse: %s max concurrent: %d", - id(self), str(self.reuse_mode), self.max_concurrent) - - def get_executor(self) -> AsyncioExecutor: - """ - Get active (running) executor from the pool - :return: AsyncioExecutor instance - """ - if self.reuse_mode: - with self.lock: - if len(self.pool) > 0: - result = self.pool.pop(0) - self.logger.info("Reusing AsyncExecutor %s", id(result)) - return result - # Create AsyncioExecutor outside of lock - # to avoid potentially longer locked periods - result = AsyncioExecutor() - result.start() - self.logger.info("Creating AsyncExecutor %s", id(result)) - return result - - def return_executor(self, executor: AsyncioExecutor): - """ - Return AsyncioExecutor instance back to the pool of available instances. - :param executor: AsyncioExecutor to return. - """ - if self.reuse_mode: - with self.lock: - self.pool.append(executor) - self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d", id(executor), len(self.pool)) - else: - # Shutdown AsyncioExecutor outside of lock - # to avoid potentially longer locked periods - self.logger.info("Shutting down: AsyncExecutor %s", id(executor)) - executor.shutdown() diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index 512eddb32..d7ae189ec 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -28,6 +28,7 @@ from neuro_san.internals.graph.registry.agent_network import AgentNetwork from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.utils.file_of_class import FileOfClass +from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS from neuro_san.service.grpc.grpc_agent_server import DEFAULT_MAX_CONCURRENT_REQUESTS @@ -177,6 +178,8 @@ def main_loop(self): metadata_set = metadata_set | set(self.usage_logger_metadata.split()) metadata_str: str = " ".join(sorted(metadata_set)) + AsyncioExecutorPoolProvider.set_executors_pool(reuse_mode=True) + if self.server_status.grpc_service.is_requested(): self.grpc_server = GrpcAgentServer( self.grpc_port, diff --git a/neuro_san/session/session_invocation_context.py b/neuro_san/session/session_invocation_context.py index b61492970..d3bb85451 100644 --- a/neuro_san/session/session_invocation_context.py +++ b/neuro_san/session/session_invocation_context.py @@ -15,9 +15,9 @@ from asyncio import Future from leaf_common.asyncio.asyncio_executor import AsyncioExecutor +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool from leaf_server_common.logging.logging_setup import setup_extra_logging_fields -from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue from neuro_san.internals.interfaces.async_agent_session_factory import AsyncAgentSessionFactory from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory diff --git a/requirements.txt b/requirements.txt index 211e40345..0499367a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ # requirements-build.txt # In-house dependencies -leaf-common>=1.2.23 +leaf-common>=1.2.24 leaf-server-common>=0.1.20 # These are needed for generating code from .proto files