Un-3217 Implement pooling for AsyncExecutors.#328
Un-3217 Implement pooling for AsyncExecutors.#328andreidenissov-cog wants to merge 8 commits intomainfrom
Conversation
| ] | ||
|
|
||
| # No limit on number of concurrently running executors | ||
| MAX_CONCURRENT_EXECUTORS = 0 |
There was a problem hiding this comment.
Why does this need to be defined twice?
Maybe this should move to AsyncioExecutorPool and be the default for the constructor?
| ] | ||
|
|
||
| # No limit on number of concurrently running executors | ||
| MAX_CONCURRENT_EXECUTORS = 0 |
There was a problem hiding this comment.
Why does this need to be defined twice?
There was a problem hiding this comment.
Very interesting approach!
In a way there is a leak because if there is a burst of requests, there will be 1 executor per request
and that number will never go down. But I can see how this will address the "Event loop is closed problem".
Maybe there is a way to slowly shut them down? (not for this PR)
- Single definition for MAX_CONCURRENT_EXECUTORS please
- Move AsyncioExecutorPool to internals.utils to avoid the tangles you've just created.
Also: Is per-request logging metadata still working as expected?
I can maybe see a problem where we only set up the logging for the event loop thread once and then the logging for the next re-use gets mal-ed up.
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) |
There was a problem hiding this comment.
Should this pool be owned by the server and not the service?
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) |
There was a problem hiding this comment.
Should this pool be owned by the server and not the service?
| from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer | ||
| from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool |
There was a problem hiding this comment.
Cannot import stuff from neuro_san.service in the client area. Huge reach-around tangle
and will be a problem should we decide to break service stuff out to separate repo.
I think what you want to do is put this in internals.utils for now.
It could easily make its way down to leaf-common.asyncio.
| from leaf_common.asyncio.asyncio_executor import AsyncioExecutor | ||
| from leaf_server_common.logging.logging_setup import setup_extra_logging_fields | ||
|
|
||
| from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool |
There was a problem hiding this comment.
Cannot include neuro_san.service here either.
Move it to internals.utils.
| False, if requested executor instances are created new | ||
| and shutdown on return (backward compatible mode) | ||
| """ | ||
| self.max_concurrent = max_concurrent |
There was a problem hiding this comment.
Where is self.max_concurrent ever used?
There was a problem hiding this comment.
Nowhere right now. Was thinking somewhat ahead. Maybe not worth it.
|
|
||
| # No limit on number of concurrently running executors | ||
| MAX_CONCURRENT_EXECUTORS = 0 | ||
|
|
There was a problem hiding this comment.
For now, we don't limit number of AsyncExecutors which can be created in the pool.
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS) | ||
| # Load once |
There was a problem hiding this comment.
Create an instance of AsyncioExecutorPool for this service.
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
| invocation_context.start() |
There was a problem hiding this comment.
SessionInvocationContext now takes AsyncioExecutorPool as a parameter.
| # Get an async executor to run all tasks for this session instance: | ||
| self.asyncio_executor: AsyncioExecutor = AsyncioExecutor() | ||
| self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor() | ||
| self.origination: Origination = Origination() |
There was a problem hiding this comment.
Instead of creating AsynioExecutor instance directly here, request it from the pool.
| if self.asyncio_executor is not None: | ||
| self.asyncio_executor.shutdown() | ||
| self.async_executors_pool.return_executor(self.asyncio_executor) | ||
| self.asyncio_executor = None |
There was a problem hiding this comment.
And then we are done, return our AsyncioExecutor back to the pool.
|
|
||
| # No limit on number of concurrently running executors | ||
| MAX_CONCURRENT_EXECUTORS = 0 | ||
|
|
There was a problem hiding this comment.
For now, we don't limit number of AsyncExecutors which can be created in the pool.
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
| invocation_context.start() |
There was a problem hiding this comment.
We do the same thing as for sync AgentService above.
| self.logger = logging.getLogger(self.__class__.__name__) | ||
| self.logger.info("AsyncioExecutorPool created: %s reuse: %s max concurrent: %d", | ||
| id(self), str(self.reuse_mode), self.max_concurrent) | ||
|
|
There was a problem hiding this comment.
Pretty simple-minded pool of AsyncioExecutors implementation.
Instances move through self.pool sequence in queue mode: fist in - first out.
This PR implements a very simple pooling for AsyncExecutors we use to run per-request service session.
The main goal here is to compensate for (unknown to us) httpx connections pooling policy, which results in transport connection being closed at some unpredictable moment in time, after our request is already processed and underlying AsyncExecutor is shutdown.
With pooling, we basically keep all our AsyncExecutors alive and running, with their event loops active and available for incoming "close this connection" task. There is also some performance advantage in not creating AsyncExecutor from scratch for each request , with thread creation/starting and all.
We can still specify "backward compatibility" mode for the pool with pass-thru creation and shutdown for each AsyncExecutor.
Tested: usual stress tests for both OpenAI and AzureOpenAI models with no exceptions observed.