Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nomenclature/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
"constituents_not_native",
"Constituent region(s)\n{regions}\nin {file} not found in native region(s)",
),
"AggregationMappingConflict": (
"aggregation_mapping_conflict",
"{type} {duplicates} in aggregation-mapping in {file}",
),
}

PydanticCustomErrors = namedtuple("PydanticCustomErrors", pydantic_custom_error_config)
Expand Down
1 change: 1 addition & 0 deletions nomenclature/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
)
from nomenclature.processor.required_data import RequiredDataValidator # noqa
from nomenclature.processor.data_validator import DataValidator # noqa
from nomenclature.processor.generic import Aggregator # noqa
153 changes: 153 additions & 0 deletions nomenclature/processor/generic.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before I forget it, I'd suggest to rename to aggregator.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done!

Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import logging
from collections import Counter
from pathlib import Path

import yaml
from pyam import IamDataFrame
from pydantic import BaseModel, field_validator, ValidationInfo
from pydantic.types import FilePath
from pydantic_core import PydanticCustomError

from nomenclature.definition import DataStructureDefinition
from nomenclature.error import custom_pydantic_errors
from nomenclature.processor import Processor
from nomenclature.processor.utils import get_relative_path

logger = logging.getLogger(__name__)

here = Path(__file__).parent.absolute()


class AggregationItem(BaseModel):
"""Item used for aggregation-mapping"""

name: str
components: list[str]


class Aggregator(Processor):
"""Aggregation or renaming of an IamDataFrame on a `dimension`"""
file: FilePath
dimension: str
mapping: list[AggregationItem]

def apply(self, df: IamDataFrame) -> IamDataFrame:
"""Apply region processing

Parameters
----------
df : IamDataFrame
Input data that the region processing is applied to

Returns
-------
IamDataFrame:
Processed data

"""
return df.rename(
mapping={self.dimension: self.rename_mapping},
check_duplicates=False,
)

@property
def rename_mapping(self):
rename_dict = {}

for item in self.mapping:
for c in item.components:
rename_dict[c] = item.name

return rename_dict

@field_validator("mapping")
def validate_target_names(cls, v, info: ValidationInfo):
_validate_items([item.name for item in v], info, "Duplicate target")
return v

@field_validator("mapping")
def validate_components(cls, v, info: ValidationInfo):
# components have to be unique for creating rename-mapping (component -> target)
all_components = list()
for item in v:
all_components.extend(item.components)
_validate_items(all_components, info, "Duplicate component")
return v

@field_validator("mapping")
def validate_target_vs_components(cls, v, info: ValidationInfo):
# guard against having identical target and component
_codes = list()
for item in v:
_codes.append(item.name)
_codes.extend(item.components)
_validate_items(_codes, info, "Non-unique target and component")
return v

@property
def codes(self):
_codes = list()
for item in self.mapping:
_codes.append(item.name)
_codes.extend(item.components)
return _codes

def validate_with_definition(self, dsd: DataStructureDefinition) -> None:
error = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you raise the error below directly, you don't need this

Suggested change
error = None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

# check for codes that are not defined in the codelists
codelist = getattr(dsd, self.dimension, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd let the error occur right here if the dimension is not found:

Suggested change
codelist = getattr(dsd, self.dimension, None)
codelist = getattr(dsd, self.dimension)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the current implementation better because this way, I can show the name of the offending file directly in the error message, making it easier to debug in repositories that have many files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, but in this case the error does not really have anything to do with the aggregator processor but rather with the missing dimension in the DataStructueDefinition.

# no validation if codelist is not defined or filter-item is None
if codelist is None:
error = f"Dimension '{self.dimension}' not found in DataStructureDefinition"
elif invalid := codelist.validate_items(self.codes):
error = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
error = (
raise ValueError(f"The following {self.dimension}s are not defined in the "
"DataStructureDefinition:\n - " + "\n - ".join(invalid) + "\nin " + str(self.file) + "")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

f"The following {self.dimension}s are not defined in the "
"DataStructureDefinition:\n - " + "\n - ".join(invalid)
)
if error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the error is raised directly, you don't need this anymore

Suggested change
if error:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

raise ValueError(error + "\nin " + str(self.file) + "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError(error + "\nin " + str(self.file) + "")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.


@classmethod
def from_file(cls, file: Path | str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you rename the mapping attribute to aggregate and use dict[str, list[str]] in favor of AggregationItem, I think you should be able to remove this function almost entirely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried that, but this would require ditching the AggregationItem class, I guess?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly

"""Initialize an AggregatorMapping from a file.

.. code:: yaml

dimension: <some_dimension>
aggregate:
- Target Value:
- Source Value A
- Source Value B

"""
file = Path(file) if isinstance(file, str) else file
try:
with open(file, "r", encoding="utf-8") as f:
mapping_input = yaml.safe_load(f)

mapping_list: list[dict[str, list]] = []
for item in mapping_input["aggregate"]:
# TODO explicit check that only one key-value pair exists per item
mapping_list.append(
dict(name=list(item)[0], components=list(item.values())[0])
)
except Exception as error:
raise ValueError(f"{error} in {get_relative_path(file)}") from error
return cls(
dimension=mapping_input["dimension"],
mapping=mapping_list, # type: ignore
file=get_relative_path(file),
)


def _validate_items(items, info, _type):
duplicates = [item for item, count in Counter(items).items() if count > 1]
if duplicates:
raise PydanticCustomError(
*custom_pydantic_errors.AggregationMappingConflict,
{
"type": _type,
"duplicates": duplicates,
"file": info.data["file"],
},
)
2 changes: 1 addition & 1 deletion nomenclature/processor/iamc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def validate_with_definition(self, dsd: DataStructureDefinition) -> None:
if invalid := codelist.validate_items(getattr(self, dimension)):
error_msg += (
f"The following {dimension}s are not defined in the "
f"DataStructureDefinition:\n {', '.join(invalid)}\n"
"DataStructureDefinition:\n " + ", ".join(invalid) + "\n"
)

if error_msg:
Expand Down
8 changes: 8 additions & 0 deletions tests/data/processor/generic/aggregation_mapping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Primary Energy|Coal
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Primary Energy:
- Final Energy|Electricity
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Final Energy|Foo
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: foo
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Primary Energy
18 changes: 18 additions & 0 deletions tests/data/processor/generic/definition/variable/variables.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- Primary Energy:
description: Total primary energy consumption
unit: EJ/yr
- Primary Energy|Coal:
description: Primary energy consumption of coal
unit: EJ/yr
- Primary Energy|Biomass:
description: Primary energy consumption of biomass
unit: EJ/yr
- Final Energy:
description: Total final energy consumption
unit: EJ/yr
- Final Energy|Electricity:
description: Electricity demand
unit: EJ/yr
- Final Energy|Heat:
description: Heat demand
unit: EJ/yr
116 changes: 116 additions & 0 deletions tests/test_generic_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from pathlib import Path

import pandas as pd
import pyam
import pydantic
import pytest


from pyam import IamDataFrame
from conftest import TEST_DATA_DIR
from nomenclature import DataStructureDefinition
from nomenclature.processor import Aggregator

TEST_FOLDER_GENERIC_PROCESSOR = TEST_DATA_DIR / "processor" / "generic"


def test_aggregator_from_file():
mapping_file = "aggregation_mapping.yaml"
# Test that the file is read and represented correctly
obs = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / mapping_file)
exp = {
"file": (TEST_FOLDER_GENERIC_PROCESSOR / mapping_file).relative_to(Path.cwd()),
"dimension": "variable",
"mapping": [
{
"name": "Primary Energy",
"components": ["Primary Energy|Coal", "Primary Energy|Biomass"],
},
{
"name": "Final Energy",
"components": ["Final Energy|Electricity", "Final Energy|Heat"],
},
],
}
assert obs.model_dump() == exp


@pytest.mark.parametrize(
"file, error_msg_pattern",
[
(
"aggregation_mapping_duplicate_target.yaml",
"Duplicate target \['Primary Energy'\] in aggregation-mapping in ",
),
(
"aggregation_mapping_duplicate_component.yaml",
"Duplicate component \['Primary Energy\|Coal'\] in aggregation-mapping in ",
),
(
"aggregation_mapping_target_component_conflict.yaml",
"Non-unique target and component \['Primary Energy'\] in aggregation-",
),
],
)
def test_aggregator_raises(file, error_msg_pattern):
# This is to test different failure conditions
with pytest.raises(pydantic.ValidationError, match=f"{error_msg_pattern}.*{file}"):
Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file)


def test_aggregator_validate_with_definition():
# Validate the Aggregator against the codelist in a DataStructureDefintion
aggregator = Aggregator.from_file(
TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml"
)
definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition")
aggregator.validate_with_definition(definition)


def test_aggregator_validate_invalid_code():
file = "aggregation_mapping_invalid_code.yaml"
aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file)
definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition")
match = f"The following variables are not .*\n .*- Final Energy\|Foo\n.*{file}"
with pytest.raises(ValueError, match=match):
aggregator.validate_with_definition(definition)


def test_aggregator_validate_invalid_dimension():
file = "aggregation_mapping_invalid_dimension.yaml"
aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file)
definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition")
match = f"Dimension 'foo' not found in DataStructureDefinition\nin.*{file}"
with pytest.raises(ValueError, match=match):
aggregator.validate_with_definition(definition)


def test_aggregator_apply():
aggregator = Aggregator.from_file(
TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml"
)
iamc_args = dict(model="model_a", scenario="scenario_a", region="World")

df = IamDataFrame(
pd.DataFrame(
[
["Primary Energy|Coal", "EJ/yr", 0.5, 3],
["Primary Energy|Biomass", "EJ/yr", 2, 7],
["Final Energy|Electricity", "EJ/yr", 2.5, 3],
["Final Energy|Heat", "EJ/yr", 3, 6],
],
columns=["variable", "unit", 2005, 2010],
),
**iamc_args,
)
exp = IamDataFrame(
pd.DataFrame(
[
["Primary Energy", "EJ/yr", 2.5, 10],
["Final Energy", "EJ/yr", 5.5, 9],
],
columns=["variable", "unit", 2005, 2010],
),
**iamc_args,
)
pyam.assert_iamframe_equal(aggregator.apply(df), exp)