-
Notifications
You must be signed in to change notification settings - Fork 28
Un-3217 Implement pooling for AsyncExecutors. #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2a3d6b5
05b68b1
44394f6
156654f
774ac98
d5bf66c
2f4d7db
3494bf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
|
|
||
| # Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. | ||
| # All Rights Reserved. | ||
| # Issued under the Academic Public License. | ||
| # | ||
| # You can be released from the terms, and requirements of the Academic Public | ||
| # License by purchasing a commercial license. | ||
| # Purchase of a commercial license is mandatory for any use of the | ||
| # neuro-san SDK Software in commercial settings. | ||
| # | ||
| # END COPYRIGHT | ||
| """ | ||
| See class comments | ||
| """ | ||
| from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool | ||
|
|
||
|
|
||
| class AsyncioExecutorPoolProvider: | ||
| """ | ||
| Class providing server-wide instance of AsyncioExecutorPool | ||
| for use by specific agent services. | ||
| """ | ||
|
|
||
| _executors_pool: AsyncioExecutorPool = None | ||
|
|
||
| @classmethod | ||
| def set_executors_pool(cls, reuse_mode: bool = True): | ||
| cls._executors_pool = AsyncioExecutorPool(reuse_mode=reuse_mode) | ||
|
|
||
| @classmethod | ||
| def get_executors_pool(cls): | ||
| return cls._executors_pool |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| import uuid | ||
|
|
||
| from leaf_common.asyncio.asyncio_executor import AsyncioExecutor | ||
| from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool | ||
|
|
||
| from leaf_server_common.server.atomic_counter import AtomicCounter | ||
| from leaf_server_common.server.request_logger import RequestLogger | ||
|
|
@@ -29,6 +30,7 @@ | |
| from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory | ||
| from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory | ||
| from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory | ||
| from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.generic.chat_message_converter import ChatMessageConverter | ||
| from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory | ||
|
|
@@ -86,6 +88,7 @@ def __init__(self, | |
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() | ||
| # Load once | ||
| self.llm_factory.load() | ||
| self.toolbox_factory.load() | ||
|
|
@@ -214,7 +217,12 @@ def streaming_chat(self, request_dict: Dict[str, Any], | |
|
|
||
| # Prepare | ||
| factory = ExternalAgentSessionFactory(use_direct=False) | ||
| invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) | ||
| invocation_context = SessionInvocationContext( | ||
| factory, | ||
| self.async_executors_pool, | ||
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
| invocation_context.start() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SessionInvocationContext now takes AsyncioExecutorPool as a parameter. |
||
|
|
||
| # Set up logging inside async thread | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| import uuid | ||
|
|
||
| from leaf_common.asyncio.asyncio_executor import AsyncioExecutor | ||
| from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool | ||
|
|
||
| from leaf_server_common.server.atomic_counter import AtomicCounter | ||
|
|
||
|
|
@@ -27,6 +28,7 @@ | |
| from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory | ||
| from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory | ||
| from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory | ||
| from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.generic.chat_message_converter import ChatMessageConverter | ||
| from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger | ||
|
|
@@ -81,6 +83,7 @@ def __init__(self, | |
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() | ||
| # Load once. | ||
| self.llm_factory.load() | ||
| self.toolbox_factory.load() | ||
|
|
@@ -208,7 +211,12 @@ async def streaming_chat(self, request_dict: Dict[str, Any], | |
|
|
||
| # Prepare | ||
| factory = ExternalAgentSessionFactory(use_direct=False) | ||
| invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) | ||
| invocation_context = SessionInvocationContext( | ||
| factory, | ||
| self.async_executors_pool, | ||
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
| invocation_context.start() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do the same thing as for sync AgentService above. |
||
|
|
||
| # Set up logging inside async thread | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| from asyncio import Future | ||
|
|
||
| from leaf_common.asyncio.asyncio_executor import AsyncioExecutor | ||
| from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool | ||
| from leaf_server_common.logging.logging_setup import setup_extra_logging_fields | ||
|
|
||
| from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue | ||
|
|
@@ -35,7 +36,10 @@ class SessionInvocationContext(InvocationContext): | |
| service call or library call. | ||
| """ | ||
|
|
||
| # pylint: disable=too-many-arguments | ||
| # pylint: disable=too-many-positional-arguments | ||
| def __init__(self, async_session_factory: AsyncAgentSessionFactory, | ||
| async_executors_pool: AsyncioExecutorPool, | ||
| llm_factory: ContextTypeLlmFactory, | ||
| toolbox_factory: ContextTypeToolboxFactory = None, | ||
| metadata: Dict[str, str] = None): | ||
|
|
@@ -44,15 +48,18 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory, | |
|
|
||
| :param async_session_factory: The AsyncAgentSessionFactory to use | ||
| when connecting with external agents. | ||
| :param async_executors_pool: pool of AsyncioExecutors to use for obtaining | ||
| an executor instance to use for this context; | ||
| :param llm_factory: The ContextTypeLlmFactory instance | ||
| :param metadata: A grpc metadata of key/value pairs to be inserted into | ||
| the header. Default is None. Preferred format is a | ||
| dictionary of string keys to string values. | ||
| """ | ||
|
|
||
| self.async_session_factory: AsyncAgentSessionFactory = async_session_factory | ||
| self.async_executors_pool: AsyncioExecutorPool = async_executors_pool | ||
| # Get an async executor to run all tasks for this session instance: | ||
| self.asyncio_executor: AsyncioExecutor = AsyncioExecutor() | ||
| self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor() | ||
| self.origination: Origination = Origination() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of creating AsynioExecutor instance directly here, request it from the pool. |
||
| self.queue: AsyncCollatingQueue = AsyncCollatingQueue() | ||
| self.journal: Journal = MessageJournal(self.queue) | ||
|
|
@@ -66,6 +73,7 @@ def start(self): | |
| Starts the active components of this invocation context. | ||
| Do this separately from constructor for more control. | ||
| Currently, we only start internal AsyncioExecutor. | ||
| It could be already running, but starting it twice is allowed. | ||
| """ | ||
| self.asyncio_executor.start() | ||
|
|
||
|
|
@@ -119,7 +127,7 @@ def close(self): | |
| Release resources owned by this context | ||
| """ | ||
| if self.asyncio_executor is not None: | ||
| self.asyncio_executor.shutdown() | ||
| self.async_executors_pool.return_executor(self.asyncio_executor) | ||
| self.asyncio_executor = None | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And then we are done, return our AsyncioExecutor back to the pool. |
||
| if self.queue is not None: | ||
| self.queue.close() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create an instance of AsyncioExecutorPool for this service.