-
Notifications
You must be signed in to change notification settings - Fork 29
UN-3310 Faster server startup. #290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
41aa184
33cd8eb
9f59b09
c948b80
58ea203
7968711
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
|
|
||
| # Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. | ||
| # All Rights Reserved. | ||
| # Issued under the Academic Public License. | ||
| # | ||
| # You can be released from the terms, and requirements of the Academic Public | ||
| # License by purchasing a commercial license. | ||
| # Purchase of a commercial license is mandatory for any use of the | ||
| # neuro-san SDK Software in commercial settings. | ||
| # | ||
| # END COPYRIGHT | ||
|
|
||
| from typing import Any | ||
| from typing import Dict | ||
| from threading import Lock | ||
| import copy | ||
|
|
||
| from leaf_server_common.server.request_logger import RequestLogger | ||
|
|
||
| from neuro_san.service.generic.agent_service import AgentService | ||
| from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
|
|
||
|
|
||
| class AgentServiceProvider: | ||
| """ | ||
| Class providing lazy construction of AgentService instance | ||
| with given constructor parameters. | ||
| """ | ||
|
|
||
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def __init__(self, | ||
| request_logger: RequestLogger, | ||
| security_cfg: Dict[str, Any], | ||
| agent_name: str, | ||
| agent_network_provider: AgentNetworkProvider, | ||
| server_logging: AgentServerLogging): | ||
| """ | ||
| Constructor. | ||
| :param request_logger: The instance of the RequestLogger that helps | ||
| keep track of stats | ||
| :param security_cfg: A dictionary of parameters used to | ||
| secure the TLS and the authentication of the gRPC | ||
| connection. Supplying this implies use of a secure | ||
| GRPC Channel. If None, uses insecure channel. | ||
| :param agent_name: The agent name for the service | ||
| :param agent_network_provider: The AgentNetworkProvider to use for the session. | ||
| :param server_logging: An AgentServerLogging instance initialized so that | ||
| spawned asynchronous threads can also properly initialize | ||
| their logging. | ||
| """ | ||
| self.request_logger = request_logger | ||
| self.security_cfg = copy.deepcopy(security_cfg) | ||
| self.server_logging: AgentServerLogging = server_logging | ||
| self.agent_network_provider: AgentNetworkProvider = agent_network_provider | ||
| self.agent_name: str = agent_name | ||
| self.lock: Lock = Lock() | ||
| self.service_instance: AgentService = None | ||
|
|
||
| def get_service(self) -> AgentService: | ||
| """ | ||
| Get service instance. | ||
| Create it if it is not instantiated yet. | ||
| """ | ||
| if self.service_instance is None: | ||
| with self.lock: | ||
| if self.service_instance is None: | ||
| self.service_instance = AgentService( | ||
| self.request_logger, | ||
| self.security_cfg, | ||
| self.agent_name, | ||
| self.agent_network_provider, | ||
| self.server_logging) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Standard instance creation pattern for multi-threaded environment. |
||
| return self.service_instance | ||
|
|
||
| def service_created(self) -> bool: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Since this is a boolean, perhaps a better name would be is_service_created() |
||
| """ | ||
| Return True if service instance has already been instantiated; | ||
| False otherwise | ||
| """ | ||
| return self.service_instance is not None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
|
|
||
| # Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. | ||
| # All Rights Reserved. | ||
| # Issued under the Academic Public License. | ||
| # | ||
| # You can be released from the terms, and requirements of the Academic Public | ||
| # License by purchasing a commercial license. | ||
| # Purchase of a commercial license is mandatory for any use of the | ||
| # neuro-san SDK Software in commercial settings. | ||
| # | ||
| # END COPYRIGHT | ||
|
|
||
| from typing import Any | ||
| from typing import Dict | ||
| from threading import Lock | ||
| import copy | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class more or less duplicates AgentServiceProvider logic, but for AsyncAgentService instances. |
||
| from neuro_san.service.generic.async_agent_service import AsyncAgentService | ||
| from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider | ||
| from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
|
|
||
|
|
||
| class AsyncAgentServiceProvider: | ||
| """ | ||
| Class providing lazy construction of AsyncAgentService instance | ||
| with given constructor parameters. | ||
| """ | ||
|
|
||
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def __init__(self, | ||
| request_logger: EventLoopLogger, | ||
| security_cfg: Dict[str, Any], | ||
| agent_name: str, | ||
| agent_network_provider: AgentNetworkProvider, | ||
| server_logging: AgentServerLogging): | ||
| """ | ||
| Constructor. | ||
| :param request_logger: The instance of the EventLoopLogger that helps | ||
| log information from running event loop | ||
| :param security_cfg: A dictionary of parameters used to | ||
| secure the TLS and the authentication of the gRPC | ||
| connection. Supplying this implies use of a secure | ||
| GRPC Channel. If None, uses insecure channel. | ||
| :param agent_name: The agent name for the service | ||
| :param agent_network_provider: The AgentNetworkProvider to use for the session. | ||
| :param server_logging: An AgentServerLogging instance initialized so that | ||
| spawned asynchronous threads can also properly initialize | ||
| their logging. | ||
| """ | ||
| self.request_logger = request_logger | ||
| self.security_cfg = copy.deepcopy(security_cfg) | ||
| self.server_logging: AgentServerLogging = server_logging | ||
| self.agent_network_provider: AgentNetworkProvider = agent_network_provider | ||
| self.agent_name: str = agent_name | ||
| self.lock: Lock = Lock() | ||
| self.service_instance: AsyncAgentService = None | ||
|
|
||
| def get_service(self) -> AsyncAgentService: | ||
| """ | ||
| Get service instance. | ||
| Create it if it is not instantiated yet. | ||
| """ | ||
| if self.service_instance is None: | ||
| with self.lock: | ||
| if self.service_instance is None: | ||
| self.service_instance = AsyncAgentService( | ||
| self.request_logger, | ||
| self.security_cfg, | ||
| self.agent_name, | ||
| self.agent_network_provider, | ||
| self.server_logging) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For next PR consideration: Since all of this is the same except for the type of service_instance, maybe there is a path to abstraction here. |
||
| return self.service_instance | ||
|
|
||
| def service_created(self) -> bool: | ||
| """ | ||
| Return True if service instance has already been instantiated; | ||
| False otherwise | ||
| """ | ||
| return self.service_instance is not None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| from neuro_san.api.grpc import agent_pb2_grpc | ||
| from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.generic.agent_service_provider import AgentServiceProvider | ||
| from neuro_san.service.generic.agent_service import AgentService | ||
|
|
||
|
|
||
|
|
@@ -60,18 +61,23 @@ def __init__(self, | |
| their logging. | ||
| """ | ||
| self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder() | ||
| self.service: AgentService =\ | ||
| AgentService(request_logger, | ||
| security_cfg, | ||
| agent_name, | ||
| agent_network_provider, | ||
| server_logging) | ||
| self.service_provider: AgentServiceProvider =\ | ||
| AgentServiceProvider( | ||
| request_logger, | ||
| security_cfg, | ||
| agent_name, | ||
| agent_network_provider, | ||
| server_logging) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create "lazy" service provider - instantiate actual AgentService only when needed. |
||
| def get_request_count(self) -> int: | ||
| """ | ||
| :return: The number of currently active requests | ||
| """ | ||
| return self.service.get_request_count() | ||
| if not self.service_provider.service_created(): | ||
| # Service is not yet instantiated - it has no requests | ||
| return 0 | ||
| service: AgentService = self.service_provider.get_service() | ||
| return service.get_request_count() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's perf optimization: if service instance has not been instantiated, then its request count is obviously 0. |
||
|
|
||
| # pylint: disable=no-member | ||
| def Function(self, request: service_messages.FunctionRequest, | ||
|
|
@@ -89,7 +95,9 @@ def Function(self, request: service_messages.FunctionRequest, | |
|
|
||
| # Get our args in order to pass to grpc-free session level | ||
| request_dict: Dict[str, Any] = MessageToDict(request) | ||
| response_dict: Dict[str, Any] = self.service.function(request_dict, request_metadata, context) | ||
| service: AgentService = self.service_provider.get_service() | ||
| response_dict: Dict[str, Any] =\ | ||
| service.function(request_dict, request_metadata, context) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and below - get AgentService from AgentServiceProvider for actual request processing. |
||
| # Convert the response dictionary to a grpc message | ||
| response_string = json.dumps(response_dict) | ||
|
|
@@ -113,7 +121,8 @@ def Connectivity(self, request: service_messages.ConnectivityRequest, | |
|
|
||
| # Get our args in order to pass to grpc-free session level | ||
| request_dict: Dict[str, Any] = MessageToDict(request) | ||
| response_dict: Dict[str, Any] = self.service.connectivity(request_dict, request_metadata, context) | ||
| service: AgentService = self.service_provider.get_service() | ||
| response_dict: Dict[str, Any] = service.connectivity(request_dict, request_metadata, context) | ||
|
|
||
| # Convert the response dictionary to a grpc message | ||
| response_string = json.dumps(response_dict) | ||
|
|
@@ -136,8 +145,9 @@ def StreamingChat(self, request: service_messages.ChatRequest, | |
|
|
||
| # Get our args in order to pass to grpc-free session level | ||
| request_dict: Dict[str, Any] = MessageToDict(request) | ||
| service: AgentService = self.service_provider.get_service() | ||
| response_dict_iterator: Iterator[Dict[str, Any]] =\ | ||
| self.service.streaming_chat(request_dict, request_metadata, context) | ||
| service.streaming_chat(request_dict, request_metadata, context) | ||
| for response_dict in response_dict_iterator: | ||
| # Convert the response dictionary to a grpc message | ||
| response_string = json.dumps(response_dict) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,8 @@ | |
| import tornado | ||
| from tornado.web import RequestHandler | ||
|
|
||
| from neuro_san.service.generic.async_agent_service import AsyncAgentService | ||
| from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.service.http.interfaces.agent_authorizer import AgentAuthorizer | ||
| from neuro_san.service.http.interfaces.agents_updater import AgentsUpdater | ||
|
|
@@ -97,6 +99,26 @@ def get_metadata(self) -> Dict[str, Any]: | |
| result[item_name] = "None" | ||
| return result | ||
|
|
||
| async def get_service(self, agent_name: str, metadata: Dict[str, Any]) -> AsyncAgentService: | ||
| """ | ||
| Get agent's AsyncAgentService for request execution | ||
| :param agent_name: agent name | ||
| :param metadata: metadata to be used for logging if necessary. | ||
| :return: instance of AsyncAgentService if it is defined for this agent | ||
| None otherwise | ||
| """ | ||
| update_done: bool = await self.update_agents(metadata=metadata) | ||
| if not update_done: | ||
| return None | ||
|
|
||
| service_provider: AsyncAgentServiceProvider = self.agent_policy.allow(agent_name) | ||
| if service_provider is None: | ||
| self.set_status(404) | ||
| self.logger.error(metadata, "error: Invalid request path %s", self.request.path) | ||
| self.do_finish() | ||
| return None | ||
| return service_provider.get_service() | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Factor out common logic for getting AsyncAgentService for request processing. |
||
| async def update_agents(self, metadata: Dict[str, Any]) -> bool: | ||
| """ | ||
| Update internal agents table. | ||
|
|
@@ -144,7 +166,7 @@ def prepare(self): | |
| self.do_finish() | ||
| return | ||
|
|
||
| self.logger.info(self.get_metadata(), f"[REQUEST RECEIVED] {self.request.method} {self.request.uri}") | ||
| self.logger.debug(self.get_metadata(), f"[REQUEST RECEIVED] {self.request.method} {self.request.uri}") | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Less chatty. |
||
| def do_finish(self): | ||
| """ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,15 +29,8 @@ async def get(self, agent_name: str): | |
| Implementation of GET request handler for "connectivity" API call. | ||
| """ | ||
| metadata: Dict[str, Any] = self.get_metadata() | ||
| update_done: bool = await self.update_agents(metadata=metadata) | ||
| if not update_done: | ||
| return | ||
|
|
||
| service: AsyncAgentService = self.agent_policy.allow(agent_name) | ||
| service: AsyncAgentService = await self.get_service(agent_name, metadata) | ||
| if service is None: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and below - use common function from base_request_handler module.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice that this consolidates all that other status/error/do_finish business. |
||
| self.set_status(404) | ||
| self.logger.error({}, "error: Invalid request path %s", self.request.path) | ||
| self.do_finish() | ||
| return | ||
|
|
||
| self.application.start_client_request(metadata, f"{agent_name}/connectivity") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,15 +29,8 @@ async def get(self, agent_name: str): | |
| Implementation of GET request handler for "function" API call. | ||
| """ | ||
| metadata: Dict[str, Any] = self.get_metadata() | ||
| update_done: bool = await self.update_agents(metadata=metadata) | ||
| if not update_done: | ||
| return | ||
|
|
||
| service: AsyncAgentService = self.agent_policy.allow(agent_name) | ||
| service: AsyncAgentService = await self.get_service(agent_name, metadata) | ||
| if service is None: | ||
| self.set_status(404) | ||
| self.logger.error({}, "error: Invalid request path %s", self.request.path) | ||
| self.do_finish() | ||
| return | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use common function from base_request_handler module. |
||
|
|
||
| self.application.start_client_request(metadata, f"{agent_name}/function") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,15 +58,8 @@ async def post(self, agent_name: str): | |
| """ | ||
|
|
||
| metadata: Dict[str, Any] = self.get_metadata() | ||
| update_done: bool = await self.update_agents(metadata=metadata) | ||
| if not update_done: | ||
| return | ||
|
|
||
| service: AsyncAgentService = self.agent_policy.allow(agent_name) | ||
| service: AsyncAgentService = await self.get_service(agent_name, metadata) | ||
| if service is None: | ||
| self.set_status(404) | ||
| self.logger.error({}, "error: Invalid request path %s", self.request.path) | ||
| self.do_finish() | ||
| return | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use common function from base_request_handler module. |
||
| self.application.start_client_request(metadata, f"{agent_name}/streaming_chat") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.generic.async_agent_service import AsyncAgentService | ||
| from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider | ||
| from neuro_san.service.http.handlers.health_check_handler import HealthCheckHandler | ||
| from neuro_san.service.http.handlers.connectivity_handler import ConnectivityHandler | ||
| from neuro_san.service.http.handlers.function_handler import FunctionHandler | ||
|
|
@@ -88,7 +88,7 @@ def __init__(self, start_event: threading.Event, | |
| self.logger = None | ||
| self.openapi_service_spec_path: str = openapi_service_spec_path | ||
| self.forwarded_request_metadata: List[str] = forwarded_request_metadata.split(" ") | ||
| self.allowed_agents: Dict[str, AsyncAgentService] = {} | ||
| self.allowed_agents: Dict[str, AsyncAgentServiceProvider] = {} | ||
| self.lock = None | ||
|
|
||
| def __call__(self, other_server: AgentServer): | ||
|
|
@@ -107,13 +107,13 @@ def __call__(self, other_server: AgentServer): | |
| "Timeout (%d sec) waiting for signal to HTTP server to start", | ||
| self.TIMEOUT_TO_START_SECONDS) | ||
|
|
||
| app.listen(self.http_port) | ||
| self.logger.info({}, "HTTP server is running on port %d", self.http_port) | ||
| self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) | ||
| # Construct initial "allowed" list of agents: | ||
| # no metadata to use here yet. | ||
| self.update_agents(metadata={}) | ||
| self.logger.debug({}, "Serving agents: %s", repr(self.allowed_agents.keys())) | ||
| app.listen(self.http_port) | ||
| self.logger.info({}, "HTTP server is running on port %d", self.http_port) | ||
| self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Start actually listening after all preparations are done. |
||
| IOLoop.current().start() | ||
| self.logger.info({}, "Http server stopped.") | ||
|
|
@@ -142,7 +142,7 @@ def make_app(self, requests_limit: int, logger: EventLoopLogger): | |
|
|
||
| return HttpServerApp(handlers, requests_limit, logger) | ||
|
|
||
| def allow(self, agent_name) -> AsyncAgentService: | ||
| def allow(self, agent_name) -> AsyncAgentServiceProvider: | ||
| return self.allowed_agents.get(agent_name, None) | ||
|
|
||
| def update_agents(self, metadata: Dict[str, Any]): | ||
|
|
@@ -181,9 +181,14 @@ def add_agent(self, agent_name: str): | |
| request_metadata_str: str = " ".join(self.forwarded_request_metadata) | ||
| agent_server_logging: AgentServerLogging = \ | ||
| AgentServerLogging(self.server_name_for_logs, request_metadata_str) | ||
| agent_service: AsyncAgentService = \ | ||
| AsyncAgentService(self.logger, None, agent_name, agent_network_provider, agent_server_logging) | ||
| self.allowed_agents[agent_name] = agent_service | ||
| agent_service_provider: AsyncAgentServiceProvider = \ | ||
| AsyncAgentServiceProvider( | ||
| self.logger, | ||
| None, | ||
| agent_name, | ||
| agent_network_provider, | ||
| agent_server_logging) | ||
| self.allowed_agents[agent_name] = agent_service_provider | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of AsyncAgentService instances, "allowed_agents" map now contains AsyncAgentServiceProvider instances for lazy instantiation of AsyncAgentService. |
||
| def remove_agent(self, agent_name: str): | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Save constructor parameters for later - when we will need to instantiate AgentService object on the first use.