88from etl_entities .hwm_store import BaseHWMStore , register_hwm_store_class
99from horizon .client .auth import LoginPassword
1010from horizon .client .sync import HorizonClientSync , RetryConfig , TimeoutConfig
11+ from horizon .commons .exceptions import EntityAlreadyExistsError
1112from horizon .commons .schemas .v1 import (
1213 HWMCreateRequestV1 ,
1314 HWMPaginateQueryV1 ,
1415 HWMUpdateRequestV1 ,
16+ NamespaceCreateRequestV1 ,
1517 NamespacePaginateQueryV1 ,
18+ NamespaceResponseV1 ,
1619)
1720
1821try :
@@ -33,7 +36,7 @@ class HorizonHWMStore(BaseHWMStore):
3336
3437 .. warning::
3538
36- It is required to create namespace in Horizon BEFORE using this class .
39+ It is required to create a namespace BEFORE working with HWMs .
3740
3841 Parameters
3942 ----------
@@ -98,7 +101,7 @@ class HorizonHWMStore(BaseHWMStore):
98101 namespace="namespace",
99102 retry=RetryConfig(total=5),
100103 timeout=TimeoutConfig(request_timeout=10),
101- ):
104+ ).force_create_namespace() :
102105 with IncrementalStrategy():
103106 df = reader.run()
104107 writer.run(df)
@@ -213,6 +216,24 @@ def check(self) -> HorizonHWMStore:
213216 self ._get_namespace_id ()
214217 return self
215218
219+ def force_create_namespace (self ) -> HorizonHWMStore :
220+ """
221+ Create a namespace with name specified in HorizonHWMStore class.
222+
223+ Returns
224+ -------
225+ HorizonHWMStore
226+ Self
227+ """
228+ namespace = self ._get_namespace (self .namespace )
229+ if namespace is None :
230+ try :
231+ namespace = self .client .create_namespace (NamespaceCreateRequestV1 (name = self .namespace ))
232+ self ._namespace_id = namespace .id # noqa: WPS601
233+ except EntityAlreadyExistsError :
234+ ...
235+ return self
236+
216237 # LoginPassword, RetryConfig and TimeoutConfig can be inherited from Pydantic v2 BaseModel
217238 # which is detected by Pydantic v1 as arbitrary type. So we need to parse them manually.
218239 @validator ("auth" , pre = True )
@@ -233,6 +254,10 @@ def _check_timeout(cls, value: TimeoutConfig):
233254 return TimeoutConfig .parse_obj (value )
234255 return value
235256
257+ def _get_namespace (self , name : str ) -> NamespaceResponseV1 | None :
258+ namespaces = self .client .paginate_namespaces (query = NamespacePaginateQueryV1 (name = name )).items
259+ return namespaces [0 ] if namespaces else None
260+
236261 def _get_namespace_id (self ) -> int :
237262 """
238263 Fetch the ID of the namespace. Raises an exception if the namespace doesn't exist.
@@ -250,11 +275,14 @@ def _get_namespace_id(self) -> int:
250275 if self ._namespace_id is not None :
251276 return self ._namespace_id
252277
253- namespaces = self .client .paginate_namespaces (NamespacePaginateQueryV1 (name = self .namespace )).items
254- if not namespaces :
255- raise RuntimeError (f"Namespace { self .namespace !r} not found. Please create it before using." )
278+ namespace = self ._get_namespace (self .namespace )
279+ if namespace is None :
280+ raise RuntimeError (
281+ f"Namespace { self .namespace !r} not found. "
282+ "Please create it before using by calling .force_create_namespace() method." ,
283+ )
256284
257- self ._namespace_id = namespaces [ 0 ] .id # noqa: WPS601
285+ self ._namespace_id = namespace .id # noqa: WPS601
258286 return self ._namespace_id
259287
260288 def _get_hwm_id (self , namespace_id : int , hwm_name : str ) -> Optional [int ]:
0 commit comments