diff --git a/neuro_san/internals/run_context/langchain/llms/langchain_llm_client_factory.py b/neuro_san/internals/interfaces/environment_configuration.py similarity index 51% rename from neuro_san/internals/run_context/langchain/llms/langchain_llm_client_factory.py rename to neuro_san/internals/interfaces/environment_configuration.py index 1c7bf8617..acf53e17a 100644 --- a/neuro_san/internals/run_context/langchain/llms/langchain_llm_client_factory.py +++ b/neuro_san/internals/interfaces/environment_configuration.py @@ -9,44 +9,38 @@ # neuro-san SDK Software in commercial settings. # # END COPYRIGHT - from typing import Any from typing import Dict import os -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient - -class LangChainLlmClientFactory: +class EnvironmentConfiguration: """ - Interface for Factory classes creating LLM client connections for LangChain. + Easy policy add on for the get_value_or_env() method for various classes that + are effected by configuration via dictionary/hocon and/or environment variables. """ - def create_llm_client(self, config: Dict[str, Any]) -> LangChainLlmClient: - """ - Create a LangChainLlmClient instance from the fully-specified llm config. - :param config: The fully specified llm config - :return: A LangChainLlmClient instance containing run-time resources necessary - for model usage by the service. - - Can raise a ValueError if the config's class or model_name value is - unknown to this method. - """ - raise NotImplementedError - - def get_value_or_env(self, config: Dict[str, Any], key: str, env_key: str) -> Any: + @staticmethod + def get_value_or_env(config: Dict[str, Any], key: str, env_key: str, + none_obj: Any = None) -> Any: """ :param config: The config dictionary to search :param key: The key for the config to look for :param env_key: The os.environ key whose value should be gotten if either the key does not exist or the value for the key is None + :param none_obj: An optional object instance to test. + If present this method will return None, implying + that some other external object/mechanism is supplying the values. """ + if none_obj is not None: + return None + value = None if config is not None: value = config.get(key) - if value is None: + if value is None and env_key is not None: value = os.getenv(env_key) return value diff --git a/neuro_san/internals/run_context/langchain/llms/anthropic_client_policy.py b/neuro_san/internals/run_context/langchain/llms/anthropic_client_policy.py new file mode 100644 index 000000000..570b24571 --- /dev/null +++ b/neuro_san/internals/run_context/langchain/llms/anthropic_client_policy.py @@ -0,0 +1,48 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +from typing import Any + +from contextlib import suppress + +from neuro_san.internals.run_context.langchain.llms.client_policy import ClientPolicy + + +class AnthropicClientPolicy(ClientPolicy): + """ + Implementation of the ClientPolicy for Anthtropic chat models. + + Anthropic chat models do not allow for passing in an externally managed + async web client. + """ + + async def delete_resources(self): + """ + Release the run-time resources used by the model + """ + if self.llm is None: + return + + # Do the necessary reach-ins to successfully shut down the web client + + # This is really an anthropic.AsyncClient, but we don't really want to do the Resolver here. + # Note we don't want to do this in the constructor, as AnthropicChat lazily + # creates these as needed via a cached_property that needs to be done in its own time + # via Anthropic infrastructure. By the time we get here, it's already been created. + anthropic_async_client: Any = self.llm._async_client # pylint:disable=protected-access + + if anthropic_async_client is not None: + with suppress(Exception): + await anthropic_async_client.aclose() + + # Let's not do this again, shall we? + self.llm = None diff --git a/neuro_san/internals/run_context/langchain/llms/azure_client_policy.py b/neuro_san/internals/run_context/langchain/llms/azure_client_policy.py new file mode 100644 index 000000000..950b04fe5 --- /dev/null +++ b/neuro_san/internals/run_context/langchain/llms/azure_client_policy.py @@ -0,0 +1,98 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT +from typing import Any +from typing import Dict + +from leaf_common.config.resolver import Resolver + +from neuro_san.internals.run_context.langchain.llms.openai_client_policy import OpenAIClientPolicy + + +class AzureClientPolicy(OpenAIClientPolicy): + """ + ClientPolicy implementation for OpenAI via Azure. + + OpenAI's BaseLanguageModel implementations do allow us to pass in a web client + as an argument, so this implementation takes advantage of the create_client() + method to do that. Worth noting that where many other implementations might care about + the llm reference, because of our create_client() implementation, we do not. + """ + + def create_client(self, config: Dict[str, Any]) -> Any: + """ + Creates the web client to used by a BaseLanguageModel to be + constructed in the future. Neuro SAN infrastructures prefers that this + be an asynchronous client, however we realize some BaseLanguageModels + do not support that (even though they should!). + + Implementations should retain any references to state that needs to be cleaned up + in the delete_resources() method. + + :param config: The fully specified llm config + :return: The web client that accesses the LLM. + By default this is None, as many BaseLanguageModels + do not allow a web client to be passed in as an arg. + """ + # OpenAI is the one chat class that we do not require any extra installs. + # This is what we want to work out of the box. + # Nevertheless, have it go through the same lazy-loading resolver rigamarole as the others. + + # Set up a resolver to use to resolve lazy imports of classes from + # langchain_* packages to prevent installing the world. + resolver = Resolver() + + # pylint: disable=invalid-name + AsyncAzureOpenAI = resolver.resolve_class_in_module("AsyncAzureOpenAI", + module_name="openai", + install_if_missing="langchain-openai") + + self.create_http_client(config) + + # Prepare some more complex args + openai_api_key: str = self.get_value_or_env(config, "openai_api_key", "AZURE_OPENAI_API_KEY") + if openai_api_key is None: + openai_api_key = self.get_value_or_env(config, "openai_api_key", "OPENAI_API_KEY") + + # From lanchain_openai.chat_models.azure.py + default_headers: Dict[str, str] = {} + default_headers = config.get("default_headers", default_headers) + default_headers.update({ + "User-Agent": "langchain-partner-python-azure-openai", + }) + + self.async_openai_client = AsyncAzureOpenAI( + azure_endpoint=self.get_value_or_env(config, "azure_endpoint", + "AZURE_OPENAI_ENDPOINT"), + deployment_name=self.get_value_or_env(config, "deployment_name", + "AZURE_OPENAI_DEPLOYMENT_NAME"), + api_version=self.get_value_or_env(config, "openai_api_version", + "OPENAI_API_VERSION"), + api_key=openai_api_key, + # AD here means "ActiveDirectory" + azure_ad_token=self.get_value_or_env(config, "azure_ad_token", + "AZURE_OPENAI_AD_TOKEN"), + # azure_ad_token_provider is a complex object, and we can't set that through config + + organization=self.get_value_or_env(config, "openai_organization", "OPENAI_ORG_ID"), + # project - not set in langchain_openai + # webhook_secret - not set in langchain_openai + base_url=self.get_value_or_env(config, "openai_api_base", "OPENAI_API_BASE"), + timeout=config.get("request_timeout"), + max_retries=config.get("max_retries"), + default_headers=default_headers, + # default_query - don't understand enough to set, but set in langchain_openai + http_client=self.http_client + ) + + # We retain the async_openai_client reference, but we hand back this reach-in + # to pass to the BaseLanguageModel constructor. + return self.async_openai_client.chat.completions diff --git a/neuro_san/internals/run_context/langchain/llms/bedrock_client_policy.py b/neuro_san/internals/run_context/langchain/llms/bedrock_client_policy.py new file mode 100644 index 000000000..b6131db19 --- /dev/null +++ b/neuro_san/internals/run_context/langchain/llms/bedrock_client_policy.py @@ -0,0 +1,42 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT + +from neuro_san.internals.run_context.langchain.llms.client_policy import ClientPolicy + + +class BedrockClientPolicy(ClientPolicy): + """ + ClientPolicy implementation for Bedrock. + + Bedrock does not allow for passing in async web clients. + As a matter of fact, all of its clients are synchronous, + which is not the best for an async service. + """ + + async def delete_resources(self): + """ + Release the run-time resources used by the model + """ + if self.llm is None: + return + + # Do the necessary reach-ins to successfully shut down the web client + if self.llm.client is not None: + # This is a boto3 client + self.llm.client.close() + + if self.llm.bedrock_client is not None: + # This is a boto3 client + self.llm.bedrock_client.close() + + # Let's not do this again, shall we? + self.llm = None diff --git a/neuro_san/internals/run_context/langchain/llms/client_policy.py b/neuro_san/internals/run_context/langchain/llms/client_policy.py new file mode 100644 index 000000000..5f75cec61 --- /dev/null +++ b/neuro_san/internals/run_context/langchain/llms/client_policy.py @@ -0,0 +1,73 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT +from typing import Any +from typing import Dict + +from langchain.llms.base import BaseLanguageModel + +from neuro_san.internals.interfaces.environment_configuration import EnvironmentConfiguration + + +class ClientPolicy(EnvironmentConfiguration): + """ + Policy interface to manage the lifecycles of web clients that talk to LLM services. + This inherits from EnvironmentConfiguration in order to support easy access to the + get_value_or_env() method. + + There are really two styles of implementation encompassed by this one interface. + + 1) When BaseLanguageModels can have web clients passed into their constructor, + implementations should use the create_client() method to retain any references necessary + to help them clean up nicely in the delete_resources() method. + + 2) When BaseLanguageModels cannot have web clients passed into their constructor, + implementations should pass the already created llm into their implementation's + constructor. Later delete_resources() implementations will need to do a reach-in + to the llm instance to clean up any references related to the web client. + """ + + def __init__(self, llm: BaseLanguageModel = None): + """ + Constructor. + + :param llm: BaseLanguageModel + """ + self.llm: BaseLanguageModel = llm + + # pylint: disable=useless-return + def create_client(self, config: Dict[str, Any]) -> Any: + """ + Creates the web client to used by a BaseLanguageModel to be + constructed in the future. Neuro SAN infrastructures prefers that this + be an asynchronous client, however we realize some BaseLanguageModels + do not support that (even though they should!). + + Implementations should retain any references to state that needs to be cleaned up + in the delete_resources() method. + + :param config: The fully specified llm config + :return: The web client that accesses the LLM. + By default this is None, as many BaseLanguageModels + do not allow a web client to be passed in as an arg. + """ + _ = config + return None + + async def delete_resources(self): + """ + Release the run-time resources used by the instance. + + Unfortunately for many BaseLanguageModels, this tends to involve + a reach-in to its private internals in order to shutting down + any web client references in there. + """ + raise NotImplementedError diff --git a/neuro_san/internals/run_context/langchain/llms/default_llm_factory.py b/neuro_san/internals/run_context/langchain/llms/default_llm_factory.py index 395a28b9c..45b94a850 100644 --- a/neuro_san/internals/run_context/langchain/llms/default_llm_factory.py +++ b/neuro_san/internals/run_context/langchain/llms/default_llm_factory.py @@ -26,13 +26,9 @@ from leaf_common.parsers.dictionary_extractor import DictionaryExtractor from neuro_san.internals.interfaces.context_type_llm_factory import ContextTypeLlmFactory -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client_factory import LangChainLlmClientFactory from neuro_san.internals.run_context.langchain.llms.langchain_llm_factory import LangChainLlmFactory from neuro_san.internals.run_context.langchain.llms.langchain_llm_resources import LangChainLlmResources from neuro_san.internals.run_context.langchain.llms.llm_info_restorer import LlmInfoRestorer -from neuro_san.internals.run_context.langchain.llms.standard_langchain_llm_client_factory \ - import StandardLangChainLlmClientFactory from neuro_san.internals.run_context.langchain.llms.standard_langchain_llm_factory import StandardLangChainLlmFactory from neuro_san.internals.run_context.langchain.util.api_key_error_check import ApiKeyErrorCheck from neuro_san.internals.run_context.langchain.util.argument_validator import ArgumentValidator @@ -86,9 +82,6 @@ def __init__(self, config: Dict[str, Any] = None): self.llm_factories: List[LangChainLlmFactory] = [ StandardLangChainLlmFactory() ] - self.llm_client_factories: List[LangChainLlmClientFactory] = [ - StandardLangChainLlmClientFactory() - ] # Get user LLM info file path with the following priority: # 1. "agent_llm_info_file" from agent network hocon @@ -178,8 +171,7 @@ def create_llm( unknown to this method. """ full_config: Dict[str, Any] = self.create_full_llm_config(config) - llm_client: LangChainLlmClient = self.create_llm_client(full_config) - llm_resources: LangChainLlmResources = self.create_llm_resources_with_client(full_config, llm_client) + llm_resources: LangChainLlmResources = self.create_llm_resources(full_config) return llm_resources def create_full_llm_config(self, config: Dict[str, Any]) -> Dict[str, Any]: @@ -279,52 +271,6 @@ def get_chat_class_args(self, chat_class_name: str, use_model_name: str = None) return args - def create_llm_client(self, config: Dict[str, Any]) -> LangChainLlmClient: - """ - Create a LangChainLlmClient from the fully-specified llm config either from - StandardLlmClientFactory or a user-defined LlmClientFactory. - :param config: The fully specified llm config which is a product of - _create_full_llm_config() above. - :return: A LangChainLlmClient instance to use when making llm requests. - This can be None if client connection management is not required. - Can raise a ValueError if the config's class or model_name value is - unknown to this method. - """ - llm_client: LangChainLlmClient = None - - # Loop through the loaded factories in order until we can find one - # that can create the llm. - found_exception: Exception = None - for llm_client_factory in self.llm_client_factories: - try: - llm_client = llm_client_factory.create_llm_client(config) - if llm_client is not None and isinstance(llm_client, LangChainLlmClient): - # We found what we were looking for - found_exception = None - break - - # Catch some common wrong or missing API key errors in a single place - # with some verbose error messaging. - except API_KEY_ERRORS as exception: - # Will re-raise but with the right exception text it will - # also provide some more helpful failure text. - message: str = ApiKeyErrorCheck.check_for_api_key_exception(exception) - if message is not None: - raise ValueError(message) from exception - found_exception = exception - - except ValueError as exception: - # Let the next model have a crack - found_exception = exception - - # DEF - Might eventually want to resolve a specific class like the end of - # create_llm_resources_with_client() does. - - if found_exception is not None: - raise found_exception - - return llm_client - def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: """ Create a BaseLanguageModel from the fully-specified llm config. @@ -336,18 +282,12 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: """ raise NotImplementedError - def create_llm_resources_with_client(self, config: Dict[str, Any], - llm_client: LangChainLlmClient = None) -> LangChainLlmResources: + def create_llm_resources(self, config: Dict[str, Any]) -> LangChainLlmResources: """ Create a BaseLanguageModel from the fully-specified llm config either from standard LLM factory, user-defined LLM factory, or user-specified langchain model class. :param config: The fully specified llm config which is a product of _create_full_llm_config() above. - :param llm_client: A LangChainLlmClient instance, which by default is None, - implying that create_base_chat_model() needs to create its own client. - Note, however that a None value can lead to connection leaks and requests - that continue to run after the request connection is dropped in a server - environment. :return: A LangChainLlmResources instance containing a BaseLanguageModel (can be Chat or LLM) and all related resources necessary for managing the model run-time lifecycle. @@ -361,7 +301,7 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], found_exception: Exception = None for llm_factory in self.llm_factories: try: - llm_resources = llm_factory.create_llm_resources_with_client(config, llm_client) + llm_resources = llm_factory.create_llm_resources(config) if llm_resources is not None and isinstance(llm_resources, LangChainLlmResources): # We found what we were looking for found_exception = None @@ -371,7 +311,6 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], found_exception = None except NotImplementedError: - # Try ignoring the llm_client this factory # This allows for backwards compatibility with older LangChainLlmFactories llm: BaseLanguageModel = llm_factory.create_base_chat_model(config) if llm is not None: diff --git a/neuro_san/internals/run_context/langchain/llms/httpx_langchain_llm_client.py b/neuro_san/internals/run_context/langchain/llms/httpx_langchain_llm_client.py deleted file mode 100644 index 53bcd34fa..000000000 --- a/neuro_san/internals/run_context/langchain/llms/httpx_langchain_llm_client.py +++ /dev/null @@ -1,54 +0,0 @@ - -# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. -# All Rights Reserved. -# Issued under the Academic Public License. -# -# You can be released from the terms, and requirements of the Academic Public -# License by purchasing a commercial license. -# Purchase of a commercial license is mandatory for any use of the -# neuro-san SDK Software in commercial settings. -# -# END COPYRIGHT - -from typing import Any - -from contextlib import suppress -from httpx import AsyncClient - -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient - - -class HttpxLangChainLlmClient(LangChainLlmClient): - """ - Implementation of the LangChainLlmClient for httpx clients. - """ - - def __init__(self, http_client: AsyncClient, async_llm_client: Any = None): - """ - Constructor. - - :param http_client: httpx.AsyncClient used for model connections to LLM host. - :param async_llm_client: optional async_llm_client used for model connections to LLM host. - When used, this is most often ChatModel-specific. - """ - self.http_client: AsyncClient = http_client - self.async_llm_client: Any = async_llm_client - - def get_client(self) -> Any: - """ - Get the async llm client used by the model - """ - return self.async_llm_client - - async def delete_resources(self): - """ - Release the run-time resources used by the model - """ - self.async_llm_client = None - - if self.http_client is None: - return - - with suppress(Exception): - await self.http_client.aclose() - self.http_client = None diff --git a/neuro_san/internals/run_context/langchain/llms/langchain_llm_client.py b/neuro_san/internals/run_context/langchain/llms/langchain_llm_client.py deleted file mode 100644 index 616afedf9..000000000 --- a/neuro_san/internals/run_context/langchain/llms/langchain_llm_client.py +++ /dev/null @@ -1,30 +0,0 @@ - -# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. -# All Rights Reserved. -# Issued under the Academic Public License. -# -# You can be released from the terms, and requirements of the Academic Public -# License by purchasing a commercial license. -# Purchase of a commercial license is mandatory for any use of the -# neuro-san SDK Software in commercial settings. -# -# END COPYRIGHT -from typing import Any - - -class LangChainLlmClient: - """ - Interface for representing a client that connects to a LangChain model - """ - - def get_client(self) -> Any: - """ - Get the client used by the model - """ - raise NotImplementedError - - async def delete_resources(self): - """ - Release the run-time resources used by the model - """ - raise NotImplementedError diff --git a/neuro_san/internals/run_context/langchain/llms/langchain_llm_factory.py b/neuro_san/internals/run_context/langchain/llms/langchain_llm_factory.py index cd6d3d3f3..4bc7b0578 100644 --- a/neuro_san/internals/run_context/langchain/llms/langchain_llm_factory.py +++ b/neuro_san/internals/run_context/langchain/llms/langchain_llm_factory.py @@ -13,17 +13,17 @@ from typing import Any from typing import Dict -import os - from langchain_core.language_models.base import BaseLanguageModel -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient +from neuro_san.internals.interfaces.environment_configuration import EnvironmentConfiguration from neuro_san.internals.run_context.langchain.llms.langchain_llm_resources import LangChainLlmResources -class LangChainLlmFactory: +class LangChainLlmFactory(EnvironmentConfiguration): """ - Interface for Factory classes creating LLM BaseLanguageModels + Interface for Factory classes creating LLM BaseLanguageModels. + This derives from EnvironmentConfiguration in order to support easy access to + the get_value_or_env() method. Most methods take a config dictionary which consists of the following keys: @@ -53,7 +53,7 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: Create a LangChainLlmResources instance from the fully-specified llm config. This method is provided for backwards compatibility. - Prefer create_llm_resources_with_client() instead, + Prefer create_llm_resources() instead, as this allows server infrastructure to better account for outstanding connections to LLM providers when connections drop. @@ -65,54 +65,17 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: """ raise NotImplementedError - def create_llm_resources_with_client(self, config: Dict[str, Any], - llm_client: LangChainLlmClient = None) -> LangChainLlmResources: + def create_llm_resources(self, config: Dict[str, Any]) -> LangChainLlmResources: """ Create a LangChainLlmResources instance from the fully-specified llm config. :param config: The fully specified llm config which is a product of _create_full_llm_config() above. - :param llm_client: A LangChainLlmClient instance, which by default is None, - implying that create_base_chat_model() needs to create its own client. - Note, however that a None value can lead to connection leaks and requests - that continue to run after the request connection is dropped in a server - environment. :return: A LangChainLlmResources instance containing - a BaseLanguageModel (can be Chat or LLM) and all related resources + a BaseLanguageModel (can be Chat or LLM) and a ClientPolicy + object that contains all related resources necessary for managing the model run-time lifecycle. Can raise a ValueError if the config's class or model_name value is unknown to this method. """ raise NotImplementedError - - def get_value_or_env(self, config: Dict[str, Any], key: str, env_key: str, - llm_client: Any = None) -> Any: - """ - :param config: The config dictionary to search - :param key: The key for the config to look for - :param env_key: The os.environ key whose value should be gotten if either - the key does not exist or the value for the key is None - :param llm_client: An optional client instance. - If present this method will return None. - - Most BaseLanguageModels will take some kind of pre-made - client as part of their constructor args, but they will - also take enough args to constructor a client for themselves - under the hood when explicitly not given that client. - - Note that this does *not* actually provide any values from - the given client, rather this arg allows those constructor - variables associated with creating that under-the-covers - client to remain None when there is a client already made. - """ - if llm_client is not None: - return None - - value = None - if config is not None: - value = config.get(key) - - if value is None and env_key is not None: - value = os.getenv(env_key) - - return value diff --git a/neuro_san/internals/run_context/langchain/llms/langchain_llm_resources.py b/neuro_san/internals/run_context/langchain/llms/langchain_llm_resources.py index 0622a2394..254a3ba82 100644 --- a/neuro_san/internals/run_context/langchain/llms/langchain_llm_resources.py +++ b/neuro_san/internals/run_context/langchain/llms/langchain_llm_resources.py @@ -12,39 +12,39 @@ from langchain_core.language_models.base import BaseLanguageModel -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient +from neuro_san.internals.run_context.langchain.llms.client_policy import ClientPolicy class LangChainLlmResources: """ Class for representing a LangChain model - together with run-time resources necessary for model usage by the service. + together with run-time policy necessary for model usage by the service. """ - def __init__(self, model: BaseLanguageModel, llm_client: LangChainLlmClient = None): + def __init__(self, model: BaseLanguageModel, client_policy: ClientPolicy = None): """ Constructor. :param model: Language model used. - :param http_client: optional httpx.AsyncClient used for model connections to LLM host. + :param client_policy: optional ClientPolicy object to manage connections to LLM host. """ self.model: BaseLanguageModel = model - self.llm_client: LangChainLlmClient = llm_client + self.client_policy: ClientPolicy = client_policy def get_model(self) -> BaseLanguageModel: """ - Get the language model + :return: the BaseLanguageModel """ return self.model - def get_llm_client(self) -> LangChainLlmClient: + def get_client_policy(self) -> ClientPolicy: """ - Get the client used by the model + :return: the ClientPolicy used by the model """ - return self.llm_client + return self.client_policy async def delete_resources(self): """ Release the run-time resources used by the model """ - if self.llm_client: - await self.llm_client.delete_resources() + if self.client_policy is not None: + await self.client_policy.delete_resources() diff --git a/neuro_san/internals/run_context/langchain/llms/openai_client_policy.py b/neuro_san/internals/run_context/langchain/llms/openai_client_policy.py new file mode 100644 index 000000000..51fbd9cc8 --- /dev/null +++ b/neuro_san/internals/run_context/langchain/llms/openai_client_policy.py @@ -0,0 +1,111 @@ + +# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. +# All Rights Reserved. +# Issued under the Academic Public License. +# +# You can be released from the terms, and requirements of the Academic Public +# License by purchasing a commercial license. +# Purchase of a commercial license is mandatory for any use of the +# neuro-san SDK Software in commercial settings. +# +# END COPYRIGHT +from typing import Any +from typing import Dict + +from contextlib import suppress +from httpx import AsyncClient + +from langchain_core.language_models.base import BaseLanguageModel + +from leaf_common.config.resolver import Resolver + +from neuro_san.internals.run_context.langchain.llms.client_policy import ClientPolicy + + +class OpenAIClientPolicy(ClientPolicy): + """ + ClientPolicy implementation for OpenAI. + + OpenAI's BaseLanguageModel implementations do allow us to pass in a web client + as an argument, so this implementation takes advantage of the create_client() + method to do that. Worth noting that where many other implementations might care about + the llm reference, because of our create_client() implementation, we do not. + """ + + def __init__(self, llm: BaseLanguageModel = None): + """ + Constructor. + """ + super().__init__() + + self.http_client: AsyncClient = None + + # Not doing lazy type resolution here just for type hints. + # Save that for create_client(), where it's meatier. + self.async_openai_client: Any = None + + def create_client(self, config: Dict[str, Any]) -> Any: + """ + Creates the web client to used by a BaseLanguageModel to be + constructed in the future. Neuro SAN infrastructures prefers that this + be an asynchronous client, however we realize some BaseLanguageModels + do not support that (even though they should!). + + Implementations should retain any references to state that needs to be cleaned up + in the delete_resources() method. + + :param config: The fully specified llm config + :return: The web client that accesses the LLM. + By default this is None, as many BaseLanguageModels + do not allow a web client to be passed in as an arg. + """ + # OpenAI is the one chat class that we do not require any extra installs. + # This is what we want to work out of the box. + # Nevertheless, have it go through the same lazy-loading resolver rigamarole as the others. + + # Set up a resolver to use to resolve lazy imports of classes from + # langchain_* packages to prevent installing the world. + resolver = Resolver() + + # pylint: disable=invalid-name + AsyncOpenAI = resolver.resolve_class_in_module("AsyncOpenAI", + module_name="openai", + install_if_missing="langchain-openai") + + self.create_http_client(config) + + self.async_openai_client = AsyncOpenAI( + api_key=self.get_value_or_env(config, "openai_api_key", "OPENAI_API_KEY"), + base_url=self.get_value_or_env(config, "openai_api_base", "OPENAI_API_BASE"), + organization=self.get_value_or_env(config, "openai_organization", "OPENAI_ORG_ID"), + timeout=config.get("request_timeout"), + max_retries=config.get("max_retries"), + http_client=self.http_client + ) + + # We retain the async_openai_client reference, but we hand back this reach-in + # to pass to the BaseLanguageModel constructor. + return self.async_openai_client.chat.completions + + def create_http_client(self, config: Dict[str, Any]): + """ + Creates the http client from the given config. + + :param config: The fully specified llm config + """ + # Our run-time model resource here is httpx client which we need to control directly: + openai_proxy: str = self.get_value_or_env(config, "openai_proxy", "OPENAI_PROXY") + request_timeout: int = config.get("request_timeout") + self.http_client = AsyncClient(proxy=openai_proxy, timeout=request_timeout) + + async def delete_resources(self): + """ + Release the run-time resources used by the instance. + """ + self.async_openai_client = None + + if self.http_client is not None: + with suppress(Exception): + await self.http_client.aclose() + + self.http_client = None diff --git a/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_client_factory.py b/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_client_factory.py deleted file mode 100644 index 6e6264c37..000000000 --- a/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_client_factory.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright (C) 2023-2025 Cognizant Digital Business, Evolutionary AI. -# All Rights Reserved. -# Issued under the Academic Public License. -# -# You can be released from the terms, and requirements of the Academic Public -# License by purchasing a commercial license. -# Purchase of a commercial license is mandatory for any use of the -# neuro-san SDK Software in commercial settings. -# -# END COPYRIGHT - -from typing import Any -from typing import Dict - -from httpx import AsyncClient - -from leaf_common.config.resolver import Resolver - -from neuro_san.internals.run_context.langchain.llms.httpx_langchain_llm_client import HttpxLangChainLlmClient -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client_factory import LangChainLlmClientFactory - - -class StandardLangChainLlmClientFactory(LangChainLlmClientFactory): - """ - Factory class for creating LangChainLlmClient instances for Chat and LLM operations - """ - - def create_llm_client(self, config: Dict[str, Any]) -> LangChainLlmClient: - """ - Create an LangChainLlmClient instance from the fully-specified llm config. - :param config: The fully specified llm config - :return: A LangChainLlmClient instance. - Can be None if the llm class in the config does not need a client. - Can raise a ValueError if the config's class or model_name value is - unknown to this method. - """ - # pylint: disable=too-many-locals - # Construct the LLM - llm_client: LangChainLlmClient = None - chat_class: str = config.get("class") - if chat_class is not None: - chat_class = chat_class.lower() - - # Check for key "model_name", "model", and "model_id" to use as model name - # If the config is from default_llm_info, this is always "model_name" - # but with user-specified config, it is possible to have the other keys will be specifed instead. - model_name: str = config.get("model_name") or config.get("model") or config.get("model_id") - - # Set up a resolver to use to resolve lazy imports of classes from - # langchain_* packages to prevent installing the world. - resolver = Resolver() - - if chat_class == "openai": - - # OpenAI is the one chat class that we do not require any extra installs. - # This is what we want to work out of the box. - # Nevertheless, have it go through the same lazy-loading resolver rigamarole as the others. - - # pylint: disable=invalid-name - AsyncOpenAI = resolver.resolve_class_in_module("AsyncOpenAI", - module_name="openai", - install_if_missing="langchain-openai") - - # Our run-time model resource here is httpx client which we need to control directly: - openai_proxy = self.get_value_or_env(config, "openai_organization", "OPENAI_PROXY") - request_timeout = config.get("request_timeout") - http_client = AsyncClient(proxy=openai_proxy, timeout=request_timeout) - - async_openai_client = AsyncOpenAI( - api_key=self.get_value_or_env(config, "openai_api_key", "OPENAI_API_KEY"), - base_url=self.get_value_or_env(config, "openai_api_base", "OPENAI_API_BASE"), - organization=self.get_value_or_env(config, "openai_organization", "OPENAI_ORG_ID"), - timeout=request_timeout, - max_retries=config.get("max_retries"), - http_client=http_client - ) - - llm_client = HttpxLangChainLlmClient(http_client, async_openai_client) - - elif chat_class == "azure-openai": - - # Not yet - llm_client = None - - elif chat_class == "anthropic": - - # Not yet - llm_client = None - - elif chat_class == "ollama": - - # Never. Ollama models are local - llm_client = None - - elif chat_class == "nvidia": - - # Not yet - llm_client = None - - elif chat_class == "gemini": - - # Not yet - llm_client = None - - elif chat_class == "bedrock": - - # Not yet - llm_client = None - - elif chat_class is None: - raise ValueError(f"Class name {chat_class} for model_name {model_name} is unspecified.") - else: - raise ValueError(f"Class {chat_class} for model_name {model_name} is unrecognized.") - - return llm_client diff --git a/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_factory.py b/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_factory.py index c1f747590..8c74d082e 100644 --- a/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_factory.py +++ b/neuro_san/internals/run_context/langchain/llms/standard_langchain_llm_factory.py @@ -16,9 +16,13 @@ from leaf_common.config.resolver import Resolver -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient +from neuro_san.internals.run_context.langchain.llms.anthropic_client_policy import AnthropicClientPolicy +from neuro_san.internals.run_context.langchain.llms.azure_client_policy import AzureClientPolicy +from neuro_san.internals.run_context.langchain.llms.bedrock_client_policy import BedrockClientPolicy +from neuro_san.internals.run_context.langchain.llms.client_policy import ClientPolicy from neuro_san.internals.run_context.langchain.llms.langchain_llm_factory import LangChainLlmFactory from neuro_san.internals.run_context.langchain.llms.langchain_llm_resources import LangChainLlmResources +from neuro_san.internals.run_context.langchain.llms.openai_client_policy import OpenAIClientPolicy class StandardLangChainLlmFactory(LangChainLlmFactory): @@ -53,7 +57,7 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: Create a BaseLanguageModel from the fully-specified llm config. This method is provided for backwards compatibility. - Prefer create_llm_resources_with_client() instead, + Prefer create_llm_resources() instead, as this allows server infrastructure to better account for outstanding connections to LLM providers when connections drop. @@ -66,17 +70,11 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: raise NotImplementedError # pylint: disable=too-many-branches - def create_llm_resources_with_client(self, config: Dict[str, Any], - llm_client: LangChainLlmClient = None) -> LangChainLlmResources: + def create_llm_resources(self, config: Dict[str, Any]) -> LangChainLlmResources: """ Create a BaseLanguageModel from the fully-specified llm config. :param config: The fully specified llm config which is a product of _create_full_llm_config() above. - :param llm_client: A LangChainLlmClient instance, which by default is None, - implying that create_base_chat_model() needs to create its own client. - Note, however that a None value can lead to connection leaks and requests - that continue to run after the request connection is dropped in a server - environment. :return: A LangChainLlmResources instance containing a BaseLanguageModel (can be Chat or LLM) and all related resources necessary for managing the model run-time lifecycle. @@ -86,6 +84,8 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], # pylint: disable=too-many-locals # Construct the LLM llm: BaseLanguageModel = None + client_policy: ClientPolicy = None + chat_class: str = config.get("class") if chat_class is not None: chat_class = chat_class.lower() @@ -110,13 +110,9 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], module_name="langchain_openai.chat_models.base", install_if_missing="langchain-openai") - # See if there is an async_client to be had from the llm_client passed in - async_client: Any = None - if llm_client is not None: - async_openai_client = llm_client.get_client() - if async_openai_client is not None: - # Necessary reach-in. - async_client = async_openai_client.chat.completions + # Create the policy object that allows us to manage the model run-time lifecycle + client_policy = OpenAIClientPolicy() + async_client: Any = client_policy.create_client(config) # Now construct LLM chat model we will be using: llm = ChatOpenAI( @@ -178,27 +174,38 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], "include_usage": True } } - openai_api_key: str = self.get_value_or_env(config, "openai_api_key", "AZURE_OPENAI_API_KEY") - if openai_api_key is None: - openai_api_key = self.get_value_or_env(config, "openai_api_key", "OPENAI_API_KEY") # AzureChatOpenAI just happens to come with langchain_openai # pylint: disable=invalid-name AzureChatOpenAI = resolver.resolve_class_in_module("AzureChatOpenAI", module_name="langchain_openai.chat_models.azure", install_if_missing="langchain-openai") + + # Create the policy object that allows us to manage the model run-time lifecycle + client_policy = AzureClientPolicy() + async_client: Any = client_policy.create_client(config) + + # Prepare some more complex args + openai_api_key: str = self.get_value_or_env(config, "openai_api_key", "AZURE_OPENAI_API_KEY", async_client) + if openai_api_key is None: + openai_api_key = self.get_value_or_env(config, "openai_api_key", "OPENAI_API_KEY", async_client) + llm = AzureChatOpenAI( + async_client=async_client, model_name=model_name, temperature=config.get("temperature"), + + # This next group of params should always be None when we have async_client openai_api_key=openai_api_key, openai_api_base=self.get_value_or_env(config, "openai_api_base", - "OPENAI_API_BASE"), + "OPENAI_API_BASE", async_client), openai_organization=self.get_value_or_env(config, "openai_organization", - "OPENAI_ORG_ID"), - openai_proxy=self.get_value_or_env(config, "openai_organization", - "OPENAI_PROXY"), - request_timeout=config.get("request_timeout"), - max_retries=config.get("max_retries"), + "OPENAI_ORG_ID", async_client), + openai_proxy=self.get_value_or_env(config, "openai_proxy", + "OPENAI_PROXY", async_client), + request_timeout=self.get_value_or_env(config, "request_timeout", None, async_client), + max_retries=self.get_value_or_env(config, "max_retries", None, async_client), + presence_penalty=config.get("presence_penalty"), frequency_penalty=config.get("frequency_penalty"), seed=config.get("seed"), @@ -227,23 +234,25 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], # global verbose value) so that the warning is never triggered. verbose=False, - # Azure-specific + # Azure-specific group that should be None if we have an async_client azure_endpoint=self.get_value_or_env(config, "azure_endpoint", - "AZURE_OPENAI_ENDPOINT"), + "AZURE_OPENAI_ENDPOINT", async_client), deployment_name=self.get_value_or_env(config, "deployment_name", - "AZURE_OPENAI_DEPLOYMENT_NAME"), + "AZURE_OPENAI_DEPLOYMENT_NAME", async_client), openai_api_version=self.get_value_or_env(config, "openai_api_version", - "OPENAI_API_VERSION"), - + "OPENAI_API_VERSION", async_client), # AD here means "ActiveDirectory" azure_ad_token=self.get_value_or_env(config, "azure_ad_token", - "AZURE_OPENAI_AD_TOKEN"), - model_version=config.get("model_version"), + "AZURE_OPENAI_AD_TOKEN", async_client), openai_api_type=self.get_value_or_env(config, "openai_api_type", - "OPENAI_API_TYPE"), + "OPENAI_API_TYPE", async_client), + + model_version=config.get("model_version"), + # Needed for token counting model_kwargs=model_kwargs, ) + elif chat_class == "anthropic": # Use lazy loading to prevent installing the world @@ -251,6 +260,10 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], ChatAnthropic = resolver.resolve_class_in_module("ChatAnthropic", module_name="langchain_anthropic.chat_models", install_if_missing="langchain-anthropic") + + # ChatAnthropic currently only supports _async_client() as a cached_property, + # not as a constructor arg. + llm = ChatAnthropic( model_name=model_name, max_tokens=config.get("max_tokens"), # This is always for output @@ -264,9 +277,14 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], "ANTHROPIC_API_URL"), anthropic_api_key=self.get_value_or_env(config, "anthropic_api_key", "ANTHROPIC_API_KEY"), + default_headers=config.get("default_headers"), + betas=config.get("betas"), streaming=True, # streaming is always on. Without it token counting will not work. # Set stream_usage to True in order to get token counting chunks. stream_usage=True, + thinking=config.get("thinking"), + mcp_servers=config.get("mcp_servers"), + context_management=config.get("context_management"), # If omitted, this defaults to the global verbose value, # accessible via langchain_core.globals.get_verbose(): @@ -283,6 +301,10 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], # global verbose value) so that the warning is never triggered. verbose=False, ) + + # Create the client_policy after the fact, with reach-in + client_policy = AnthropicClientPolicy(llm) + elif chat_class == "ollama": # Use lazy loading to prevent installing the world @@ -399,6 +421,10 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], ) elif chat_class == "bedrock": + # Note: ChatBedrock only ever uses a synchronous boto3 client to access + # any llm and there are no aioboto3 hooks yet. Not the greatest choice + # for a performant asynchronous server. + # Use lazy loading to prevent installing the world # pylint: disable=invalid-name ChatBedrock = resolver.resolve_class_in_module("ChatBedrock", @@ -443,11 +469,15 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], # global verbose value) so that the warning is never triggered. verbose=False, ) + + # Create the client_policy after the fact, with reach-in + client_policy = BedrockClientPolicy(llm) + elif chat_class is None: raise ValueError(f"Class name {chat_class} for model_name {model_name} is unspecified.") else: raise ValueError(f"Class {chat_class} for model_name {model_name} is unrecognized.") - # Return the LlmResources with the llm_client that was passed in. + # Return the LlmResources with the client_policy that was created. # That might be None, and that's OK. - return LangChainLlmResources(llm, llm_client=llm_client) + return LangChainLlmResources(llm, client_policy=client_policy) diff --git a/tests/neuro_san/internals/run_context/langchain/llms/test_llm_factory.py b/tests/neuro_san/internals/run_context/langchain/llms/test_llm_factory.py index 3451e704c..ff6f87650 100644 --- a/tests/neuro_san/internals/run_context/langchain/llms/test_llm_factory.py +++ b/tests/neuro_san/internals/run_context/langchain/llms/test_llm_factory.py @@ -18,9 +18,10 @@ from langchain_core.language_models.base import BaseLanguageModel from langchain_openai.chat_models.base import ChatOpenAI -from neuro_san.internals.run_context.langchain.llms.langchain_llm_client import LangChainLlmClient +from neuro_san.internals.run_context.langchain.llms.client_policy import ClientPolicy from neuro_san.internals.run_context.langchain.llms.langchain_llm_factory import LangChainLlmFactory from neuro_san.internals.run_context.langchain.llms.langchain_llm_resources import LangChainLlmResources +from neuro_san.internals.run_context.langchain.llms.openai_client_policy import OpenAIClientPolicy class TestLlmFactory(LangChainLlmFactory): @@ -55,7 +56,7 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: Create a LangChainLlmResources instance from the fully-specified llm config. This method is provided for backwards compatibility. - Prefer create_llm_resources_with_client() instead, + Prefer create_llm_resources() instead, as this allows server infrastructure to better account for outstanding connections to LLM providers when connections drop. @@ -67,26 +68,22 @@ def create_base_chat_model(self, config: Dict[str, Any]) -> BaseLanguageModel: """ raise NotImplementedError - def create_llm_resources_with_client(self, config: Dict[str, Any], - llm_client: LangChainLlmClient = None) -> LangChainLlmResources: + def create_llm_resources(self, config: Dict[str, Any]) -> LangChainLlmResources: """ Create a LangChainLlmResources instance from the fully-specified llm config. :param config: The fully specified llm config which is a product of _create_full_llm_config() above. - :param llm_client: A LangChainLlmClient instance, which by default is None, - implying that create_base_chat_model() needs to create its own client. - Note, however that a None value can lead to connection leaks and requests - that continue to run after the request connection is dropped in a server - environment. :return: A LangChainLlmResources instance containing - a BaseLanguageModel (can be Chat or LLM) and all related resources - necessary for managing the model run-time lifecycle. + a BaseLanguageModel (can be Chat or LLM) and a ClientPolicy + object necessary for managing the model run-time lifecycle. Can raise a ValueError if the config's class or model_name value is unknown to this method. """ # Construct the LLM llm: BaseLanguageModel = None + client_policy: ClientPolicy = None + chat_class: str = config.get("class") if chat_class is not None: chat_class = chat_class.lower() @@ -97,13 +94,9 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], if chat_class == "test-openai": print("Creating test-openai") - # See if there is an async_client to be had from the llm_client passed in - async_client: Any = None - if llm_client is not None: - async_openai_client = llm_client.get_client() - if async_openai_client is not None: - # Necessary reach-in - async_client = async_openai_client.chat.completions + # Create the policy object that allows us to manage the model run-time lifecycle + client_policy = OpenAIClientPolicy() + async_client: Any = client_policy.create_client(config) # Now construct LLM chat model we will be using: llm = ChatOpenAI( @@ -165,6 +158,6 @@ def create_llm_resources_with_client(self, config: Dict[str, Any], else: raise ValueError(f"Class {chat_class} for model_name {model_name} is unrecognized.") - # Return the LlmResources with the llm_client that was passed in. + # Return the LlmResources with the client_policy that was created. # That might be None, and that's OK. - return LangChainLlmResources(llm, llm_client=llm_client) + return LangChainLlmResources(llm, client_policy=client_policy)