Skip to content

Commit

Permalink
Fix the apply() implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhuppmann committed Feb 19, 2025
1 parent f98f275 commit 1ce7832
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions nomenclature/processor/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
from pathlib import Path

import yaml
from pandas import concat
from pyam import IamDataFrame
from pyam.logging import adjust_log_level
from pydantic import BaseModel, ConfigDict, computed_field, field_validator, model_validator, Field
from pydantic import (
BaseModel,
ConfigDict,
computed_field,
field_validator,
model_validator,
Field,
)

from nomenclature.definition import DataStructureDefinition
from nomenclature.error import ErrorCollector
Expand Down Expand Up @@ -158,6 +164,8 @@ def filter_args(self):
exclude_none=True, exclude_unset=True, exclude=["validation"]
)

def __str__(self):
return ", ".join([f"{key}: {value}" for key, value in self.filter_args.items()])

class DataValidator(Processor):
"""Processor for validating IAMC datapoints"""
Expand All @@ -173,12 +181,16 @@ def from_file(cls, file: Path | str) -> "DataValidator":
for item in content:
# handling of simple case where filter and criteria args are given at the same level
if "validation" not in item:
filter_args = {k: item[k] for k in item if k in IamcDataFilter.model_fields}
criteria_args = [{
k: item[k]
for k in item
if k not in IamcDataFilter.model_fields and k != "validation"
}]
filter_args = {
k: item[k] for k in item if k in IamcDataFilter.model_fields
}
criteria_args = [
{
k: item[k]
for k in item
if k not in IamcDataFilter.model_fields and k != "validation"
}
]
item = dict(**filter_args, validation=criteria_args)
criteria_items.append(item)
return cls(file=file, criteria_items=criteria_items) # type: ignore
Expand All @@ -189,23 +201,21 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:

with adjust_log_level():
for item in self.criteria_items:
per_item_df = df
per_item_df = df.filter(**item.filter_args)
for criterion in item.validation:
failed_validation = per_item_df.validate(
**criterion.validation_args
)
if failed_validation is not None:
per_item_df = IamDataFrame(
concat([df.data, failed_validation]).drop_duplicates(
keep=False
criteria_msg = (
" Criteria: " + str(item) + ", "
+ ", ".join(
[
f"{key}: {value}"
for key, value in criterion.criteria.items()
]
)
)
criteria_msg = " Criteria: " + ", ".join(
[
f"{key}: {value}"
for key, value in criterion.criteria.items()
]
)
failed_validation["warning_level"] = (
criterion.warning_level.value
)
Expand Down

0 comments on commit 1ce7832

Please sign in to comment.