diff --git a/neuro_san/internals/utils/asyncio_executor_pool_provider.py b/neuro_san/internals/utils/asyncio_executor_pool_provider.py deleted file mode 100644 index cb59a9d9d..000000000 --- a/neuro_san/internals/utils/asyncio_executor_pool_provider.py +++ /dev/null @@ -1,38 +0,0 @@ - -# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. -# All Rights Reserved. -# Issued under the Academic Public License. -# -# You can be released from the terms, and requirements of the Academic Public -# License by purchasing a commercial license. -# Purchase of a commercial license is mandatory for any use of the -# neuro-san SDK Software in commercial settings. -# -# END COPYRIGHT -""" -See class comments -""" -from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool - - -class AsyncioExecutorPoolProvider: - """ - Class providing server-wide instance of AsyncioExecutorPool - for use by specific agent services. - """ - - _executors_pool: AsyncioExecutorPool = None - - @classmethod - def set_executors_pool(cls, reuse_mode: bool = True): - """ - Set AsyncioExecutorPool instance to use. - """ - cls._executors_pool = AsyncioExecutorPool(reuse_mode=reuse_mode) - - @classmethod - def get_executors_pool(cls): - """ - Get AsyncioExecutorPool instance - """ - return cls._executors_pool diff --git a/neuro_san/service/generic/agent_service.py b/neuro_san/service/generic/agent_service.py index 17abfc818..b96e88aca 100644 --- a/neuro_san/service/generic/agent_service.py +++ b/neuro_san/service/generic/agent_service.py @@ -30,11 +30,11 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory -from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger +from neuro_san.service.utils.server_context import ServerContext from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -58,7 +58,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + server_context: ServerContext): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -75,6 +76,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asyncrhonous threads can also properly initialize their logging. + :param server_context: The ServerContext containing global-ish state """ self.request_logger = request_logger self.security_cfg = security_cfg @@ -88,7 +90,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() + self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool() # Load once self.llm_factory.load() self.toolbox_factory.load() @@ -219,7 +221,7 @@ def streaming_chat(self, request_dict: Dict[str, Any], factory = ExternalAgentSessionFactory(use_direct=False) invocation_context = SessionInvocationContext( factory, - self.async_executors_pool, + self.async_executor_pool, self.llm_factory, self.toolbox_factory, metadata) diff --git a/neuro_san/service/generic/agent_service_provider.py b/neuro_san/service/generic/agent_service_provider.py index 7ded28e86..92a266a76 100644 --- a/neuro_san/service/generic/agent_service_provider.py +++ b/neuro_san/service/generic/agent_service_provider.py @@ -17,11 +17,13 @@ from leaf_server_common.server.request_logger import RequestLogger -from neuro_san.service.generic.agent_service import AgentService from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider +from neuro_san.service.generic.agent_service import AgentService from neuro_san.service.generic.agent_server_logging import AgentServerLogging +from neuro_san.service.utils.server_context import ServerContext +# pylint: disable=too-many-instance-attributes class AgentServiceProvider: """ Class providing lazy construction of AgentService instance @@ -34,7 +36,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + server_context: ServerContext): """ Constructor. :param request_logger: The instance of the RequestLogger that helps @@ -48,6 +51,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param server_context: The ServerContext object with global-ish state """ self.request_logger = request_logger self.security_cfg = copy.deepcopy(security_cfg) @@ -55,6 +59,7 @@ def __init__(self, self.agent_network_provider: AgentNetworkProvider = agent_network_provider self.agent_name: str = agent_name self.lock: Lock = Lock() + self.server_context: ServerContext = server_context self.service_instance: AgentService = None def get_service(self) -> AgentService: @@ -70,7 +75,8 @@ def get_service(self) -> AgentService: self.security_cfg, self.agent_name, self.agent_network_provider, - self.server_logging) + self.server_logging, + self.server_context) return self.service_instance def service_created(self) -> bool: diff --git a/neuro_san/service/generic/async_agent_service.py b/neuro_san/service/generic/async_agent_service.py index 61df30be2..0cd398838 100644 --- a/neuro_san/service/generic/async_agent_service.py +++ b/neuro_san/service/generic/async_agent_service.py @@ -28,12 +28,12 @@ from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory from neuro_san.internals.run_context.factory.master_toolbox_factory import MasterToolboxFactory from neuro_san.internals.run_context.factory.master_llm_factory import MasterLlmFactory -from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.chat_message_converter import ChatMessageConverter from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger from neuro_san.service.usage.usage_logger_factory import UsageLoggerFactory from neuro_san.service.usage.wrapped_usage_logger import WrappedUsageLogger +from neuro_san.service.utils.server_context import ServerContext from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.session_invocation_context import SessionInvocationContext @@ -57,7 +57,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + server_context: ServerContext): """ :param request_logger: The instance of the EventLoopLogger that helps log information from running event loop @@ -70,6 +71,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param server_context: The ServerContext holding global-ish state """ self.request_logger = request_logger self.security_cfg = security_cfg @@ -83,7 +85,7 @@ def __init__(self, config: Dict[str, Any] = agent_network.get_config() self.llm_factory: ContextTypeLlmFactory = MasterLlmFactory.create_llm_factory(config) self.toolbox_factory: ContextTypeToolboxFactory = MasterToolboxFactory.create_toolbox_factory(config) - self.async_executors_pool: AsyncioExecutorPool = AsyncioExecutorPoolProvider.get_executors_pool() + self.async_executor_pool: AsyncioExecutorPool = server_context.get_executor_pool() # Load once. self.llm_factory.load() self.toolbox_factory.load() @@ -213,7 +215,7 @@ async def streaming_chat(self, request_dict: Dict[str, Any], factory = ExternalAgentSessionFactory(use_direct=False) invocation_context = SessionInvocationContext( factory, - self.async_executors_pool, + self.async_executor_pool, self.llm_factory, self.toolbox_factory, metadata) diff --git a/neuro_san/service/generic/async_agent_service_provider.py b/neuro_san/service/generic/async_agent_service_provider.py index 58374d54c..97e538ea1 100644 --- a/neuro_san/service/generic/async_agent_service_provider.py +++ b/neuro_san/service/generic/async_agent_service_provider.py @@ -15,12 +15,14 @@ from threading import Lock import copy -from neuro_san.service.generic.async_agent_service import AsyncAgentService from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger +from neuro_san.service.generic.async_agent_service import AsyncAgentService from neuro_san.service.generic.agent_server_logging import AgentServerLogging +from neuro_san.service.utils.server_context import ServerContext +# pylint: disable=too-many-instance-attributes class AsyncAgentServiceProvider: """ Class providing lazy construction of AsyncAgentService instance @@ -33,7 +35,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + server_context: ServerContext): """ Constructor. :param request_logger: The instance of the EventLoopLogger that helps @@ -47,6 +50,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param server_context: The ServerContext object containing global-ish state """ self.request_logger = request_logger self.security_cfg = copy.deepcopy(security_cfg) @@ -54,6 +58,7 @@ def __init__(self, self.agent_network_provider: AgentNetworkProvider = agent_network_provider self.agent_name: str = agent_name self.lock: Lock = Lock() + self.server_context: ServerContext = server_context self.service_instance: AsyncAgentService = None def get_service(self) -> AsyncAgentService: @@ -69,7 +74,8 @@ def get_service(self) -> AsyncAgentService: self.security_cfg, self.agent_name, self.agent_network_provider, - self.server_logging) + self.server_logging, + self.server_context) return self.service_instance def service_created(self) -> bool: diff --git a/neuro_san/service/grpc/grpc_agent_server.py b/neuro_san/service/grpc/grpc_agent_server.py index 3e4b35a3e..028586ef1 100644 --- a/neuro_san/service/grpc/grpc_agent_server.py +++ b/neuro_san/service/grpc/grpc_agent_server.py @@ -24,13 +24,14 @@ from neuro_san.internals.interfaces.agent_state_listener import AgentStateListener from neuro_san.internals.interfaces.agent_storage_source import AgentStorageSource from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer from neuro_san.service.grpc.concierge_service import ConciergeService from neuro_san.service.grpc.dynamic_agent_router import DynamicAgentRouter from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.interfaces.agent_server import AgentServer +from neuro_san.service.utils.server_context import ServerContext +from neuro_san.service.utils.server_status import ServerStatus from neuro_san.session.agent_service_stub import AgentServiceStub DEFAULT_SERVER_NAME: str = 'neuro-san.Agent' @@ -52,8 +53,7 @@ class GrpcAgentServer(AgentServer, AgentStateListener): # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, port: int, server_loop_callbacks: ServerLoopCallbacks, - network_storage_dict: Dict[str, AgentNetworkStorage], - server_status: ServerStatus, + server_context: ServerContext, server_name: str = DEFAULT_SERVER_NAME, server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS, max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUESTS, @@ -65,10 +65,7 @@ def __init__(self, port: int, :param port: The integer port number for the service to listen on :param server_loop_callbacks: The ServerLoopCallbacks instance for break out methods in main serving loop. - :param network_storage_dict: A dictionary of string (descripting scope) to - AgentNetworkStorage instance which keeps all the AgentNetwork instances - of a particular grouping. - :param server_status: server status to register the state of gRPC server + :param server_context: ServerContext carrying global-ish state :param server_name: The name of the service :param server_name_for_logs: The name of the service for log files :param max_concurrent_requests: The maximum number of requests to handle at a time. @@ -86,13 +83,12 @@ def __init__(self, port: int, self.logger = logging.getLogger(__name__) - self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict # Below is now odd. self.server_name: str = server_name self.server_name_for_logs: str = server_name_for_logs self.max_concurrent_requests: int = max_concurrent_requests self.request_limit: int = request_limit - self.server_status: ServerStatus = server_status + self.server_context: ServerContext = server_context self.server_lifetime = None self.security_cfg = None @@ -114,7 +110,8 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): service = GrpcAgentService(self.server_lifetime, self.security_cfg, agent_name, source.get_agent_network_provider(agent_name), - self.server_logging) + self.server_logging, + self.server_context) self.services.append(service) servicer_to_server = AgentServicerToServer(service) agent_rpc_handlers = servicer_to_server.build_rpc_handlers() @@ -164,8 +161,9 @@ def prepare_for_serving(self): # Add listener to handle adding per-agent gRPC services # to our dynamic router: - public_storage: AgentNetworkStorage = self.network_storage_dict.get("public") - for network_storage in self.network_storage_dict.values(): + network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() + public_storage: AgentNetworkStorage = network_storage_dict.get("public") + for network_storage in network_storage_dict.values(): network_storage.add_listener(self) # Add DynamicAgentRouter instance as a generic RPC handler for our server: @@ -184,13 +182,15 @@ def serve(self): """ Start serving gRPC requests """ - self.server_status.grpc_service.set_status(True) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.grpc_service.set_status(True) self.server_lifetime.run() def stop(self): """ Stop the server. """ - self.server_status.set_grpc_status(False) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.grpc_service.set_status(False) # pylint: disable=protected-access self.server_lifetime._stop_serving() diff --git a/neuro_san/service/grpc/grpc_agent_service.py b/neuro_san/service/grpc/grpc_agent_service.py index 9d26ae730..0f675c833 100644 --- a/neuro_san/service/grpc/grpc_agent_service.py +++ b/neuro_san/service/grpc/grpc_agent_service.py @@ -29,6 +29,7 @@ from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.agent_service_provider import AgentServiceProvider from neuro_san.service.generic.agent_service import AgentService +from neuro_san.service.utils.server_context import ServerContext class GrpcAgentService(agent_pb2_grpc.AgentServiceServicer): @@ -42,7 +43,8 @@ def __init__(self, security_cfg: Dict[str, Any], agent_name: str, agent_network_provider: AgentNetworkProvider, - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + server_context: ServerContext): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -59,6 +61,7 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asyncrhonous threads can also properly initialize their logging. + :param server_context: The ServerContext object containing global-ish state """ self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder() self.service_provider: AgentServiceProvider =\ @@ -67,7 +70,8 @@ def __init__(self, security_cfg, agent_name, agent_network_provider, - server_logging) + server_logging, + server_context) def get_request_count(self) -> int: """ diff --git a/neuro_san/service/http/handlers/health_check_handler.py b/neuro_san/service/http/handlers/health_check_handler.py index 30726559c..ff9947c07 100644 --- a/neuro_san/service/http/handlers/health_check_handler.py +++ b/neuro_san/service/http/handlers/health_check_handler.py @@ -22,8 +22,8 @@ from importlib.metadata import PackageNotFoundError from tornado.web import RequestHandler -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.http.logging.http_logger import HttpLogger +from neuro_san.service.utils.server_status import ServerStatus class HealthCheckHandler(RequestHandler): diff --git a/neuro_san/service/http/server/http_server.py b/neuro_san/service/http/server/http_server.py index ba88dd0cd..53e8dd4f4 100644 --- a/neuro_san/service/http/server/http_server.py +++ b/neuro_san/service/http/server/http_server.py @@ -27,7 +27,6 @@ from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider from neuro_san.service.http.handlers.health_check_handler import HealthCheckHandler from neuro_san.service.http.handlers.connectivity_handler import ConnectivityHandler @@ -40,6 +39,8 @@ from neuro_san.service.http.server.http_server_app import HttpServerApp from neuro_san.service.interfaces.agent_server import AgentServer from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger +from neuro_san.service.utils.server_status import ServerStatus +from neuro_san.service.utils.server_context import ServerContext class HttpServer(AgentAuthorizer, AgentStateListener): @@ -52,30 +53,25 @@ class HttpServer(AgentAuthorizer, AgentStateListener): TIMEOUT_TO_START_SECONDS: int = 10 def __init__(self, - server_status: ServerStatus, + server_context: ServerContext, http_port: int, openapi_service_spec_path: str, requests_limit: int, - network_storage_dict: Dict[str, AgentNetworkStorage], forwarded_request_metadata: str = AgentServer.DEFAULT_FORWARDED_REQUEST_METADATA): """ Constructor: - :param server_status: server status to register the state of http server + :param server_context: ServerContext with global-ish state :param http_port: port for http neuro-san service; :param openapi_service_spec_path: path to a file with OpenAPI service specification; :param request_limit: The number of requests to service before shutting down. This is useful to be sure production environments can handle a service occasionally going down. - :param network_storage_dict: A dictionary of string (descripting scope) to - AgentNetworkStorage instance which keeps all the AgentNetwork instances - of a particular grouping. :param forwarded_request_metadata: A space-delimited list of http metadata request keys to forward to logs/other requests """ self.server_name_for_logs: str = "Http Server" self.http_port = http_port - self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict - self.server_status: ServerStatus = server_status + self.server_context: ServerContext = server_context # Randomize requests limit for this server instance. # Lower and upper bounds for number of requests before shutting down @@ -94,7 +90,8 @@ def __init__(self, self.lock = threading.Lock() # Add listener to handle adding per-agent http service # (services map is defined by self.allowed_agents dictionary) - for network_storage in self.network_storage_dict.values(): + network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() + for network_storage in network_storage_dict.values(): network_storage.add_listener(self) def __call__(self, other_server: AgentServer): @@ -106,7 +103,8 @@ def __call__(self, other_server: AgentServer): self.logger.debug({}, "Serving agents: %s", repr(self.allowed_agents.keys())) app.listen(self.http_port) - self.server_status.http_service.set_status(True) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.http_service.set_status(True) self.logger.info({}, "HTTP server is running on port %d", self.http_port) self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) @@ -122,12 +120,12 @@ def make_app(self, requests_limit: int, logger: EventLoopLogger): request_initialize_data: Dict[str, Any] = self.build_request_data() live_request_initialize_data: Dict[str, Any] = { "forwarded_request_metadata": self.forwarded_request_metadata, - "server_status": self.server_status, + "server_status": self.server_context.get_server_status(), "op": "live" } ready_request_initialize_data: Dict[str, Any] = { "forwarded_request_metadata": self.forwarded_request_metadata, - "server_status": self.server_status, + "server_status": self.server_context.get_server_status(), "op": "ready" } handlers = [] @@ -167,7 +165,8 @@ def agent_added(self, agent_name: str, source: AgentStorageSource): None, agent_name, agent_network_provider, - agent_server_logging) + agent_server_logging, + self.server_context) self.allowed_agents[agent_name] = agent_service_provider self.logger.info({}, "Added agent %s to allowed http service list", agent_name) @@ -199,5 +198,5 @@ def build_request_data(self) -> Dict[str, Any]: "agent_policy": self, "forwarded_request_metadata": self.forwarded_request_metadata, "openapi_service_spec_path": self.openapi_service_spec_path, - "network_storage_dict": self.network_storage_dict + "network_storage_dict": self.server_context.get_network_storage_dict() } diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index d7ae189ec..615f94b22 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -28,7 +28,6 @@ from neuro_san.internals.graph.registry.agent_network import AgentNetwork from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.utils.file_of_class import FileOfClass -from neuro_san.internals.utils.asyncio_executor_pool_provider import AsyncioExecutorPoolProvider from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS from neuro_san.service.grpc.grpc_agent_server import DEFAULT_MAX_CONCURRENT_REQUESTS @@ -36,8 +35,9 @@ from neuro_san.service.grpc.grpc_agent_server import GrpcAgentServer from neuro_san.service.grpc.grpc_agent_service import GrpcAgentService from neuro_san.service.http.server.http_server import HttpServer -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.watcher.main_loop.storage_watcher import StorageWatcher +from neuro_san.service.utils.server_status import ServerStatus +from neuro_san.service.utils.server_context import ServerContext # pylint: disable=too-many-instance-attributes @@ -66,11 +66,7 @@ def __init__(self): self.grpc_server: GrpcAgentServer = None self.http_server: HttpServer = None self.manifest_files: List[str] = [] - # Dictionary is string key (describing scope) to AgentNetworkStorage grouping. - self.network_storage_dict: Dict[str, AgentNetworkStorage] = { - "public": AgentNetworkStorage() - } - self.server_status: ServerStatus = None + self.server_context = ServerContext() def parse_args(self): """ @@ -122,14 +118,15 @@ def parse_args(self): # See destination below ~ line 139, 154 for explanation. args = arg_parser.parse_args() self.server_name = args.server_name - self.server_status = ServerStatus(self.server_name) + server_status = ServerStatus(self.server_name) + self.server_context.set_server_status(server_status) self.grpc_port = args.port if self.grpc_port == 0: - self.server_status.grpc_service.set_requested(False) + server_status.grpc_service.set_requested(False) self.http_port = args.http_port if self.http_port == 0: - self.server_status.http_service.set_requested(False) + server_status.http_service.set_requested(False) self.server_name_for_logs = args.server_name_for_logs self.max_concurrent_requests = args.max_concurrent_requests @@ -144,7 +141,7 @@ def parse_args(self): self.manifest_update_period_seconds = args.manifest_update_period_seconds if self.manifest_update_period_seconds <= 0: # StorageWatcher is disabled: - self.server_status.updater.set_requested(False) + server_status.updater.set_requested(False) manifest_restorer = RegistryManifestRestorer() manifest_agent_networks: Dict[str, AgentNetwork] = manifest_restorer.restore() @@ -178,14 +175,13 @@ def main_loop(self): metadata_set = metadata_set | set(self.usage_logger_metadata.split()) metadata_str: str = " ".join(sorted(metadata_set)) - AsyncioExecutorPoolProvider.set_executors_pool(reuse_mode=True) + server_status: ServerStatus = self.server_context.get_server_status() - if self.server_status.grpc_service.is_requested(): + if server_status.grpc_service.is_requested(): self.grpc_server = GrpcAgentServer( self.grpc_port, server_loop_callbacks=self, - network_storage_dict=self.network_storage_dict, - server_status=self.server_status, + server_context=self.server_context, server_name=self.server_name, server_name_for_logs=self.server_name_for_logs, max_concurrent_requests=self.max_concurrent_requests, @@ -193,43 +189,42 @@ def main_loop(self): forwarded_request_metadata=metadata_str) self.grpc_server.prepare_for_serving() - if self.server_status.updater.is_requested(): - if not self.server_status.grpc_service.is_requested(): + if server_status.updater.is_requested(): + if not server_status.grpc_service.is_requested(): current_dir: str = os.path.dirname(os.path.abspath(__file__)) - setup_logging(self.server_status.updater.get_service_name(), + setup_logging(server_status.updater.get_service_name(), current_dir, 'AGENT_SERVICE_LOG_JSON', 'AGENT_SERVICE_LOG_LEVEL') manifest_file: str = self.manifest_files[0] watcher = StorageWatcher( - self.network_storage_dict, manifest_file, self.manifest_update_period_seconds, - self.server_status) + self.server_context) watcher.start() - if self.server_status.http_service.is_requested(): + if server_status.http_service.is_requested(): # Create HTTP server; self.http_server = HttpServer( - self.server_status, + self.server_context, self.http_port, self.service_openapi_spec_file, self.request_limit, - self.network_storage_dict, forwarded_request_metadata=metadata_str) # Now - our servers (gRPC and http) are created and listen to updates of network_storage # Perform the initial setup - public_storage: AgentNetworkStorage = self.network_storage_dict.get("public") + network_storage_dict: Dict[str, AgentNetworkStorage] = self.server_context.get_network_storage_dict() + public_storage: AgentNetworkStorage = network_storage_dict.get("public") public_storage.setup_agent_networks(self.agent_networks) # Start all services: http_server_thread = None - if self.server_status.http_service.is_requested(): + if server_status.http_service.is_requested(): http_server_thread = threading.Thread(target=self.http_server, args=(self.grpc_server,), daemon=True) http_server_thread.start() - if self.server_status.grpc_service.is_requested(): + if server_status.grpc_service.is_requested(): self.grpc_server.serve() if http_server_thread is not None: diff --git a/neuro_san/service/utils/__init__.py b/neuro_san/service/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neuro_san/service/utils/server_context.py b/neuro_san/service/utils/server_context.py new file mode 100644 index 000000000..114c24f40 --- /dev/null +++ b/neuro_san/service/utils/server_context.py @@ -0,0 +1,60 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +from typing import Dict + +from leaf_common.asyncio.asyncio_executor_pool import AsyncioExecutorPool + +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage +from neuro_san.service.utils.server_status import ServerStatus + + +class ServerContext: + """ + Class that contains global-ish state for each instance of a server. + """ + + def __init__(self): + """ + Constructor. + """ + self.server_status: ServerStatus = None + self.executor_pool = AsyncioExecutorPool(reuse_mode=True) + + # Dictionary is string key (describing scope) to AgentNetworkStorage grouping. + self.network_storage_dict: Dict[str, AgentNetworkStorage] = { + "public": AgentNetworkStorage() + } + + def get_executor_pool(self) -> AsyncioExecutorPool: + """ + :return: The AsyncioExecutorPool + """ + return self.executor_pool + + def set_server_status(self, server_status: ServerStatus): + """ + Sets the server status + """ + self.server_status = server_status + + def get_server_status(self) -> ServerStatus: + """ + :return: The ServerStatus + """ + return self.server_status + + def get_network_storage_dict(self) -> Dict[str, AgentNetworkStorage]: + """ + :return: The Network Storage dictionary + """ + return self.network_storage_dict diff --git a/neuro_san/service/main_loop/server_status.py b/neuro_san/service/utils/server_status.py similarity index 95% rename from neuro_san/service/main_loop/server_status.py rename to neuro_san/service/utils/server_status.py index e68b23b83..504582997 100644 --- a/neuro_san/service/main_loop/server_status.py +++ b/neuro_san/service/utils/server_status.py @@ -10,7 +10,7 @@ # # END COPYRIGHT -from neuro_san.service.main_loop.service_status import ServiceStatus +from neuro_san.service.utils.service_status import ServiceStatus class ServerStatus: diff --git a/neuro_san/service/main_loop/service_status.py b/neuro_san/service/utils/service_status.py similarity index 100% rename from neuro_san/service/main_loop/service_status.py rename to neuro_san/service/utils/service_status.py diff --git a/neuro_san/service/watcher/main_loop/storage_watcher.py b/neuro_san/service/watcher/main_loop/storage_watcher.py index 6a4c21fe7..bd5e7777a 100644 --- a/neuro_san/service/watcher/main_loop/storage_watcher.py +++ b/neuro_san/service/watcher/main_loop/storage_watcher.py @@ -8,7 +8,6 @@ # neuro-san SDK Software in commercial settings. # # END COPYRIGHT -from typing import Dict from typing import List from logging import getLogger @@ -18,11 +17,11 @@ from time import sleep from time import time -from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage -from neuro_san.service.main_loop.server_status import ServerStatus from neuro_san.service.watcher.interfaces.startable import Startable from neuro_san.service.watcher.interfaces.storage_updater import StorageUpdater from neuro_san.service.watcher.registries.registry_storage_updater import RegistryStorageUpdater +from neuro_san.service.utils.server_context import ServerContext +from neuro_san.service.utils.server_status import ServerStatus # pylint: disable=too-many-instance-attributes @@ -33,32 +32,29 @@ class StorageWatcher(Startable): and other changes to AgentNetworkStorage instances. """ def __init__(self, - network_storage_dict: Dict[str, AgentNetworkStorage], manifest_path: str, manifest_update_period_in_seconds: int, - server_status: ServerStatus): + server_context: ServerContext): """ Constructor. - :param network_storage_dict: A dictionary of string (descripting scope) to - AgentNetworkStorage instance which keeps all the AgentNetwork instances - of a particular grouping. :param manifest_path: file path to server manifest file :param manifest_update_period_in_seconds: update period in seconds - :param server_status: server status to register the state of updater + :param server_context: ServerContext for global-ish state """ - self.network_storage_dict: Dict[str, AgentNetworkStorage] = network_storage_dict self.logger: Logger = getLogger(self.__class__.__name__) self.updater_thread = Thread(target=self._run, daemon=True) + self.server_context: ServerContext = server_context self.storage_updaters: List[StorageUpdater] = [ - RegistryStorageUpdater(network_storage_dict, manifest_update_period_in_seconds, manifest_path), + RegistryStorageUpdater(self.server_context.get_network_storage_dict(), + manifest_update_period_in_seconds, + manifest_path), # We will eventually have more here ] self.update_period_in_seconds: int = self.compute_update_period_in_seconds(self.storage_updaters) - self.server_status: ServerStatus = server_status self.keep_running: bool = True @staticmethod @@ -106,7 +102,8 @@ def _run(self): sleep_for_seconds: float = self.update_period_in_seconds while self.keep_running: - self.server_status.updater.set_status(True) + server_status: ServerStatus = self.server_context.get_server_status() + server_status.updater.set_status(True) sleep(sleep_for_seconds)