diff --git a/neuro_san/service/generic/agent_service_provider.py b/neuro_san/service/generic/agent_service_provider.py new file mode 100644 index 000000000..7ded28e86 --- /dev/null +++ b/neuro_san/service/generic/agent_service_provider.py @@ -0,0 +1,81 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +from typing import Any +from typing import Dict +from threading import Lock +import copy + +from leaf_server_common.server.request_logger import RequestLogger + +from neuro_san.service.generic.agent_service import AgentService +from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider +from neuro_san.service.generic.agent_server_logging import AgentServerLogging + + +class AgentServiceProvider: + """ + Class providing lazy construction of AgentService instance + with given constructor parameters. + """ + + # pylint: disable=too-many-arguments,too-many-positional-arguments + def __init__(self, + request_logger: RequestLogger, + security_cfg: Dict[str, Any], + agent_name: str, + agent_network_provider: AgentNetworkProvider, + server_logging: AgentServerLogging): + """ + Constructor. + :param request_logger: The instance of the RequestLogger that helps + keep track of stats + :param security_cfg: A dictionary of parameters used to + secure the TLS and the authentication of the gRPC + connection. Supplying this implies use of a secure + GRPC Channel. If None, uses insecure channel. + :param agent_name: The agent name for the service + :param agent_network_provider: The AgentNetworkProvider to use for the session. + :param server_logging: An AgentServerLogging instance initialized so that + spawned asynchronous threads can also properly initialize + their logging. + """ + self.request_logger = request_logger + self.security_cfg = copy.deepcopy(security_cfg) + self.server_logging: AgentServerLogging = server_logging + self.agent_network_provider: AgentNetworkProvider = agent_network_provider + self.agent_name: str = agent_name + self.lock: Lock = Lock() + self.service_instance: AgentService = None + + def get_service(self) -> AgentService: + """ + Get service instance. + Create it if it is not instantiated yet. + """ + if self.service_instance is None: + with self.lock: + if self.service_instance is None: + self.service_instance = AgentService( + self.request_logger, + self.security_cfg, + self.agent_name, + self.agent_network_provider, + self.server_logging) + return self.service_instance + + def service_created(self) -> bool: + """ + Return True if service instance has already been instantiated; + False otherwise + """ + return self.service_instance is not None diff --git a/neuro_san/service/generic/async_agent_service_provider.py b/neuro_san/service/generic/async_agent_service_provider.py new file mode 100644 index 000000000..58374d54c --- /dev/null +++ b/neuro_san/service/generic/async_agent_service_provider.py @@ -0,0 +1,80 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +from typing import Any +from typing import Dict +from threading import Lock +import copy + +from neuro_san.service.generic.async_agent_service import AsyncAgentService +from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider +from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger +from neuro_san.service.generic.agent_server_logging import AgentServerLogging + + +class AsyncAgentServiceProvider: + """ + Class providing lazy construction of AsyncAgentService instance + with given constructor parameters. + """ + + # pylint: disable=too-many-arguments,too-many-positional-arguments + def __init__(self, + request_logger: EventLoopLogger, + security_cfg: Dict[str, Any], + agent_name: str, + agent_network_provider: AgentNetworkProvider, + server_logging: AgentServerLogging): + """ + Constructor. + :param request_logger: The instance of the EventLoopLogger that helps + log information from running event loop + :param security_cfg: A dictionary of parameters used to + secure the TLS and the authentication of the gRPC + connection. Supplying this implies use of a secure + GRPC Channel. If None, uses insecure channel. + :param agent_name: The agent name for the service + :param agent_network_provider: The AgentNetworkProvider to use for the session. + :param server_logging: An AgentServerLogging instance initialized so that + spawned asynchronous threads can also properly initialize + their logging. + """ + self.request_logger = request_logger + self.security_cfg = copy.deepcopy(security_cfg) + self.server_logging: AgentServerLogging = server_logging + self.agent_network_provider: AgentNetworkProvider = agent_network_provider + self.agent_name: str = agent_name + self.lock: Lock = Lock() + self.service_instance: AsyncAgentService = None + + def get_service(self) -> AsyncAgentService: + """ + Get service instance. + Create it if it is not instantiated yet. + """ + if self.service_instance is None: + with self.lock: + if self.service_instance is None: + self.service_instance = AsyncAgentService( + self.request_logger, + self.security_cfg, + self.agent_name, + self.agent_network_provider, + self.server_logging) + return self.service_instance + + def service_created(self) -> bool: + """ + Return True if service instance has already been instantiated; + False otherwise + """ + return self.service_instance is not None diff --git a/neuro_san/service/grpc/grpc_agent_service.py b/neuro_san/service/grpc/grpc_agent_service.py index 23b4da8fc..9d26ae730 100644 --- a/neuro_san/service/grpc/grpc_agent_service.py +++ b/neuro_san/service/grpc/grpc_agent_service.py @@ -27,6 +27,7 @@ from neuro_san.api.grpc import agent_pb2_grpc from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging +from neuro_san.service.generic.agent_service_provider import AgentServiceProvider from neuro_san.service.generic.agent_service import AgentService @@ -60,18 +61,23 @@ def __init__(self, their logging. """ self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder() - self.service: AgentService =\ - AgentService(request_logger, - security_cfg, - agent_name, - agent_network_provider, - server_logging) + self.service_provider: AgentServiceProvider =\ + AgentServiceProvider( + request_logger, + security_cfg, + agent_name, + agent_network_provider, + server_logging) def get_request_count(self) -> int: """ :return: The number of currently active requests """ - return self.service.get_request_count() + if not self.service_provider.service_created(): + # Service is not yet instantiated - it has no requests + return 0 + service: AgentService = self.service_provider.get_service() + return service.get_request_count() # pylint: disable=no-member def Function(self, request: service_messages.FunctionRequest, @@ -89,7 +95,9 @@ def Function(self, request: service_messages.FunctionRequest, # Get our args in order to pass to grpc-free session level request_dict: Dict[str, Any] = MessageToDict(request) - response_dict: Dict[str, Any] = self.service.function(request_dict, request_metadata, context) + service: AgentService = self.service_provider.get_service() + response_dict: Dict[str, Any] =\ + service.function(request_dict, request_metadata, context) # Convert the response dictionary to a grpc message response_string = json.dumps(response_dict) @@ -113,7 +121,8 @@ def Connectivity(self, request: service_messages.ConnectivityRequest, # Get our args in order to pass to grpc-free session level request_dict: Dict[str, Any] = MessageToDict(request) - response_dict: Dict[str, Any] = self.service.connectivity(request_dict, request_metadata, context) + service: AgentService = self.service_provider.get_service() + response_dict: Dict[str, Any] = service.connectivity(request_dict, request_metadata, context) # Convert the response dictionary to a grpc message response_string = json.dumps(response_dict) @@ -136,8 +145,9 @@ def StreamingChat(self, request: service_messages.ChatRequest, # Get our args in order to pass to grpc-free session level request_dict: Dict[str, Any] = MessageToDict(request) + service: AgentService = self.service_provider.get_service() response_dict_iterator: Iterator[Dict[str, Any]] =\ - self.service.streaming_chat(request_dict, request_metadata, context) + service.streaming_chat(request_dict, request_metadata, context) for response_dict in response_dict_iterator: # Convert the response dictionary to a grpc message response_string = json.dumps(response_dict) diff --git a/neuro_san/service/http/handlers/base_request_handler.py b/neuro_san/service/http/handlers/base_request_handler.py index 86b6fdfdd..8806a7d66 100644 --- a/neuro_san/service/http/handlers/base_request_handler.py +++ b/neuro_san/service/http/handlers/base_request_handler.py @@ -24,6 +24,8 @@ import tornado from tornado.web import RequestHandler +from neuro_san.service.generic.async_agent_service import AsyncAgentService +from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.service.http.interfaces.agent_authorizer import AgentAuthorizer from neuro_san.service.http.interfaces.agents_updater import AgentsUpdater @@ -97,6 +99,26 @@ def get_metadata(self) -> Dict[str, Any]: result[item_name] = "None" return result + async def get_service(self, agent_name: str, metadata: Dict[str, Any]) -> AsyncAgentService: + """ + Get agent's AsyncAgentService for request execution + :param agent_name: agent name + :param metadata: metadata to be used for logging if necessary. + :return: instance of AsyncAgentService if it is defined for this agent + None otherwise + """ + update_done: bool = await self.update_agents(metadata=metadata) + if not update_done: + return None + + service_provider: AsyncAgentServiceProvider = self.agent_policy.allow(agent_name) + if service_provider is None: + self.set_status(404) + self.logger.error(metadata, "error: Invalid request path %s", self.request.path) + self.do_finish() + return None + return service_provider.get_service() + async def update_agents(self, metadata: Dict[str, Any]) -> bool: """ Update internal agents table. @@ -144,7 +166,7 @@ def prepare(self): self.do_finish() return - self.logger.info(self.get_metadata(), f"[REQUEST RECEIVED] {self.request.method} {self.request.uri}") + self.logger.debug(self.get_metadata(), f"[REQUEST RECEIVED] {self.request.method} {self.request.uri}") def do_finish(self): """ diff --git a/neuro_san/service/http/handlers/connectivity_handler.py b/neuro_san/service/http/handlers/connectivity_handler.py index e86266662..cc277f2d2 100644 --- a/neuro_san/service/http/handlers/connectivity_handler.py +++ b/neuro_san/service/http/handlers/connectivity_handler.py @@ -29,15 +29,8 @@ async def get(self, agent_name: str): Implementation of GET request handler for "connectivity" API call. """ metadata: Dict[str, Any] = self.get_metadata() - update_done: bool = await self.update_agents(metadata=metadata) - if not update_done: - return - - service: AsyncAgentService = self.agent_policy.allow(agent_name) + service: AsyncAgentService = await self.get_service(agent_name, metadata) if service is None: - self.set_status(404) - self.logger.error({}, "error: Invalid request path %s", self.request.path) - self.do_finish() return self.application.start_client_request(metadata, f"{agent_name}/connectivity") diff --git a/neuro_san/service/http/handlers/function_handler.py b/neuro_san/service/http/handlers/function_handler.py index b4c763c9a..eb4102f44 100644 --- a/neuro_san/service/http/handlers/function_handler.py +++ b/neuro_san/service/http/handlers/function_handler.py @@ -29,15 +29,8 @@ async def get(self, agent_name: str): Implementation of GET request handler for "function" API call. """ metadata: Dict[str, Any] = self.get_metadata() - update_done: bool = await self.update_agents(metadata=metadata) - if not update_done: - return - - service: AsyncAgentService = self.agent_policy.allow(agent_name) + service: AsyncAgentService = await self.get_service(agent_name, metadata) if service is None: - self.set_status(404) - self.logger.error({}, "error: Invalid request path %s", self.request.path) - self.do_finish() return self.application.start_client_request(metadata, f"{agent_name}/function") diff --git a/neuro_san/service/http/handlers/streaming_chat_handler.py b/neuro_san/service/http/handlers/streaming_chat_handler.py index f41bf3cbb..d1636c12c 100644 --- a/neuro_san/service/http/handlers/streaming_chat_handler.py +++ b/neuro_san/service/http/handlers/streaming_chat_handler.py @@ -58,15 +58,8 @@ async def post(self, agent_name: str): """ metadata: Dict[str, Any] = self.get_metadata() - update_done: bool = await self.update_agents(metadata=metadata) - if not update_done: - return - - service: AsyncAgentService = self.agent_policy.allow(agent_name) + service: AsyncAgentService = await self.get_service(agent_name, metadata) if service is None: - self.set_status(404) - self.logger.error({}, "error: Invalid request path %s", self.request.path) - self.do_finish() return self.application.start_client_request(metadata, f"{agent_name}/streaming_chat") diff --git a/neuro_san/service/http/server/http_sidecar.py b/neuro_san/service/http/server/http_sidecar.py index dbc310deb..d6fba8cff 100644 --- a/neuro_san/service/http/server/http_sidecar.py +++ b/neuro_san/service/http/server/http_sidecar.py @@ -26,7 +26,7 @@ from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging -from neuro_san.service.generic.async_agent_service import AsyncAgentService +from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider from neuro_san.service.http.handlers.health_check_handler import HealthCheckHandler from neuro_san.service.http.handlers.connectivity_handler import ConnectivityHandler from neuro_san.service.http.handlers.function_handler import FunctionHandler @@ -88,7 +88,7 @@ def __init__(self, start_event: threading.Event, self.logger = None self.openapi_service_spec_path: str = openapi_service_spec_path self.forwarded_request_metadata: List[str] = forwarded_request_metadata.split(" ") - self.allowed_agents: Dict[str, AsyncAgentService] = {} + self.allowed_agents: Dict[str, AsyncAgentServiceProvider] = {} self.lock = None def __call__(self, other_server: AgentServer): @@ -107,13 +107,13 @@ def __call__(self, other_server: AgentServer): "Timeout (%d sec) waiting for signal to HTTP server to start", self.TIMEOUT_TO_START_SECONDS) - app.listen(self.http_port) - self.logger.info({}, "HTTP server is running on port %d", self.http_port) - self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) # Construct initial "allowed" list of agents: # no metadata to use here yet. self.update_agents(metadata={}) self.logger.debug({}, "Serving agents: %s", repr(self.allowed_agents.keys())) + app.listen(self.http_port) + self.logger.info({}, "HTTP server is running on port %d", self.http_port) + self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) IOLoop.current().start() self.logger.info({}, "Http server stopped.") @@ -142,7 +142,7 @@ def make_app(self, requests_limit: int, logger: EventLoopLogger): return HttpServerApp(handlers, requests_limit, logger) - def allow(self, agent_name) -> AsyncAgentService: + def allow(self, agent_name) -> AsyncAgentServiceProvider: return self.allowed_agents.get(agent_name, None) def update_agents(self, metadata: Dict[str, Any]): @@ -181,9 +181,14 @@ def add_agent(self, agent_name: str): request_metadata_str: str = " ".join(self.forwarded_request_metadata) agent_server_logging: AgentServerLogging = \ AgentServerLogging(self.server_name_for_logs, request_metadata_str) - agent_service: AsyncAgentService = \ - AsyncAgentService(self.logger, None, agent_name, agent_network_provider, agent_server_logging) - self.allowed_agents[agent_name] = agent_service + agent_service_provider: AsyncAgentServiceProvider = \ + AsyncAgentServiceProvider( + self.logger, + None, + agent_name, + agent_network_provider, + agent_server_logging) + self.allowed_agents[agent_name] = agent_service_provider def remove_agent(self, agent_name: str): """