-
Notifications
You must be signed in to change notification settings - Fork 14
Add generic Aggregator processor
#462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
88c6895
557a33e
2e9a6c5
89bfbd2
8c2374e
59cf536
bf9b58f
1c59923
cbaccb8
d63b775
5db5bf1
f9685cf
8470cc0
a189ffe
3e832e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,153 @@ | ||||||||||
| import logging | ||||||||||
| from collections import Counter | ||||||||||
| from pathlib import Path | ||||||||||
|
|
||||||||||
| import yaml | ||||||||||
| from pyam import IamDataFrame | ||||||||||
| from pydantic import BaseModel, field_validator, ValidationInfo | ||||||||||
| from pydantic.types import FilePath | ||||||||||
| from pydantic_core import PydanticCustomError | ||||||||||
|
|
||||||||||
| from nomenclature.definition import DataStructureDefinition | ||||||||||
| from nomenclature.error import custom_pydantic_errors | ||||||||||
| from nomenclature.processor import Processor | ||||||||||
| from nomenclature.processor.utils import get_relative_path | ||||||||||
|
|
||||||||||
| logger = logging.getLogger(__name__) | ||||||||||
|
|
||||||||||
| here = Path(__file__).parent.absolute() | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class AggregationItem(BaseModel): | ||||||||||
phackstock marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| """Item used for aggregation-mapping""" | ||||||||||
|
|
||||||||||
| name: str | ||||||||||
| components: list[str] | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class Aggregator(Processor): | ||||||||||
| """Aggregation or renaming of an IamDataFrame on a `dimension`""" | ||||||||||
| file: FilePath | ||||||||||
| dimension: str | ||||||||||
| mapping: list[AggregationItem] | ||||||||||
phackstock marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||
|
|
||||||||||
| def apply(self, df: IamDataFrame) -> IamDataFrame: | ||||||||||
| """Apply region processing | ||||||||||
|
|
||||||||||
| Parameters | ||||||||||
| ---------- | ||||||||||
| df : IamDataFrame | ||||||||||
| Input data that the region processing is applied to | ||||||||||
|
|
||||||||||
| Returns | ||||||||||
| ------- | ||||||||||
| IamDataFrame: | ||||||||||
| Processed data | ||||||||||
|
|
||||||||||
| """ | ||||||||||
| return df.rename( | ||||||||||
| mapping={self.dimension: self.rename_mapping}, | ||||||||||
| check_duplicates=False, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def rename_mapping(self): | ||||||||||
| rename_dict = {} | ||||||||||
|
|
||||||||||
| for item in self.mapping: | ||||||||||
| for c in item.components: | ||||||||||
| rename_dict[c] = item.name | ||||||||||
|
|
||||||||||
| return rename_dict | ||||||||||
|
|
||||||||||
| @field_validator("mapping") | ||||||||||
| def validate_target_names(cls, v, info: ValidationInfo): | ||||||||||
| _validate_items([item.name for item in v], info, "Duplicate target") | ||||||||||
| return v | ||||||||||
|
|
||||||||||
| @field_validator("mapping") | ||||||||||
| def validate_components(cls, v, info: ValidationInfo): | ||||||||||
| # components have to be unique for creating rename-mapping (component -> target) | ||||||||||
| all_components = list() | ||||||||||
| for item in v: | ||||||||||
| all_components.extend(item.components) | ||||||||||
| _validate_items(all_components, info, "Duplicate component") | ||||||||||
| return v | ||||||||||
|
|
||||||||||
| @field_validator("mapping") | ||||||||||
| def validate_target_vs_components(cls, v, info: ValidationInfo): | ||||||||||
| # guard against having identical target and component | ||||||||||
| _codes = list() | ||||||||||
| for item in v: | ||||||||||
| _codes.append(item.name) | ||||||||||
| _codes.extend(item.components) | ||||||||||
| _validate_items(_codes, info, "Non-unique target and component") | ||||||||||
| return v | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def codes(self): | ||||||||||
| _codes = list() | ||||||||||
| for item in self.mapping: | ||||||||||
| _codes.append(item.name) | ||||||||||
| _codes.extend(item.components) | ||||||||||
| return _codes | ||||||||||
|
|
||||||||||
| def validate_with_definition(self, dsd: DataStructureDefinition) -> None: | ||||||||||
| error = None | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you raise the error below directly, you don't need this
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. |
||||||||||
| # check for codes that are not defined in the codelists | ||||||||||
| codelist = getattr(dsd, self.dimension, None) | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd let the error occur right here if the dimension is not found:
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the current implementation better because this way, I can show the name of the offending file directly in the error message, making it easier to debug in repositories that have many files.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point, but in this case the error does not really have anything to do with the aggregator processor but rather with the missing dimension in the DataStructueDefinition. |
||||||||||
| # no validation if codelist is not defined or filter-item is None | ||||||||||
| if codelist is None: | ||||||||||
| error = f"Dimension '{self.dimension}' not found in DataStructureDefinition" | ||||||||||
| elif invalid := codelist.validate_items(self.codes): | ||||||||||
| error = ( | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. |
||||||||||
| f"The following {self.dimension}s are not defined in the " | ||||||||||
| "DataStructureDefinition:\n - " + "\n - ".join(invalid) | ||||||||||
| ) | ||||||||||
| if error: | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the error is raised directly, you don't need this anymore
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. |
||||||||||
| raise ValueError(error + "\nin " + str(self.file) + "") | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. |
||||||||||
|
|
||||||||||
| @classmethod | ||||||||||
| def from_file(cls, file: Path | str): | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you rename the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried that, but this would require ditching the AggregationItem class, I guess?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, exactly |
||||||||||
| """Initialize an AggregatorMapping from a file. | ||||||||||
|
|
||||||||||
| .. code:: yaml | ||||||||||
|
|
||||||||||
| dimension: <some_dimension> | ||||||||||
| aggregate: | ||||||||||
| - Target Value: | ||||||||||
| - Source Value A | ||||||||||
| - Source Value B | ||||||||||
|
|
||||||||||
| """ | ||||||||||
| file = Path(file) if isinstance(file, str) else file | ||||||||||
| try: | ||||||||||
| with open(file, "r", encoding="utf-8") as f: | ||||||||||
| mapping_input = yaml.safe_load(f) | ||||||||||
|
|
||||||||||
| mapping_list: list[dict[str, list]] = [] | ||||||||||
| for item in mapping_input["aggregate"]: | ||||||||||
| # TODO explicit check that only one key-value pair exists per item | ||||||||||
| mapping_list.append( | ||||||||||
| dict(name=list(item)[0], components=list(item.values())[0]) | ||||||||||
| ) | ||||||||||
| except Exception as error: | ||||||||||
| raise ValueError(f"{error} in {get_relative_path(file)}") from error | ||||||||||
| return cls( | ||||||||||
| dimension=mapping_input["dimension"], | ||||||||||
| mapping=mapping_list, # type: ignore | ||||||||||
| file=get_relative_path(file), | ||||||||||
| ) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def _validate_items(items, info, _type): | ||||||||||
| duplicates = [item for item, count in Counter(items).items() if count > 1] | ||||||||||
| if duplicates: | ||||||||||
| raise PydanticCustomError( | ||||||||||
phackstock marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
| *custom_pydantic_errors.AggregationMappingConflict, | ||||||||||
| { | ||||||||||
| "type": _type, | ||||||||||
| "duplicates": duplicates, | ||||||||||
| "file": info.data["file"], | ||||||||||
| }, | ||||||||||
| ) | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| dimension: variable | ||
| aggregate: | ||
| - Primary Energy: | ||
| - Primary Energy|Coal | ||
| - Primary Energy|Biomass | ||
| - Final Energy: | ||
| - Final Energy|Electricity | ||
| - Final Energy|Heat |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| dimension: variable | ||
| aggregate: | ||
| - Primary Energy: | ||
| - Primary Energy|Coal | ||
| - Primary Energy|Biomass | ||
| - Final Energy: | ||
| - Primary Energy|Coal | ||
| - Final Energy|Heat |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| dimension: variable | ||
| aggregate: | ||
| - Primary Energy: | ||
| - Primary Energy|Coal | ||
| - Primary Energy|Biomass | ||
| - Primary Energy: | ||
| - Final Energy|Electricity | ||
| - Final Energy|Heat |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| dimension: variable | ||
| aggregate: | ||
| - Primary Energy: | ||
| - Primary Energy|Coal | ||
| - Primary Energy|Biomass | ||
| - Final Energy: | ||
| - Final Energy|Electricity | ||
| - Final Energy|Foo |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| dimension: foo | ||
| aggregate: | ||
| - Primary Energy: | ||
| - Primary Energy|Coal | ||
| - Primary Energy|Biomass | ||
| - Final Energy: | ||
| - Final Energy|Electricity | ||
| - Final Energy|Heat |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| dimension: variable | ||
| aggregate: | ||
| - Primary Energy: | ||
| - Primary Energy|Coal | ||
| - Primary Energy|Biomass | ||
| - Final Energy: | ||
| - Final Energy|Electricity | ||
| - Primary Energy |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| - Primary Energy: | ||
| description: Total primary energy consumption | ||
| unit: EJ/yr | ||
| - Primary Energy|Coal: | ||
| description: Primary energy consumption of coal | ||
| unit: EJ/yr | ||
| - Primary Energy|Biomass: | ||
| description: Primary energy consumption of biomass | ||
| unit: EJ/yr | ||
| - Final Energy: | ||
| description: Total final energy consumption | ||
| unit: EJ/yr | ||
| - Final Energy|Electricity: | ||
| description: Electricity demand | ||
| unit: EJ/yr | ||
| - Final Energy|Heat: | ||
| description: Heat demand | ||
| unit: EJ/yr |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| from pathlib import Path | ||
|
|
||
| import pandas as pd | ||
| import pyam | ||
| import pydantic | ||
| import pytest | ||
|
|
||
|
|
||
| from pyam import IamDataFrame | ||
| from conftest import TEST_DATA_DIR | ||
| from nomenclature import DataStructureDefinition | ||
| from nomenclature.processor import Aggregator | ||
|
|
||
| TEST_FOLDER_GENERIC_PROCESSOR = TEST_DATA_DIR / "processor" / "generic" | ||
|
|
||
|
|
||
| def test_aggregator_from_file(): | ||
| mapping_file = "aggregation_mapping.yaml" | ||
| # Test that the file is read and represented correctly | ||
| obs = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / mapping_file) | ||
| exp = { | ||
| "file": (TEST_FOLDER_GENERIC_PROCESSOR / mapping_file).relative_to(Path.cwd()), | ||
| "dimension": "variable", | ||
| "mapping": [ | ||
| { | ||
| "name": "Primary Energy", | ||
| "components": ["Primary Energy|Coal", "Primary Energy|Biomass"], | ||
| }, | ||
| { | ||
| "name": "Final Energy", | ||
| "components": ["Final Energy|Electricity", "Final Energy|Heat"], | ||
| }, | ||
| ], | ||
| } | ||
| assert obs.model_dump() == exp | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "file, error_msg_pattern", | ||
| [ | ||
| ( | ||
| "aggregation_mapping_duplicate_target.yaml", | ||
| "Duplicate target \['Primary Energy'\] in aggregation-mapping in ", | ||
| ), | ||
| ( | ||
| "aggregation_mapping_duplicate_component.yaml", | ||
| "Duplicate component \['Primary Energy\|Coal'\] in aggregation-mapping in ", | ||
| ), | ||
| ( | ||
| "aggregation_mapping_target_component_conflict.yaml", | ||
| "Non-unique target and component \['Primary Energy'\] in aggregation-", | ||
| ), | ||
| ], | ||
| ) | ||
| def test_aggregator_raises(file, error_msg_pattern): | ||
| # This is to test different failure conditions | ||
| with pytest.raises(pydantic.ValidationError, match=f"{error_msg_pattern}.*{file}"): | ||
| Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file) | ||
|
|
||
|
|
||
| def test_aggregator_validate_with_definition(): | ||
| # Validate the Aggregator against the codelist in a DataStructureDefintion | ||
| aggregator = Aggregator.from_file( | ||
| TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml" | ||
| ) | ||
| definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition") | ||
| aggregator.validate_with_definition(definition) | ||
|
|
||
|
|
||
| def test_aggregator_validate_invalid_code(): | ||
| file = "aggregation_mapping_invalid_code.yaml" | ||
| aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file) | ||
| definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition") | ||
| match = f"The following variables are not .*\n .*- Final Energy\|Foo\n.*{file}" | ||
| with pytest.raises(ValueError, match=match): | ||
| aggregator.validate_with_definition(definition) | ||
|
|
||
|
|
||
| def test_aggregator_validate_invalid_dimension(): | ||
| file = "aggregation_mapping_invalid_dimension.yaml" | ||
| aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file) | ||
| definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition") | ||
| match = f"Dimension 'foo' not found in DataStructureDefinition\nin.*{file}" | ||
| with pytest.raises(ValueError, match=match): | ||
| aggregator.validate_with_definition(definition) | ||
|
|
||
|
|
||
| def test_aggregator_apply(): | ||
| aggregator = Aggregator.from_file( | ||
| TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml" | ||
| ) | ||
| iamc_args = dict(model="model_a", scenario="scenario_a", region="World") | ||
|
|
||
| df = IamDataFrame( | ||
| pd.DataFrame( | ||
| [ | ||
| ["Primary Energy|Coal", "EJ/yr", 0.5, 3], | ||
| ["Primary Energy|Biomass", "EJ/yr", 2, 7], | ||
| ["Final Energy|Electricity", "EJ/yr", 2.5, 3], | ||
| ["Final Energy|Heat", "EJ/yr", 3, 6], | ||
| ], | ||
| columns=["variable", "unit", 2005, 2010], | ||
| ), | ||
| **iamc_args, | ||
| ) | ||
| exp = IamDataFrame( | ||
| pd.DataFrame( | ||
| [ | ||
| ["Primary Energy", "EJ/yr", 2.5, 10], | ||
| ["Final Energy", "EJ/yr", 5.5, 9], | ||
| ], | ||
| columns=["variable", "unit", 2005, 2010], | ||
| ), | ||
| **iamc_args, | ||
| ) | ||
| pyam.assert_iamframe_equal(aggregator.apply(df), exp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I forget it, I'd suggest to rename to
aggregator.pyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done!