-
Notifications
You must be signed in to change notification settings - Fork 28
Remove ServiceAgentNetworkStorage singleton #289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9ef260d
acb6f66
cfe61ad
1c3b576
496b0de
6d61c67
9749a23
97bccbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ | |
| from neuro_san.internals.graph.persistence.agent_network_restorer import AgentNetworkRestorer | ||
| from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer | ||
| from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider | ||
| from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.session.direct_agent_session import DirectAgentSession | ||
| from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory | ||
| from neuro_san.session.missing_agent_check import MissingAgentCheck | ||
|
|
@@ -35,7 +35,7 @@ class DirectAgentSessionFactory: | |
| Sets up everything needed to use a DirectAgentSession more as a library. | ||
| This includes: | ||
| * Some reading of AgentNetworks | ||
| * Setting up ServiceAgentNetworkStorage with agent networks | ||
| * Setting up AgentNetworkStorage with agent networks | ||
| which were read in | ||
| * Initializing an LlmFactory | ||
| """ | ||
|
|
@@ -46,9 +46,9 @@ def __init__(self): | |
| """ | ||
| manifest_restorer = RegistryManifestRestorer() | ||
| self.manifest_networks: Dict[str, AgentNetwork] = manifest_restorer.restore() | ||
| network_storage: ServiceAgentNetworkStorage = ServiceAgentNetworkStorage.get_instance() | ||
| self.network_storage = AgentNetworkStorage() | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the two places where an object instance originates. |
||
| for agent_name, agent_network in self.manifest_networks.items(): | ||
| network_storage.add_agent_network(agent_name, agent_network) | ||
| self.network_storage.add_agent_network(agent_name, agent_network) | ||
|
|
||
| def create_session(self, agent_name: str, use_direct: bool = False, | ||
| metadata: Dict[str, str] = None, umbrella_timeout: Timeout = None) -> AgentSession: | ||
|
|
@@ -73,7 +73,7 @@ def create_session(self, agent_name: str, use_direct: bool = False, | |
| llm_factory.load() | ||
| toolbox_factory.load() | ||
|
|
||
| factory = ExternalAgentSessionFactory(use_direct=use_direct) | ||
| factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only place where we pass the AgentNetworkStorage into the ExternalAgentSessionFactory. |
||
| invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata) | ||
| invocation_context.start() | ||
| session: DirectAgentSession = DirectAgentSession(agent_network=agent_network, | ||
|
|
@@ -100,10 +100,8 @@ def get_agent_network(self, agent_name: str) -> AgentNetwork: | |
| agent_network = restorer.restore(file_reference=agent_name) | ||
| else: | ||
| # Use the standard stuff available via the manifest file. | ||
| network_storage: ServiceAgentNetworkStorage =\ | ||
| ServiceAgentNetworkStorage.get_instance() | ||
| agent_network_provider: AgentNetworkProvider =\ | ||
| network_storage.get_agent_network_provider(agent_name) | ||
| self.network_storage.get_agent_network_provider(agent_name) | ||
| agent_network = agent_network_provider.get_agent_network() | ||
|
|
||
| # Common place for nice error messages when networks are not found | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,32 +20,20 @@ | |
| from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider | ||
|
|
||
|
|
||
| class ServiceAgentNetworkStorage: | ||
| class AgentNetworkStorage: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename. |
||
| """ | ||
| Service-wide storage for AgentNetworkProviders. | ||
| This class is a global singleton containing | ||
| Service-wide storage for AgentNetworkProviders containing | ||
| a table of currently active AgentNetworks for each agent registered to the service. | ||
| Note: a mapping from an agent to its AgentNetwork is dynamic, | ||
| as we can change agents definitions at service run-time. | ||
| as it is possible to change agents definitions at service run-time. | ||
| """ | ||
|
|
||
| instance = None | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need the instance variable any more. |
||
|
|
||
| def __init__(self): | ||
| self.agents_table: Dict[str, AgentNetwork] = {} | ||
| self.logger = logging.getLogger(self.__class__.__name__) | ||
| self.lock = threading.Lock() | ||
| self.listeners: List[AgentStateListener] = [] | ||
|
|
||
| @classmethod | ||
| def get_instance(cls): | ||
| """ | ||
| Get a singleton instance of this class | ||
| """ | ||
| if not ServiceAgentNetworkStorage.instance: | ||
| ServiceAgentNetworkStorage.instance = ServiceAgentNetworkStorage() | ||
| return ServiceAgentNetworkStorage.instance | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need get_instance() any more. |
||
|
|
||
| def add_listener(self, listener: AgentStateListener): | ||
| """ | ||
| Add a state listener to be notified when status of service agents changes. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| from neuro_san.api.grpc import concierge_pb2 as concierge_messages | ||
| from neuro_san.api.grpc import concierge_pb2_grpc | ||
|
|
||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.session.direct_concierge_session import DirectConciergeSession | ||
|
|
||
|
|
@@ -40,7 +41,8 @@ class ConciergeService(concierge_pb2_grpc.ConciergeServiceServicer): | |
| def __init__(self, | ||
| request_logger: RequestLogger, | ||
| security_cfg: Dict[str, Any], | ||
| server_logging: AgentServerLogging): | ||
| server_logging: AgentServerLogging, | ||
| network_storage: AgentNetworkStorage): | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typical plumbing addition. |
||
| """ | ||
| Set the gRPC interface up for health checking so that the service | ||
| will be opened to callers when the mesh sees it operational, if this | ||
|
|
@@ -55,11 +57,14 @@ def __init__(self, | |
| :param server_logging: An AgentServerLogging instance initialized so that | ||
| spawned asynchronous threads can also properly initialize | ||
| their logging. | ||
| :param network_storage: A AgentNetworkStorage instance which keeps all | ||
| the AgentNetwork instances. | ||
| """ | ||
| self.request_logger = request_logger | ||
| self.security_cfg = security_cfg | ||
| self.server_logging: AgentServerLogging = server_logging | ||
| self.forwarder: GrpcMetadataForwarder = self.server_logging.get_forwarder() | ||
| self.network_storage: AgentNetworkStorage = network_storage | ||
|
|
||
| # pylint: disable=no-member | ||
| def List(self, request: concierge_messages.ConciergeRequest, | ||
|
|
@@ -90,7 +95,8 @@ def List(self, request: concierge_messages.ConciergeRequest, | |
| request_dict: Dict[str, Any] = MessageToDict(request) | ||
|
|
||
| # Delegate to Direct*Session | ||
| session = DirectConciergeSession(metadata=metadata, | ||
| session = DirectConciergeSession(network_storage=self.network_storage, | ||
| metadata=metadata, | ||
| security_cfg=self.security_cfg) | ||
| response_dict = session.list(request_dict) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ | |
| from neuro_san.api.grpc import concierge_pb2_grpc | ||
|
|
||
| from neuro_san.internals.graph.registry.agent_network import AgentNetwork | ||
| from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer | ||
| from neuro_san.service.grpc.concierge_service import ConciergeService | ||
|
|
@@ -51,6 +51,7 @@ class GrpcAgentServer(AgentServer): | |
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def __init__(self, port: int, | ||
| server_loop_callbacks: ServerLoopCallbacks, | ||
| network_storage: AgentNetworkStorage, | ||
| agent_networks: Dict[str, AgentNetwork], | ||
| server_name: str = DEFAULT_SERVER_NAME, | ||
| server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS, | ||
|
|
@@ -64,6 +65,8 @@ def __init__(self, port: int, | |
| :param server_loop_callbacks: The ServerLoopCallbacks instance for | ||
| break out methods in main serving loop. | ||
| :param agent_networks: A dictionary of agent name to AgentNetwork to use for the session. | ||
| :param network_storage: A AgentNetworkStorage instance which keeps all | ||
| the AgentNetwork instances. | ||
| :param server_name: The name of the service | ||
| :param server_name_for_logs: The name of the service for log files | ||
| :param max_concurrent_requests: The maximum number of requests to handle at a time. | ||
|
|
@@ -81,6 +84,7 @@ def __init__(self, port: int, | |
|
|
||
| self.logger = logging.getLogger(__name__) | ||
|
|
||
| self.network_storage: AgentNetworkStorage = network_storage | ||
| self.agent_networks: Dict[str, AgentNetwork] = agent_networks | ||
| self.server_name: str = server_name | ||
| self.server_name_for_logs: str = server_name_for_logs | ||
|
|
@@ -107,25 +111,14 @@ def get_starting_event(self) -> threading.Event: | |
| """ | ||
| return self.notify_started | ||
|
|
||
| def setup_agent_network_storage(self): | ||
| """ | ||
| Initialize ServiceAgentNetworkStorage with AgentNetworks | ||
| we have parsed in server manifest file. | ||
| """ | ||
| network_storage: ServiceAgentNetworkStorage =\ | ||
| ServiceAgentNetworkStorage.get_instance() | ||
| network_storage.setup_agent_networks(self.agent_networks) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't really need this method. Just call the network storage directly (see below) |
||
|
|
||
| def agent_added(self, agent_name: str): | ||
| """ | ||
| Agent is being added to the service. | ||
| :param agent_name: name of an agent | ||
| """ | ||
| network_storage: ServiceAgentNetworkStorage =\ | ||
| ServiceAgentNetworkStorage.get_instance() | ||
| service = GrpcAgentService(self.server_lifetime, self.security_cfg, | ||
| agent_name, | ||
| network_storage.get_agent_network_provider(agent_name), | ||
| self.network_storage.get_agent_network_provider(agent_name), | ||
| self.server_logging) | ||
| self.services.append(service) | ||
| servicer_to_server = AgentServicerToServer(service) | ||
|
|
@@ -172,21 +165,21 @@ def serve(self): | |
| # New-style service | ||
| self.security_cfg = None # ... yet | ||
|
|
||
| network_storage: ServiceAgentNetworkStorage = \ | ||
| ServiceAgentNetworkStorage.get_instance() | ||
| # Add listener to handle adding per-agent gRPC services | ||
| # to our dynamic router: | ||
| network_storage.add_listener(self) | ||
| self.network_storage.add_listener(self) | ||
|
|
||
| self.setup_agent_network_storage() | ||
| # DEF - It's possible this could move outside this class. | ||
| self.network_storage.setup_agent_networks(self.agent_networks) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Call the network storage directly instead of using the method. |
||
|
|
||
| # Add DynamicAgentRouter instance as a generic RPC handler for our server: | ||
| server.add_generic_rpc_handlers((self.service_router,)) | ||
|
|
||
| concierge_service: ConciergeService = \ | ||
| ConciergeService(self.server_lifetime, | ||
| self.security_cfg, | ||
| self.server_logging) | ||
| self.server_logging, | ||
| self.network_storage) | ||
| concierge_pb2_grpc.add_ConciergeServiceServicer_to_server( | ||
| concierge_service, | ||
| server) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import tornado | ||
| from tornado.web import RequestHandler | ||
|
|
||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.service.http.interfaces.agent_authorizer import AgentAuthorizer | ||
| from neuro_san.service.http.interfaces.agents_updater import AgentsUpdater | ||
| from neuro_san.service.http.logging.http_logger import HttpLogger | ||
|
|
@@ -38,11 +39,14 @@ class BaseRequestHandler(RequestHandler): | |
| request_id: int = 0 | ||
|
|
||
| # pylint: disable=attribute-defined-outside-init | ||
| # pylint: disable=too-many-arguments | ||
| # pylint: disable=too-many-positional-arguments | ||
| def initialize(self, | ||
| agent_policy: AgentAuthorizer, | ||
| agents_updater: AgentsUpdater, | ||
| forwarded_request_metadata: List[str], | ||
| openapi_service_spec_path: str): | ||
| openapi_service_spec_path: str, | ||
| network_storage: AgentNetworkStorage): | ||
| """ | ||
| This method is called by Tornado framework to allow | ||
| injecting service-specific data into local handler context. | ||
|
|
@@ -51,13 +55,17 @@ def initialize(self, | |
| collection of agents being served | ||
| :param forwarded_request_metadata: request metadata to forward. | ||
| :param openapi_service_spec_path: file path to OpenAPI service spec. | ||
| :param network_storage: A AgentNetworkStorage instance which keeps all | ||
| the AgentNetwork instances. | ||
| """ | ||
|
|
||
| self.agent_policy = agent_policy | ||
| self.agents_updater = agents_updater | ||
| self.forwarded_request_metadata: List[str] = forwarded_request_metadata | ||
| self.openapi_service_spec_path: str = openapi_service_spec_path | ||
| self.logger = HttpLogger(forwarded_request_metadata) | ||
| self.network_storage: AgentNetworkStorage = network_storage | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BaseRequestHandler itself doesn't really need the AgentNetworkStorage, but subclasses do and this is the closest thing the tornado infrastructure has to a constructor that is available to us. |
||
|
|
||
| # Set default request_id for this request handler in case we will need it: | ||
| BaseRequestHandler.request_id += 1 | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ | |
| from tornado.ioloop import IOLoop | ||
|
|
||
| from neuro_san.interfaces.concierge_session import ConciergeSession | ||
| from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider | ||
| from neuro_san.service.generic.agent_server_logging import AgentServerLogging | ||
| from neuro_san.service.generic.async_agent_service import AsyncAgentService | ||
|
|
@@ -55,6 +55,7 @@ def __init__(self, start_event: threading.Event, | |
| http_port: int, | ||
| openapi_service_spec_path: str, | ||
| requests_limit: int, | ||
| network_storage: AgentNetworkStorage, | ||
| forwarded_request_metadata: str = AgentServer.DEFAULT_FORWARDED_REQUEST_METADATA): | ||
| """ | ||
| Constructor: | ||
|
|
@@ -64,12 +65,15 @@ def __init__(self, start_event: threading.Event, | |
| :param request_limit: The number of requests to service before shutting down. | ||
| This is useful to be sure production environments can handle | ||
| a service occasionally going down. | ||
| :param network_storage: A AgentNetworkStorage instance which keeps all | ||
| the AgentNetwork instances. | ||
| :param forwarded_request_metadata: A space-delimited list of http metadata request keys | ||
| to forward to logs/other requests | ||
| """ | ||
| self.server_name_for_logs: str = "Http Server" | ||
| self.start_event: threading.Event = start_event | ||
| self.http_port = http_port | ||
| self.network_storage: AgentNetworkStorage = network_storage | ||
|
|
||
| # Randomize requests limit for this server instance. | ||
| # Lower and upper bounds for number of requests before shutting down | ||
|
|
@@ -148,7 +152,8 @@ def update_agents(self, metadata: Dict[str, Any]): | |
| :return: nothing | ||
| """ | ||
| data: Dict[str, Any] = {} | ||
| session: ConciergeSession = DirectConciergeSession(metadata=metadata) | ||
| # Why do we need the Concierge if we already have access to network_storage? | ||
| session: ConciergeSession = DirectConciergeSession(self.network_storage, metadata=metadata) | ||
| agents_dict: Dict[str, List[Dict[str, str]]] = session.list(data) | ||
| agents_list: List[Dict[str, str]] = agents_dict["agents"] | ||
| agents: List[str] = [] | ||
|
|
@@ -171,7 +176,7 @@ def add_agent(self, agent_name: str): | |
| :param agent_name: name of an agent | ||
| """ | ||
| agent_network_provider: SingleAgentNetworkProvider = \ | ||
| ServiceAgentNetworkStorage.get_instance().get_agent_network_provider(agent_name) | ||
| self.network_storage.get_agent_network_provider(agent_name) | ||
| # Convert back to a single string as required by constructor | ||
| request_metadata_str: str = " ".join(self.forwarded_request_metadata) | ||
| agent_server_logging: AgentServerLogging = \ | ||
|
|
@@ -196,5 +201,6 @@ def build_request_data(self) -> Dict[str, Any]: | |
| "agent_policy": self, | ||
| "agents_updater": self, | ||
| "forwarded_request_metadata": self.forwarded_request_metadata, | ||
| "openapi_service_spec_path": self.openapi_service_spec_path | ||
| "openapi_service_spec_path": self.openapi_service_spec_path, | ||
| "network_storage": self.network_storage | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add network_storage to this "request data" dictionary. This is the mechanism by which args are passed to BaseRequestHandler.initialize() |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| from neuro_san.interfaces.agent_session import AgentSession | ||
| from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer | ||
| from neuro_san.internals.graph.registry.agent_network import AgentNetwork | ||
| from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage | ||
| from neuro_san.internals.utils.file_of_class import FileOfClass | ||
| from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME | ||
| from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS | ||
|
|
@@ -61,6 +62,7 @@ def __init__(self): | |
| self.manifest_update_period_seconds: int = 0 | ||
| self.server: GrpcAgentServer = None | ||
| self.manifest_files: List[str] = [] | ||
| self.network_storage = AgentNetworkStorage() | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the two places where an object instance originates. |
||
|
|
||
| def parse_args(self): | ||
| """ | ||
|
|
@@ -154,6 +156,7 @@ def main_loop(self): | |
|
|
||
| self.server = GrpcAgentServer(self.port, | ||
| server_loop_callbacks=self, | ||
| network_storage=self.network_storage, | ||
| agent_networks=self.agent_networks, | ||
| server_name=self.server_name, | ||
| server_name_for_logs=self.server_name_for_logs, | ||
|
|
@@ -164,7 +167,7 @@ def main_loop(self): | |
| if self.manifest_update_period_seconds > 0: | ||
| manifest_file: str = self.manifest_files[0] | ||
| updater: ManifestPeriodicUpdater =\ | ||
| ManifestPeriodicUpdater(manifest_file, self.manifest_update_period_seconds) | ||
| ManifestPeriodicUpdater(self.network_storage, manifest_file, self.manifest_update_period_seconds) | ||
| updater.start() | ||
|
|
||
| # Start HTTP server side-car: | ||
|
|
@@ -173,6 +176,7 @@ def main_loop(self): | |
| self.http_port, | ||
| self.service_openapi_spec_file, | ||
| self.request_limit, | ||
| self.network_storage, | ||
| forwarded_request_metadata=metadata_str) | ||
| http_server_thread = threading.Thread(target=http_sidecar, args=(self.server,), daemon=True) | ||
| http_server_thread.start() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename ServiceAgentNetworkStorage -> AgentNetworkStorage