Skip to content

Comments

Un-3217 Implement pooling for AsyncExecutors.#328

Closed
andreidenissov-cog wants to merge 8 commits intomainfrom
ASD-UN-3217-async-executor-pool01
Closed

Un-3217 Implement pooling for AsyncExecutors.#328
andreidenissov-cog wants to merge 8 commits intomainfrom
ASD-UN-3217-async-executor-pool01

Conversation

@andreidenissov-cog
Copy link
Contributor

@andreidenissov-cog andreidenissov-cog commented Jul 24, 2025

This PR implements a very simple pooling for AsyncExecutors we use to run per-request service session.
The main goal here is to compensate for (unknown to us) httpx connections pooling policy, which results in transport connection being closed at some unpredictable moment in time, after our request is already processed and underlying AsyncExecutor is shutdown.
With pooling, we basically keep all our AsyncExecutors alive and running, with their event loops active and available for incoming "close this connection" task. There is also some performance advantage in not creating AsyncExecutor from scratch for each request , with thread creation/starting and all.
We can still specify "backward compatibility" mode for the pool with pass-thru creation and shutdown for each AsyncExecutor.

Tested: usual stress tests for both OpenAI and AzureOpenAI models with no exceptions observed.

]

# No limit on number of concurrently running executors
MAX_CONCURRENT_EXECUTORS = 0
Copy link
Collaborator

@d1donlydfink d1donlydfink Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be defined twice?

Maybe this should move to AsyncioExecutorPool and be the default for the constructor?

]

# No limit on number of concurrently running executors
MAX_CONCURRENT_EXECUTORS = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be defined twice?

Copy link
Collaborator

@d1donlydfink d1donlydfink left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very interesting approach!

In a way there is a leak because if there is a burst of requests, there will be 1 executor per request
and that number will never go down. But I can see how this will address the "Event loop is closed problem".
Maybe there is a way to slowly shut them down? (not for this PR)

  • Single definition for MAX_CONCURRENT_EXECUTORS please
  • Move AsyncioExecutorPool to internals.utils to avoid the tangles you've just created.

Also: Is per-request logging metadata still working as expected?
I can maybe see a problem where we only set up the logging for the event loop thread once and then the logging for the next re-use gets mal-ed up.

config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this pool be owned by the server and not the service?

config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this pool be owned by the server and not the service?

from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool
Copy link
Collaborator

@d1donlydfink d1donlydfink Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot import stuff from neuro_san.service in the client area. Huge reach-around tangle
and will be a problem should we decide to break service stuff out to separate repo.

I think what you want to do is put this in internals.utils for now.
It could easily make its way down to leaf-common.asyncio.

from leaf_common.asyncio.asyncio_executor import AsyncioExecutor
from leaf_server_common.logging.logging_setup import setup_extra_logging_fields

from neuro_san.service.generic.asyncio_executor_pool import AsyncioExecutorPool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot include neuro_san.service here either.

Move it to internals.utils.

False, if requested executor instances are created new
and shutdown on return (backward compatible mode)
"""
self.max_concurrent = max_concurrent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is self.max_concurrent ever used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nowhere right now. Was thinking somewhat ahead. Maybe not worth it.


# No limit on number of concurrently running executors
MAX_CONCURRENT_EXECUTORS = 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we don't limit number of AsyncExecutors which can be created in the pool.

self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPool(MAX_CONCURRENT_EXECUTORS)
# Load once
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an instance of AsyncioExecutorPool for this service.

self.llm_factory,
self.toolbox_factory,
metadata)
invocation_context.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionInvocationContext now takes AsyncioExecutorPool as a parameter.

# Get an async executor to run all tasks for this session instance:
self.asyncio_executor: AsyncioExecutor = AsyncioExecutor()
self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor()
self.origination: Origination = Origination()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating AsynioExecutor instance directly here, request it from the pool.

if self.asyncio_executor is not None:
self.asyncio_executor.shutdown()
self.async_executors_pool.return_executor(self.asyncio_executor)
self.asyncio_executor = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then we are done, return our AsyncioExecutor back to the pool.


# No limit on number of concurrently running executors
MAX_CONCURRENT_EXECUTORS = 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we don't limit number of AsyncExecutors which can be created in the pool.

self.llm_factory,
self.toolbox_factory,
metadata)
invocation_context.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do the same thing as for sync AgentService above.

self.logger = logging.getLogger(self.__class__.__name__)
self.logger.info("AsyncioExecutorPool created: %s reuse: %s max concurrent: %d",
id(self), str(self.reuse_mode), self.max_concurrent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty simple-minded pool of AsyncioExecutors implementation.
Instances move through self.pool sequence in queue mode: fist in - first out.

@andreidenissov-cog andreidenissov-cog deleted the ASD-UN-3217-async-executor-pool01 branch October 2, 2025 20:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants