From a42d4d8b8a8700929d3863b6011e53af3aff85a6 Mon Sep 17 00:00:00 2001 From: Dan Fink Date: Mon, 28 Jul 2025 16:45:39 -0700 Subject: [PATCH 1/4] Move ServerStatus to util package to remove tangle --- neuro_san/service/grpc/grpc_agent_server.py | 2 +- neuro_san/service/http/handlers/health_check_handler.py | 2 +- neuro_san/service/http/server/http_server.py | 2 +- neuro_san/service/main_loop/server_main_loop.py | 2 +- neuro_san/service/util/__init__.py | 0 neuro_san/service/{main_loop => util}/server_status.py | 2 +- neuro_san/service/{main_loop => util}/service_status.py | 0 neuro_san/service/watcher/main_loop/storage_watcher.py | 2 +- 8 files changed, 6 insertions(+), 6 deletions(-) create mode 100644 neuro_san/service/util/__init__.py rename neuro_san/service/{main_loop => util}/server_status.py (95%) rename neuro_san/service/{main_loop => util}/service_status.py (100%) diff --git a/neuro_san/service/grpc/grpc_agent_server.py b/neuro_san/service/grpc/grpc_agent_server.py index 3e4b35a3e..498887bb6 100644 --- a/neuro_san/service/grpc/grpc_agent_server.py +++ b/neuro_san/service/grpc/grpc_agent_server.py @@ -24,13 +24,13 @@ from neuro_san.internals.interfaces.agent_state_listener import AgentStateListener from neuro_san.internals.interfaces.agent_storage_source import AgentStorageSource from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer from neuro_san.service.grpc.concierge_service import ConciergeService from neuro_san.service.grpc.dynamic_agent_router import DynamicAgentRouter from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.interfaces.agent_server import AgentServer +from neuro_san.service.util.server_status import ServerStatus from neuro_san.session.agent_service_stub import AgentServiceStub DEFAULT_SERVER_NAME: str = 'neuro-san.Agent' diff --git a/neuro_san/service/http/handlers/health_check_handler.py b/neuro_san/service/http/handlers/health_check_handler.py index 30726559c..e65b94830 100644 --- a/neuro_san/service/http/handlers/health_check_handler.py +++ b/neuro_san/service/http/handlers/health_check_handler.py @@ -22,8 +22,8 @@ from importlib.metadata import PackageNotFoundError from tornado.web import RequestHandler -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.http.logging.http_logger import HttpLogger +from neuro_san.service.util.server_status import ServerStatus class HealthCheckHandler(RequestHandler): diff --git a/neuro_san/service/http/server/http_server.py b/neuro_san/service/http/server/http_server.py index ba88dd0cd..2c1018646 100644 --- a/neuro_san/service/http/server/http_server.py +++ b/neuro_san/service/http/server/http_server.py @@ -27,7 +27,6 @@ from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider from neuro_san.service.http.handlers.health_check_handler import HealthCheckHandler from neuro_san.service.http.handlers.connectivity_handler import ConnectivityHandler @@ -40,6 +39,7 @@ from neuro_san.service.http.server.http_server_app import HttpServerApp from neuro_san.service.interfaces.agent_server import AgentServer from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger +from neuro_san.service.util.server_status import ServerStatus class HttpServer(AgentAuthorizer, AgentStateListener): diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index d7ae189ec..54afe21f0 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -36,8 +36,8 @@ from neuro_san.service.grpc.grpc_agent_server import GrpcAgentServer from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.http.server.http_server import HttpServer -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.watcher.main_loop.storage_watcher import StorageWatcher +from neuro_san.service.util.server_status import ServerStatus # pylint: disable=too-many-instance-attributes diff --git a/neuro_san/service/util/__init__.py b/neuro_san/service/util/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neuro_san/service/main_loop/server_status.py b/neuro_san/service/util/server_status.py similarity index 95% rename from neuro_san/service/main_loop/server_status.py rename to neuro_san/service/util/server_status.py index e68b23b83..3023c02ed 100644 --- a/neuro_san/service/main_loop/server_status.py +++ b/neuro_san/service/util/server_status.py @@ -10,7 +10,7 @@ # # END COPYRIGHT -from neuro_san.service.main_loop.service_status import ServiceStatus +from neuro_san.service.util.service_status import ServiceStatus class ServerStatus: diff --git a/neuro_san/service/main_loop/service_status.py b/neuro_san/service/util/service_status.py similarity index 100% rename from neuro_san/service/main_loop/service_status.py rename to neuro_san/service/util/service_status.py diff --git a/neuro_san/service/watcher/main_loop/storage_watcher.py b/neuro_san/service/watcher/main_loop/storage_watcher.py index 6a4c21fe7..af735a933 100644 --- a/neuro_san/service/watcher/main_loop/storage_watcher.py +++ b/neuro_san/service/watcher/main_loop/storage_watcher.py @@ -19,10 +19,10 @@ from time import time from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.watcher.interfaces.startable import Startable from neuro_san.service.watcher.interfaces.storage_updater import StorageUpdater from neuro_san.service.watcher.registries.registry_storage_updater import RegistryStorageUpdater +from neuro_san.service.util.server_status import ServerStatus # pylint: disable=too-many-instance-attributes From bec90ef914650fff5c8a4660a1c5a1d2f56ba943 Mon Sep 17 00:00:00 2001 From: Dan Fink Date: Mon, 28 Jul 2025 17:43:50 -0700 Subject: [PATCH 2/4] Plumb a ServiceContext --- .../utils/asyncio_executor_pool_provider.py | 38 ------------ neuro_san/service/generic/agent_service.py | 10 ++-- .../service/generic/agent_service_provider.py | 12 +++- .../service/generic/async_agent_service.py | 10 ++-- .../generic/async_agent_service_provider.py | 12 +++- neuro_san/service/grpc/grpc_agent_server.py | 25 ++++---- neuro_san/service/grpc/grpc_agent_service.py | 8 ++- .../http/handlers/health_check_handler.py | 2 +- neuro_san/service/http/server/http_server.py | 29 ++++----- .../service/main_loop/server_main_loop.py | 46 +++++++------- neuro_san/service/{util => utils}/__init__.py | 0 .../service/{util => utils}/server_status.py | 2 +- neuro_san/service/utils/service_context.py | 60 +++++++++++++++++++ .../service/{util => utils}/service_status.py | 0 .../watcher/main_loop/storage_watcher.py | 21 +++---- 15 files changed, 150 insertions(+), 125 deletions(-) delete mode 100644 neuro_san/internals/utils/asyncio_executor_pool_provider.py rename neuro_san/service/{util => utils}/__init__.py (100%) rename neuro_san/service/{util => utils}/server_status.py (96%) create mode 100644 neuro_san/service/utils/service_context.py rename neuro_san/service/{util => utils}/service_status.py (100%) diff --git a/neuro_san/internals/utils/asyncio_executor_pool_provider.py b/neuro_san/internals/utils/asyncio_executor_pool_provider.py deleted file mode 100644 index cb59a9d9d..000000000 --- a/neuro_san/internals/utils/asyncio_executor_pool_provider.py +++ /dev/null @@ -1,38 +0,0 @@ - -# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. -# All Rights Reserved. -# Issued under the Academic Public License. -# -# You can be released from the terms, and requirements of the Academic Public -# License by purchasing a commercial license. -# Purchase of a commercial license is mandatory for any use of the -# neuro-san SDK Software in commercial settings. -# -# END COPYRIGHT -""" -See class comments -""" -from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool - - -class AsyncioExecutorPoolProvider: - """ - Class providing server-wide instance of AsyncioExecutorPool - for use by specific agent services. - """ - - _executors_pool: AsyncioExecutorPool = None - - @classmethod - def set_executors_pool(cls, reuse_mode: bool = True): - """ - Set AsyncioExecutorPool instance to use. - """ - cls._executors_pool = AsyncioExecutorPool(reuse_mode=reuse_mode) - - @classmethod - def get_executors_pool(cls): - """ - Get AsyncioExecutorPool instance - """ - return cls._executors_pool diff --git a/neuro_san/service/generic/agent_service.py b/neuro_san/service/generic/agent_service.py index 17abfc818..12d54c591 100644 --- a/neuro_san/service/generic/agent_service.py +++ b/neuro_san/service/generic/agent_service.py @@ -30,11 +30,11 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory -from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger +from neuro_san.service.utils.service_context import ServiceContext from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -58,7 +58,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + service_context: ServiceContext): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -75,6 +76,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asyncrhonous threads can also properly initialize their logging. + :param service_context: The ServiceContext containing global-ish state """ self.request_logger = request_logger self.security_cfg = security_cfg @@ -88,7 +90,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() + self.async_executor_pool: AsyncioExecutorPool = service_context.get_executor_pool() # Load once self.llm_factory.load() self.toolbox_factory.load() @@ -219,7 +221,7 @@ def streaming_chat(self, request_dict: Dict[str, Any], factory = ExternalAgentSessionFactory(use_direct=False) invocation_context = SessionInvocationContext( factory, - self.async_executors_pool, + self.async_executor_pool, self.llm_factory, self.toolbox_factory, metadata) diff --git a/neuro_san/service/generic/agent_service_provider.py b/neuro_san/service/generic/agent_service_provider.py index 7ded28e86..3f843daf1 100644 --- a/neuro_san/service/generic/agent_service_provider.py +++ b/neuro_san/service/generic/agent_service_provider.py @@ -17,11 +17,13 @@ from leaf_server_common.server.request_logger import RequestLogger -from neuro_san.service.generic.agent_service import AgentService from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider +from neuro_san.service.generic.agent_service import AgentService from neuro_san.service.generic.agent_server_logging import AgentServerLogging +from neuro_san.service.utils.service_context import ServiceContext +# pylint: disable=too-many-instance-attributes class AgentServiceProvider: """ Class providing lazy construction of AgentService instance @@ -34,7 +36,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + service_context: ServiceContext): """ Constructor. :param request_logger: The instance of the RequestLogger that helps @@ -48,6 +51,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param service_context: The ServiceContext object with global-ish state """ self.request_logger = request_logger self.security_cfg = copy.deepcopy(security_cfg) @@ -55,6 +59,7 @@ def __init__(self, self.agent_network_provider: AgentNetworkProvider = agent_network_provider self.agent_name: str = agent_name self.lock: Lock = Lock() + self.service_context: ServiceContext = service_context self.service_instance: AgentService = None def get_service(self) -> AgentService: @@ -70,7 +75,8 @@ def get_service(self) -> AgentService: self.security_cfg, self.agent_name, self.agent_network_provider, - self.server_logging) + self.server_logging, + self.service_context) return self.service_instance def service_created(self) -> bool: diff --git a/neuro_san/service/generic/async_agent_service.py b/neuro_san/service/generic/async_agent_service.py index 61df30be2..0d3190108 100644 --- a/neuro_san/service/generic/async_agent_service.py +++ b/neuro_san/service/generic/async_agent_service.py @@ -28,12 +28,12 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory -from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger +from neuro_san.service.utils.service_context import ServiceContext from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -57,7 +57,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + service_context: ServiceContext): """ :param request_logger: The instance of the EventLoopLogger that helps log information from running event loop @@ -70,6 +71,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param service_context: The ServiceContext holding global-ish state """ self.request_logger = request_logger self.security_cfg = security_cfg @@ -83,7 +85,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() + self.async_executor_pool: AsyncioExecutorPool = service_context.get_executor_pool() # Load once. self.llm_factory.load() self.toolbox_factory.load() @@ -213,7 +215,7 @@ async def streaming_chat(self, request_dict: Dict[str, Any], factory = ExternalAgentSessionFactory(use_direct=False) invocation_context = SessionInvocationContext( factory, - self.async_executors_pool, + self.async_executor_pool, self.llm_factory, self.toolbox_factory, metadata) diff --git a/neuro_san/service/generic/async_agent_service_provider.py b/neuro_san/service/generic/async_agent_service_provider.py index 58374d54c..58e6dc380 100644 --- a/neuro_san/service/generic/async_agent_service_provider.py +++ b/neuro_san/service/generic/async_agent_service_provider.py @@ -15,12 +15,14 @@ from threading import Lock import copy -from neuro_san.service.generic.async_agent_service import AsyncAgentService from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger +from neuro_san.service.generic.async_agent_service import AsyncAgentService from neuro_san.service.generic.agent_server_logging import AgentServerLogging +from neuro_san.service.utils.service_context import ServiceContext +# pylint: disable=too-many-instance-attributes class AsyncAgentServiceProvider: """ Class providing lazy construction of AsyncAgentService instance @@ -33,7 +35,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + service_context: ServiceContext): """ Constructor. :param request_logger: The instance of the EventLoopLogger that helps @@ -47,6 +50,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param service_context: The ServiceContext object containing global-ish state """ self.request_logger = request_logger self.security_cfg = copy.deepcopy(security_cfg) @@ -54,6 +58,7 @@ def __init__(self, self.agent_network_provider: AgentNetworkProvider = agent_network_provider self.agent_name: str = agent_name self.lock: Lock = Lock() + self.service_context: ServiceContext = service_context self.service_instance: AsyncAgentService = None def get_service(self) -> AsyncAgentService: @@ -69,7 +74,8 @@ def get_service(self) -> AsyncAgentService: self.security_cfg, self.agent_name, self.agent_network_provider, - self.server_logging) + self.server_logging, + self.service_context) return self.service_instance def service_created(self) -> bool: diff --git a/neuro_san/service/grpc/grpc_agent_server.py b/neuro_san/service/grpc/grpc_agent_server.py index 498887bb6..bb00f36ec 100644 --- a/neuro_san/service/grpc/grpc_agent_server.py +++ b/neuro_san/service/grpc/grpc_agent_server.py @@ -30,7 +30,7 @@ from neuro_san.service.grpc.dynamic_agent_router import DynamicAgentRouter from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.interfaces.agent_server import AgentServer -from neuro_san.service.util.server_status import ServerStatus +from neuro_san.service.utils.service_context import ServiceContext from neuro_san.session.agent_service_stub import AgentServiceStub DEFAULT_SERVER_NAME: str = 'neuro-san.Agent' @@ -52,8 +52,7 @@ class GrpcAgentServer(AgentServer, AgentStateListener): # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, port: int, server_loop_callbacks: ServerLoopCallbacks, - network_storage_dict: Dict[str, AgentNetworkStorage], - server_status: ServerStatus, + service_context: ServiceContext, server_name: str = DEFAULT_SERVER_NAME, server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS, max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS, @@ -65,10 +64,7 @@ def __init__(self, port: int, :param port: The integer port number for the service to listen on :param server_loop_callbacks: The ServerLoopCallbacks instance for break out methods in main serving loop. - :param network_storage_dict: A dictionary of string (descripting scope) to - AgentNetworkStorage instance which keeps all the AgentNetwork instances - of a particular grouping. - :param server_status: server status to register the state of gRPC server + :param service_context: ServiceContext carrying global-ish state :param server_name: The name of the service :param server_name_for_logs: The name of the service for log files :param max_concurrent_requests: The maximum number of requests to handle at a time. @@ -86,13 +82,12 @@ def __init__(self, port: int, self.logger = logging.getLogger(__name__) - self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict # Below is now odd. self.server_name: str = server_name self.server_name_for_logs: str = server_name_for_logs self.max_concurrent_requests: int = max_concurrent_requests self.request_limit: int = request_limit - self.server_status: ServerStatus = server_status + self.service_context: ServiceContext = service_context self.server_lifetime = None self.security_cfg = None @@ -114,7 +109,8 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): service = GrpcAgentService(self.server_lifetime, self.security_cfg, agent_name, source.get_agent_network_provider(agent_name), - self.server_logging) + self.server_logging, + self.service_context) self.services.append(service) servicer_to_server = AgentServicerToServer(service) agent_rpc_handlers = servicer_to_server.build_rpc_handlers() @@ -164,8 +160,9 @@ def prepare_for_serving(self): # Add listener to handle adding per-agent gRPC services # to our dynamic router: - public_storage: AgentNetworkStorage = self.network_storage_dict.get("public") - for network_storage in self.network_storage_dict.values(): + network_storage_dict: Dict[str, AgentNetworkStorage] = self.service_context.get_network_storage_dict() + public_storage: AgentNetworkStorage = network_storage_dict.get("public") + for network_storage in network_storage_dict.values(): network_storage.add_listener(self) # Add DynamicAgentRouter instance as a generic RPC handler for our server: @@ -184,13 +181,13 @@ def serve(self): """ Start serving gRPC requests """ - self.server_status.grpc_service.set_status(True) + self.service_context.get_server_status().grpc_service.set_status(True) self.server_lifetime.run() def stop(self): """ Stop the server. """ - self.server_status.set_grpc_status(False) + self.service_context.get_server_status().set_grpc_status(False) # pylint: disable=protected-access self.server_lifetime._stop_serving() diff --git a/neuro_san/service/grpc/grpc_agent_service.py b/neuro_san/service/grpc/grpc_agent_service.py index 9d26ae730..45347cae2 100644 --- a/neuro_san/service/grpc/grpc_agent_service.py +++ b/neuro_san/service/grpc/grpc_agent_service.py @@ -29,6 +29,7 @@ from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.agent_service_provider import AgentServiceProvider from neuro_san.service.generic.agent_service import AgentService +from neuro_san.service.utils.service_context import ServiceContext class GrpcAgentService(agent_pb2_grpc.AgentServiceServicer): @@ -42,7 +43,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + service_context: ServiceContext): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -59,6 +61,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asyncrhonous threads can also properly initialize their logging. + :param service_context: The ServiceContext object containing global-ish state """ self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder() self.service_provider: AgentServiceProvider =\ @@ -67,7 +70,8 @@ def __init__(self, security_cfg, agent_name, agent_network_provider, - server_logging) + server_logging, + service_context) def get_request_count(self) -> int: """ diff --git a/neuro_san/service/http/handlers/health_check_handler.py b/neuro_san/service/http/handlers/health_check_handler.py index e65b94830..ff9947c07 100644 --- a/neuro_san/service/http/handlers/health_check_handler.py +++ b/neuro_san/service/http/handlers/health_check_handler.py @@ -23,7 +23,7 @@ from tornado.web import RequestHandler from neuro_san.service.http.logging.http_logger import HttpLogger -from neuro_san.service.util.server_status import ServerStatus +from neuro_san.service.utils.server_status import ServerStatus class HealthCheckHandler(RequestHandler): diff --git a/neuro_san/service/http/server/http_server.py b/neuro_san/service/http/server/http_server.py index 2c1018646..cc7655ee2 100644 --- a/neuro_san/service/http/server/http_server.py +++ b/neuro_san/service/http/server/http_server.py @@ -24,7 +24,6 @@ from neuro_san.internals.interfaces.agent_state_listener import AgentStateListener from neuro_san.internals.interfaces.agent_storage_source import AgentStorageSource -from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider @@ -39,7 +38,8 @@ from neuro_san.service.http.server.http_server_app import HttpServerApp from neuro_san.service.interfaces.agent_server import AgentServer from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger -from neuro_san.service.util.server_status import ServerStatus +from neuro_san.service.utils.server_status import ServerStatus +from neuro_san.service.utils.service_context import ServiceContext class HttpServer(AgentAuthorizer, AgentStateListener): @@ -52,30 +52,25 @@ class HttpServer(AgentAuthorizer, AgentStateListener): TIMEOUT_TO_START_SECONDS: int = 10 def __init__(self, - server_status: ServerStatus, + service_context: ServiceContext, http_port: int, openapi_service_spec_path: str, requests_limit: int, - network_storage_dict: Dict[str, AgentNetworkStorage], forwarded_request_metadata: str = AgentServer.DEFAULT_FORWARDED_REQUEST_METADATA): """ Constructor: - :param server_status: server status to register the state of http server + :param service_context: ServiceContext with global-ish state :param http_port: port for http neuro-san service; :param openapi_service_spec_path: path to a file with OpenAPI service specification; :param request_limit: The number of requests to service before shutting down. This is useful to be sure production environments can handle a service occasionally going down. - :param network_storage_dict: A dictionary of string (descripting scope) to - AgentNetworkStorage instance which keeps all the AgentNetwork instances - of a particular grouping. :param forwarded_request_metadata: A space-delimited list of http metadata request keys to forward to logs/other requests """ self.server_name_for_logs: str = "Http Server" self.http_port = http_port - self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict - self.server_status: ServerStatus = server_status + self.service_context: ServiceContext = service_context # Randomize requests limit for this server instance. # Lower and upper bounds for number of requests before shutting down @@ -94,7 +89,7 @@ def __init__(self, self.lock = threading.Lock() # Add listener to handle adding per-agent http service # (services map is defined by self.allowed_agents dictionary) - for network_storage in self.network_storage_dict.values(): + for network_storage in self.service_context.get_network_storage_dict().values(): network_storage.add_listener(self) def __call__(self, other_server: AgentServer): @@ -106,7 +101,8 @@ def __call__(self, other_server: AgentServer): self.logger.debug({}, "Serving agents: %s", repr(self.allowed_agents.keys())) app.listen(self.http_port) - self.server_status.http_service.set_status(True) + server_status: ServerStatus = self.service_context.get_server_status() + server_status.http_service.set_status(True) self.logger.info({}, "HTTP server is running on port %d", self.http_port) self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) @@ -122,12 +118,12 @@ def make_app(self, requests_limit: int, logger: EventLoopLogger): request_initialize_data: Dict[str, Any] = self.build_request_data() live_request_initialize_data: Dict[str, Any] = { "forwarded_request_metadata": self.forwarded_request_metadata, - "server_status": self.server_status, + "server_status": self.service_context.get_server_status(), "op": "live" } ready_request_initialize_data: Dict[str, Any] = { "forwarded_request_metadata": self.forwarded_request_metadata, - "server_status": self.server_status, + "server_status": self.service_context.get_server_status(), "op": "ready" } handlers = [] @@ -167,7 +163,8 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): None, agent_name, agent_network_provider, - agent_server_logging) + agent_server_logging, + self.service_context) self.allowed_agents[agent_name] = agent_service_provider self.logger.info({}, "Added agent %s to allowed http service list", agent_name) @@ -199,5 +196,5 @@ def build_request_data(self) -> Dict[str, Any]: "agent_policy": self, "forwarded_request_metadata": self.forwarded_request_metadata, "openapi_service_spec_path": self.openapi_service_spec_path, - "network_storage_dict": self.network_storage_dict + "network_storage_dict": self.service_context.get_network_storage_dict() } diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index 54afe21f0..d7fddcdd9 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -28,7 +28,6 @@ from neuro_san.internals.graph.registry.agent_network import AgentNetwork from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.utils.file_of_class import FileOfClass -from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS from neuro_san.service.grpc.grpc_agent_server import DEFAULT_MAX_CONCURRENT_REQUESTS @@ -37,7 +36,8 @@ from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.http.server.http_server import HttpServer from neuro_san.service.watcher.main_loop.storage_watcher import StorageWatcher -from neuro_san.service.util.server_status import ServerStatus +from neuro_san.service.utils.server_status import ServerStatus +from neuro_san.service.utils.service_context import ServiceContext # pylint: disable=too-many-instance-attributes @@ -66,11 +66,7 @@ def __init__(self): self.grpc_server: GrpcAgentServer = None self.http_server: HttpServer = None self.manifest_files: List[str] = [] - # Dictionary is string key (describing scope) to AgentNetworkStorage grouping. - self.network_storage_dict: Dict[str, AgentNetworkStorage] = { - "public": AgentNetworkStorage() - } - self.server_status: ServerStatus = None + self.service_context = ServiceContext() def parse_args(self): """ @@ -122,14 +118,15 @@ def parse_args(self): # See destination below ~ line 139, 154 for explanation. args = arg_parser.parse_args() self.server_name = args.server_name - self.server_status = ServerStatus(self.server_name) + server_status = ServerStatus(self.server_name) + self.service_context.set_server_status(server_status) self.grpc_port = args.port if self.grpc_port == 0: - self.server_status.grpc_service.set_requested(False) + server_status.grpc_service.set_requested(False) self.http_port = args.http_port if self.http_port == 0: - self.server_status.http_service.set_requested(False) + server_status.http_service.set_requested(False) self.server_name_for_logs = args.server_name_for_logs self.max_concurrent_requests = args.max_concurrent_requests @@ -144,7 +141,7 @@ def parse_args(self): self.manifest_update_period_seconds = args.manifest_update_period_seconds if self.manifest_update_period_seconds <= 0: # StorageWatcher is disabled: - self.server_status.updater.set_requested(False) + server_status.updater.set_requested(False) manifest_restorer = RegistryManifestRestorer() manifest_agent_networks: Dict[str, AgentNetwork] = manifest_restorer.restore() @@ -178,14 +175,13 @@ def main_loop(self): metadata_set = metadata_set | set(self.usage_logger_metadata.split()) metadata_str: str = " ".join(sorted(metadata_set)) - AsyncioExecutorPoolProvider.set_executors_pool(reuse_mode=True) + server_status: ServerStatus = self.service_context.get_server_status() - if self.server_status.grpc_service.is_requested(): + if server_status.grpc_service.is_requested(): self.grpc_server = GrpcAgentServer( self.grpc_port, server_loop_callbacks=self, - network_storage_dict=self.network_storage_dict, - server_status=self.server_status, + service_context=self.service_context, server_name=self.server_name, server_name_for_logs=self.server_name_for_logs, max_concurrent_requests=self.max_concurrent_requests, @@ -193,43 +189,41 @@ def main_loop(self): forwarded_request_metadata=metadata_str) self.grpc_server.prepare_for_serving() - if self.server_status.updater.is_requested(): - if not self.server_status.grpc_service.is_requested(): + if server_status.updater.is_requested(): + if not server_status.grpc_service.is_requested(): current_dir: str = os.path.dirname(os.path.abspath(__file__)) - setup_logging(self.server_status.updater.get_service_name(), + setup_logging(server_status.updater.get_service_name(), current_dir, 'AGENT_SERVICE_LOG_JSON', 'AGENT_SERVICE_LOG_LEVEL') manifest_file: str = self.manifest_files[0] watcher = StorageWatcher( - self.network_storage_dict, manifest_file, self.manifest_update_period_seconds, - self.server_status) + self.service_context) watcher.start() - if self.server_status.http_service.is_requested(): + if server_status.http_service.is_requested(): # Create HTTP server; self.http_server = HttpServer( - self.server_status, + self.service_context, self.http_port, self.service_openapi_spec_file, self.request_limit, - self.network_storage_dict, forwarded_request_metadata=metadata_str) # Now - our servers (gRPC and http) are created and listen to updates of network_storage # Perform the initial setup - public_storage: AgentNetworkStorage = self.network_storage_dict.get("public") + public_storage: AgentNetworkStorage = self.service_context.get_network_storage_dict().get("public") public_storage.setup_agent_networks(self.agent_networks) # Start all services: http_server_thread = None - if self.server_status.http_service.is_requested(): + if server_status.http_service.is_requested(): http_server_thread = threading.Thread(target=self.http_server, args=(self.grpc_server,), daemon=True) http_server_thread.start() - if self.server_status.grpc_service.is_requested(): + if server_status.grpc_service.is_requested(): self.grpc_server.serve() if http_server_thread is not None: diff --git a/neuro_san/service/util/__init__.py b/neuro_san/service/utils/__init__.py similarity index 100% rename from neuro_san/service/util/__init__.py rename to neuro_san/service/utils/__init__.py diff --git a/neuro_san/service/util/server_status.py b/neuro_san/service/utils/server_status.py similarity index 96% rename from neuro_san/service/util/server_status.py rename to neuro_san/service/utils/server_status.py index 3023c02ed..504582997 100644 --- a/neuro_san/service/util/server_status.py +++ b/neuro_san/service/utils/server_status.py @@ -10,7 +10,7 @@ # # END COPYRIGHT -from neuro_san.service.util.service_status import ServiceStatus +from neuro_san.service.utils.service_status import ServiceStatus class ServerStatus: diff --git a/neuro_san/service/utils/service_context.py b/neuro_san/service/utils/service_context.py new file mode 100644 index 000000000..7d69edff9 --- /dev/null +++ b/neuro_san/service/utils/service_context.py @@ -0,0 +1,60 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +from typing import Dict + +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool + +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage +from neuro_san.service.utils.server_status import ServerStatus + + +class ServiceContext: + """ + Class that contains global-ish state for each instance of a service. + """ + + def __init__(self): + """ + Constructor. + """ + self.server_status: ServerStatus = None + self.executor_pool = AsyncioExecutorPool(reuse_mode=True) + + # Dictionary is string key (describing scope) to AgentNetworkStorage grouping. + self.network_storage_dict: Dict[str, AgentNetworkStorage] = { + "public": AgentNetworkStorage() + } + + def get_executor_pool(self) -> AsyncioExecutorPool: + """ + :return: The AsyncioExecutorPool + """ + return self.executor_pool + + def set_server_status(self, server_status: ServerStatus): + """ + Sets the server status + """ + self.server_status = server_status + + def get_server_status(self) -> ServerStatus: + """ + :return: The ServerStatus + """ + return self.server_status + + def get_network_storage_dict(self) -> Dict[str, AgentNetworkStorage]: + """ + :return: The Network Storage dictionary + """ + return self.network_storage_dict diff --git a/neuro_san/service/util/service_status.py b/neuro_san/service/utils/service_status.py similarity index 100% rename from neuro_san/service/util/service_status.py rename to neuro_san/service/utils/service_status.py diff --git a/neuro_san/service/watcher/main_loop/storage_watcher.py b/neuro_san/service/watcher/main_loop/storage_watcher.py index af735a933..be30ecc60 100644 --- a/neuro_san/service/watcher/main_loop/storage_watcher.py +++ b/neuro_san/service/watcher/main_loop/storage_watcher.py @@ -8,7 +8,6 @@ # neuro-san SDK Software in commercial settings. # # END COPYRIGHT -from typing import Dict from typing import List from logging import getLogger @@ -18,11 +17,10 @@ from time import sleep from time import time -from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.service.watcher.interfaces.startable import Startable from neuro_san.service.watcher.interfaces.storage_updater import StorageUpdater from neuro_san.service.watcher.registries.registry_storage_updater import RegistryStorageUpdater -from neuro_san.service.util.server_status import ServerStatus +from neuro_san.service.utils.service_context import ServiceContext # pylint: disable=too-many-instance-attributes @@ -33,32 +31,29 @@ class StorageWatcher(Startable): and other changes to AgentNetworkStorage instances. """ def __init__(self, - network_storage_dict: Dict[str, AgentNetworkStorage], manifest_path: str, manifest_update_period_in_seconds: int, - server_status: ServerStatus): + service_context: ServiceContext): """ Constructor. - :param network_storage_dict: A dictionary of string (descripting scope) to - AgentNetworkStorage instance which keeps all the AgentNetwork instances - of a particular grouping. :param manifest_path: file path to server manifest file :param manifest_update_period_in_seconds: update period in seconds - :param server_status: server status to register the state of updater + :param service_context: ServiceContext for global-ish state """ - self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict self.logger: Logger = getLogger(self.__class__.__name__) self.updater_thread = Thread(target=self._run, daemon=True) + self.service_context: ServiceContext = service_context self.storage_updaters: List[StorageUpdater] = [ - RegistryStorageUpdater(network_storage_dict, manifest_update_period_in_seconds, manifest_path), + RegistryStorageUpdater(self.service_context.get_network_storage_dict(), + manifest_update_period_in_seconds, + manifest_path), # We will eventually have more here ] self.update_period_in_seconds: int = self.compute_update_period_in_seconds(self.storage_updaters) - self.server_status: ServerStatus = server_status self.keep_running: bool = True @staticmethod @@ -106,7 +101,7 @@ def _run(self): sleep_for_seconds: float = self.update_period_in_seconds while self.keep_running: - self.server_status.updater.set_status(True) + self.service_context.get_server_status().updater.set_status(True) sleep(sleep_for_seconds) From 193b249bce83eec49d65b0466854e49ce3f985ab Mon Sep 17 00:00:00 2001 From: Dan Fink Date: Mon, 28 Jul 2025 17:51:56 -0700 Subject: [PATCH 3/4] Rename to ServerContext --- neuro_san/service/generic/agent_service.py | 8 ++++---- .../service/generic/agent_service_provider.py | 10 +++++----- .../service/generic/async_agent_service.py | 8 ++++---- .../generic/async_agent_service_provider.py | 10 +++++----- neuro_san/service/grpc/grpc_agent_server.py | 16 +++++++-------- neuro_san/service/grpc/grpc_agent_service.py | 8 ++++---- neuro_san/service/http/server/http_server.py | 20 +++++++++---------- .../service/main_loop/server_main_loop.py | 16 +++++++-------- .../{service_context.py => server_context.py} | 4 ++-- .../watcher/main_loop/storage_watcher.py | 12 +++++------ 10 files changed, 56 insertions(+), 56 deletions(-) rename neuro_san/service/utils/{service_context.py => server_context.py} (98%) diff --git a/neuro_san/service/generic/agent_service.py b/neuro_san/service/generic/agent_service.py index 12d54c591..b96e88aca 100644 --- a/neuro_san/service/generic/agent_service.py +++ b/neuro_san/service/generic/agent_service.py @@ -34,7 +34,7 @@ from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -59,7 +59,7 @@ def __init__(self, agent_name: str, agent_network_provider: AgentNetworkProvider, server_logging: AgentServerLogging, - service_context: ServiceContext): + server_context: ServerContext): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -76,7 +76,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asyncrhonous threads can also properly initialize their logging. - :param service_context: The ServiceContext containing global-ish state + :param server_context: The ServerContext containing global-ish state """ self.request_logger = request_logger self.security_cfg = security_cfg @@ -90,7 +90,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executor_pool: AsyncioExecutorPool = service_context.get_executor_pool() + self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool() # Load once self.llm_factory.load() self.toolbox_factory.load() diff --git a/neuro_san/service/generic/agent_service_provider.py b/neuro_san/service/generic/agent_service_provider.py index 3f843daf1..92a266a76 100644 --- a/neuro_san/service/generic/agent_service_provider.py +++ b/neuro_san/service/generic/agent_service_provider.py @@ -20,7 +20,7 @@ from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.service.generic.agent_service import AgentService from neuro_san.service.generic.agent_server_logging import AgentServerLogging -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext # pylint: disable=too-many-instance-attributes @@ -37,7 +37,7 @@ def __init__(self, agent_name: str, agent_network_provider: AgentNetworkProvider, server_logging: AgentServerLogging, - service_context: ServiceContext): + server_context: ServerContext): """ Constructor. :param request_logger: The instance of the RequestLogger that helps @@ -51,7 +51,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. - :param service_context: The ServiceContext object with global-ish state + :param server_context: The ServerContext object with global-ish state """ self.request_logger = request_logger self.security_cfg = copy.deepcopy(security_cfg) @@ -59,7 +59,7 @@ def __init__(self, self.agent_network_provider: AgentNetworkProvider = agent_network_provider self.agent_name: str = agent_name self.lock: Lock = Lock() - self.service_context: ServiceContext = service_context + self.server_context: ServerContext = server_context self.service_instance: AgentService = None def get_service(self) -> AgentService: @@ -76,7 +76,7 @@ def get_service(self) -> AgentService: self.agent_name, self.agent_network_provider, self.server_logging, - self.service_context) + self.server_context) return self.service_instance def service_created(self) -> bool: diff --git a/neuro_san/service/generic/async_agent_service.py b/neuro_san/service/generic/async_agent_service.py index 0d3190108..0cd398838 100644 --- a/neuro_san/service/generic/async_agent_service.py +++ b/neuro_san/service/generic/async_agent_service.py @@ -33,7 +33,7 @@ from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -58,7 +58,7 @@ def __init__(self, agent_name: str, agent_network_provider: AgentNetworkProvider, server_logging: AgentServerLogging, - service_context: ServiceContext): + server_context: ServerContext): """ :param request_logger: The instance of the EventLoopLogger that helps log information from running event loop @@ -71,7 +71,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. - :param service_context: The ServiceContext holding global-ish state + :param server_context: The ServerContext holding global-ish state """ self.request_logger = request_logger self.security_cfg = security_cfg @@ -85,7 +85,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executor_pool: AsyncioExecutorPool = service_context.get_executor_pool() + self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool() # Load once. self.llm_factory.load() self.toolbox_factory.load() diff --git a/neuro_san/service/generic/async_agent_service_provider.py b/neuro_san/service/generic/async_agent_service_provider.py index 58e6dc380..97e538ea1 100644 --- a/neuro_san/service/generic/async_agent_service_provider.py +++ b/neuro_san/service/generic/async_agent_service_provider.py @@ -19,7 +19,7 @@ from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.generic.async_agent_service import AsyncAgentService from neuro_san.service.generic.agent_server_logging import AgentServerLogging -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext # pylint: disable=too-many-instance-attributes @@ -36,7 +36,7 @@ def __init__(self, agent_name: str, agent_network_provider: AgentNetworkProvider, server_logging: AgentServerLogging, - service_context: ServiceContext): + server_context: ServerContext): """ Constructor. :param request_logger: The instance of the EventLoopLogger that helps @@ -50,7 +50,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. - :param service_context: The ServiceContext object containing global-ish state + :param server_context: The ServerContext object containing global-ish state """ self.request_logger = request_logger self.security_cfg = copy.deepcopy(security_cfg) @@ -58,7 +58,7 @@ def __init__(self, self.agent_network_provider: AgentNetworkProvider = agent_network_provider self.agent_name: str = agent_name self.lock: Lock = Lock() - self.service_context: ServiceContext = service_context + self.server_context: ServerContext = server_context self.service_instance: AsyncAgentService = None def get_service(self) -> AsyncAgentService: @@ -75,7 +75,7 @@ def get_service(self) -> AsyncAgentService: self.agent_name, self.agent_network_provider, self.server_logging, - self.service_context) + self.server_context) return self.service_instance def service_created(self) -> bool: diff --git a/neuro_san/service/grpc/grpc_agent_server.py b/neuro_san/service/grpc/grpc_agent_server.py index bb00f36ec..6c8dd6bdf 100644 --- a/neuro_san/service/grpc/grpc_agent_server.py +++ b/neuro_san/service/grpc/grpc_agent_server.py @@ -30,7 +30,7 @@ from neuro_san.service.grpc.dynamic_agent_router import DynamicAgentRouter from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.interfaces.agent_server import AgentServer -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext from neuro_san.session.agent_service_stub import AgentServiceStub DEFAULT_SERVER_NAME: str = 'neuro-san.Agent' @@ -52,7 +52,7 @@ class GrpcAgentServer(AgentServer, AgentStateListener): # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, port: int, server_loop_callbacks: ServerLoopCallbacks, - service_context: ServiceContext, + server_context: ServerContext, server_name: str = DEFAULT_SERVER_NAME, server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS, max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS, @@ -64,7 +64,7 @@ def __init__(self, port: int, :param port: The integer port number for the service to listen on :param server_loop_callbacks: The ServerLoopCallbacks instance for break out methods in main serving loop. - :param service_context: ServiceContext carrying global-ish state + :param server_context: ServerContext carrying global-ish state :param server_name: The name of the service :param server_name_for_logs: The name of the service for log files :param max_concurrent_requests: The maximum number of requests to handle at a time. @@ -87,7 +87,7 @@ def __init__(self, port: int, self.server_name_for_logs: str = server_name_for_logs self.max_concurrent_requests: int = max_concurrent_requests self.request_limit: int = request_limit - self.service_context: ServiceContext = service_context + self.server_context: ServerContext = server_context self.server_lifetime = None self.security_cfg = None @@ -110,7 +110,7 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): agent_name, source.get_agent_network_provider(agent_name), self.server_logging, - self.service_context) + self.server_context) self.services.append(service) servicer_to_server = AgentServicerToServer(service) agent_rpc_handlers = servicer_to_server.build_rpc_handlers() @@ -160,7 +160,7 @@ def prepare_for_serving(self): # Add listener to handle adding per-agent gRPC services # to our dynamic router: - network_storage_dict: Dict[str, AgentNetworkStorage] = self.service_context.get_network_storage_dict() + network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() public_storage: AgentNetworkStorage = network_storage_dict.get("public") for network_storage in network_storage_dict.values(): network_storage.add_listener(self) @@ -181,13 +181,13 @@ def serve(self): """ Start serving gRPC requests """ - self.service_context.get_server_status().grpc_service.set_status(True) + self.server_context.get_server_status().grpc_service.set_status(True) self.server_lifetime.run() def stop(self): """ Stop the server. """ - self.service_context.get_server_status().set_grpc_status(False) + self.server_context.get_server_status().set_grpc_status(False) # pylint: disable=protected-access self.server_lifetime._stop_serving() diff --git a/neuro_san/service/grpc/grpc_agent_service.py b/neuro_san/service/grpc/grpc_agent_service.py index 45347cae2..0f675c833 100644 --- a/neuro_san/service/grpc/grpc_agent_service.py +++ b/neuro_san/service/grpc/grpc_agent_service.py @@ -29,7 +29,7 @@ from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.agent_service_provider import AgentServiceProvider from neuro_san.service.generic.agent_service import AgentService -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext class GrpcAgentService(agent_pb2_grpc.AgentServiceServicer): @@ -44,7 +44,7 @@ def __init__(self, agent_name: str, agent_network_provider: AgentNetworkProvider, server_logging: AgentServerLogging, - service_context: ServiceContext): + server_context: ServerContext): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -61,7 +61,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asyncrhonous threads can also properly initialize their logging. - :param service_context: The ServiceContext object containing global-ish state + :param server_context: The ServerContext object containing global-ish state """ self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder() self.service_provider: AgentServiceProvider =\ @@ -71,7 +71,7 @@ def __init__(self, agent_name, agent_network_provider, server_logging, - service_context) + server_context) def get_request_count(self) -> int: """ diff --git a/neuro_san/service/http/server/http_server.py b/neuro_san/service/http/server/http_server.py index cc7655ee2..63964940d 100644 --- a/neuro_san/service/http/server/http_server.py +++ b/neuro_san/service/http/server/http_server.py @@ -39,7 +39,7 @@ from neuro_san.service.interfaces.agent_server import AgentServer from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.utils.server_status import ServerStatus -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext class HttpServer(AgentAuthorizer, AgentStateListener): @@ -52,14 +52,14 @@ class HttpServer(AgentAuthorizer, AgentStateListener): TIMEOUT_TO_START_SECONDS: int = 10 def __init__(self, - service_context: ServiceContext, + server_context: ServerContext, http_port: int, openapi_service_spec_path: str, requests_limit: int, forwarded_request_metadata: str = AgentServer.DEFAULT_FORWARDED_REQUEST_METADATA): """ Constructor: - :param service_context: ServiceContext with global-ish state + :param server_context: ServerContext with global-ish state :param http_port: port for http neuro-san service; :param openapi_service_spec_path: path to a file with OpenAPI service specification; :param request_limit: The number of requests to service before shutting down. @@ -70,7 +70,7 @@ def __init__(self, """ self.server_name_for_logs: str = "Http Server" self.http_port = http_port - self.service_context: ServiceContext = service_context + self.server_context: ServerContext = server_context # Randomize requests limit for this server instance. # Lower and upper bounds for number of requests before shutting down @@ -89,7 +89,7 @@ def __init__(self, self.lock = threading.Lock() # Add listener to handle adding per-agent http service # (services map is defined by self.allowed_agents dictionary) - for network_storage in self.service_context.get_network_storage_dict().values(): + for network_storage in self.server_context.get_network_storage_dict().values(): network_storage.add_listener(self) def __call__(self, other_server: AgentServer): @@ -101,7 +101,7 @@ def __call__(self, other_server: AgentServer): self.logger.debug({}, "Serving agents: %s", repr(self.allowed_agents.keys())) app.listen(self.http_port) - server_status: ServerStatus = self.service_context.get_server_status() + server_status: ServerStatus = self.server_context.get_server_status() server_status.http_service.set_status(True) self.logger.info({}, "HTTP server is running on port %d", self.http_port) self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) @@ -118,12 +118,12 @@ def make_app(self, requests_limit: int, logger: EventLoopLogger): request_initialize_data: Dict[str, Any] = self.build_request_data() live_request_initialize_data: Dict[str, Any] = { "forwarded_request_metadata": self.forwarded_request_metadata, - "server_status": self.service_context.get_server_status(), + "server_status": self.server_context.get_server_status(), "op": "live" } ready_request_initialize_data: Dict[str, Any] = { "forwarded_request_metadata": self.forwarded_request_metadata, - "server_status": self.service_context.get_server_status(), + "server_status": self.server_context.get_server_status(), "op": "ready" } handlers = [] @@ -164,7 +164,7 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): agent_name, agent_network_provider, agent_server_logging, - self.service_context) + self.server_context) self.allowed_agents[agent_name] = agent_service_provider self.logger.info({}, "Added agent %s to allowed http service list", agent_name) @@ -196,5 +196,5 @@ def build_request_data(self) -> Dict[str, Any]: "agent_policy": self, "forwarded_request_metadata": self.forwarded_request_metadata, "openapi_service_spec_path": self.openapi_service_spec_path, - "network_storage_dict": self.service_context.get_network_storage_dict() + "network_storage_dict": self.server_context.get_network_storage_dict() } diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index d7fddcdd9..3dce1f0b7 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -37,7 +37,7 @@ from neuro_san.service.http.server.http_server import HttpServer from neuro_san.service.watcher.main_loop.storage_watcher import StorageWatcher from neuro_san.service.utils.server_status import ServerStatus -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext # pylint: disable=too-many-instance-attributes @@ -66,7 +66,7 @@ def __init__(self): self.grpc_server: GrpcAgentServer = None self.http_server: HttpServer = None self.manifest_files: List[str] = [] - self.service_context = ServiceContext() + self.server_context = ServerContext() def parse_args(self): """ @@ -119,7 +119,7 @@ def parse_args(self): args = arg_parser.parse_args() self.server_name = args.server_name server_status = ServerStatus(self.server_name) - self.service_context.set_server_status(server_status) + self.server_context.set_server_status(server_status) self.grpc_port = args.port if self.grpc_port == 0: @@ -175,13 +175,13 @@ def main_loop(self): metadata_set = metadata_set | set(self.usage_logger_metadata.split()) metadata_str: str = " ".join(sorted(metadata_set)) - server_status: ServerStatus = self.service_context.get_server_status() + server_status: ServerStatus = self.server_context.get_server_status() if server_status.grpc_service.is_requested(): self.grpc_server = GrpcAgentServer( self.grpc_port, server_loop_callbacks=self, - service_context=self.service_context, + server_context=self.server_context, server_name=self.server_name, server_name_for_logs=self.server_name_for_logs, max_concurrent_requests=self.max_concurrent_requests, @@ -200,13 +200,13 @@ def main_loop(self): watcher = StorageWatcher( manifest_file, self.manifest_update_period_seconds, - self.service_context) + self.server_context) watcher.start() if server_status.http_service.is_requested(): # Create HTTP server; self.http_server = HttpServer( - self.service_context, + self.server_context, self.http_port, self.service_openapi_spec_file, self.request_limit, @@ -214,7 +214,7 @@ def main_loop(self): # Now - our servers (gRPC and http) are created and listen to updates of network_storage # Perform the initial setup - public_storage: AgentNetworkStorage = self.service_context.get_network_storage_dict().get("public") + public_storage: AgentNetworkStorage = self.server_context.get_network_storage_dict().get("public") public_storage.setup_agent_networks(self.agent_networks) # Start all services: diff --git a/neuro_san/service/utils/service_context.py b/neuro_san/service/utils/server_context.py similarity index 98% rename from neuro_san/service/utils/service_context.py rename to neuro_san/service/utils/server_context.py index 7d69edff9..114c24f40 100644 --- a/neuro_san/service/utils/service_context.py +++ b/neuro_san/service/utils/server_context.py @@ -18,9 +18,9 @@ from neuro_san.service.utils.server_status import ServerStatus -class ServiceContext: +class ServerContext: """ - Class that contains global-ish state for each instance of a service. + Class that contains global-ish state for each instance of a server. """ def __init__(self): diff --git a/neuro_san/service/watcher/main_loop/storage_watcher.py b/neuro_san/service/watcher/main_loop/storage_watcher.py index be30ecc60..71cda1b83 100644 --- a/neuro_san/service/watcher/main_loop/storage_watcher.py +++ b/neuro_san/service/watcher/main_loop/storage_watcher.py @@ -20,7 +20,7 @@ from neuro_san.service.watcher.interfaces.startable import Startable from neuro_san.service.watcher.interfaces.storage_updater import StorageUpdater from neuro_san.service.watcher.registries.registry_storage_updater import RegistryStorageUpdater -from neuro_san.service.utils.service_context import ServiceContext +from neuro_san.service.utils.server_context import ServerContext # pylint: disable=too-many-instance-attributes @@ -33,20 +33,20 @@ class StorageWatcher(Startable): def __init__(self, manifest_path: str, manifest_update_period_in_seconds: int, - service_context: ServiceContext): + server_context: ServerContext): """ Constructor. :param manifest_path: file path to server manifest file :param manifest_update_period_in_seconds: update period in seconds - :param service_context: ServiceContext for global-ish state + :param server_context: ServerContext for global-ish state """ self.logger: Logger = getLogger(self.__class__.__name__) self.updater_thread = Thread(target=self._run, daemon=True) - self.service_context: ServiceContext = service_context + self.server_context: ServerContext = server_context self.storage_updaters: List[StorageUpdater] = [ - RegistryStorageUpdater(self.service_context.get_network_storage_dict(), + RegistryStorageUpdater(self.server_context.get_network_storage_dict(), manifest_update_period_in_seconds, manifest_path), # We will eventually have more here @@ -101,7 +101,7 @@ def _run(self): sleep_for_seconds: float = self.update_period_in_seconds while self.keep_running: - self.service_context.get_server_status().updater.set_status(True) + self.server_context.get_server_status().updater.set_status(True) sleep(sleep_for_seconds) From 28f8a9834be426e836fde3fefb354ef3a219ef98 Mon Sep 17 00:00:00 2001 From: Dan Fink Date: Tue, 29 Jul 2025 17:29:57 -0700 Subject: [PATCH 4/4] Expand some lines too long --- neuro_san/service/grpc/grpc_agent_server.py | 7 +++++-- neuro_san/service/http/server/http_server.py | 4 +++- neuro_san/service/main_loop/server_main_loop.py | 3 ++- neuro_san/service/watcher/main_loop/storage_watcher.py | 4 +++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/neuro_san/service/grpc/grpc_agent_server.py b/neuro_san/service/grpc/grpc_agent_server.py index 6c8dd6bdf..028586ef1 100644 --- a/neuro_san/service/grpc/grpc_agent_server.py +++ b/neuro_san/service/grpc/grpc_agent_server.py @@ -31,6 +31,7 @@ from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.interfaces.agent_server import AgentServer from neuro_san.service.utils.server_context import ServerContext +from neuro_san.service.utils.server_status import ServerStatus from neuro_san.session.agent_service_stub import AgentServiceStub DEFAULT_SERVER_NAME: str = 'neuro-san.Agent' @@ -181,13 +182,15 @@ def serve(self): """ Start serving gRPC requests """ - self.server_context.get_server_status().grpc_service.set_status(True) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.grpc_service.set_status(True) self.server_lifetime.run() def stop(self): """ Stop the server. """ - self.server_context.get_server_status().set_grpc_status(False) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.grpc_service.set_status(False) # pylint: disable=protected-access self.server_lifetime._stop_serving() diff --git a/neuro_san/service/http/server/http_server.py b/neuro_san/service/http/server/http_server.py index 63964940d..53e8dd4f4 100644 --- a/neuro_san/service/http/server/http_server.py +++ b/neuro_san/service/http/server/http_server.py @@ -24,6 +24,7 @@ from neuro_san.internals.interfaces.agent_state_listener import AgentStateListener from neuro_san.internals.interfaces.agent_storage_source import AgentStorageSource +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider @@ -89,7 +90,8 @@ def __init__(self, self.lock = threading.Lock() # Add listener to handle adding per-agent http service # (services map is defined by self.allowed_agents dictionary) - for network_storage in self.server_context.get_network_storage_dict().values(): + network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() + for network_storage in network_storage_dict.values(): network_storage.add_listener(self) def __call__(self, other_server: AgentServer): diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index 3dce1f0b7..615f94b22 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -214,7 +214,8 @@ def main_loop(self): # Now - our servers (gRPC and http) are created and listen to updates of network_storage # Perform the initial setup - public_storage: AgentNetworkStorage = self.server_context.get_network_storage_dict().get("public") + network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() + public_storage: AgentNetworkStorage = network_storage_dict.get("public") public_storage.setup_agent_networks(self.agent_networks) # Start all services: diff --git a/neuro_san/service/watcher/main_loop/storage_watcher.py b/neuro_san/service/watcher/main_loop/storage_watcher.py index 71cda1b83..bd5e7777a 100644 --- a/neuro_san/service/watcher/main_loop/storage_watcher.py +++ b/neuro_san/service/watcher/main_loop/storage_watcher.py @@ -21,6 +21,7 @@ from neuro_san.service.watcher.interfaces.storage_updater import StorageUpdater from neuro_san.service.watcher.registries.registry_storage_updater import RegistryStorageUpdater from neuro_san.service.utils.server_context import ServerContext +from neuro_san.service.utils.server_status import ServerStatus # pylint: disable=too-many-instance-attributes @@ -101,7 +102,8 @@ def _run(self): sleep_for_seconds: float = self.update_period_in_seconds while self.keep_running: - self.server_context.get_server_status().updater.set_status(True) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.updater.set_status(True) sleep(sleep_for_seconds)