Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 0 additions & 38 deletions neuro_san/internals/utils/asyncio_executor_pool_provider.py

This file was deleted.

10 changes: 6 additions & 4 deletions neuro_san/service/generic/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory
from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory
from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory
from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.chat_message_converter import ChatMessageConverter
from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory
from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger
from neuro_san.service.utils.server_context import ServerContext
from neuro_san.session.direct_agent_session import DirectAgentSession
from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory
from neuro_san.session.session_invocation_context import SessionInvocationContext
Expand All @@ -58,7 +58,8 @@ def __init__(self,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
server_logging: AgentServerLogging,
server_context: ServerContext):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to plumb the AsyncioExecutorPool all over the place but then realized that the pain of this plumbing is one of the things that lead to the singleton-ness of the PoolProvider solution in the first place.

So now we pass around a ServerContext object which contains global-sh server state ultimately owned by ServerMainLoop. This should alleviate the plumbing burden. Much of what you will see below is (final?) plumbing for this.

Future: It would also make sense to package up the other aspects of this contstructor into a ServiceContext object and plumb that for similar reasons. Enough going on in here already.

"""
Set the gRPC interface up for health checking so that the service
will be opened to callers when the mesh sees it operational, if this
Expand All @@ -75,6 +76,7 @@ def __init__(self,
:param server_logging: An AgentServerLogging instance initialized so that
spawned asyncrhonous threads can also properly initialize
their logging.
:param server_context: The ServerContext containing global-ish state
"""
self.request_logger = request_logger
self.security_cfg = security_cfg
Expand All @@ -88,7 +90,7 @@ def __init__(self,
config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool()
self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool()
# Load once
self.llm_factory.load()
self.toolbox_factory.load()
Expand Down Expand Up @@ -219,7 +221,7 @@ def streaming_chat(self, request_dict: Dict[str, Any],
factory = ExternalAgentSessionFactory(use_direct=False)
invocation_context = SessionInvocationContext(
factory,
self.async_executors_pool,
self.async_executor_pool,
self.llm_factory,
self.toolbox_factory,
metadata)
Expand Down
12 changes: 9 additions & 3 deletions neuro_san/service/generic/agent_service_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

from leaf_server_common.server.request_logger import RequestLogger

from neuro_san.service.generic.agent_service import AgentService
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.service.generic.agent_service import AgentService
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.utils.server_context import ServerContext


# pylint: disable=too-many-instance-attributes
class AgentServiceProvider:
"""
Class providing lazy construction of AgentService instance
Expand All @@ -34,7 +36,8 @@ def __init__(self,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
server_logging: AgentServerLogging,
server_context: ServerContext):
"""
Constructor.
:param request_logger: The instance of the RequestLogger that helps
Expand All @@ -48,13 +51,15 @@ def __init__(self,
:param server_logging: An AgentServerLogging instance initialized so that
spawned asynchronous threads can also properly initialize
their logging.
:param server_context: The ServerContext object with global-ish state
"""
self.request_logger = request_logger
self.security_cfg = copy.deepcopy(security_cfg)
self.server_logging: AgentServerLogging = server_logging
self.agent_network_provider: AgentNetworkProvider = agent_network_provider
self.agent_name: str = agent_name
self.lock: Lock = Lock()
self.server_context: ServerContext = server_context
self.service_instance: AgentService = None

def get_service(self) -> AgentService:
Expand All @@ -70,7 +75,8 @@ def get_service(self) -> AgentService:
self.security_cfg,
self.agent_name,
self.agent_network_provider,
self.server_logging)
self.server_logging,
self.server_context)
return self.service_instance

def service_created(self) -> bool:
Expand Down
10 changes: 6 additions & 4 deletions neuro_san/service/generic/async_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory
from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory
from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory
from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.chat_message_converter import ChatMessageConverter
from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger
from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory
from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger
from neuro_san.service.utils.server_context import ServerContext
from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession
from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory
from neuro_san.session.session_invocation_context import SessionInvocationContext
Expand All @@ -57,7 +57,8 @@ def __init__(self,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
server_logging: AgentServerLogging,
server_context: ServerContext):
"""
:param request_logger: The instance of the EventLoopLogger that helps
log information from running event loop
Expand All @@ -70,6 +71,7 @@ def __init__(self,
:param server_logging: An AgentServerLogging instance initialized so that
spawned asynchronous threads can also properly initialize
their logging.
:param server_context: The ServerContext holding global-ish state
"""
self.request_logger = request_logger
self.security_cfg = security_cfg
Expand All @@ -83,7 +85,7 @@ def __init__(self,
config: Dict[str, Any] = agent_network.get_config()
self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config)
self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config)
self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool()
self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change variable to singular to match the existing class name. This led to typos.

Seems like maybe we should consolidate the sync/asynchronous versions into an abstract class at some point.

# Load once.
self.llm_factory.load()
self.toolbox_factory.load()
Expand Down Expand Up @@ -213,7 +215,7 @@ async def streaming_chat(self, request_dict: Dict[str, Any],
factory = ExternalAgentSessionFactory(use_direct=False)
invocation_context = SessionInvocationContext(
factory,
self.async_executors_pool,
self.async_executor_pool,
self.llm_factory,
self.toolbox_factory,
metadata)
Expand Down
12 changes: 9 additions & 3 deletions neuro_san/service/generic/async_agent_service_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
from threading import Lock
import copy

from neuro_san.service.generic.async_agent_service import AsyncAgentService
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger
from neuro_san.service.generic.async_agent_service import AsyncAgentService
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.utils.server_context import ServerContext


# pylint: disable=too-many-instance-attributes
class AsyncAgentServiceProvider:
"""
Class providing lazy construction of AsyncAgentService instance
Expand All @@ -33,7 +35,8 @@ def __init__(self,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
server_logging: AgentServerLogging,
server_context: ServerContext):
"""
Constructor.
:param request_logger: The instance of the EventLoopLogger that helps
Expand All @@ -47,13 +50,15 @@ def __init__(self,
:param server_logging: An AgentServerLogging instance initialized so that
spawned asynchronous threads can also properly initialize
their logging.
:param server_context: The ServerContext object containing global-ish state
"""
self.request_logger = request_logger
self.security_cfg = copy.deepcopy(security_cfg)
self.server_logging: AgentServerLogging = server_logging
self.agent_network_provider: AgentNetworkProvider = agent_network_provider
self.agent_name: str = agent_name
self.lock: Lock = Lock()
self.server_context: ServerContext = server_context
self.service_instance: AsyncAgentService = None

def get_service(self) -> AsyncAgentService:
Expand All @@ -69,7 +74,8 @@ def get_service(self) -> AsyncAgentService:
self.security_cfg,
self.agent_name,
self.agent_network_provider,
self.server_logging)
self.server_logging,
self.server_context)
return self.service_instance

def service_created(self) -> bool:
Expand Down
28 changes: 14 additions & 14 deletions neuro_san/service/grpc/grpc_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
from neuro_san.internals.interfaces.agent_state_listener import AgentStateListener
from neuro_san.internals.interfaces.agent_storage_source import AgentStorageSource
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.main_loop.server_status import ServerStatus
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer
from neuro_san.service.grpc.concierge_service import ConciergeService
from neuro_san.service.grpc.dynamic_agent_router import DynamicAgentRouter
from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService
from neuro_san.service.interfaces.agent_server import AgentServer
from neuro_san.service.utils.server_context import ServerContext
from neuro_san.service.utils.server_status import ServerStatus
from neuro_san.session.agent_service_stub import AgentServiceStub

DEFAULT_SERVER_NAME: str = 'neuro-san.Agent'
Expand All @@ -52,8 +53,7 @@ class GrpcAgentServer(AgentServer, AgentStateListener):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(self, port: int,
server_loop_callbacks: ServerLoopCallbacks,
network_storage_dict: Dict[str, AgentNetworkStorage],
server_status: ServerStatus,
server_context: ServerContext,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only plumb the one object that contains both network_storage_dict and server_status.

server_name: str = DEFAULT_SERVER_NAME,
server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS,
max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS,
Expand All @@ -65,10 +65,7 @@ def __init__(self, port: int,
:param port: The integer port number for the service to listen on
:param server_loop_callbacks: The ServerLoopCallbacks instance for
break out methods in main serving loop.
:param network_storage_dict: A dictionary of string (descripting scope) to
AgentNetworkStorage instance which keeps all the AgentNetwork instances
of a particular grouping.
:param server_status: server status to register the state of gRPC server
:param server_context: ServerContext carrying global-ish state
:param server_name: The name of the service
:param server_name_for_logs: The name of the service for log files
:param max_concurrent_requests: The maximum number of requests to handle at a time.
Expand All @@ -86,13 +83,12 @@ def __init__(self, port: int,

self.logger = logging.getLogger(__name__)

self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict
# Below is now odd.
self.server_name: str = server_name
self.server_name_for_logs: str = server_name_for_logs
self.max_concurrent_requests: int = max_concurrent_requests
self.request_limit: int = request_limit
self.server_status: ServerStatus = server_status
self.server_context: ServerContext = server_context

self.server_lifetime = None
self.security_cfg = None
Expand All @@ -114,7 +110,8 @@ def agent_added(self, agent_name: str, source: AgentStorageSource):
service = GrpcAgentService(self.server_lifetime, self.security_cfg,
agent_name,
source.get_agent_network_provider(agent_name),
self.server_logging)
self.server_logging,
self.server_context)
self.services.append(service)
servicer_to_server = AgentServicerToServer(service)
agent_rpc_handlers = servicer_to_server.build_rpc_handlers()
Expand Down Expand Up @@ -164,8 +161,9 @@ def prepare_for_serving(self):

# Add listener to handle adding per-agent gRPC services
# to our dynamic router:
public_storage: AgentNetworkStorage = self.network_storage_dict.get("public")
for network_storage in self.network_storage_dict.values():
network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict()
public_storage: AgentNetworkStorage = network_storage_dict.get("public")
for network_storage in network_storage_dict.values():
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the objects on the ServerContext now.

network_storage.add_listener(self)

# Add DynamicAgentRouter instance as a generic RPC handler for our server:
Expand All @@ -184,13 +182,15 @@ def serve(self):
"""
Start serving gRPC requests
"""
self.server_status.grpc_service.set_status(True)
server_status: ServerStatus = self.server_context.get_server_status()
server_status.grpc_service.set_status(True)
self.server_lifetime.run()

def stop(self):
"""
Stop the server.
"""
self.server_status.set_grpc_status(False)
server_status: ServerStatus = self.server_context.get_server_status()
server_status.grpc_service.set_status(False)
# pylint: disable=protected-access
self.server_lifetime._stop_serving()
8 changes: 6 additions & 2 deletions neuro_san/service/grpc/grpc_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.agent_service_provider import AgentServiceProvider
from neuro_san.service.generic.agent_service import AgentService
from neuro_san.service.utils.server_context import ServerContext


class GrpcAgentService(agent_pb2_grpc.AgentServiceServicer):
Expand All @@ -42,7 +43,8 @@ def __init__(self,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
server_logging: AgentServerLogging,
server_context: ServerContext):
"""
Set the gRPC interface up for health checking so that the service
will be opened to callers when the mesh sees it operational, if this
Expand All @@ -59,6 +61,7 @@ def __init__(self,
:param server_logging: An AgentServerLogging instance initialized so that
spawned asyncrhonous threads can also properly initialize
their logging.
:param server_context: The ServerContext object containing global-ish state
"""
self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder()
self.service_provider: AgentServiceProvider =\
Expand All @@ -67,7 +70,8 @@ def __init__(self,
security_cfg,
agent_name,
agent_network_provider,
server_logging)
server_logging,
server_context)

def get_request_count(self) -> int:
"""
Expand Down
2 changes: 1 addition & 1 deletion neuro_san/service/http/handlers/health_check_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from importlib.metadata import PackageNotFoundError

from tornado.web import RequestHandler
from neuro_san.service.main_loop.server_status import ServerStatus
from neuro_san.service.http.logging.http_logger import HttpLogger
from neuro_san.service.utils.server_status import ServerStatus


class HealthCheckHandler(RequestHandler):
Expand Down
Loading