Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 75 additions & 12 deletions TM1py/Services/ElementService.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,74 @@ def delete_edges(
use_ti: bool = False,
use_blob: bool = False,
remove_blob: bool = True,
skip_invalid_edges: bool = True,
**kwargs,
):
"""
Remove edges in TM1.

:param dimension_name: The name of the dimension.
:param hierarchy_name: The name of the hierarchy.
:param edges: A list of tuples representing the edges to remove, where each tuple contains a parent and a child.
:param use_ti: A boolean indicating whether to use a TI process to delete edges (default: False).
:param use_blob: A boolean indicating whether to use a blob file to delete edges (default: False).
:param remove_blob: A boolean indicating whether to remove the parent-child file after use (default: True).
:param skip_invalid_edges: A boolean indicating whether to skip invalid edges (default: True).
"""
if use_ti:
return self.delete_edges_use_ti(dimension_name, hierarchy_name, edges, **kwargs)
return self.delete_edges_use_ti(dimension_name, hierarchy_name, edges, skip_invalid_edges, **kwargs)

if use_blob:
return self.delete_edges_use_blob(dimension_name, hierarchy_name, edges, remove_blob, **kwargs)
return self.delete_edges_use_blob(
dimension_name, hierarchy_name, edges, remove_blob, skip_invalid_edges, **kwargs
)

h_service = self._get_hierarchy_service()
h = h_service.get(dimension_name, hierarchy_name, **kwargs)
for edge in edges:
if edge not in h.edges and not skip_invalid_edges:
raise TM1pyException(f"Edge {edge} does not exist in hierarchy [{dimension_name}].[{hierarchy_name}]")
h.remove_edge(parent=edge[0], component=edge[1])
h_service.update(h, **kwargs)

def delete_edges_use_ti(self, dimension_name: str, hierarchy_name: str, edges: List[str] = None, **kwargs):
def delete_edges_use_ti(
self,
dimension_name: str,
hierarchy_name: str,
edges: List[str] = None,
skip_invalid_edges: bool = True,
**kwargs,
):
"""
Remove edges in TM1 via an unbound TI process.
:param dimension_name: The name of the dimension.
:param hierarchy_name: The name of the hierarchy.
:param edges: A list of tuples representing the edges to remove, where each tuple contains a parent and a child.
:param skip_invalid_edges: A boolean indicating whether to skip invalid edges (default: True).
"""
if not edges:
return

def escape_single_quote(text):
return text.replace("'", "''")

statements = [
f"HierarchyElementComponentDelete('{dimension_name}', '{hierarchy_name}', "
f"'{escape_single_quote(parent)}', '{escape_single_quote(child)}');"
for (parent, child) in edges
]
statements = []
for parent, child in edges:
parent = escape_single_quote(parent)
child = escape_single_quote(child)
if skip_invalid_edges:
statements.extend(
[
f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}','{parent}','{child}')=1);",
f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}','{parent}','{child}');",
f"ENDIF;",
]
)

else:
statements.append(
f"HierarchyElementComponentDelete('{dimension_name}', '{hierarchy_name}', '{parent}', '{child}');"
)

unbound_process_name = self.suggest_unique_object_name()

Expand All @@ -174,7 +216,13 @@ def escape_single_quote(text):
@require_data_admin
@require_ops_admin
def delete_edges_use_blob(
self, dimension_name: str, hierarchy_name: str, edges: List[str] = None, remove_blob: bool = True, **kwargs
self,
dimension_name: str,
hierarchy_name: str,
edges: List[str] = None,
remove_blob: bool = True,
skip_invalid_edges: bool = True,
**kwargs,
):
"""
Remove edges in TM1 via an unbound TI process having an uploaded CSV as the data source.
Expand All @@ -183,6 +231,7 @@ def delete_edges_use_blob(
:param hierarchy_name: The name of the hierarchy.
:param edges: A list of tuples representing the edges to remove, where each tuple contains a parent and a child.
:param remove_blob: A boolean indicating whether to remove the parent-child file after use (default: True).
:param skip_invalid_edges: A boolean indicating whether to skip invalid edges (default: True).
:param kwargs: Additional arguments for the process execution.
:return: None
"""
Expand All @@ -209,6 +258,7 @@ def delete_edges_use_blob(
hierarchy_name=hierarchy_name,
process_name=unique_name,
blob_filename=file_name,
skip_invalid_edges=skip_invalid_edges,
)

success, status, log_file = process_service.execute_process_with_return(process=process, **kwargs)
Expand All @@ -223,7 +273,12 @@ def delete_edges_use_blob(
file_service.delete(file_name=file_name)

def _build_unwind_hierarchy_edges_from_blob_process(
self, dimension_name: str, hierarchy_name: str, process_name: str, blob_filename: str
self,
dimension_name: str,
hierarchy_name: str,
process_name: str,
blob_filename: str,
skip_invalid_edges: bool = True,
) -> Process:

# v11 automatically adds blb file extensions to documents created via the contents api
Expand Down Expand Up @@ -251,7 +306,15 @@ def _build_unwind_hierarchy_edges_from_blob_process(
hierarchyupdate_process.add_variable(name=child_variable, variable_type="String")

# Write the statement for delete component in hierarchy
delete_component = f"\rHierarchyElementComponentDelete('{dimension_name}', '{hierarchy_name}', {parent_variable}, {child_variable});"
if skip_invalid_edges:
delete_component = (
f"\r"
f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable})=1);"
f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});"
f"ENDIF;"
)
else:
delete_component = f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}',{parent_variable},{child_variable});"

# Define Metadata section
metadata_statement = delete_component
Expand Down Expand Up @@ -1091,7 +1154,7 @@ def get_edges(sub_trees):
component_name = sub_tree["Component"]["Name"]
else:
component_name = sub_tree["ComponentName"]
edges[sub_tree["ParentName"],component_name] = sub_tree["Weight"]
edges[sub_tree["ParentName"], component_name] = sub_tree["Weight"]

if "Edges" not in sub_tree["Component"]:
continue
Expand Down
98 changes: 92 additions & 6 deletions Tests/ElementService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

from mdxpy import MdxBuilder

from TM1py.Exceptions import TM1pyException, TM1pyRestException, TM1pyWritePartialFailureException
from TM1py.Objects import Dimension, Element, ElementAttribute, Hierarchy
from TM1py.Services import TM1Service
from Tests.Utils import (
generate_test_uuid,
skip_if_no_pandas,
skip_if_version_lower_than,
)
from TM1py.Exceptions import TM1pyException, TM1pyRestException
from TM1py.Objects import Dimension, Element, ElementAttribute, Hierarchy
from TM1py.Services import TM1Service


class TestElementService(unittest.TestCase):
Expand Down Expand Up @@ -81,7 +81,9 @@ def setUp(self):
self.tm1.cubes.cells.write_value("1990/91", self.attribute_cube_name, ("1991", "Financial Year"))
self.tm1.cubes.cells.write_value("1991/92", self.attribute_cube_name, ("1992", "Financial Year"))
self.tm1.cubes.cells.write_value("All Years", self.attribute_cube_name, ("Total Years", "Financial Year"))
self.tm1.cubes.cells.write_value("All Consolidations", self.attribute_cube_name, ("All Consolidations", "Financial Year"))
self.tm1.cubes.cells.write_value(
"All Consolidations", self.attribute_cube_name, ("All Consolidations", "Financial Year")
)

self.create_or_update_dimension_with_hierarchies()

Expand Down Expand Up @@ -713,8 +715,6 @@ def run_test_get_elements_dataframe_elements_via_mdx(self, use_blob: bool):

self.assertTrue(df.equals(reference_df))



def test_get_element_names(self):
element_names = self.tm1.dimensions.hierarchies.elements.get_element_names(
self.dimension_name, self.hierarchy_name
Expand Down Expand Up @@ -1279,6 +1279,92 @@ def test_delete_edges(self):
self.assertNotIn(("Total Years", "1989"), edges)
self.assertNotIn(("Total Years", "1990"), edges)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_ti_skip_invalid_edges_true(self):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Every Year", "1989"), ("Total Years", "1989"), ("Total Years", "1990")],
skip_invalid_edges=True,
)

edges = self.tm1.elements.get_edges(self.dimension_name, self.dimension_name)
self.assertNotIn(("Total Years", "1989"), edges)
self.assertNotIn(("Total Years", "1990"), edges)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_skip_invalid_edges_false(self):
with self.assertRaises(TM1pyException):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Every Year", "1989")],
skip_invalid_edges=False,
)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_blob(self):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Total Years", "1989"), ("Total Years", "1990")],
use_blob=True,
)

edges = self.tm1.elements.get_edges(self.dimension_name, self.dimension_name)
self.assertNotIn(("Total Years", "1989"), edges)
self.assertNotIn(("Total Years", "1990"), edges)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_blob_skip_invalid_edges_true(self):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Every Year", "1989"), ("Total Years", "1989"), ("Total Years", "1990")],
use_blob=True,
skip_invalid_edges=True,
)

edges = self.tm1.elements.get_edges(self.dimension_name, self.dimension_name)
self.assertNotIn(("Total Years", "1989"), edges)
self.assertNotIn(("Total Years", "1990"), edges)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_blob_skip_invalid_edges_false(self):
with self.assertRaises(TM1pyWritePartialFailureException):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Every Year", "1989")],
use_blob=True,
skip_invalid_edges=False,
)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_ti_skip_invalid_edges_true(self):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Every Year", "1989"), ("Total Years", "1989"), ("Total Years", "1990")],
use_ti=True,
skip_invalid_edges=True,
)

edges = self.tm1.elements.get_edges(self.dimension_name, self.dimension_name)
self.assertNotIn(("Total Years", "1989"), edges)
self.assertNotIn(("Total Years", "1990"), edges)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_ti_skip_invalid_edges_false(self):
with self.assertRaises(TM1pyException):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
edges=[("Every Year", "1989")],
use_ti=True,
skip_invalid_edges=False,
)

def test_remove_edge_parent_not_existing(self):
with self.assertRaises(TM1pyRestException):
self.tm1.elements.remove_edge(
Expand Down