Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions neuro_san/service/generic/agent_service_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI.
# All Rights Reserved.
# Issued under the Academic Public License.
#
# You can be released from the terms, and requirements of the Academic Public
# License by purchasing a commercial license.
# Purchase of a commercial license is mandatory for any use of the
# neuro-san SDK Software in commercial settings.
#
# END COPYRIGHT

from typing import Any
from typing import Dict
from threading import Lock
import copy

from leaf_server_common.server.request_logger import RequestLogger

from neuro_san.service.generic.agent_service import AgentService
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging


class AgentServiceProvider:
"""
Class providing lazy construction of AgentService instance
with given constructor parameters.
"""

# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(self,
request_logger: RequestLogger,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
"""
Constructor.
:param request_logger: The instance of the RequestLogger that helps
keep track of stats
:param security_cfg: A dictionary of parameters used to
secure the TLS and the authentication of the gRPC
connection. Supplying this implies use of a secure
GRPC Channel. If None, uses insecure channel.
:param agent_name: The agent name for the service
:param agent_network_provider: The AgentNetworkProvider to use for the session.
:param server_logging: An AgentServerLogging instance initialized so that
spawned asynchronous threads can also properly initialize
their logging.
"""
self.request_logger = request_logger
self.security_cfg = copy.deepcopy(security_cfg)
self.server_logging: AgentServerLogging = server_logging
self.agent_network_provider: AgentNetworkProvider = agent_network_provider
self.agent_name: str = agent_name
self.lock: Lock = Lock()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save constructor parameters for later - when we will need to instantiate AgentService object on the first use.

self.service_instance: AgentService = None

def get_service(self) -> AgentService:
"""
Get service instance.
Create it if it is not instantiated yet.
"""
if self.service_instance is None:
with self.lock:
if self.service_instance is None:
self.service_instance = AgentService(
self.request_logger,
self.security_cfg,
self.agent_name,
self.agent_network_provider,
self.server_logging)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standard instance creation pattern for multi-threaded environment.

return self.service_instance

def service_created(self) -> bool:
Copy link
Collaborator

@d1donlydfink d1donlydfink Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since this is a boolean, perhaps a better name would be is_service_created()
Here and below in the async version.

"""
Return True if service instance has already been instantiated;
False otherwise
"""
return self.service_instance is not None
80 changes: 80 additions & 0 deletions neuro_san/service/generic/async_agent_service_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI.
# All Rights Reserved.
# Issued under the Academic Public License.
#
# You can be released from the terms, and requirements of the Academic Public
# License by purchasing a commercial license.
# Purchase of a commercial license is mandatory for any use of the
# neuro-san SDK Software in commercial settings.
#
# END COPYRIGHT

from typing import Any
from typing import Dict
from threading import Lock
import copy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class more or less duplicates AgentServiceProvider logic, but for AsyncAgentService instances.
Maybe we could cook up something more generic and elegant Pythonic style for both classes,
but not in this PR.

from neuro_san.service.generic.async_agent_service import AsyncAgentService
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.service.interfaces.event_loop_logger import EventLoopLogger
from neuro_san.service.generic.agent_server_logging import AgentServerLogging


class AsyncAgentServiceProvider:
"""
Class providing lazy construction of AsyncAgentService instance
with given constructor parameters.
"""

# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(self,
request_logger: EventLoopLogger,
security_cfg: Dict[str, Any],
agent_name: str,
agent_network_provider: AgentNetworkProvider,
server_logging: AgentServerLogging):
"""
Constructor.
:param request_logger: The instance of the EventLoopLogger that helps
log information from running event loop
:param security_cfg: A dictionary of parameters used to
secure the TLS and the authentication of the gRPC
connection. Supplying this implies use of a secure
GRPC Channel. If None, uses insecure channel.
:param agent_name: The agent name for the service
:param agent_network_provider: The AgentNetworkProvider to use for the session.
:param server_logging: An AgentServerLogging instance initialized so that
spawned asynchronous threads can also properly initialize
their logging.
"""
self.request_logger = request_logger
self.security_cfg = copy.deepcopy(security_cfg)
self.server_logging: AgentServerLogging = server_logging
self.agent_network_provider: AgentNetworkProvider = agent_network_provider
self.agent_name: str = agent_name
self.lock: Lock = Lock()
self.service_instance: AsyncAgentService = None

def get_service(self) -> AsyncAgentService:
"""
Get service instance.
Create it if it is not instantiated yet.
"""
if self.service_instance is None:
with self.lock:
if self.service_instance is None:
self.service_instance = AsyncAgentService(
self.request_logger,
self.security_cfg,
self.agent_name,
self.agent_network_provider,
self.server_logging)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For next PR consideration: Since all of this is the same except for the type of service_instance, maybe there is a path to abstraction here.
However well functioning all the async stuff is, someday Python will clean up this copy-paste cancer on the language.

return self.service_instance

def service_created(self) -> bool:
"""
Return True if service instance has already been instantiated;
False otherwise
"""
return self.service_instance is not None
30 changes: 20 additions & 10 deletions neuro_san/service/grpc/grpc_agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from neuro_san.api.grpc import agent_pb2_grpc
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.agent_service_provider import AgentServiceProvider
from neuro_san.service.generic.agent_service import AgentService


Expand Down Expand Up @@ -60,18 +61,23 @@ def __init__(self,
their logging.
"""
self.forwarder: GrpcMetadataForwarder = server_logging.get_forwarder()
self.service: AgentService =\
AgentService(request_logger,
security_cfg,
agent_name,
agent_network_provider,
server_logging)
self.service_provider: AgentServiceProvider =\
AgentServiceProvider(
request_logger,
security_cfg,
agent_name,
agent_network_provider,
server_logging)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create "lazy" service provider - instantiate actual AgentService only when needed.

def get_request_count(self) -> int:
"""
:return: The number of currently active requests
"""
return self.service.get_request_count()
if not self.service_provider.service_created():
# Service is not yet instantiated - it has no requests
return 0
service: AgentService = self.service_provider.get_service()
return service.get_request_count()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's perf optimization: if service instance has not been instantiated, then its request count is obviously 0.
Otherwise with current gRPC server logic, we'll always instantiate all agent services,
even when not using gRPC service at all.


# pylint: disable=no-member
def Function(self, request: service_messages.FunctionRequest,
Expand All @@ -89,7 +95,9 @@ def Function(self, request: service_messages.FunctionRequest,

# Get our args in order to pass to grpc-free session level
request_dict: Dict[str, Any] = MessageToDict(request)
response_dict: Dict[str, Any] = self.service.function(request_dict, request_metadata, context)
service: AgentService = self.service_provider.get_service()
response_dict: Dict[str, Any] =\
service.function(request_dict, request_metadata, context)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below - get AgentService from AgentServiceProvider for actual request processing.

# Convert the response dictionary to a grpc message
response_string = json.dumps(response_dict)
Expand All @@ -113,7 +121,8 @@ def Connectivity(self, request: service_messages.ConnectivityRequest,

# Get our args in order to pass to grpc-free session level
request_dict: Dict[str, Any] = MessageToDict(request)
response_dict: Dict[str, Any] = self.service.connectivity(request_dict, request_metadata, context)
service: AgentService = self.service_provider.get_service()
response_dict: Dict[str, Any] = service.connectivity(request_dict, request_metadata, context)

# Convert the response dictionary to a grpc message
response_string = json.dumps(response_dict)
Expand All @@ -136,8 +145,9 @@ def StreamingChat(self, request: service_messages.ChatRequest,

# Get our args in order to pass to grpc-free session level
request_dict: Dict[str, Any] = MessageToDict(request)
service: AgentService = self.service_provider.get_service()
response_dict_iterator: Iterator[Dict[str, Any]] =\
self.service.streaming_chat(request_dict, request_metadata, context)
service.streaming_chat(request_dict, request_metadata, context)
for response_dict in response_dict_iterator:
# Convert the response dictionary to a grpc message
response_string = json.dumps(response_dict)
Expand Down
24 changes: 23 additions & 1 deletion neuro_san/service/http/handlers/base_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import tornado
from tornado.web import RequestHandler

from neuro_san.service.generic.async_agent_service import AsyncAgentService
from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.http.interfaces.agent_authorizer import AgentAuthorizer
from neuro_san.service.http.interfaces.agents_updater import AgentsUpdater
Expand Down Expand Up @@ -97,6 +99,26 @@ def get_metadata(self) -> Dict[str, Any]:
result[item_name] = "None"
return result

async def get_service(self, agent_name: str, metadata: Dict[str, Any]) -> AsyncAgentService:
"""
Get agent's AsyncAgentService for request execution
:param agent_name: agent name
:param metadata: metadata to be used for logging if necessary.
:return: instance of AsyncAgentService if it is defined for this agent
None otherwise
"""
update_done: bool = await self.update_agents(metadata=metadata)
if not update_done:
return None

service_provider: AsyncAgentServiceProvider = self.agent_policy.allow(agent_name)
if service_provider is None:
self.set_status(404)
self.logger.error(metadata, "error: Invalid request path %s", self.request.path)
self.do_finish()
return None
return service_provider.get_service()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Factor out common logic for getting AsyncAgentService for request processing.
Will be used in all (3) neuro-san API requests.

async def update_agents(self, metadata: Dict[str, Any]) -> bool:
"""
Update internal agents table.
Expand Down Expand Up @@ -144,7 +166,7 @@ def prepare(self):
self.do_finish()
return

self.logger.info(self.get_metadata(), f"[REQUEST RECEIVED] {self.request.method} {self.request.uri}")
self.logger.debug(self.get_metadata(), f"[REQUEST RECEIVED] {self.request.method} {self.request.uri}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less chatty.

def do_finish(self):
"""
Expand Down
9 changes: 1 addition & 8 deletions neuro_san/service/http/handlers/connectivity_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,8 @@ async def get(self, agent_name: str):
Implementation of GET request handler for "connectivity" API call.
"""
metadata: Dict[str, Any] = self.get_metadata()
update_done: bool = await self.update_agents(metadata=metadata)
if not update_done:
return

service: AsyncAgentService = self.agent_policy.allow(agent_name)
service: AsyncAgentService = await self.get_service(agent_name, metadata)
if service is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below - use common function from base_request_handler module.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that this consolidates all that other status/error/do_finish business.

self.set_status(404)
self.logger.error({}, "error: Invalid request path %s", self.request.path)
self.do_finish()
return

self.application.start_client_request(metadata, f"{agent_name}/connectivity")
Expand Down
9 changes: 1 addition & 8 deletions neuro_san/service/http/handlers/function_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,8 @@ async def get(self, agent_name: str):
Implementation of GET request handler for "function" API call.
"""
metadata: Dict[str, Any] = self.get_metadata()
update_done: bool = await self.update_agents(metadata=metadata)
if not update_done:
return

service: AsyncAgentService = self.agent_policy.allow(agent_name)
service: AsyncAgentService = await self.get_service(agent_name, metadata)
if service is None:
self.set_status(404)
self.logger.error({}, "error: Invalid request path %s", self.request.path)
self.do_finish()
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use common function from base_request_handler module.


self.application.start_client_request(metadata, f"{agent_name}/function")
Expand Down
9 changes: 1 addition & 8 deletions neuro_san/service/http/handlers/streaming_chat_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,8 @@ async def post(self, agent_name: str):
"""

metadata: Dict[str, Any] = self.get_metadata()
update_done: bool = await self.update_agents(metadata=metadata)
if not update_done:
return

service: AsyncAgentService = self.agent_policy.allow(agent_name)
service: AsyncAgentService = await self.get_service(agent_name, metadata)
if service is None:
self.set_status(404)
self.logger.error({}, "error: Invalid request path %s", self.request.path)
self.do_finish()
return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use common function from base_request_handler module.

self.application.start_client_request(metadata, f"{agent_name}/streaming_chat")
Expand Down
23 changes: 14 additions & 9 deletions neuro_san/service/http/server/http_sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.async_agent_service import AsyncAgentService
from neuro_san.service.generic.async_agent_service_provider import AsyncAgentServiceProvider
from neuro_san.service.http.handlers.health_check_handler import HealthCheckHandler
from neuro_san.service.http.handlers.connectivity_handler import ConnectivityHandler
from neuro_san.service.http.handlers.function_handler import FunctionHandler
Expand Down Expand Up @@ -88,7 +88,7 @@ def __init__(self, start_event: threading.Event,
self.logger = None
self.openapi_service_spec_path: str = openapi_service_spec_path
self.forwarded_request_metadata: List[str] = forwarded_request_metadata.split(" ")
self.allowed_agents: Dict[str, AsyncAgentService] = {}
self.allowed_agents: Dict[str, AsyncAgentServiceProvider] = {}
self.lock = None

def __call__(self, other_server: AgentServer):
Expand All @@ -107,13 +107,13 @@ def __call__(self, other_server: AgentServer):
"Timeout (%d sec) waiting for signal to HTTP server to start",
self.TIMEOUT_TO_START_SECONDS)

app.listen(self.http_port)
self.logger.info({}, "HTTP server is running on port %d", self.http_port)
self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit)
# Construct initial "allowed" list of agents:
# no metadata to use here yet.
self.update_agents(metadata={})
self.logger.debug({}, "Serving agents: %s", repr(self.allowed_agents.keys()))
app.listen(self.http_port)
self.logger.info({}, "HTTP server is running on port %d", self.http_port)
self.logger.info({}, "HTTP server is shutting down after %d requests", self.requests_limit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start actually listening after all preparations are done.
Log server start at this point.

IOLoop.current().start()
self.logger.info({}, "Http server stopped.")
Expand Down Expand Up @@ -142,7 +142,7 @@ def make_app(self, requests_limit: int, logger: EventLoopLogger):

return HttpServerApp(handlers, requests_limit, logger)

def allow(self, agent_name) -> AsyncAgentService:
def allow(self, agent_name) -> AsyncAgentServiceProvider:
return self.allowed_agents.get(agent_name, None)

def update_agents(self, metadata: Dict[str, Any]):
Expand Down Expand Up @@ -181,9 +181,14 @@ def add_agent(self, agent_name: str):
request_metadata_str: str = " ".join(self.forwarded_request_metadata)
agent_server_logging: AgentServerLogging = \
AgentServerLogging(self.server_name_for_logs, request_metadata_str)
agent_service: AsyncAgentService = \
AsyncAgentService(self.logger, None, agent_name, agent_network_provider, agent_server_logging)
self.allowed_agents[agent_name] = agent_service
agent_service_provider: AsyncAgentServiceProvider = \
AsyncAgentServiceProvider(
self.logger,
None,
agent_name,
agent_network_provider,
agent_server_logging)
self.allowed_agents[agent_name] = agent_service_provider

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of AsyncAgentService instances, "allowed_agents" map now contains AsyncAgentServiceProvider instances for lazy instantiation of AsyncAgentService.

def remove_agent(self, agent_name: str):
"""
Expand Down