Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion neuro_san/client/direct_agent_session_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Dict

from leaf_common.time.timeout import Timeout
from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool

from neuro_san.interfaces.agent_session import AgentSession
from neuro_san.internals.interfaces.context_type_toolbox_factory import ContextTypeToolboxFactory
Expand Down Expand Up @@ -74,7 +75,8 @@ def create_session(self, agent_name: str, use_direct: bool = False,
toolbox_factory.load()

factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage)
invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata)
executors_pool: AsyncioExecutorPool = AsyncioExecutorPool()
invocation_context = SessionInvocationContext(factory, executors_pool, llm_factory, toolbox_factory, metadata)
invocation_context.start()
session: DirectAgentSession = DirectAgentSession(agent_network=agent_network,
invocation_context=invocation_context,
Expand Down
32 changes: 32 additions & 0 deletions neuro_san/internals/utils/asyncio_executor_pool_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI.
# All Rights Reserved.
# Issued under the Academic Public License.
#
# You can be released from the terms, and requirements of the Academic Public
# License by purchasing a commercial license.
# Purchase of a commercial license is mandatory for any use of the
# neuro-san SDK Software in commercial settings.
#
# END COPYRIGHT
"""
See class comments
"""
from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool


class AsyncioExecutorPoolProvider:
"""
Class providing server-wide instance of AsyncioExecutorPool
for use by specific agent services.
"""

_executors_pool: AsyncioExecutorPool = None

@classmethod
def set_executors_pool(cls, reuse_mode: bool = True):
cls._executors_pool = AsyncioExecutorPool(reuse_mode=reuse_mode)

@classmethod
def get_executors_pool(cls):
return cls._executors_pool
10 changes: 9 additions & 1 deletion neuro_san/service/generic/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uuid

from leaf_common.asyncio.asyncio_executor import AsyncioExecutor
from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool

from leaf_server_common.server.atomic_counter import AtomicCounter
from leaf_server_common.server.request_logger import RequestLogger
Expand All @@ -29,6 +30,7 @@
from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory
from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory
from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory
from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.chat_message_converter import ChatMessageConverter
from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory
Expand Down Expand Up @@ -86,6 +88,7 @@ def __init__(self,
config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool()
# Load once
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an instance of AsyncioExecutorPool for this service.

self.llm_factory.load()
self.toolbox_factory.load()
Expand Down Expand Up @@ -214,7 +217,12 @@ def streaming_chat(self, request_dict: Dict[str, Any],

# Prepare
factory = ExternalAgentSessionFactory(use_direct=False)
invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata)
invocation_context = SessionInvocationContext(
factory,
self.async_executors_pool,
self.llm_factory,
self.toolbox_factory,
metadata)
invocation_context.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionInvocationContext now takes AsyncioExecutorPool as a parameter.


# Set up logging inside async thread
Expand Down
10 changes: 9 additions & 1 deletion neuro_san/service/generic/async_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import uuid

from leaf_common.asyncio.asyncio_executor import AsyncioExecutor
from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool

from leaf_server_common.server.atomic_counter import AtomicCounter

Expand All @@ -27,6 +28,7 @@
from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory
from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory
from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory
from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.chat_message_converter import ChatMessageConverter
from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger
Expand Down Expand Up @@ -81,6 +83,7 @@ def __init__(self,
config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool()
# Load once.
self.llm_factory.load()
self.toolbox_factory.load()
Expand Down Expand Up @@ -208,7 +211,12 @@ async def streaming_chat(self, request_dict: Dict[str, Any],

# Prepare
factory = ExternalAgentSessionFactory(use_direct=False)
invocation_context = SessionInvocationContext(factory, self.llm_factory, self.toolbox_factory, metadata)
invocation_context = SessionInvocationContext(
factory,
self.async_executors_pool,
self.llm_factory,
self.toolbox_factory,
metadata)
invocation_context.start()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do the same thing as for sync AgentService above.


# Set up logging inside async thread
Expand Down
3 changes: 3 additions & 0 deletions neuro_san/service/main_loop/server_main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from neuro_san.internals.graph.registry.agent_network import AgentNetwork
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.internals.utils.file_of_class import FileOfClass
from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider
from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME
from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS
from neuro_san.service.grpc.grpc_agent_server import DEFAULT_MAX_CONCURRENT_REQUESTS
Expand Down Expand Up @@ -177,6 +178,8 @@ def main_loop(self):
metadata_set = metadata_set | set(self.usage_logger_metadata.split())
metadata_str: str = " ".join(sorted(metadata_set))

AsyncioExecutorPoolProvider.set_executors_pool(reuse_mode=True)

if self.server_status.grpc_service.is_requested():
self.grpc_server = GrpcAgentServer(
self.grpc_port,
Expand Down
12 changes: 10 additions & 2 deletions neuro_san/session/session_invocation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from asyncio import Future

from leaf_common.asyncio.asyncio_executor import AsyncioExecutor
from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool
from leaf_server_common.logging.logging_setup import setup_extra_logging_fields

from neuro_san.internals.chat.async_collating_queue import AsyncCollatingQueue
Expand All @@ -35,7 +36,10 @@ class SessionInvocationContext(InvocationContext):
service call or library call.
"""

# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
def __init__(self, async_session_factory: AsyncAgentSessionFactory,
async_executors_pool: AsyncioExecutorPool,
llm_factory: ContextTypeLlmFactory,
toolbox_factory: ContextTypeToolboxFactory = None,
metadata: Dict[str, str] = None):
Expand All @@ -44,15 +48,18 @@ def __init__(self, async_session_factory: AsyncAgentSessionFactory,

:param async_session_factory: The AsyncAgentSessionFactory to use
when connecting with external agents.
:param async_executors_pool: pool of AsyncioExecutors to use for obtaining
an executor instance to use for this context;
:param llm_factory: The ContextTypeLlmFactory instance
:param metadata: A grpc metadata of key/value pairs to be inserted into
the header. Default is None. Preferred format is a
dictionary of string keys to string values.
"""

self.async_session_factory: AsyncAgentSessionFactory = async_session_factory
self.async_executors_pool: AsyncioExecutorPool = async_executors_pool
# Get an async executor to run all tasks for this session instance:
self.asyncio_executor: AsyncioExecutor = AsyncioExecutor()
self.asyncio_executor: AsyncioExecutor = self.async_executors_pool.get_executor()
self.origination: Origination = Origination()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating AsynioExecutor instance directly here, request it from the pool.

self.queue: AsyncCollatingQueue = AsyncCollatingQueue()
self.journal: Journal = MessageJournal(self.queue)
Expand All @@ -66,6 +73,7 @@ def start(self):
Starts the active components of this invocation context.
Do this separately from constructor for more control.
Currently, we only start internal AsyncioExecutor.
It could be already running, but starting it twice is allowed.
"""
self.asyncio_executor.start()

Expand Down Expand Up @@ -119,7 +127,7 @@ def close(self):
Release resources owned by this context
"""
if self.asyncio_executor is not None:
self.asyncio_executor.shutdown()
self.async_executors_pool.return_executor(self.asyncio_executor)
self.asyncio_executor = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then we are done, return our AsyncioExecutor back to the pool.

if self.queue is not None:
self.queue.close()
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# requirements-build.txt

# In-house dependencies
leaf-common>=1.2.23
leaf-common>=1.2.24
leaf-server-common>=0.1.20

# These are needed for generating code from .proto files
Expand Down
Loading