diff --git a/neuro_san/client/direct_agent_session_factory.py b/neuro_san/client/direct_agent_session_factory.py index f60c96b09..881dd161a 100644 --- a/neuro_san/client/direct_agent_session_factory.py +++ b/neuro_san/client/direct_agent_session_factory.py @@ -23,7 +23,7 @@ from neuro_san.internals.graph.persistence.agent_network_restorer import AgentNetworkRestorer from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider -from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.session.direct_agent_session import DirectAgentSession from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory from neuro_san.session.missing_agent_check import MissingAgentCheck @@ -35,7 +35,7 @@ class DirectAgentSessionFactory: Sets up everything needed to use a DirectAgentSession more as a library. This includes: * Some reading of AgentNetworks - * Setting up ServiceAgentNetworkStorage with agent networks + * Setting up AgentNetworkStorage with agent networks which were read in * Initializing an LlmFactory """ @@ -46,9 +46,9 @@ def __init__(self): """ manifest_restorer = RegistryManifestRestorer() self.manifest_networks: Dict[str, AgentNetwork] = manifest_restorer.restore() - network_storage: ServiceAgentNetworkStorage = ServiceAgentNetworkStorage.get_instance() + self.network_storage = AgentNetworkStorage() for agent_name, agent_network in self.manifest_networks.items(): - network_storage.add_agent_network(agent_name, agent_network) + self.network_storage.add_agent_network(agent_name, agent_network) def create_session(self, agent_name: str, use_direct: bool = False, metadata: Dict[str, str] = None, umbrella_timeout: Timeout = None) -> AgentSession: @@ -73,7 +73,7 @@ def create_session(self, agent_name: str, use_direct: bool = False, llm_factory.load() toolbox_factory.load() - factory = ExternalAgentSessionFactory(use_direct=use_direct) + factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage) invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata) invocation_context.start() session: DirectAgentSession = DirectAgentSession(agent_network=agent_network, @@ -100,10 +100,8 @@ def get_agent_network(self, agent_name: str) -> AgentNetwork: agent_network = restorer.restore(file_reference=agent_name) else: # Use the standard stuff available via the manifest file. - network_storage: ServiceAgentNetworkStorage =\ - ServiceAgentNetworkStorage.get_instance() agent_network_provider: AgentNetworkProvider =\ - network_storage.get_agent_network_provider(agent_name) + self.network_storage.get_agent_network_provider(agent_name) agent_network = agent_network_provider.get_agent_network() # Common place for nice error messages when networks are not found diff --git a/neuro_san/internals/network_providers/service_agent_network_storage.py b/neuro_san/internals/network_providers/agent_network_storage.py similarity index 89% rename from neuro_san/internals/network_providers/service_agent_network_storage.py rename to neuro_san/internals/network_providers/agent_network_storage.py index bed527ac4..7e797f31d 100644 --- a/neuro_san/internals/network_providers/service_agent_network_storage.py +++ b/neuro_san/internals/network_providers/agent_network_storage.py @@ -20,32 +20,20 @@ from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider -class ServiceAgentNetworkStorage: +class AgentNetworkStorage: """ - Service-wide storage for AgentNetworkProviders. - This class is a global singleton containing + Service-wide storage for AgentNetworkProviders containing a table of currently active AgentNetworks for each agent registered to the service. Note: a mapping from an agent to its AgentNetwork is dynamic, - as we can change agents definitions at service run-time. + as it is possible to change agents definitions at service run-time. """ - instance = None - def __init__(self): self.agents_table: Dict[str, AgentNetwork] = {} self.logger = logging.getLogger(self.__class__.__name__) self.lock = threading.Lock() self.listeners: List[AgentStateListener] = [] - @classmethod - def get_instance(cls): - """ - Get a singleton instance of this class - """ - if not ServiceAgentNetworkStorage.instance: - ServiceAgentNetworkStorage.instance = ServiceAgentNetworkStorage() - return ServiceAgentNetworkStorage.instance - def add_listener(self, listener: AgentStateListener): """ Add a state listener to be notified when status of service agents changes. diff --git a/neuro_san/service/grpc/concierge_service.py b/neuro_san/service/grpc/concierge_service.py index 9e83328dd..c1ef58a66 100644 --- a/neuro_san/service/grpc/concierge_service.py +++ b/neuro_san/service/grpc/concierge_service.py @@ -28,6 +28,7 @@ from neuro_san.api.grpc import concierge_pb2 as concierge_messages from neuro_san.api.grpc import concierge_pb2_grpc +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.session.direct_concierge_session import DirectConciergeSession @@ -40,7 +41,8 @@ class ConciergeService(concierge_pb2_grpc.ConciergeServiceServicer): def __init__(self, request_logger: RequestLogger, security_cfg: Dict[str, Any], - server_logging: AgentServerLogging): + server_logging: AgentServerLogging, + network_storage: AgentNetworkStorage): """ Set the gRPC interface up for health checking so that the service will be opened to callers when the mesh sees it operational, if this @@ -55,11 +57,14 @@ def __init__(self, :param server_logging: An AgentServerLogging instance initialized so that spawned asynchronous threads can also properly initialize their logging. + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. """ self.request_logger = request_logger self.security_cfg = security_cfg self.server_logging: AgentServerLogging = server_logging self.forwarder: GrpcMetadataForwarder = self.server_logging.get_forwarder() + self.network_storage: AgentNetworkStorage = network_storage # pylint: disable=no-member def List(self, request: concierge_messages.ConciergeRequest, @@ -90,7 +95,8 @@ def List(self, request: concierge_messages.ConciergeRequest, request_dict: Dict[str, Any] = MessageToDict(request) # Delegate to Direct*Session - session = DirectConciergeSession(metadata=metadata, + session = DirectConciergeSession(network_storage=self.network_storage, + metadata=metadata, security_cfg=self.security_cfg) response_dict = session.list(request_dict) diff --git a/neuro_san/service/grpc/grpc_agent_server.py b/neuro_san/service/grpc/grpc_agent_server.py index af1d5ef47..a0608d281 100644 --- a/neuro_san/service/grpc/grpc_agent_server.py +++ b/neuro_san/service/grpc/grpc_agent_server.py @@ -23,7 +23,7 @@ from neuro_san.api.grpc import concierge_pb2_grpc from neuro_san.internals.graph.registry.agent_network import AgentNetwork -from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer from neuro_san.service.grpc.concierge_service import ConciergeService @@ -51,6 +51,7 @@ class GrpcAgentServer(AgentServer): # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, port: int, server_loop_callbacks: ServerLoopCallbacks, + network_storage: AgentNetworkStorage, agent_networks: Dict[str, AgentNetwork], server_name: str = DEFAULT_SERVER_NAME, server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS, @@ -64,6 +65,8 @@ def __init__(self, port: int, :param server_loop_callbacks: The ServerLoopCallbacks instance for break out methods in main serving loop. :param agent_networks: A dictionary of agent name to AgentNetwork to use for the session. + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. :param server_name: The name of the service :param server_name_for_logs: The name of the service for log files :param max_concurrent_requests: The maximum number of requests to handle at a time. @@ -81,6 +84,7 @@ def __init__(self, port: int, self.logger = logging.getLogger(__name__) + self.network_storage: AgentNetworkStorage = network_storage self.agent_networks: Dict[str, AgentNetwork] = agent_networks self.server_name: str = server_name self.server_name_for_logs: str = server_name_for_logs @@ -107,25 +111,14 @@ def get_starting_event(self) -> threading.Event: """ return self.notify_started - def setup_agent_network_storage(self): - """ - Initialize ServiceAgentNetworkStorage with AgentNetworks - we have parsed in server manifest file. - """ - network_storage: ServiceAgentNetworkStorage =\ - ServiceAgentNetworkStorage.get_instance() - network_storage.setup_agent_networks(self.agent_networks) - def agent_added(self, agent_name: str): """ Agent is being added to the service. :param agent_name: name of an agent """ - network_storage: ServiceAgentNetworkStorage =\ - ServiceAgentNetworkStorage.get_instance() service = GrpcAgentService(self.server_lifetime, self.security_cfg, agent_name, - network_storage.get_agent_network_provider(agent_name), + self.network_storage.get_agent_network_provider(agent_name), self.server_logging) self.services.append(service) servicer_to_server = AgentServicerToServer(service) @@ -172,13 +165,12 @@ def serve(self): # New-style service self.security_cfg = None # ... yet - network_storage: ServiceAgentNetworkStorage = \ - ServiceAgentNetworkStorage.get_instance() # Add listener to handle adding per-agent gRPC services # to our dynamic router: - network_storage.add_listener(self) + self.network_storage.add_listener(self) - self.setup_agent_network_storage() + # DEF - It's possible this could move outside this class. + self.network_storage.setup_agent_networks(self.agent_networks) # Add DynamicAgentRouter instance as a generic RPC handler for our server: server.add_generic_rpc_handlers((self.service_router,)) @@ -186,7 +178,8 @@ def serve(self): concierge_service: ConciergeService = \ ConciergeService(self.server_lifetime, self.security_cfg, - self.server_logging) + self.server_logging, + self.network_storage) concierge_pb2_grpc.add_ConciergeServiceServicer_to_server( concierge_service, server) diff --git a/neuro_san/service/http/handlers/base_request_handler.py b/neuro_san/service/http/handlers/base_request_handler.py index fcb6414b1..86b6fdfdd 100644 --- a/neuro_san/service/http/handlers/base_request_handler.py +++ b/neuro_san/service/http/handlers/base_request_handler.py @@ -24,6 +24,7 @@ import tornado from tornado.web import RequestHandler +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.service.http.interfaces.agent_authorizer import AgentAuthorizer from neuro_san.service.http.interfaces.agents_updater import AgentsUpdater from neuro_san.service.http.logging.http_logger import HttpLogger @@ -38,11 +39,14 @@ class BaseRequestHandler(RequestHandler): request_id: int = 0 # pylint: disable=attribute-defined-outside-init + # pylint: disable=too-many-arguments + # pylint: disable=too-many-positional-arguments def initialize(self, agent_policy: AgentAuthorizer, agents_updater: AgentsUpdater, forwarded_request_metadata: List[str], - openapi_service_spec_path: str): + openapi_service_spec_path: str, + network_storage: AgentNetworkStorage): """ This method is called by Tornado framework to allow injecting service-specific data into local handler context. @@ -51,6 +55,8 @@ def initialize(self, collection of agents being served :param forwarded_request_metadata: request metadata to forward. :param openapi_service_spec_path: file path to OpenAPI service spec. + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. """ self.agent_policy = agent_policy @@ -58,6 +64,8 @@ def initialize(self, self.forwarded_request_metadata: List[str] = forwarded_request_metadata self.openapi_service_spec_path: str = openapi_service_spec_path self.logger = HttpLogger(forwarded_request_metadata) + self.network_storage: AgentNetworkStorage = network_storage + # Set default request_id for this request handler in case we will need it: BaseRequestHandler.request_id += 1 diff --git a/neuro_san/service/http/handlers/concierge_handler.py b/neuro_san/service/http/handlers/concierge_handler.py index fd964986c..d52ad3fea 100644 --- a/neuro_san/service/http/handlers/concierge_handler.py +++ b/neuro_san/service/http/handlers/concierge_handler.py @@ -33,7 +33,7 @@ def get(self): self.application.start_client_request(metadata, "/api/v1/list") try: data: Dict[str, Any] = {} - session: ConciergeSession = DirectConciergeSession(metadata=metadata) + session: ConciergeSession = DirectConciergeSession(self.network_storage, metadata=metadata) result_dict: Dict[str, Any] = session.list(data) # Return response to the HTTP client diff --git a/neuro_san/service/http/server/http_sidecar.py b/neuro_san/service/http/server/http_sidecar.py index 040082620..dbc310deb 100644 --- a/neuro_san/service/http/server/http_sidecar.py +++ b/neuro_san/service/http/server/http_sidecar.py @@ -23,7 +23,7 @@ from tornado.ioloop import IOLoop from neuro_san.interfaces.concierge_session import ConciergeSession -from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider from neuro_san.service.generic.agent_server_logging import AgentServerLogging from neuro_san.service.generic.async_agent_service import AsyncAgentService @@ -55,6 +55,7 @@ def __init__(self, start_event: threading.Event, http_port: int, openapi_service_spec_path: str, requests_limit: int, + network_storage: AgentNetworkStorage, forwarded_request_metadata: str = AgentServer.DEFAULT_FORWARDED_REQUEST_METADATA): """ Constructor: @@ -64,12 +65,15 @@ def __init__(self, start_event: threading.Event, :param request_limit: The number of requests to service before shutting down. This is useful to be sure production environments can handle a service occasionally going down. + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. :param forwarded_request_metadata: A space-delimited list of http metadata request keys to forward to logs/other requests """ self.server_name_for_logs: str = "Http Server" self.start_event: threading.Event = start_event self.http_port = http_port + self.network_storage: AgentNetworkStorage = network_storage # Randomize requests limit for this server instance. # Lower and upper bounds for number of requests before shutting down @@ -148,7 +152,8 @@ def update_agents(self, metadata: Dict[str, Any]): :return: nothing """ data: Dict[str, Any] = {} - session: ConciergeSession = DirectConciergeSession(metadata=metadata) + # Why do we need the Concierge if we already have access to network_storage? + session: ConciergeSession = DirectConciergeSession(self.network_storage, metadata=metadata) agents_dict: Dict[str, List[Dict[str, str]]] = session.list(data) agents_list: List[Dict[str, str]] = agents_dict["agents"] agents: List[str] = [] @@ -171,7 +176,7 @@ def add_agent(self, agent_name: str): :param agent_name: name of an agent """ agent_network_provider: SingleAgentNetworkProvider = \ - ServiceAgentNetworkStorage.get_instance().get_agent_network_provider(agent_name) + self.network_storage.get_agent_network_provider(agent_name) # Convert back to a single string as required by constructor request_metadata_str: str = " ".join(self.forwarded_request_metadata) agent_server_logging: AgentServerLogging = \ @@ -196,5 +201,6 @@ def build_request_data(self) -> Dict[str, Any]: "agent_policy": self, "agents_updater": self, "forwarded_request_metadata": self.forwarded_request_metadata, - "openapi_service_spec_path": self.openapi_service_spec_path + "openapi_service_spec_path": self.openapi_service_spec_path, + "network_storage": self.network_storage } diff --git a/neuro_san/service/main_loop/server_main_loop.py b/neuro_san/service/main_loop/server_main_loop.py index 546f6624b..df0e6d6c6 100644 --- a/neuro_san/service/main_loop/server_main_loop.py +++ b/neuro_san/service/main_loop/server_main_loop.py @@ -25,6 +25,7 @@ from neuro_san.interfaces.agent_session import AgentSession from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer from neuro_san.internals.graph.registry.agent_network import AgentNetwork +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.internals.utils.file_of_class import FileOfClass from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS @@ -61,6 +62,7 @@ def __init__(self): self.manifest_update_period_seconds: int = 0 self.server: GrpcAgentServer = None self.manifest_files: List[str] = [] + self.network_storage = AgentNetworkStorage() def parse_args(self): """ @@ -154,6 +156,7 @@ def main_loop(self): self.server = GrpcAgentServer(self.port, server_loop_callbacks=self, + network_storage=self.network_storage, agent_networks=self.agent_networks, server_name=self.server_name, server_name_for_logs=self.server_name_for_logs, @@ -164,7 +167,7 @@ def main_loop(self): if self.manifest_update_period_seconds > 0: manifest_file: str = self.manifest_files[0] updater: ManifestPeriodicUpdater =\ - ManifestPeriodicUpdater(manifest_file, self.manifest_update_period_seconds) + ManifestPeriodicUpdater(self.network_storage, manifest_file, self.manifest_update_period_seconds) updater.start() # Start HTTP server side-car: @@ -173,6 +176,7 @@ def main_loop(self): self.http_port, self.service_openapi_spec_file, self.request_limit, + self.network_storage, forwarded_request_metadata=metadata_str) http_server_thread = threading.Thread(target=http_sidecar, args=(self.server,), daemon=True) http_server_thread.start() diff --git a/neuro_san/service/registries_watcher/periodic_updater/manifest_periodic_updater.py b/neuro_san/service/registries_watcher/periodic_updater/manifest_periodic_updater.py index a96f0db2a..cf8d2eb68 100644 --- a/neuro_san/service/registries_watcher/periodic_updater/manifest_periodic_updater.py +++ b/neuro_san/service/registries_watcher/periodic_updater/manifest_periodic_updater.py @@ -16,7 +16,7 @@ from neuro_san.internals.graph.registry.agent_network import AgentNetwork from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer -from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.service.registries_watcher.periodic_updater.registry_event_observer import RegistryEventObserver from neuro_san.service.registries_watcher.periodic_updater.registry_polling_observer import RegistryPollingObserver @@ -28,12 +28,19 @@ class ManifestPeriodicUpdater: """ use_polling: bool = True - def __init__(self, manifest_path: str, update_period_seconds: int): + def __init__(self, + network_storage: AgentNetworkStorage, + manifest_path: str, + update_period_seconds: int): """ Constructor. + + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. :param manifest_path: file path to server manifest file :param update_period_seconds: update period in seconds """ + self.network_storage: AgentNetworkStorage = network_storage self.manifest_path: str = manifest_path self.update_period_seconds: int = update_period_seconds self.logger = logging.getLogger(self.__class__.__name__) @@ -43,7 +50,6 @@ def __init__(self, manifest_path: str, update_period_seconds: int): self.observer = RegistryPollingObserver(self.manifest_path, poll_interval) else: self.observer = RegistryEventObserver(self.manifest_path) - self.network_storage: ServiceAgentNetworkStorage = ServiceAgentNetworkStorage.get_instance() self.go_run: bool = True def _run(self): diff --git a/neuro_san/session/direct_concierge_session.py b/neuro_san/session/direct_concierge_session.py index a0432fc12..5ea33070b 100644 --- a/neuro_san/session/direct_concierge_session.py +++ b/neuro_san/session/direct_concierge_session.py @@ -15,7 +15,7 @@ from typing import List from neuro_san.interfaces.concierge_session import ConciergeSession -from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage class DirectConciergeSession(ConciergeSession): @@ -26,11 +26,14 @@ class DirectConciergeSession(ConciergeSession): # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__(self, + network_storage: AgentNetworkStorage, metadata: Dict[str, Any] = None, security_cfg: Dict[str, Any] = None): """ Constructor + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. :param metadata: A dictionary of request metadata to be forwarded to subsequent yet-to-be-made requests. :param security_cfg: A dictionary of parameters used to @@ -38,6 +41,7 @@ def __init__(self, connection. Supplying this implies use of a secure GRPC Channel. If None, uses insecure channel. """ + self.network_storage: AgentNetworkStorage = network_storage # These aren't used yet self._metadata: Dict[str, Any] = metadata self._security_cfg: Dict[str, Any] = security_cfg @@ -51,8 +55,7 @@ def list(self, request_dict: Dict[str, Any]) -> Dict[str, Any]: protobuf structure. Has the following keys: "agents" - the sequence of dictionaries describing available agents """ - network_storage: ServiceAgentNetworkStorage = ServiceAgentNetworkStorage.get_instance() - agents_names: List[str] = network_storage.get_agent_names() + agents_names: List[str] = self.network_storage.get_agent_names() agents_list: List[Dict[str, Any]] = [] for agent_name in agents_names: agents_list.append({"agent_name": agent_name, "description": ""}) diff --git a/neuro_san/session/external_agent_session_factory.py b/neuro_san/session/external_agent_session_factory.py index 7419fa7f1..8960ac333 100644 --- a/neuro_san/session/external_agent_session_factory.py +++ b/neuro_san/session/external_agent_session_factory.py @@ -19,7 +19,7 @@ from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider from neuro_san.internals.interfaces.invocation_context import InvocationContext from neuro_san.internals.run_context.utils.external_agent_parsing import ExternalAgentParsing -from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage +from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage from neuro_san.session.async_direct_agent_session import AsyncDirectAgentSession from neuro_san.session.async_http_service_agent_session import AsyncHttpServiceAgentSession @@ -29,13 +29,17 @@ class ExternalAgentSessionFactory(AsyncAgentSessionFactory): Creates AgentSessions for external agents. """ - def __init__(self, use_direct: bool = False): + def __init__(self, use_direct: bool = False, + network_storage: AgentNetworkStorage = None): """ Constructor :param use_direct: When True, will use a Direct session for external agents that would reside on the same server. + :param network_storage: A AgentNetworkStorage instance which keeps all + the AgentNetwork instances. Only used with use_direct=True. """ + self.network_storage: AgentNetworkStorage = network_storage self.use_direct: bool = use_direct def create_session(self, agent_url: str, @@ -80,9 +84,8 @@ def create_session_from_location_dict(self, agent_location: Dict[str, str], # Optimization: We want to create a different kind of session to minimize socket usage # and potentially relieve the direct user of the burden of having to start a server - network_storage: ServiceAgentNetworkStorage = ServiceAgentNetworkStorage.get_instance() agent_network_provider: AgentNetworkProvider = \ - network_storage.get_agent_network_provider(agent_name) + self.network_storage.get_agent_network_provider(agent_name) agent_network: AgentNetwork = agent_network_provider.get_agent_network() session = AsyncDirectAgentSession(agent_network, invocation_context, metadata=metadata)