Skip to content

Commit

Permalink
Add generic Aggregator processor (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhuppmann authored Feb 19, 2025
1 parent 2992bd2 commit 37979d6
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 1 deletion.
4 changes: 4 additions & 0 deletions nomenclature/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
"constituents_not_native",
"Constituent region(s)\n{regions}\nin {file} not found in native region(s)",
),
"AggregationMappingConflict": (
"aggregation_mapping_conflict",
"{type} {duplicates} in aggregation-mapping in {file}",
),
}

PydanticCustomErrors = namedtuple("PydanticCustomErrors", pydantic_custom_error_config)
Expand Down
1 change: 1 addition & 0 deletions nomenclature/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
)
from nomenclature.processor.required_data import RequiredDataValidator # noqa
from nomenclature.processor.data_validator import DataValidator # noqa
from nomenclature.processor.aggregator import Aggregator # noqa
153 changes: 153 additions & 0 deletions nomenclature/processor/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import logging
from collections import Counter
from pathlib import Path

import yaml
from pyam import IamDataFrame
from pydantic import BaseModel, field_validator, ValidationInfo
from pydantic.types import FilePath
from pydantic_core import PydanticCustomError

from nomenclature.definition import DataStructureDefinition
from nomenclature.error import custom_pydantic_errors
from nomenclature.processor import Processor
from nomenclature.processor.utils import get_relative_path

logger = logging.getLogger(__name__)

here = Path(__file__).parent.absolute()


class AggregationItem(BaseModel):
"""Item used for aggregation-mapping"""

name: str
components: list[str]


class Aggregator(Processor):
"""Aggregation or renaming of an IamDataFrame on a `dimension`"""
file: FilePath
dimension: str
aggregate: list[AggregationItem]

def apply(self, df: IamDataFrame) -> IamDataFrame:
"""Apply region processing
Parameters
----------
df : IamDataFrame
Input data that the region processing is applied to
Returns
-------
IamDataFrame:
Processed data
"""
return df.rename(
mapping={self.dimension: self.rename_mapping},
check_duplicates=False,
)

@property
def rename_mapping(self):
rename_dict = {}

for item in self.aggregate:
for c in item.components:
rename_dict[c] = item.name

return rename_dict

@field_validator("aggregate")
def validate_target_names(cls, v, info: ValidationInfo):
_validate_items([item.name for item in v], info, "Duplicate target")
return v

@field_validator("aggregate")
def validate_components(cls, v, info: ValidationInfo):
# components have to be unique for creating rename-mapping (component -> target)
all_components = list()
for item in v:
all_components.extend(item.components)
_validate_items(all_components, info, "Duplicate component")
return v

@field_validator("aggregate")
def validate_target_vs_components(cls, v, info: ValidationInfo):
# guard against having identical target and component
_codes = list()
for item in v:
_codes.append(item.name)
_codes.extend(item.components)
_validate_items(_codes, info, "Non-unique target and component")
return v

@property
def codes(self):
_codes = list()
for item in self.aggregate:
_codes.append(item.name)
_codes.extend(item.components)
return _codes

def validate_with_definition(self, dsd: DataStructureDefinition) -> None:
error = None
# check for codes that are not defined in the codelists
codelist = getattr(dsd, self.dimension, None)
# no validation if codelist is not defined or filter-item is None
if codelist is None:
error = f"Dimension '{self.dimension}' not found in DataStructureDefinition"
elif invalid := codelist.validate_items(self.codes):
error = (
f"The following {self.dimension}s are not defined in the "
"DataStructureDefinition:\n - " + "\n - ".join(invalid)
)
if error:
raise ValueError(error + "\nin " + str(self.file) + "")

@classmethod
def from_file(cls, file: Path | str):
"""Initialize an AggregatorMapping from a file.
.. code:: yaml
dimension: <some_dimension>
aggregate:
- Target Value:
- Source Value A
- Source Value B
"""
file = Path(file) if isinstance(file, str) else file
try:
with open(file, "r", encoding="utf-8") as f:
mapping_input = yaml.safe_load(f)

aggregate_list: list[dict[str, list]] = []
for item in mapping_input["aggregate"]:
# TODO explicit check that only one key-value pair exists per item
aggregate_list.append(
dict(name=list(item)[0], components=list(item.values())[0])
)
except Exception as error:
raise ValueError(f"{error} in {get_relative_path(file)}") from error
return cls(
dimension=mapping_input["dimension"],
aggregate=aggregate_list, # type: ignore
file=get_relative_path(file),
)


def _validate_items(items, info, _type):
duplicates = [item for item, count in Counter(items).items() if count > 1]
if duplicates:
raise PydanticCustomError(
*custom_pydantic_errors.AggregationMappingConflict,
{
"type": _type,
"duplicates": duplicates,
"file": info.data["file"],
},
)
2 changes: 1 addition & 1 deletion nomenclature/processor/iamc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def validate_with_definition(self, dsd: DataStructureDefinition) -> None:
if invalid := codelist.validate_items(getattr(self, dimension)):
error_msg += (
f"The following {dimension}s are not defined in the "
f"DataStructureDefinition:\n {', '.join(invalid)}\n"
"DataStructureDefinition:\n " + ", ".join(invalid) + "\n"
)

if error_msg:
Expand Down
8 changes: 8 additions & 0 deletions tests/data/processor/aggregator/aggregation_mapping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Primary Energy|Coal
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Primary Energy:
- Final Energy|Electricity
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Final Energy|Foo
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: foo
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Final Energy|Heat
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dimension: variable
aggregate:
- Primary Energy:
- Primary Energy|Coal
- Primary Energy|Biomass
- Final Energy:
- Final Energy|Electricity
- Primary Energy
18 changes: 18 additions & 0 deletions tests/data/processor/aggregator/definition/variable/variables.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- Primary Energy:
description: Total primary energy consumption
unit: EJ/yr
- Primary Energy|Coal:
description: Primary energy consumption of coal
unit: EJ/yr
- Primary Energy|Biomass:
description: Primary energy consumption of biomass
unit: EJ/yr
- Final Energy:
description: Total final energy consumption
unit: EJ/yr
- Final Energy|Electricity:
description: Electricity demand
unit: EJ/yr
- Final Energy|Heat:
description: Heat demand
unit: EJ/yr
116 changes: 116 additions & 0 deletions tests/test_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from pathlib import Path

import pandas as pd
import pyam
import pydantic
import pytest


from pyam import IamDataFrame
from conftest import TEST_DATA_DIR
from nomenclature import DataStructureDefinition
from nomenclature.processor import Aggregator

TEST_FOLDER_GENERIC_PROCESSOR = TEST_DATA_DIR / "processor" / "aggregator"


def test_aggregator_from_file():
mapping_file = "aggregation_mapping.yaml"
# Test that the file is read and represented correctly
obs = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / mapping_file)
exp = {
"file": (TEST_FOLDER_GENERIC_PROCESSOR / mapping_file).relative_to(Path.cwd()),
"dimension": "variable",
"aggregate": [
{
"name": "Primary Energy",
"components": ["Primary Energy|Coal", "Primary Energy|Biomass"],
},
{
"name": "Final Energy",
"components": ["Final Energy|Electricity", "Final Energy|Heat"],
},
],
}
assert obs.model_dump() == exp


@pytest.mark.parametrize(
"file, error_msg_pattern",
[
(
"aggregation_mapping_duplicate_target.yaml",
"Duplicate target \['Primary Energy'\] in aggregation-mapping in ",
),
(
"aggregation_mapping_duplicate_component.yaml",
"Duplicate component \['Primary Energy\|Coal'\] in aggregation-mapping in ",
),
(
"aggregation_mapping_target_component_conflict.yaml",
"Non-unique target and component \['Primary Energy'\] in aggregation-",
),
],
)
def test_aggregator_raises(file, error_msg_pattern):
# This is to test different failure conditions
with pytest.raises(pydantic.ValidationError, match=f"{error_msg_pattern}.*{file}"):
Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file)


def test_aggregator_validate_with_definition():
# Validate the Aggregator against the codelist in a DataStructureDefintion
aggregator = Aggregator.from_file(
TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml"
)
definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition")
aggregator.validate_with_definition(definition)


def test_aggregator_validate_invalid_code():
file = "aggregation_mapping_invalid_code.yaml"
aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file)
definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition")
match = f"The following variables are not .*\n .*- Final Energy\|Foo\n.*{file}"
with pytest.raises(ValueError, match=match):
aggregator.validate_with_definition(definition)


def test_aggregator_validate_invalid_dimension():
file = "aggregation_mapping_invalid_dimension.yaml"
aggregator = Aggregator.from_file(TEST_FOLDER_GENERIC_PROCESSOR / file)
definition = DataStructureDefinition(TEST_FOLDER_GENERIC_PROCESSOR / "definition")
match = f"Dimension 'foo' not found in DataStructureDefinition\nin.*{file}"
with pytest.raises(ValueError, match=match):
aggregator.validate_with_definition(definition)


def test_aggregator_apply():
aggregator = Aggregator.from_file(
TEST_FOLDER_GENERIC_PROCESSOR / "aggregation_mapping.yaml"
)
iamc_args = dict(model="model_a", scenario="scenario_a", region="World")

df = IamDataFrame(
pd.DataFrame(
[
["Primary Energy|Coal", "EJ/yr", 0.5, 3],
["Primary Energy|Biomass", "EJ/yr", 2, 7],
["Final Energy|Electricity", "EJ/yr", 2.5, 3],
["Final Energy|Heat", "EJ/yr", 3, 6],
],
columns=["variable", "unit", 2005, 2010],
),
**iamc_args,
)
exp = IamDataFrame(
pd.DataFrame(
[
["Primary Energy", "EJ/yr", 2.5, 10],
["Final Energy", "EJ/yr", 5.5, 9],
],
columns=["variable", "unit", 2005, 2010],
),
**iamc_args,
)
pyam.assert_iamframe_equal(aggregator.apply(df), exp)

0 comments on commit 37979d6

Please sign in to comment.