Skip to content

Commit

Permalink
use pydantic instead of dicts
Browse files Browse the repository at this point in the history
PDOK-16629
  • Loading branch information
roelarents committed Nov 13, 2024
1 parent 4ad808c commit 291fe11
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 86 deletions.
51 changes: 24 additions & 27 deletions geopackage_validator/generate.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import logging
from typing import Dict, List, Union
from collections import OrderedDict
from typing import List

from osgeo import ogr
from osgeo.ogr import DataSource

from geopackage_validator import utils
from geopackage_validator import __version__
from geopackage_validator import utils
from geopackage_validator.models import (
ColumnDefinition,
TableDefinition,
TablesDefinition,
)

logger = logging.getLogger(__name__)

ColumnDefinition = List[Dict[str, str]]
TableDefinition = Dict[str, Union[int, Dict[str, ColumnDefinition]]]


def columns_definition(table, geometry_column) -> ColumnDefinition:
def columns_definition(table, geometry_column) -> List[ColumnDefinition]:
layer_definition = table.GetLayerDefn()

assert layer_definition, f'Invalid Layer {"" if not table else table.GetName()}'
Expand All @@ -28,26 +28,27 @@ def columns_definition(table, geometry_column) -> ColumnDefinition:
for column_id in range(field_count)
]

fid_column = fid_column_definition(table)
fid_columns = fid_column_definition(table)

return fid_column + [geometry_column] + columns
return fid_columns + [geometry_column] + columns


def fid_column_definition(table) -> ColumnDefinition:
def fid_column_definition(table) -> List[ColumnDefinition]:
name = table.GetFIDColumn()
if not name:
return []
return [{"name": name, "type": "INTEGER"}]
return [ColumnDefinition(name=name, type="INTEGER")]



def generate_table_definitions(dataset: DataSource) -> TableDefinition:
def generate_table_definitions(dataset: DataSource) -> TablesDefinition:
projections = set()
table_geometry_types = {
table_name: geometry_type_name
for table_name, _, geometry_type_name in utils.dataset_geometry_tables(dataset)
}

table_list = []
table_list: List[TableDefinition] = []
for table in dataset:
geo_column_name = table.GetGeometryColumn()
if geo_column_name == "":
Expand All @@ -59,31 +60,27 @@ def generate_table_definitions(dataset: DataSource) -> TableDefinition:
"type": table_geometry_types[table_name],
}
table_list.append(
OrderedDict(
[
("name", table_name),
("geometry_column", geo_column_name),
("columns", columns_definition(table, geometry_column)),
]
TableDefinition(
name=table_name,
geometry_column=geo_column_name,
columns=columns_definition(table, geometry_column),
)
)

projections.add(table.GetSpatialRef().GetAuthorityCode(None))

assert len(projections) == 1, "Expected one projection per geopackage."

result = OrderedDict(
[
("geopackage_validator_version", __version__),
("projection", int(projections.pop())),
("tables", table_list),
]
result = TablesDefinition(
geopackage_validator_version=__version__,
projection=int(projections.pop()),
tables=table_list,
)

return result


def generate_definitions_for_path(gpkg_path: str) -> TableDefinition:
def generate_definitions_for_path(gpkg_path: str) -> TablesDefinition:
"""Starts the geopackage validation."""
utils.check_gdal_version()

Expand Down
47 changes: 47 additions & 0 deletions geopackage_validator/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import copy
from typing import List, Optional

from pydantic import BaseModel


class Named(BaseModel):
name: str


class ColumnDefinition(Named):
type: str


class TableDefinition(Named):
geometry_column: str = "geom"
columns: List[ColumnDefinition] = []


class TablesDefinition(BaseModel):
geopackage_validator_version: str = "0"
projection: Optional[int]
tables: List[TableDefinition]


def migrate_tables_definition(old: dict) -> dict:
"""Migrate a possibly old tables definition to new schema/model"""
version = old.get("geopackage_validator_version", "0")
# older versions where not versioned (?), so assuming "0" if there is no version
version_tuple = tuple(int(v) for v in version.split("."))
if version_tuple == (0, 0, 0, "-dev") or version_tuple > (
0,
5,
8,
): # no changes after 0.5.8
return old
new = copy.deepcopy(old)
if version_tuple <= (
0,
5,
8,
): # until 0.5.8, column's "type" property was named "data_type"
for t in new.get("tables", []):
for c in t.get("columns", []):
c["type"] = c["data_type"]
del c["data_type"]
return new
17 changes: 14 additions & 3 deletions geopackage_validator/output.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
from collections import OrderedDict
from datetime import datetime
from typing import Dict, List
from collections import OrderedDict

import yaml
from pydantic import BaseModel

from geopackage_validator import __version__

Expand Down Expand Up @@ -57,8 +58,18 @@ def log_output(


def print_output(python_object, as_yaml, yaml_indent=2):
if isinstance(python_object, BaseModel):
return print_output_pydantic(python_object, as_yaml, yaml_indent)
if as_yaml:
content = yaml.dump(python_object, indent=yaml_indent)
content = yaml.dump(python_object, indent=yaml_indent, sort_keys=False)
else:
content = json.dumps(python_object, indent=4)
content = json.dumps(python_object, indent=4, sort_keys=False)
print(content)


def print_output_pydantic(model: BaseModel, as_yaml: bool, yaml_indent=2):
content = model.model_dump_json(indent=4)
if as_yaml:
python_object = yaml.safe_load(content)
content = yaml.dump(python_object, indent=yaml_indent, sort_keys=False)
print(content)
18 changes: 11 additions & 7 deletions geopackage_validator/validate.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from collections import OrderedDict
import logging
import sys
import traceback
from collections import OrderedDict
from pathlib import Path

import yaml
from osgeo import gdal

from geopackage_validator.generate import TableDefinition
from geopackage_validator import utils
from geopackage_validator import validations as validation
from geopackage_validator.models import TablesDefinition, migrate_tables_definition
from geopackage_validator.validations.validator import (
Validator,
ValidationLevel,
format_result,
)
from geopackage_validator import utils


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -149,7 +150,7 @@ def validate(
if result is not None:
validation_results.append(result)
validation_error = True
success = success and validator.level == ValidationLevel.RECCOMENDATION
success = success and validator.level == ValidationLevel.RECOMMENDATION
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
trace = [
Expand Down Expand Up @@ -228,5 +229,8 @@ def get_validator_classes():
return sorted(validator_classes, key=lambda v: (v.level, v.code))


def load_table_definitions(table_definitions_path) -> TableDefinition:
return utils.load_config(table_definitions_path)
def load_table_definitions(table_definitions_path: str) -> TablesDefinition:
with Path(table_definitions_path).open("r") as table_definitions_file:
tables_definition_raw = yaml.safe_load(table_definitions_file)
tables_definition_raw = migrate_tables_definition(tables_definition_raw)
return TablesDefinition.model_validate(tables_definition_raw)
4 changes: 2 additions & 2 deletions geopackage_validator/validations/geom_column_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class GeomColumnNameValidator(validator.Validator):
"""It is recommended to name all GEOMETRY type columns 'geom'."""

code = 17
level = validator.ValidationLevel.RECCOMENDATION
level = validator.ValidationLevel.RECOMMENDATION
message = "Found in table: {table_name}, column: {column_name}"

def check(self) -> Iterable[str]:
Expand All @@ -28,7 +28,7 @@ class GeomColumnNameEqualValidator(validator.Validator):
"""It is recommended to give all GEOMETRY type columns the same name."""

code = 18
level = validator.ValidationLevel.RECCOMENDATION
level = validator.ValidationLevel.RECOMMENDATION
message = "Found column names are unequal: {column_names}"

def check(self) -> Iterable[str]:
Expand Down
2 changes: 1 addition & 1 deletion geopackage_validator/validations/geometry_ccw_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PolygonWindingOrderValidator(validator.Validator):
"""It is recommended that all (MULTI)POLYGON geometries have a counter-clockwise orientation for their exterior ring, and a clockwise direction for all interior rings."""

code = 20
level = validator.ValidationLevel.RECCOMENDATION
level = validator.ValidationLevel.RECOMMENDATION
message = "Warning layer: {layer}, example id: {row_id}, has {count} features that do not have a counter-clockwise exterior ring and/or a clockwise interior ring."

def check(self) -> Iterable[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class GeometryDimensionValidator(validator.Validator):
"""It is recommended to only use multidimensional geometry coordinates (elevation and measurement) when necessary."""

code = 19
level = validator.ValidationLevel.RECCOMENDATION
level = validator.ValidationLevel.RECOMMENDATION
message = "Table: {table}, has features with {message}"

def check(self) -> Iterable[str]:
Expand Down
Loading

0 comments on commit 291fe11

Please sign in to comment.