-
Notifications
You must be signed in to change notification settings - Fork 28
Un-3217 Implement pooling for AsyncExecutors. #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
2a3d6b5
05b68b1
44394f6
156654f
774ac98
d5bf66c
2f4d7db
3494bf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| from neuro_san.service.generic.chat_message_converter import ChatMessageConverter | ||
| from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory | ||
| from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger | ||
| from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool | ||
| from neuro_san.session.direct_agent_session import DirectAgentSession | ||
| from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory | ||
| from neuro_san.session.session_invocation_context import SessionInvocationContext | ||
|
|
@@ -42,6 +43,9 @@ | |
| DO_NOT_LOG_REQUESTS = [ | ||
| ] | ||
|
|
||
| # No limit on number of concurrently running executors | ||
| MAX_CONCURRENT_EXECUTORS = 0 | ||
|
||
|
|
||
|
||
|
|
||
| # pylint: disable=too-many-instance-attributes | ||
| class AgentService: | ||
|
|
@@ -86,6 +90,7 @@ def __init__(self, | |
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) | ||
|
||
| # Load once | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create an instance of AsyncioExecutorPool for this service. |
||
| self.llm_factory.load() | ||
| self.toolbox_factory.load() | ||
|
|
@@ -214,7 +219,12 @@ def streaming_chat(self, request_dict: Dict[str, Any], | |
|
|
||
| # Prepare | ||
| factory = ExternalAgentSessionFactory(use_direct=False) | ||
| invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) | ||
| invocation_context = SessionInvocationContext( | ||
| factory, | ||
| self.async_executors_pool, | ||
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
| invocation_context.start() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SessionInvocationContext now takes AsyncioExecutorPool as a parameter. |
||
|
|
||
| # Set up logging inside async thread | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger | ||
| from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory | ||
| from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger | ||
| from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool | ||
| from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession | ||
| from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory | ||
| from neuro_san.session.session_invocation_context import SessionInvocationContext | ||
|
|
@@ -41,6 +42,9 @@ | |
| DO_NOT_LOG_REQUESTS = [ | ||
| ] | ||
|
|
||
| # No limit on number of concurrently running executors | ||
| MAX_CONCURRENT_EXECUTORS = 0 | ||
|
||
|
|
||
|
||
|
|
||
| # pylint: disable=too-many-instance-attributes | ||
| class AsyncAgentService: | ||
|
|
@@ -81,6 +85,7 @@ def __init__(self, | |
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) | ||
|
||
| # Load once. | ||
| self.llm_factory.load() | ||
| self.toolbox_factory.load() | ||
|
|
@@ -208,7 +213,12 @@ async def streaming_chat(self, request_dict: Dict[str, Any], | |
|
|
||
| # Prepare | ||
| factory = ExternalAgentSessionFactory(use_direct=False) | ||
| invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata) | ||
| invocation_context = SessionInvocationContext( | ||
| factory, | ||
| self.async_executors_pool, | ||
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
| invocation_context.start() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do the same thing as for sync AgentService above. |
||
|
|
||
| # Set up logging inside async thread | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
|
|
||
| # Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. | ||
| # All Rights Reserved. | ||
| # Issued under the Academic Public License. | ||
| # | ||
| # You can be released from the terms, and requirements of the Academic Public | ||
| # License by purchasing a commercial license. | ||
| # Purchase of a commercial license is mandatory for any use of the | ||
| # neuro-san SDK Software in commercial settings. | ||
| # | ||
| # END COPYRIGHT | ||
|
|
||
| import logging | ||
| import threading | ||
| from leaf_common.asyncio.asyncio_executor import AsyncioExecutor | ||
|
|
||
|
|
||
| class AsyncioExecutorPool: | ||
| """ | ||
| Class maintaining a dynamic set of reusable AsyncioExecutor instances. | ||
| """ | ||
|
|
||
| def __init__(self, max_concurrent: int, reuse_mode: bool = True): | ||
| """ | ||
| Constructor. | ||
| :param max_concurrent: maximum allowed number of concurrently running AsyncioExecutors. | ||
| :param reuse_mode: True, if requested executor instances | ||
| are taken from pool of available ones (pool mode); | ||
| False, if requested executor instances are created new | ||
| and shutdown on return (backward compatible mode) | ||
| """ | ||
| self.max_concurrent = max_concurrent | ||
|
||
| self.reuse_mode: bool = reuse_mode | ||
| self.pool = [] | ||
| self.lock: threading.Lock = threading.Lock() | ||
| self.logger = logging.getLogger(self.__class__.__name__) | ||
| self.logger.info("AsyncioExecutorPool created: %s reuse: %s max concurrent: %d", | ||
| id(self), str(self.reuse_mode), self.max_concurrent) | ||
|
|
||
|
||
| def get_executor(self) -> AsyncioExecutor: | ||
| """ | ||
| Get active (running) executor from the pool | ||
| :return: AsyncioExecutor instance | ||
| """ | ||
| with self.lock: | ||
| result: AsyncioExecutor = None | ||
| if self.reuse_mode and len(self.pool) > 0: | ||
| result = self.pool.pop(0) | ||
| self.logger.info("Reusing AsyncExecutor %s", id(result)) | ||
| else: | ||
| result = AsyncioExecutor() | ||
| result.start() | ||
| self.logger.info("Creating AsyncExecutor %s", id(result)) | ||
| return result | ||
|
|
||
| def return_executor(self, executor: AsyncioExecutor): | ||
| """ | ||
| Return AsyncioExecutor instance back to the pool of available instances. | ||
| :param executor: AsyncioExecutor to return. | ||
| """ | ||
| if self.reuse_mode: | ||
| with self.lock: | ||
| self.pool.append(executor) | ||
| self.logger.info("Returned to pool: AsyncExecutor %s pool size: %d", id(executor), len(self.pool)) | ||
| else: | ||
| self.logger.info("Shutting down: AsyncExecutor %s", id(executor)) | ||
| executor.shutdown() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| from leaf_common.asyncio.asyncio_executor import AsyncioExecutor | ||
| from leaf_server_common.logging.logging_setup import setup_extra_logging_fields | ||
|
|
||
| from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool | ||
|
||
| from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue | ||
| from neuro_san.internals.interfaces.async_agent_session_factory import AsyncAgentSessionFactory | ||
| from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory | ||
|
|
@@ -35,7 +36,10 @@ class SessionInvocationContext(InvocationContext): | |
| service call or library call. | ||
| """ | ||
|
|
||
| # pylint: disable=too-many-arguments | ||
| # pylint: disable=too-many-positional-arguments | ||
| def __init__(self, async_session_factory: AsyncAgentSessionFactory, | ||
| async_executors_pool: AsyncioExecutorPool, | ||
| llm_factory: ContextTypeLlmFactory, | ||
| toolbox_factory: ContextTypeToolboxFactory = None, | ||
| metadata: Dict[str, str] = None): | ||
|
|
@@ -44,15 +48,18 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory, | |
|
|
||
| :param async_session_factory: The AsyncAgentSessionFactory to use | ||
| when connecting with external agents. | ||
| :param async_executors_pool: pool of AsyncioExecutors to use for obtaining | ||
| an executor instance to use for this context; | ||
| :param llm_factory: The ContextTypeLlmFactory instance | ||
| :param metadata: A grpc metadata of key/value pairs to be inserted into | ||
| the header. Default is None. Preferred format is a | ||
| dictionary of string keys to string values. | ||
| """ | ||
|
|
||
| self.async_session_factory: AsyncAgentSessionFactory = async_session_factory | ||
| self.async_executors_pool: AsyncioExecutorPool = async_executors_pool | ||
| # Get an async executor to run all tasks for this session instance: | ||
| self.asyncio_executor: AsyncioExecutor = AsyncioExecutor() | ||
| self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor() | ||
| self.origination: Origination = Origination() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of creating AsynioExecutor instance directly here, request it from the pool. |
||
| self.queue: AsyncCollatingQueue = AsyncCollatingQueue() | ||
| self.journal: Journal = MessageJournal(self.queue) | ||
|
|
@@ -66,6 +73,7 @@ def start(self): | |
| Starts the active components of this invocation context. | ||
| Do this separately from constructor for more control. | ||
| Currently, we only start internal AsyncioExecutor. | ||
| It could be already running, but starting it twice is allowed. | ||
| """ | ||
| self.asyncio_executor.start() | ||
|
|
||
|
|
@@ -119,7 +127,7 @@ def close(self): | |
| Release resources owned by this context | ||
| """ | ||
| if self.asyncio_executor is not None: | ||
| self.asyncio_executor.shutdown() | ||
| self.async_executors_pool.return_executor(self.asyncio_executor) | ||
| self.asyncio_executor = None | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And then we are done, return our AsyncioExecutor back to the pool. |
||
| if self.queue is not None: | ||
| self.queue.close() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot import stuff from neuro_san.service in the client area. Huge reach-around tangle
and will be a problem should we decide to break service stuff out to separate repo.
I think what you want to do is put this in internals.utils for now.
It could easily make its way down to leaf-common.asyncio.