diff --git a/nomenclature/error.py b/nomenclature/error.py index 958f3c34..c04d5572 100644 --- a/nomenclature/error.py +++ b/nomenclature/error.py @@ -44,6 +44,10 @@ "constituents_not_native", "Constituent region(s)\n{regions}\nin {file} not found in native region(s)", ), + "AggregationMappingConflict": ( + "aggregation_mapping_conflict", + "{type} {duplicates} in aggregation-mapping in {file}", + ), } PydanticCustomErrors = namedtuple("PydanticCustomErrors", pydantic_custom_error_config) diff --git a/nomenclature/processor/__init__.py b/nomenclature/processor/__init__.py index 4591f186..64962735 100644 --- a/nomenclature/processor/__init__.py +++ b/nomenclature/processor/__init__.py @@ -5,3 +5,4 @@ ) from nomenclature.processor.required_data import RequiredDataValidator # noqa from nomenclature.processor.data_validator import DataValidator # noqa +from nomenclature.processor.aggregator import Aggregator # noqa diff --git a/nomenclature/processor/aggregator.py b/nomenclature/processor/aggregator.py new file mode 100644 index 00000000..8e7946c4 --- /dev/null +++ b/nomenclature/processor/aggregator.py @@ -0,0 +1,153 @@ +import logging +from collections import Counter +from pathlib import Path + +import yaml +from pyam import IamDataFrame +from pydantic import BaseModel, field_validator, ValidationInfo +from pydantic.types import FilePath +from pydantic_core import PydanticCustomError + +from nomenclature.definition import DataStructureDefinition +from nomenclature.error import custom_pydantic_errors +from nomenclature.processor import Processor +from nomenclature.processor.utils import get_relative_path + +logger = logging.getLogger(__name__) + +here = Path(__file__).parent.absolute() + + +class AggregationItem(BaseModel): + """Item used for aggregation-mapping""" + + name: str + components: list[str] + + +class Aggregator(Processor): + """Aggregation or renaming of an IamDataFrame on a `dimension`""" + file: FilePath + dimension: str + aggregate: list[AggregationItem] + + def apply(self, df: IamDataFrame) -> IamDataFrame: + """Apply region processing + + Parameters + ---------- + df : IamDataFrame + Input data that the region processing is applied to + + Returns + ------- + IamDataFrame: + Processed data + + """ + return df.rename( + mapping={self.dimension: self.rename_mapping}, + check_duplicates=False, + ) + + @property + def rename_mapping(self): + rename_dict = {} + + for item in self.aggregate: + for c in item.components: + rename_dict[c] = item.name + + return rename_dict + + @field_validator("aggregate") + def validate_target_names(cls, v, info: ValidationInfo): + _validate_items([item.name for item in v], info, "Duplicate target") + return v + + @field_validator("aggregate") + def validate_components(cls, v, info: ValidationInfo): + # components have to be unique for creating rename-mapping (component -> target) + all_components = list() + for item in v: + all_components.extend(item.components) + _validate_items(all_components, info, "Duplicate component") + return v + + @field_validator("aggregate") + def validate_target_vs_components(cls, v, info: ValidationInfo): + # guard against having identical target and component + _codes = list() + for item in v: + _codes.append(item.name) + _codes.extend(item.components) + _validate_items(_codes, info, "Non-unique target and component") + return v + + @property + def codes(self): + _codes = list() + for item in self.aggregate: + _codes.append(item.name) + _codes.extend(item.components) + return _codes + + def validate_with_definition(self, dsd: DataStructureDefinition) -> None: + error = None + # check for codes that are not defined in the codelists + codelist = getattr(dsd, self.dimension, None) + # no validation if codelist is not defined or filter-item is None + if codelist is None: + error = f"Dimension '{self.dimension}' not found in DataStructureDefinition" + elif invalid := codelist.validate_items(self.codes): + error = ( + f"The following {self.dimension}s are not defined in the " + "DataStructureDefinition:\n - " + "\n - ".join(invalid) + ) + if error: + raise ValueError(error + "\nin " + str(self.file) + "") + + @classmethod + def from_file(cls, file: Path | str): + """Initialize an AggregatorMapping from a file. + + .. code:: yaml + + dimension: + aggregate: + - Target Value: + - Source Value A + - Source Value B + + """ + file = Path(file) if isinstance(file, str) else file + try: + with open(file, "r", encoding="utf-8") as f: + mapping_input = yaml.safe_load(f) + + aggregate_list: list[dict[str, list]] = [] + for item in mapping_input["aggregate"]: + # TODO explicit check that only one key-value pair exists per item + aggregate_list.append( + dict(name=list(item)[0], components=list(item.values())[0]) + ) + except Exception as error: + raise ValueError(f"{error} in {get_relative_path(file)}") from error + return cls( + dimension=mapping_input["dimension"], + aggregate=aggregate_list, # type: ignore + file=get_relative_path(file), + ) + + +def _validate_items(items, info, _type): + duplicates = [item for item, count in Counter(items).items() if count > 1] + if duplicates: + raise PydanticCustomError( + *custom_pydantic_errors.AggregationMappingConflict, + { + "type": _type, + "duplicates": duplicates, + "file": info.data["file"], + }, + ) diff --git a/nomenclature/processor/iamc.py b/nomenclature/processor/iamc.py index 3cba8d98..03bbe61b 100644 --- a/nomenclature/processor/iamc.py +++ b/nomenclature/processor/iamc.py @@ -36,7 +36,7 @@ def validate_with_definition(self, dsd: DataStructureDefinition) -> None: if invalid := codelist.validate_items(getattr(self, dimension)): error_msg += ( f"The following {dimension}s are not defined in the " - f"DataStructureDefinition:\n {', '.join(invalid)}\n" + "DataStructureDefinition:\n " + ", ".join(invalid) + "\n" ) if error_msg: diff --git a/tests/data/processor/aggregator/aggregation_mapping.yaml b/tests/data/processor/aggregator/aggregation_mapping.yaml new file mode 100644 index 00000000..8c253e26 --- /dev/null +++ b/tests/data/processor/aggregator/aggregation_mapping.yaml @@ -0,0 +1,8 @@ +dimension: variable +aggregate: + - Primary Energy: + - Primary Energy|Coal + - Primary Energy|Biomass + - Final Energy: + - Final Energy|Electricity + - Final Energy|Heat diff --git a/tests/data/processor/aggregator/aggregation_mapping_duplicate_component.yaml b/tests/data/processor/aggregator/aggregation_mapping_duplicate_component.yaml new file mode 100644 index 00000000..91a5eb14 --- /dev/null +++ b/tests/data/processor/aggregator/aggregation_mapping_duplicate_component.yaml @@ -0,0 +1,8 @@ +dimension: variable +aggregate: + - Primary Energy: + - Primary Energy|Coal + - Primary Energy|Biomass + - Final Energy: + - Primary Energy|Coal + - Final Energy|Heat diff --git a/tests/data/processor/aggregator/aggregation_mapping_duplicate_target.yaml b/tests/data/processor/aggregator/aggregation_mapping_duplicate_target.yaml new file mode 100644 index 00000000..541dab64 --- /dev/null +++ b/tests/data/processor/aggregator/aggregation_mapping_duplicate_target.yaml @@ -0,0 +1,8 @@ +dimension: variable +aggregate: + - Primary Energy: + - Primary Energy|Coal + - Primary Energy|Biomass + - Primary Energy: + - Final Energy|Electricity + - Final Energy|Heat diff --git a/tests/data/processor/aggregator/aggregation_mapping_invalid_code.yaml b/tests/data/processor/aggregator/aggregation_mapping_invalid_code.yaml new file mode 100644 index 00000000..09d060fa --- /dev/null +++ b/tests/data/processor/aggregator/aggregation_mapping_invalid_code.yaml @@ -0,0 +1,8 @@ +dimension: variable +aggregate: + - Primary Energy: + - Primary Energy|Coal + - Primary Energy|Biomass + - Final Energy: + - Final Energy|Electricity + - Final Energy|Foo diff --git a/tests/data/processor/aggregator/aggregation_mapping_invalid_dimension.yaml b/tests/data/processor/aggregator/aggregation_mapping_invalid_dimension.yaml new file mode 100644 index 00000000..360531d0 --- /dev/null +++ b/tests/data/processor/aggregator/aggregation_mapping_invalid_dimension.yaml @@ -0,0 +1,8 @@ +dimension: foo +aggregate: + - Primary Energy: + - Primary Energy|Coal + - Primary Energy|Biomass + - Final Energy: + - Final Energy|Electricity + - Final Energy|Heat diff --git a/tests/data/processor/aggregator/aggregation_mapping_target_component_conflict.yaml b/tests/data/processor/aggregator/aggregation_mapping_target_component_conflict.yaml new file mode 100644 index 00000000..9c6374a4 --- /dev/null +++ b/tests/data/processor/aggregator/aggregation_mapping_target_component_conflict.yaml @@ -0,0 +1,8 @@ +dimension: variable +aggregate: + - Primary Energy: + - Primary Energy|Coal + - Primary Energy|Biomass + - Final Energy: + - Final Energy|Electricity + - Primary Energy diff --git a/tests/data/processor/aggregator/definition/variable/variables.yaml b/tests/data/processor/aggregator/definition/variable/variables.yaml new file mode 100644 index 00000000..feb1a6ff --- /dev/null +++ b/tests/data/processor/aggregator/definition/variable/variables.yaml @@ -0,0 +1,18 @@ +- Primary Energy: + description: Total primary energy consumption + unit: EJ/yr +- Primary Energy|Coal: + description: Primary energy consumption of coal + unit: EJ/yr +- Primary Energy|Biomass: + description: Primary energy consumption of biomass + unit: EJ/yr +- Final Energy: + description: Total final energy consumption + unit: EJ/yr +- Final Energy|Electricity: + description: Electricity demand + unit: EJ/yr +- Final Energy|Heat: + description: Heat demand + unit: EJ/yr diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py new file mode 100644 index 00000000..bb700e2a --- /dev/null +++ b/tests/test_aggregator.py @@ -0,0 +1,116 @@ +from pathlib import Path + +import pandas as pd +import pyam +import pydantic +import pytest + + +from pyam import IamDataFrame +from conftest import TEST_DATA_DIR +from nomenclature import DataStructureDefinition +from nomenclature.processor import Aggregator + +TEST_FOLDER_GENERIC_PROCESSOR = TEST_DATA_DIR / "processor" / "aggregator" + + +def test_aggregator_from_file(): + mapping_file = "aggregation_mapping.yaml" + # Test that the file is read and represented correctly + obs = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / mapping_file) + exp = { + "file": (TEST_FOLDER_GENERIC_PROCESSOR / mapping_file).relative_to(Path.cwd()), + "dimension": "variable", + "aggregate": [ + { + "name": "Primary Energy", + "components": ["Primary Energy|Coal", "Primary Energy|Biomass"], + }, + { + "name": "Final Energy", + "components": ["Final Energy|Electricity", "Final Energy|Heat"], + }, + ], + } + assert obs.model_dump() == exp + + +@pytest.mark.parametrize( + "file, error_msg_pattern", + [ + ( + "aggregation_mapping_duplicate_target.yaml", + "Duplicate target \['Primary Energy'\] in aggregation-mapping in ", + ), + ( + "aggregation_mapping_duplicate_component.yaml", + "Duplicate component \['Primary Energy\|Coal'\] in aggregation-mapping in ", + ), + ( + "aggregation_mapping_target_component_conflict.yaml", + "Non-unique target and component \['Primary Energy'\] in aggregation-", + ), + ], +) +def test_aggregator_raises(file, error_msg_pattern): + # This is to test different failure conditions + with pytest.raises(pydantic.ValidationError, match=f"{error_msg_pattern}.*{file}"): + Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file) + + +def test_aggregator_validate_with_definition(): + # Validate the Aggregator against the codelist in a DataStructureDefintion + aggregator = Aggregator.from_file( + TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml" + ) + definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition") + aggregator.validate_with_definition(definition) + + +def test_aggregator_validate_invalid_code(): + file = "aggregation_mapping_invalid_code.yaml" + aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file) + definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition") + match = f"The following variables are not .*\n .*- Final Energy\|Foo\n.*{file}" + with pytest.raises(ValueError, match=match): + aggregator.validate_with_definition(definition) + + +def test_aggregator_validate_invalid_dimension(): + file = "aggregation_mapping_invalid_dimension.yaml" + aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file) + definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition") + match = f"Dimension 'foo' not found in DataStructureDefinition\nin.*{file}" + with pytest.raises(ValueError, match=match): + aggregator.validate_with_definition(definition) + + +def test_aggregator_apply(): + aggregator = Aggregator.from_file( + TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml" + ) + iamc_args = dict(model="model_a", scenario="scenario_a", region="World") + + df = IamDataFrame( + pd.DataFrame( + [ + ["Primary Energy|Coal", "EJ/yr", 0.5, 3], + ["Primary Energy|Biomass", "EJ/yr", 2, 7], + ["Final Energy|Electricity", "EJ/yr", 2.5, 3], + ["Final Energy|Heat", "EJ/yr", 3, 6], + ], + columns=["variable", "unit", 2005, 2010], + ), + **iamc_args, + ) + exp = IamDataFrame( + pd.DataFrame( + [ + ["Primary Energy", "EJ/yr", 2.5, 10], + ["Final Energy", "EJ/yr", 5.5, 9], + ], + columns=["variable", "unit", 2005, 2010], + ), + **iamc_args, + ) + pyam.assert_iamframe_equal(aggregator.apply(df), exp)