-
Notifications
You must be signed in to change notification settings - Fork 28
UN-3352 ServerContext refactor #336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a42d4d8
bec90ef
193b249
28f8a98
e7a0c7f
5894599
8effc6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,12 +28,12 @@ | |
| from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory | ||
| from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory | ||
| from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory | ||
| from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.generic.chat_message_converter import ChatMessageConverter | ||
| from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger | ||
| from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory | ||
| from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger | ||
| from neuro_san.service.utils.server_context import ServerContext | ||
| from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession | ||
| from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory | ||
| from neuro_san.session.session_invocation_context import SessionInvocationContext | ||
|
|
@@ -57,7 +57,8 @@ def __init__(self, | |
| security_cfg: Dict[str, Any], | ||
| agent_name: str, | ||
| agent_network_provider: AgentNetworkProvider, | ||
| server_logging: AgentServerLogging): | ||
| server_logging: AgentServerLogging, | ||
| server_context: ServerContext): | ||
| """ | ||
| :param request_logger: The instance of the EventLoopLogger that helps | ||
| log information from running event loop | ||
|
|
@@ -70,6 +71,7 @@ def __init__(self, | |
| :param server_logging: An AgentServerLogging instance initialized so that | ||
| spawned asynchronous threads can also properly initialize | ||
| their logging. | ||
| :param server_context: The ServerContext holding global-ish state | ||
| """ | ||
| self.request_logger = request_logger | ||
| self.security_cfg = security_cfg | ||
|
|
@@ -83,7 +85,7 @@ def __init__(self, | |
| config: Dict[str, Any] = agent_network.get_config() | ||
| self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) | ||
| self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) | ||
| self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() | ||
| self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool() | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change variable to singular to match the existing class name. This led to typos. Seems like maybe we should consolidate the sync/asynchronous versions into an abstract class at some point. |
||
| # Load once. | ||
| self.llm_factory.load() | ||
| self.toolbox_factory.load() | ||
|
|
@@ -213,7 +215,7 @@ async def streaming_chat(self, request_dict: Dict[str, Any], | |
| factory = ExternalAgentSessionFactory(use_direct=False) | ||
| invocation_context = SessionInvocationContext( | ||
| factory, | ||
| self.async_executors_pool, | ||
| self.async_executor_pool, | ||
| self.llm_factory, | ||
| self.toolbox_factory, | ||
| metadata) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,13 +24,14 @@ | |
| from neuro_san.internals.interfaces.agent_state_listener import AgentStateListener | ||
| from neuro_san.internals.interfaces.agent_storage_source import AgentStorageSource | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.service.main_loop.server_status import ServerStatus | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer | ||
| from neuro_san.service.grpc.concierge_service import ConciergeService | ||
| from neuro_san.service.grpc.dynamic_agent_router import DynamicAgentRouter | ||
| from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService | ||
| from neuro_san.service.interfaces.agent_server import AgentServer | ||
| from neuro_san.service.utils.server_context import ServerContext | ||
| from neuro_san.service.utils.server_status import ServerStatus | ||
| from neuro_san.session.agent_service_stub import AgentServiceStub | ||
|
|
||
| DEFAULT_SERVER_NAME: str = 'neuro-san.Agent' | ||
|
|
@@ -52,8 +53,7 @@ class GrpcAgentServer(AgentServer, AgentStateListener): | |
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def __init__(self, port: int, | ||
| server_loop_callbacks: ServerLoopCallbacks, | ||
| network_storage_dict: Dict[str, AgentNetworkStorage], | ||
| server_status: ServerStatus, | ||
| server_context: ServerContext, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only plumb the one object that contains both network_storage_dict and server_status. |
||
| server_name: str = DEFAULT_SERVER_NAME, | ||
| server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS, | ||
| max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS, | ||
|
|
@@ -65,10 +65,7 @@ def __init__(self, port: int, | |
| :param port: The integer port number for the service to listen on | ||
| :param server_loop_callbacks: The ServerLoopCallbacks instance for | ||
| break out methods in main serving loop. | ||
| :param network_storage_dict: A dictionary of string (descripting scope) to | ||
| AgentNetworkStorage instance which keeps all the AgentNetwork instances | ||
| of a particular grouping. | ||
| :param server_status: server status to register the state of gRPC server | ||
| :param server_context: ServerContext carrying global-ish state | ||
| :param server_name: The name of the service | ||
| :param server_name_for_logs: The name of the service for log files | ||
| :param max_concurrent_requests: The maximum number of requests to handle at a time. | ||
|
|
@@ -86,13 +83,12 @@ def __init__(self, port: int, | |
|
|
||
| self.logger = logging.getLogger(__name__) | ||
|
|
||
| self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict | ||
| # Below is now odd. | ||
| self.server_name: str = server_name | ||
| self.server_name_for_logs: str = server_name_for_logs | ||
| self.max_concurrent_requests: int = max_concurrent_requests | ||
| self.request_limit: int = request_limit | ||
| self.server_status: ServerStatus = server_status | ||
| self.server_context: ServerContext = server_context | ||
|
|
||
| self.server_lifetime = None | ||
| self.security_cfg = None | ||
|
|
@@ -114,7 +110,8 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): | |
| service = GrpcAgentService(self.server_lifetime, self.security_cfg, | ||
| agent_name, | ||
| source.get_agent_network_provider(agent_name), | ||
| self.server_logging) | ||
| self.server_logging, | ||
| self.server_context) | ||
| self.services.append(service) | ||
| servicer_to_server = AgentServicerToServer(service) | ||
| agent_rpc_handlers = servicer_to_server.build_rpc_handlers() | ||
|
|
@@ -164,8 +161,9 @@ def prepare_for_serving(self): | |
|
|
||
| # Add listener to handle adding per-agent gRPC services | ||
| # to our dynamic router: | ||
| public_storage: AgentNetworkStorage = self.network_storage_dict.get("public") | ||
| for network_storage in self.network_storage_dict.values(): | ||
| network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() | ||
| public_storage: AgentNetworkStorage = network_storage_dict.get("public") | ||
| for network_storage in network_storage_dict.values(): | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the objects on the ServerContext now. |
||
| network_storage.add_listener(self) | ||
|
|
||
| # Add DynamicAgentRouter instance as a generic RPC handler for our server: | ||
|
|
@@ -184,13 +182,15 @@ def serve(self): | |
| """ | ||
| Start serving gRPC requests | ||
| """ | ||
| self.server_status.grpc_service.set_status(True) | ||
| server_status: ServerStatus = self.server_context.get_server_status() | ||
| server_status.grpc_service.set_status(True) | ||
| self.server_lifetime.run() | ||
|
|
||
| def stop(self): | ||
| """ | ||
| Stop the server. | ||
| """ | ||
| self.server_status.set_grpc_status(False) | ||
| server_status: ServerStatus = self.server_context.get_server_status() | ||
| server_status.grpc_service.set_status(False) | ||
| # pylint: disable=protected-access | ||
| self.server_lifetime._stop_serving() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was about to plumb the AsyncioExecutorPool all over the place but then realized that the pain of this plumbing is one of the things that lead to the singleton-ness of the PoolProvider solution in the first place.
So now we pass around a ServerContext object which contains global-sh server state ultimately owned by ServerMainLoop. This should alleviate the plumbing burden. Much of what you will see below is (final?) plumbing for this.
Future: It would also make sense to package up the other aspects of this contstructor into a ServiceContext object and plumb that for similar reasons. Enough going on in here already.