Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions neuro_san/client/direct_agent_session_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from neuro_san.internals.graph.persistence.agent_network_restorer import AgentNetworkRestorer
from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer
from neuro_san.internals.interfaces.agent_network_provider import AgentNetworkProvider
from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename ServiceAgentNetworkStorage -> AgentNetworkStorage

from neuro_san.session.direct_agent_session import DirectAgentSession
from neuro_san.session.external_agent_session_factory import ExternalAgentSessionFactory
from neuro_san.session.missing_agent_check import MissingAgentCheck
Expand All @@ -35,7 +35,7 @@ class DirectAgentSessionFactory:
Sets up everything needed to use a DirectAgentSession more as a library.
This includes:
* Some reading of AgentNetworks
* Setting up ServiceAgentNetworkStorage with agent networks
* Setting up AgentNetworkStorage with agent networks
which were read in
* Initializing an LlmFactory
"""
Expand All @@ -46,9 +46,9 @@ def __init__(self):
"""
manifest_restorer = RegistryManifestRestorer()
self.manifest_networks: Dict[str, AgentNetwork] = manifest_restorer.restore()
network_storage: ServiceAgentNetworkStorage = ServiceAgentNetworkStorage.get_instance()
self.network_storage = AgentNetworkStorage()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the two places where an object instance originates.

for agent_name, agent_network in self.manifest_networks.items():
network_storage.add_agent_network(agent_name, agent_network)
self.network_storage.add_agent_network(agent_name, agent_network)

def create_session(self, agent_name: str, use_direct: bool = False,
metadata: Dict[str, str] = None, umbrella_timeout: Timeout = None) -> AgentSession:
Expand All @@ -73,7 +73,7 @@ def create_session(self, agent_name: str, use_direct: bool = False,
llm_factory.load()
toolbox_factory.load()

factory = ExternalAgentSessionFactory(use_direct=use_direct)
factory = ExternalAgentSessionFactory(use_direct=use_direct, network_storage=self.network_storage)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where we pass the AgentNetworkStorage into the ExternalAgentSessionFactory.
Other places are in the service and the always pass use_direct=False, whose code paths will never user the network storage.

invocation_context = SessionInvocationContext(factory, llm_factory, toolbox_factory, metadata)
invocation_context.start()
session: DirectAgentSession = DirectAgentSession(agent_network=agent_network,
Expand All @@ -100,10 +100,8 @@ def get_agent_network(self, agent_name: str) -> AgentNetwork:
agent_network = restorer.restore(file_reference=agent_name)
else:
# Use the standard stuff available via the manifest file.
network_storage: ServiceAgentNetworkStorage =\
ServiceAgentNetworkStorage.get_instance()
agent_network_provider: AgentNetworkProvider =\
network_storage.get_agent_network_provider(agent_name)
self.network_storage.get_agent_network_provider(agent_name)
agent_network = agent_network_provider.get_agent_network()

# Common place for nice error messages when networks are not found
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,20 @@
from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider


class ServiceAgentNetworkStorage:
class AgentNetworkStorage:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename.

"""
Service-wide storage for AgentNetworkProviders.
This class is a global singleton containing
Service-wide storage for AgentNetworkProviders containing
a table of currently active AgentNetworks for each agent registered to the service.
Note: a mapping from an agent to its AgentNetwork is dynamic,
as we can change agents definitions at service run-time.
as it is possible to change agents definitions at service run-time.
"""

instance = None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need the instance variable any more.


def __init__(self):
self.agents_table: Dict[str, AgentNetwork] = {}
self.logger = logging.getLogger(self.__class__.__name__)
self.lock = threading.Lock()
self.listeners: List[AgentStateListener] = []

@classmethod
def get_instance(cls):
"""
Get a singleton instance of this class
"""
if not ServiceAgentNetworkStorage.instance:
ServiceAgentNetworkStorage.instance = ServiceAgentNetworkStorage()
return ServiceAgentNetworkStorage.instance
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need get_instance() any more.


def add_listener(self, listener: AgentStateListener):
"""
Add a state listener to be notified when status of service agents changes.
Expand Down
10 changes: 8 additions & 2 deletions neuro_san/service/grpc/concierge_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from neuro_san.api.grpc import concierge_pb2 as concierge_messages
from neuro_san.api.grpc import concierge_pb2_grpc

from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.session.direct_concierge_session import DirectConciergeSession

Expand All @@ -40,7 +41,8 @@ class ConciergeService(concierge_pb2_grpc.ConciergeServiceServicer):
def __init__(self,
request_logger: RequestLogger,
security_cfg: Dict[str, Any],
server_logging: AgentServerLogging):
server_logging: AgentServerLogging,
network_storage: AgentNetworkStorage):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typical plumbing addition.

"""
Set the gRPC interface up for health checking so that the service
will be opened to callers when the mesh sees it operational, if this
Expand All @@ -55,11 +57,14 @@ def __init__(self,
:param server_logging: An AgentServerLogging instance initialized so that
spawned asynchronous threads can also properly initialize
their logging.
:param network_storage: A AgentNetworkStorage instance which keeps all
the AgentNetwork instances.
"""
self.request_logger = request_logger
self.security_cfg = security_cfg
self.server_logging: AgentServerLogging = server_logging
self.forwarder: GrpcMetadataForwarder = self.server_logging.get_forwarder()
self.network_storage: AgentNetworkStorage = network_storage

# pylint: disable=no-member
def List(self, request: concierge_messages.ConciergeRequest,
Expand Down Expand Up @@ -90,7 +95,8 @@ def List(self, request: concierge_messages.ConciergeRequest,
request_dict: Dict[str, Any] = MessageToDict(request)

# Delegate to Direct*Session
session = DirectConciergeSession(metadata=metadata,
session = DirectConciergeSession(network_storage=self.network_storage,
metadata=metadata,
security_cfg=self.security_cfg)
response_dict = session.list(request_dict)

Expand Down
29 changes: 11 additions & 18 deletions neuro_san/service/grpc/grpc_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from neuro_san.api.grpc import concierge_pb2_grpc

from neuro_san.internals.graph.registry.agent_network import AgentNetwork
from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.grpc.agent_servicer_to_server import AgentServicerToServer
from neuro_san.service.grpc.concierge_service import ConciergeService
Expand Down Expand Up @@ -51,6 +51,7 @@ class GrpcAgentServer(AgentServer):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(self, port: int,
server_loop_callbacks: ServerLoopCallbacks,
network_storage: AgentNetworkStorage,
agent_networks: Dict[str, AgentNetwork],
server_name: str = DEFAULT_SERVER_NAME,
server_name_for_logs: str = DEFAULT_SERVER_NAME_FOR_LOGS,
Expand All @@ -64,6 +65,8 @@ def __init__(self, port: int,
:param server_loop_callbacks: The ServerLoopCallbacks instance for
break out methods in main serving loop.
:param agent_networks: A dictionary of agent name to AgentNetwork to use for the session.
:param network_storage: A AgentNetworkStorage instance which keeps all
the AgentNetwork instances.
:param server_name: The name of the service
:param server_name_for_logs: The name of the service for log files
:param max_concurrent_requests: The maximum number of requests to handle at a time.
Expand All @@ -81,6 +84,7 @@ def __init__(self, port: int,

self.logger = logging.getLogger(__name__)

self.network_storage: AgentNetworkStorage = network_storage
self.agent_networks: Dict[str, AgentNetwork] = agent_networks
self.server_name: str = server_name
self.server_name_for_logs: str = server_name_for_logs
Expand All @@ -107,25 +111,14 @@ def get_starting_event(self) -> threading.Event:
"""
return self.notify_started

def setup_agent_network_storage(self):
"""
Initialize ServiceAgentNetworkStorage with AgentNetworks
we have parsed in server manifest file.
"""
network_storage: ServiceAgentNetworkStorage =\
ServiceAgentNetworkStorage.get_instance()
network_storage.setup_agent_networks(self.agent_networks)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really need this method. Just call the network storage directly (see below)


def agent_added(self, agent_name: str):
"""
Agent is being added to the service.
:param agent_name: name of an agent
"""
network_storage: ServiceAgentNetworkStorage =\
ServiceAgentNetworkStorage.get_instance()
service = GrpcAgentService(self.server_lifetime, self.security_cfg,
agent_name,
network_storage.get_agent_network_provider(agent_name),
self.network_storage.get_agent_network_provider(agent_name),
self.server_logging)
self.services.append(service)
servicer_to_server = AgentServicerToServer(service)
Expand Down Expand Up @@ -172,21 +165,21 @@ def serve(self):
# New-style service
self.security_cfg = None # ... yet

network_storage: ServiceAgentNetworkStorage = \
ServiceAgentNetworkStorage.get_instance()
# Add listener to handle adding per-agent gRPC services
# to our dynamic router:
network_storage.add_listener(self)
self.network_storage.add_listener(self)

self.setup_agent_network_storage()
# DEF - It's possible this could move outside this class.
self.network_storage.setup_agent_networks(self.agent_networks)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call the network storage directly instead of using the method.
Interesting note in the comment here. It's possible we might want to move this out somewhere else.
We'll see. Enough going on in this PR.


# Add DynamicAgentRouter instance as a generic RPC handler for our server:
server.add_generic_rpc_handlers((self.service_router,))

concierge_service: ConciergeService = \
ConciergeService(self.server_lifetime,
self.security_cfg,
self.server_logging)
self.server_logging,
self.network_storage)
concierge_pb2_grpc.add_ConciergeServiceServicer_to_server(
concierge_service,
server)
Expand Down
10 changes: 9 additions & 1 deletion neuro_san/service/http/handlers/base_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tornado
from tornado.web import RequestHandler

from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.service.http.interfaces.agent_authorizer import AgentAuthorizer
from neuro_san.service.http.interfaces.agents_updater import AgentsUpdater
from neuro_san.service.http.logging.http_logger import HttpLogger
Expand All @@ -38,11 +39,14 @@ class BaseRequestHandler(RequestHandler):
request_id: int = 0

# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
def initialize(self,
agent_policy: AgentAuthorizer,
agents_updater: AgentsUpdater,
forwarded_request_metadata: List[str],
openapi_service_spec_path: str):
openapi_service_spec_path: str,
network_storage: AgentNetworkStorage):
"""
This method is called by Tornado framework to allow
injecting service-specific data into local handler context.
Expand All @@ -51,13 +55,17 @@ def initialize(self,
collection of agents being served
:param forwarded_request_metadata: request metadata to forward.
:param openapi_service_spec_path: file path to OpenAPI service spec.
:param network_storage: A AgentNetworkStorage instance which keeps all
the AgentNetwork instances.
"""

self.agent_policy = agent_policy
self.agents_updater = agents_updater
self.forwarded_request_metadata: List[str] = forwarded_request_metadata
self.openapi_service_spec_path: str = openapi_service_spec_path
self.logger = HttpLogger(forwarded_request_metadata)
self.network_storage: AgentNetworkStorage = network_storage
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseRequestHandler itself doesn't really need the AgentNetworkStorage, but subclasses do and this is the closest thing the tornado infrastructure has to a constructor that is available to us.


# Set default request_id for this request handler in case we will need it:
BaseRequestHandler.request_id += 1

Expand Down
2 changes: 1 addition & 1 deletion neuro_san/service/http/handlers/concierge_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get(self):
self.application.start_client_request(metadata, "/api/v1/list")
try:
data: Dict[str, Any] = {}
session: ConciergeSession = DirectConciergeSession(metadata=metadata)
session: ConciergeSession = DirectConciergeSession(self.network_storage, metadata=metadata)
result_dict: Dict[str, Any] = session.list(data)

# Return response to the HTTP client
Expand Down
14 changes: 10 additions & 4 deletions neuro_san/service/http/server/http_sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from tornado.ioloop import IOLoop

from neuro_san.interfaces.concierge_session import ConciergeSession
from neuro_san.internals.network_providers.service_agent_network_storage import ServiceAgentNetworkStorage
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.internals.network_providers.single_agent_network_provider import SingleAgentNetworkProvider
from neuro_san.service.generic.agent_server_logging import AgentServerLogging
from neuro_san.service.generic.async_agent_service import AsyncAgentService
Expand Down Expand Up @@ -55,6 +55,7 @@ def __init__(self, start_event: threading.Event,
http_port: int,
openapi_service_spec_path: str,
requests_limit: int,
network_storage: AgentNetworkStorage,
forwarded_request_metadata: str = AgentServer.DEFAULT_FORWARDED_REQUEST_METADATA):
"""
Constructor:
Expand All @@ -64,12 +65,15 @@ def __init__(self, start_event: threading.Event,
:param request_limit: The number of requests to service before shutting down.
This is useful to be sure production environments can handle
a service occasionally going down.
:param network_storage: A AgentNetworkStorage instance which keeps all
the AgentNetwork instances.
:param forwarded_request_metadata: A space-delimited list of http metadata request keys
to forward to logs/other requests
"""
self.server_name_for_logs: str = "Http Server"
self.start_event: threading.Event = start_event
self.http_port = http_port
self.network_storage: AgentNetworkStorage = network_storage

# Randomize requests limit for this server instance.
# Lower and upper bounds for number of requests before shutting down
Expand Down Expand Up @@ -148,7 +152,8 @@ def update_agents(self, metadata: Dict[str, Any]):
:return: nothing
"""
data: Dict[str, Any] = {}
session: ConciergeSession = DirectConciergeSession(metadata=metadata)
# Why do we need the Concierge if we already have access to network_storage?
session: ConciergeSession = DirectConciergeSession(self.network_storage, metadata=metadata)
agents_dict: Dict[str, List[Dict[str, str]]] = session.list(data)
agents_list: List[Dict[str, str]] = agents_dict["agents"]
agents: List[str] = []
Expand All @@ -171,7 +176,7 @@ def add_agent(self, agent_name: str):
:param agent_name: name of an agent
"""
agent_network_provider: SingleAgentNetworkProvider = \
ServiceAgentNetworkStorage.get_instance().get_agent_network_provider(agent_name)
self.network_storage.get_agent_network_provider(agent_name)
# Convert back to a single string as required by constructor
request_metadata_str: str = " ".join(self.forwarded_request_metadata)
agent_server_logging: AgentServerLogging = \
Expand All @@ -196,5 +201,6 @@ def build_request_data(self) -> Dict[str, Any]:
"agent_policy": self,
"agents_updater": self,
"forwarded_request_metadata": self.forwarded_request_metadata,
"openapi_service_spec_path": self.openapi_service_spec_path
"openapi_service_spec_path": self.openapi_service_spec_path,
"network_storage": self.network_storage
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add network_storage to this "request data" dictionary. This is the mechanism by which args are passed to BaseRequestHandler.initialize()

}
6 changes: 5 additions & 1 deletion neuro_san/service/main_loop/server_main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from neuro_san.interfaces.agent_session import AgentSession
from neuro_san.internals.graph.persistence.registry_manifest_restorer import RegistryManifestRestorer
from neuro_san.internals.graph.registry.agent_network import AgentNetwork
from neuro_san.internals.network_providers.agent_network_storage import AgentNetworkStorage
from neuro_san.internals.utils.file_of_class import FileOfClass
from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME
from neuro_san.service.grpc.grpc_agent_server import DEFAULT_SERVER_NAME_FOR_LOGS
Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(self):
self.manifest_update_period_seconds: int = 0
self.server: GrpcAgentServer = None
self.manifest_files: List[str] = []
self.network_storage = AgentNetworkStorage()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the two places where an object instance originates.


def parse_args(self):
"""
Expand Down Expand Up @@ -154,6 +156,7 @@ def main_loop(self):

self.server = GrpcAgentServer(self.port,
server_loop_callbacks=self,
network_storage=self.network_storage,
agent_networks=self.agent_networks,
server_name=self.server_name,
server_name_for_logs=self.server_name_for_logs,
Expand All @@ -164,7 +167,7 @@ def main_loop(self):
if self.manifest_update_period_seconds > 0:
manifest_file: str = self.manifest_files[0]
updater: ManifestPeriodicUpdater =\
ManifestPeriodicUpdater(manifest_file, self.manifest_update_period_seconds)
ManifestPeriodicUpdater(self.network_storage, manifest_file, self.manifest_update_period_seconds)
updater.start()

# Start HTTP server side-car:
Expand All @@ -173,6 +176,7 @@ def main_loop(self):
self.http_port,
self.service_openapi_spec_file,
self.request_limit,
self.network_storage,
forwarded_request_metadata=metadata_str)
http_server_thread = threading.Thread(target=http_sidecar, args=(self.server,), daemon=True)
http_server_thread.start()
Expand Down
Loading