diff --git a/pdm.lock b/pdm.lock index f534b9e..58b0193 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "docs", "license", "lint", "test"] strategy = ["cross_platform"] lock_version = "4.4" -content_hash = "sha256:b239054b4424cbc9b0c2d707d313065fbf7607e78f1bf2ca8cd6675dc2a8020f" +content_hash = "sha256:16f3f112c6aa701f9f7e10cb2693e03716480330f1dacf0de7639a7208a82af8" [[package]] name = "appnope" @@ -790,17 +790,16 @@ files = [ [[package]] name = "geoarrow-pyarrow" -version = "0.1.2.dev7+g0a95d5f" +version = "0.1.1" requires_python = ">=3.8" -subdirectory = "geoarrow-pyarrow" -git = "https://github.com/geoarrow/geoarrow-python.git" -ref = "0a95d5ff3180c20e89f4572ac38d255f58e13396" -revision = "0a95d5ff3180c20e89f4572ac38d255f58e13396" summary = "" dependencies = [ "geoarrow-c", "pyarrow", - "pyarrow-hotfix", +] +files = [ + {file = "geoarrow-pyarrow-0.1.1.tar.gz", hash = "sha256:dec429fac397f6d3616e224b9cae6229dfcc1be3c279bbe304da55bbd26e0407"}, + {file = "geoarrow_pyarrow-0.1.1-py3-none-any.whl", hash = "sha256:83f0f17af59e0fdaa060f2a11aaa957d88a6788755fd688b9892a0ad66e5c8cf"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index 31ab0fd..4d27ef6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,8 +8,9 @@ dependencies = [ "shapely", "pyarrow>=13.0.0", "duckdb>=0.9.2", - "geoarrow-pyarrow @ git+https://github.com/geoarrow/geoarrow-python.git@0a95d5ff3180c20e89f4572ac38d255f58e13396#subdirectory=geoarrow-pyarrow", + "geoarrow-pyarrow>=0.1.1", "typeguard", + "pyarrow-hotfix", ] requires-python = ">=3.9" readme = "README.md" diff --git a/quackosm/_geo_arrow_io.py b/quackosm/_geo_arrow_io.py new file mode 100644 index 0000000..1c34d97 --- /dev/null +++ b/quackosm/_geo_arrow_io.py @@ -0,0 +1,382 @@ +# type: ignore +# pragma: no cover +""" +GeoArrow IO helper module. + +Will be removed after new version of https://github.com/geoarrow/geoarrow-python/ is released. +Those two functions: `read_geoparquet_table` and `write_geoparquet_table` are required in +QuackOSM library and this file is composed of files from current unreleased version of the library. +""" +import json + +import geoarrow.pyarrow as _ga +import pyarrow.parquet as _pq +import pyarrow.types as _types +import pyarrow_hotfix as _ # noqa: F401 +from geoarrow.pyarrow._compute import ensure_storage + + +def read_geoparquet_table(*args, **kwargs): + """Read GeoParquet using PyArrow.""" + tab = _pq.read_table(*args, **kwargs) + tab_metadata = tab.schema.metadata or {} + if b"geo" in tab_metadata: + geo_meta = json.loads(tab_metadata[b"geo"]) + else: + geo_meta = {} + + # Remove "geo" schema metadata key since few transformations following + # the read operation check that schema metadata is valid (e.g., column + # subset or rename) + non_geo_meta = {k: v for k, v in tab_metadata.items() if k != b"geo"} + tab = tab.replace_schema_metadata(non_geo_meta) + + # Assign extension types to columns + if "columns" in geo_meta: + columns = geo_meta["columns"] + else: + columns = _geoparquet_guess_geometry_columns(tab.schema) + + return _geoparquet_table_to_geoarrow(tab, columns) + + +def write_geoparquet_table( + table, + *args, + primary_geometry_column=None, + geometry_columns=None, + write_bbox=False, + write_geometry_types=None, + check_wkb=True, + **kwargs, +): + """Write GeoParquet using PyArrow.""" + geo_meta = _geoparquet_metadata_from_schema( + table.schema, + primary_geometry_column=primary_geometry_column, + geometry_columns=geometry_columns, + add_geometry_types=write_geometry_types, + ) + + # Note: this will also update geo_meta with geometry_types and bbox if requested + for i, name in enumerate(table.schema.names): + if name in geo_meta["columns"]: + table = table.set_column( + i, + name, + _geoparquet_encode_chunked_array( + table[i], + geo_meta["columns"][name], + add_geometry_types=write_geometry_types, + add_bbox=write_bbox, + check_wkb=check_wkb, + ), + ) + + metadata = table.schema.metadata or {} + metadata["geo"] = json.dumps(geo_meta) + table = table.replace_schema_metadata(metadata) + return _pq.write_table(table, *args, **kwargs) + + +def _geoparquet_guess_geometry_columns(schema): + # Only attempt guessing the "geometry" or "geography" column + columns = {} + + for name in ("geometry", "geography"): + if name not in schema.names: + continue + + spec = {} + type = schema.field(name).type + + if _types.is_binary(type) or _types.is_large_binary(type): + spec["encoding"] = "WKB" + elif _types.is_string(type) or _types.is_large_string(type): + # WKT is not actually a geoparquet encoding but the guidance on + # putting geospatial things in parquet without metadata says you + # can do it and this is the internal sentinel for that case. + spec["encoding"] = "WKT" + + # A column named "geography" has spherical edges according to the + # compatible Parquet guidance. + if name == "geography": + spec["edges"] = "spherical" + + columns[name] = spec + + return columns + + +def _geoparquet_chunked_array_to_geoarrow(item, spec): + # If item was written as a GeoArrow extension type to the Parquet file, + # ignore any information in the column spec + if isinstance(item.type, _ga.GeometryExtensionType): + return item + + if "encoding" not in spec: + raise ValueError("Invalid GeoParquet column specification: missing 'encoding'") + + encoding = spec["encoding"] + if encoding in ("WKB", "WKT"): + item = _ga.array(item) + else: + raise ValueError(f"Invalid GeoParquet encoding value: '{encoding}'") + + if "crs" not in spec: + crs = json.dumps(_CRS_LONLAT).encode("UTF-8") + else: + crs = json.dumps(spec["crs"]).encode("UTF-8") + + if "edges" not in spec or spec["edges"] == "planar": + edge_type = None + elif spec["edges"] == "spherical": + edge_type = _ga.EdgeType.SPHERICAL + else: + raise ValueError("Invalid GeoParquet column edges value") + + if crs is not None: + item = _ga.with_crs(item, crs) + + if edge_type is not None: + item = _ga.with_edge_type(item, edge_type) + + return item + + +def _geoparquet_table_to_geoarrow(tab, columns): + tab_names = set(tab.schema.names) + for col_name, spec in columns.items(): + # col_name might not exist if only a subset of columns were read from file + if col_name not in tab_names: + continue + + col_i = tab.schema.get_field_index(col_name) + new_geometry = _geoparquet_chunked_array_to_geoarrow(tab[col_i], spec) + tab = tab.set_column(col_i, col_name, new_geometry) + + return tab + + +def _geoparquet_guess_primary_geometry_column(schema, primary_geometry_column=None): + if primary_geometry_column is not None: + return primary_geometry_column + + # If there's a "geometry" or "geography" column, pick that one + if "geometry" in schema.names: + return "geometry" + elif "geography" in schema.names: + return "geography" + + # Otherwise, pick the first thing we know is actually geometry + for name, type in zip(schema.names, schema.types): + if isinstance(type, _ga.GeometryExtensionType): + return name + + raise ValueError("write_geoparquet_table() requires source with at least one geometry column") + + +def _geoparquet_column_spec_from_type(type, add_geometry_types=None): + # We always encode to WKB since it's the only supported value + spec = {"encoding": "WKB", "geometry_types": []} + + # Pass along extra information from GeoArrow extension type metadata + if isinstance(type, _ga.GeometryExtensionType): + if type.crs_type == _ga.CrsType.PROJJSON: + spec["crs"] = json.loads(type.crs) + elif type.crs_type == _ga.CrsType.NONE: + spec["crs"] = None + else: + import pyproj + + spec["crs"] = pyproj.CRS(type.crs).to_json_dict() + + if type.edge_type == _ga.EdgeType.SPHERICAL: + spec["edges"] = "spherical" + + # GeoArrow-encoded types can confidently declare a single geometry type + maybe_known_geometry_type = type.geometry_type + maybe_known_dimensions = type.dimensions + if ( + add_geometry_types is not False + and maybe_known_geometry_type != _ga.GeometryType.GEOMETRY + and maybe_known_dimensions != _ga.Dimensions.UNKNOWN + ): + geometry_type = _GEOPARQUET_GEOMETRY_TYPE_LABELS[maybe_known_geometry_type] + dimensions = _GEOPARQUET_DIMENSION_LABELS[maybe_known_dimensions] + spec["geometry_types"] = [f"{geometry_type}{dimensions}"] + + return spec + + +def _geoparquet_columns_from_schema( + schema, geometry_columns=None, primary_geometry_column=None, add_geometry_types=None +): + schema_names = schema.names + schema_types = schema.types + + if geometry_columns is None: + geometry_columns = set() + if primary_geometry_column is not None: + geometry_columns.add(primary_geometry_column) + + for name, type in zip(schema_names, schema_types): + if isinstance(type, _ga.GeometryExtensionType): + geometry_columns.add(name) + else: + geometry_columns = set(geometry_columns) + + specs = {} + for name, type in zip(schema_names, schema_types): + if name in geometry_columns: + specs[name] = _geoparquet_column_spec_from_type( + type, add_geometry_types=add_geometry_types + ) + + return specs + + +def _geoparquet_metadata_from_schema( + schema, geometry_columns=None, primary_geometry_column=None, add_geometry_types=None +): + primary_geometry_column = _geoparquet_guess_primary_geometry_column( + schema, primary_geometry_column + ) + columns = _geoparquet_columns_from_schema( + schema, geometry_columns, add_geometry_types=add_geometry_types + ) + return { + "version": "1.0.0", + "primary_column": primary_geometry_column, + "columns": columns, + } + + +def _geoparquet_update_spec_geometry_types(item, spec): + geometry_type_labels = [] + for element in _ga.unique_geometry_types(item).to_pylist(): + geometry_type = _GEOPARQUET_GEOMETRY_TYPE_LABELS[element["geometry_type"]] + dimensions = _GEOPARQUET_DIMENSION_LABELS[element["dimensions"]] + geometry_type_labels.append(f"{geometry_type}{dimensions}") + + spec["geometry_types"] = geometry_type_labels + + +def _geoparquet_update_spec_bbox(item, spec): + box = _ga.box_agg(item).as_py() + spec["bbox"] = [box["xmin"], box["ymin"], box["xmax"], box["ymax"]] + + +def _geoparquet_encode_chunked_array( + item, spec, add_geometry_types=None, add_bbox=False, check_wkb=True +): + # ...because we're currently only ever encoding using WKB + if spec["encoding"] == "WKB": + item_out = _ga.as_wkb(item) + else: + encoding = spec["encoding"] + raise ValueError(f"Expected column encoding 'WKB' but got '{encoding}'") + + # For everything except a well-known text-encoded column, we want to do + # calculations on the pre-WKB-encoded value. + if spec["encoding"] == "WKT": + item_calc = item_out + else: + item_calc = item + + # geometry_types that are fixed at the data type level have already been + # added to the spec in an earlier step. The unique_geometry_types() + # function is sufficiently optimized such that this potential + # re-computation is not expensive. + if add_geometry_types is True: + _geoparquet_update_spec_geometry_types(item_calc, spec) + + if add_bbox: + _geoparquet_update_spec_bbox(item_calc, spec) + + return ensure_storage(item_out) + + +_GEOPARQUET_GEOMETRY_TYPE_LABELS = [ + "Geometry", + "Point", + "LineString", + "Polygon", + "MultiPoint", + "MultiLineString", + "MultiPolygon", +] + +_GEOPARQUET_DIMENSION_LABELS = [None, "", " Z", " M", " ZM"] + +_CRS_LONLAT = { + "$schema": "https://proj.org/schemas/v0.7/projjson.schema.json", + "type": "GeographicCRS", + "name": "WGS 84 (CRS84)", + "datum_ensemble": { + "name": "World Geodetic System 1984 ensemble", + "members": [ + { + "name": "World Geodetic System 1984 (Transit)", + "id": {"authority": "EPSG", "code": 1166}, + }, + { + "name": "World Geodetic System 1984 (G730)", + "id": {"authority": "EPSG", "code": 1152}, + }, + { + "name": "World Geodetic System 1984 (G873)", + "id": {"authority": "EPSG", "code": 1153}, + }, + { + "name": "World Geodetic System 1984 (G1150)", + "id": {"authority": "EPSG", "code": 1154}, + }, + { + "name": "World Geodetic System 1984 (G1674)", + "id": {"authority": "EPSG", "code": 1155}, + }, + { + "name": "World Geodetic System 1984 (G1762)", + "id": {"authority": "EPSG", "code": 1156}, + }, + { + "name": "World Geodetic System 1984 (G2139)", + "id": {"authority": "EPSG", "code": 1309}, + }, + ], + "ellipsoid": { + "name": "WGS 84", + "semi_major_axis": 6378137, + "inverse_flattening": 298.257223563, + }, + "accuracy": "2.0", + "id": {"authority": "EPSG", "code": 6326}, + }, + "coordinate_system": { + "subtype": "ellipsoidal", + "axis": [ + { + "name": "Geodetic longitude", + "abbreviation": "Lon", + "direction": "east", + "unit": "degree", + }, + { + "name": "Geodetic latitude", + "abbreviation": "Lat", + "direction": "north", + "unit": "degree", + }, + ], + }, + "scope": "Not known.", + "area": "World.", + "bbox": { + "south_latitude": -90, + "west_longitude": -180, + "north_latitude": 90, + "east_longitude": 180, + }, + "id": {"authority": "OGC", "code": "CRS84"}, +} diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index a7f82c9..da1adf1 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -20,9 +20,9 @@ import pyarrow as pa import pyarrow.parquet as pq import shapely.wkt as wktlib -from geoarrow.pyarrow import io from shapely.geometry.base import BaseGeometry +import quackosm._geo_arrow_io as io from quackosm._constants import FEATURES_INDEX, GEOMETRY_COLUMN, WGS84_CRS from quackosm._osm_tags_filters import GroupedOsmTagsFilter, OsmTagsFilter, merge_osm_tags_filter from quackosm._osm_way_polygon_features import ( @@ -170,7 +170,7 @@ def get_features_gdf( parsed_geoparquet_files.append(parsed_geoparquet_file) parquet_tables = [ - io.read_geoparquet_table(parsed_parquet_file) + io.read_geoparquet_table(parsed_parquet_file) # type: ignore for parsed_parquet_file in parsed_geoparquet_files ] joined_parquet_table: pa.Table = pa.concat_tables(parquet_tables) @@ -244,14 +244,18 @@ def _set_up_duckdb_connection(self, tmp_dir_name: str) -> None: self.connection.install_extension(extension_name) self.connection.load_extension(extension_name) - self.connection.sql(""" + self.connection.sql( + """ CREATE OR REPLACE MACRO linestring_to_linestring_wkt(ls) AS 'LINESTRING (' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || ')'; - """) - self.connection.sql(""" + """ + ) + self.connection.sql( + """ CREATE OR REPLACE MACRO linestring_to_polygon_wkt(ls) AS 'POLYGON ((' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || '))'; - """) + """ + ) def _parse_pbf_file( self, @@ -459,11 +463,13 @@ def _prefilter_elements_ids( # - select all with more then one ref # - join all NV to refs # - select all where all refs has been joined (total_refs == found_refs) - self.connection.sql(f""" + self.connection.sql( + f""" SELECT * FROM ({elements.sql_query()}) w WHERE kind = 'way' AND len(refs) >= 2 - """).to_view("ways", replace=True) + """ + ).to_view("ways", replace=True) ways_all_with_tags = self._sql_to_parquet_file( sql_query=f""" WITH filtered_tags AS ( @@ -539,13 +545,15 @@ def _prefilter_elements_ids( # - select all with type in ['boundary', 'multipolygon'] # - join all WV to refs # - select all where all refs has been joined (total_refs == found_refs) - self.connection.sql(f""" + self.connection.sql( + f""" SELECT * FROM ({elements.sql_query()}) WHERE kind = 'relation' AND len(refs) > 0 AND list_contains(map_keys(tags), 'type') AND list_has_any(map_extract(tags, 'type'), ['boundary', 'multipolygon']) - """).to_view("relations", replace=True) + """ + ).to_view("relations", replace=True) relations_all_with_tags = self._sql_to_parquet_file( sql_query=f""" WITH filtered_tags AS ( @@ -771,14 +779,18 @@ def _sql_to_parquet_file(self, sql_query: str, file_path: Path) -> "duckdb.DuckD def _save_parquet_file( self, relation: "duckdb.DuckDBPyRelation", file_path: Path ) -> "duckdb.DuckDBPyRelation": - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * FROM ({relation.sql_query()}) ) TO '{file_path}' (FORMAT 'parquet', PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000) - """) - return self.connection.sql(f""" + """ + ) + return self.connection.sql( + f""" SELECT * FROM read_parquet('{file_path}/**') - """) + """ + ) def _calculate_unique_ids_to_parquet( self, file_path: Path, result_path: Optional[Path] = None @@ -786,29 +798,35 @@ def _calculate_unique_ids_to_parquet( if result_path is None: result_path = file_path / "distinct" - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT id FROM read_parquet('{file_path}/**') GROUP BY id ) TO '{result_path}' (FORMAT 'parquet', PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000) - """) + """ + ) - return self.connection.sql(f""" + return self.connection.sql( + f""" SELECT * FROM read_parquet('{result_path}/**') - """) + """ + ) def _get_filtered_nodes_with_geometry( self, osm_parquet_files: ConvertedOSMParquetFiles, tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - nodes_with_geometry = self.connection.sql(f""" + nodes_with_geometry = self.connection.sql( + f""" SELECT n.id, n.tags, ST_Point(round(n.lon, 7), round(n.lat, 7)) geometry FROM ({osm_parquet_files.nodes_valid_with_tags.sql_query()}) n SEMI JOIN ({osm_parquet_files.nodes_filtered_ids.sql_query()}) fn ON n.id = fn.id - """) + """ + ) nodes_parquet = self._save_parquet_file_with_geometry( relation=nodes_with_geometry, file_path=Path(tmp_dir_name) / "filtered_nodes_with_geometry", @@ -820,13 +838,15 @@ def _get_required_nodes_with_structs( osm_parquet_files: ConvertedOSMParquetFiles, tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - nodes_with_structs = self.connection.sql(f""" + nodes_with_structs = self.connection.sql( + f""" SELECT n.id, struct_pack(x := round(n.lon, 7), y := round(n.lat, 7))::POINT_2D point FROM ({osm_parquet_files.nodes_valid_with_tags.sql_query()}) n SEMI JOIN ({osm_parquet_files.nodes_required_ids.sql_query()}) rn ON n.id = rn.id - """) + """ + ) nodes_parquet = self._save_parquet_file( relation=nodes_with_structs, file_path=Path(tmp_dir_name) / "required_nodes_with_points", @@ -852,7 +872,8 @@ def _get_required_ways_with_linestrings( groups = floor(total_required_ways / self.rows_per_bucket) grouped_required_ways_ids_path = Path(tmp_dir_name) / "ways_required_ids_grouped" - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT *, @@ -862,15 +883,19 @@ def _get_required_ways_with_linestrings( FROM ({osm_parquet_files.ways_required_ids.sql_query()}) ) TO '{grouped_required_ways_ids_path}' (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) - """) + """ + ) for group in range(groups + 1): current_required_ways_ids_group_path = grouped_required_ways_ids_path / f"group={group}" - current_required_ways_ids_group_relation = self.connection.sql(f""" + current_required_ways_ids_group_relation = self.connection.sql( + f""" SELECT * FROM read_parquet('{current_required_ways_ids_group_path}/**') - """) + """ + ) - ways_with_linestrings = self.connection.sql(f""" + ways_with_linestrings = self.connection.sql( + f""" SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring FROM ( SELECT w.id, n.point, w.ref_idx @@ -881,15 +906,18 @@ def _get_required_ways_with_linestrings( ON n.id = w.ref ) GROUP BY id - """) + """ + ) self._save_parquet_file( relation=ways_with_linestrings, file_path=required_ways_with_linestrings_path / f"group={group}", ) - ways_parquet = self.connection.sql(f""" + ways_parquet = self.connection.sql( + f""" SELECT * FROM read_parquet('{required_ways_with_linestrings_path}/**') - """) + """ + ) return ways_parquet def _get_filtered_ways_with_proper_geometry( @@ -926,7 +954,8 @@ def _get_filtered_ways_with_proper_geometry( f" list_has_any(map_extract(raw_tags, '{osm_tag_key}'), [{escaped_values}])" ) - ways_with_proper_geometry = self.connection.sql(f""" + ways_with_proper_geometry = self.connection.sql( + f""" WITH required_ways_with_linestrings AS ( SELECT w.id, @@ -963,7 +992,8 @@ def _get_filtered_ways_with_proper_geometry( required_ways_with_linestrings w ) SELECT id, tags, geometry FROM proper_geometries - """) + """ + ) ways_parquet = self._save_parquet_file_with_geometry( relation=ways_with_proper_geometry, file_path=Path(tmp_dir_name) / "filtered_ways_with_geometry", @@ -976,7 +1006,8 @@ def _get_filtered_relations_with_geometry( required_ways_with_linestrings: "duckdb.DuckDBPyRelation", tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - valid_relation_parts = self.connection.sql(f""" + valid_relation_parts = self.connection.sql( + f""" WITH unnested_relations AS ( SELECT r.id, @@ -1031,32 +1062,38 @@ def _get_filtered_relations_with_geometry( ) SELECT * FROM relations_with_geometries SEMI JOIN valid_relations ON relations_with_geometries.id = valid_relations.id - """) + """ + ) valid_relation_parts_parquet = self._save_parquet_file_with_geometry( relation=valid_relation_parts, file_path=Path(tmp_dir_name) / "valid_relation_parts", ) - relation_inner_parts = self.connection.sql(f""" + relation_inner_parts = self.connection.sql( + f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry FROM ({valid_relation_parts_parquet.sql_query()}) WHERE ref_role = 'inner' - """) + """ + ) relation_inner_parts_parquet = self._save_parquet_file_with_geometry( relation=relation_inner_parts, file_path=Path(tmp_dir_name) / "relation_inner_parts", fix_geometries=True, ) - relation_outer_parts = self.connection.sql(f""" + relation_outer_parts = self.connection.sql( + f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry FROM ({valid_relation_parts_parquet.sql_query()}) WHERE ref_role = 'outer' - """) + """ + ) relation_outer_parts_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts, file_path=Path(tmp_dir_name) / "relation_outer_parts", fix_geometries=True, ) - relation_outer_parts_with_holes = self.connection.sql(f""" + relation_outer_parts_with_holes = self.connection.sql( + f""" SELECT og.id, og.geometry_id, @@ -1065,12 +1102,14 @@ def _get_filtered_relations_with_geometry( JOIN ({relation_inner_parts_parquet.sql_query()}) ig ON og.id = ig.id AND ST_WITHIN(ig.geometry, og.geometry) GROUP BY og.id, og.geometry_id - """) + """ + ) relation_outer_parts_with_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_with_holes, file_path=Path(tmp_dir_name) / "relation_outer_parts_with_holes", ) - relation_outer_parts_without_holes = self.connection.sql(f""" + relation_outer_parts_without_holes = self.connection.sql( + f""" SELECT og.id, og.geometry_id, @@ -1078,12 +1117,14 @@ def _get_filtered_relations_with_geometry( FROM ({relation_outer_parts_parquet.sql_query()}) og ANTI JOIN ({relation_outer_parts_with_holes_parquet.sql_query()}) ogwh ON og.id = ogwh.id AND og.geometry_id = ogwh.geometry_id - """) + """ + ) relation_outer_parts_without_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_without_holes, file_path=Path(tmp_dir_name) / "relation_outer_parts_without_holes", ) - relations_with_geometry = self.connection.sql(f""" + relations_with_geometry = self.connection.sql( + f""" WITH unioned_outer_geometries AS ( SELECT id, geometry FROM ({relation_outer_parts_with_holes_parquet.sql_query()}) @@ -1100,7 +1141,8 @@ def _get_filtered_relations_with_geometry( FROM final_geometries r_g JOIN ({osm_parquet_files.relations_all_with_tags.sql_query()}) r ON r.id = r_g.id - """) + """ + ) relations_parquet = self._save_parquet_file_with_geometry( relation=relations_with_geometry, file_path=Path(tmp_dir_name) / "filtered_relations_with_geometry", @@ -1111,13 +1153,15 @@ def _save_parquet_file_with_geometry( self, relation: "duckdb.DuckDBPyRelation", file_path: Path, fix_geometries: bool = False ) -> "duckdb.DuckDBPyRelation": if not fix_geometries: - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb FROM ({relation.sql_query()}) ) TO '{file_path}' (FORMAT 'parquet', PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000) - """) + """ + ) else: valid_path = file_path / "valid" invalid_path = file_path / "invalid" @@ -1128,17 +1172,20 @@ def _save_parquet_file_with_geometry( fixed_path.mkdir(parents=True, exist_ok=True) # Save valid features - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb FROM ({relation.sql_query()}) WHERE ST_IsValid(geometry) ) TO '{valid_path}' (FORMAT 'parquet', PER_THREAD_OUTPUT true, ROW_GROUP_SIZE 25000) - """) + """ + ) # Save invalid features - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb, @@ -1150,7 +1197,8 @@ def _save_parquet_file_with_geometry( ) TO '{invalid_path}' ( FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000 ) - """) + """ + ) # Fix invalid features group_id = 0 @@ -1186,10 +1234,12 @@ def _save_parquet_file_with_geometry( self._delete_directories(invalid_path.parent, ["invalid"]) - return self.connection.sql(f""" + return self.connection.sql( + f""" SELECT * EXCLUDE (geometry_wkb), ST_GeomFromWKB(geometry_wkb) geometry FROM read_parquet('{file_path}/**') - """) + """ + ) def _concatenate_results_to_geoparquet( self, @@ -1207,7 +1257,8 @@ def _concatenate_results_to_geoparquet( way_select_clauses = ["'way/' || id as feature_id", *select_clauses] relation_select_clauses = ["'relation/' || id as feature_id", *select_clauses] - unioned_features = self.connection.sql(f""" + unioned_features = self.connection.sql( + f""" SELECT {', '.join(node_select_clauses)} FROM ({parsed_data.nodes.sql_query()}) n UNION ALL @@ -1216,14 +1267,17 @@ def _concatenate_results_to_geoparquet( UNION ALL SELECT {', '.join(relation_select_clauses)} FROM ({parsed_data.relations.sql_query()}) r - """) + """ + ) grouped_features = self._parse_features_relation_to_groups(unioned_features, explode_tags) - valid_features_full_relation = self.connection.sql(f""" + valid_features_full_relation = self.connection.sql( + f""" SELECT * FROM ({grouped_features.sql_query()}) WHERE ST_IsValid(geometry) - """) + """ + ) valid_features_parquet_path = Path(tmp_dir_name) / "osm_valid_elements" valid_features_parquet_relation = self._save_parquet_file_with_geometry( @@ -1249,11 +1303,13 @@ def _concatenate_results_to_geoparquet( parquet_tables = [valid_features_parquet_table] - invalid_features_full_relation = self.connection.sql(f""" + invalid_features_full_relation = self.connection.sql( + f""" SELECT * FROM ({grouped_features.sql_query()}) a ANTI JOIN ({valid_features_parquet_relation.sql_query()}) b ON a.feature_id = b.feature_id - """) + """ + ) total_nodes = parsed_data.nodes.count("id").fetchone()[0] total_ways = parsed_data.ways.count("id").fetchone()[0] @@ -1269,7 +1325,8 @@ def _concatenate_results_to_geoparquet( grouped_invalid_features_result_parquet = ( Path(tmp_dir_name) / "osm_invalid_elements_grouped" ) - self.connection.sql(f""" + self.connection.sql( + f""" COPY ( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb, @@ -1279,7 +1336,8 @@ def _concatenate_results_to_geoparquet( FROM ({invalid_features_full_relation.sql_query()}) ) TO '{grouped_invalid_features_result_parquet}' (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) - """) + """ + ) for group in range(groups + 1): current_invalid_features_group_path = ( @@ -1328,7 +1386,7 @@ def _concatenate_results_to_geoparquet( if empty_columns: joined_parquet_table = joined_parquet_table.drop(empty_columns) - io.write_geoparquet_table( + io.write_geoparquet_table( # type: ignore joined_parquet_table, save_file_path, primary_geometry_column=GEOMETRY_COLUMN ) @@ -1348,10 +1406,15 @@ def _generate_osm_tags_sql_select( parsed_data.ways, parsed_data.relations, ): - found_tag_keys = [row[0] for row in self.connection.sql(f""" + found_tag_keys = [ + row[0] + for row in self.connection.sql( + f""" SELECT DISTINCT UNNEST(map_keys(tags)) tag_key FROM ({elements.sql_query()}) - """).fetchall()] + """ + ).fetchall() + ] osm_tag_keys.update(found_tag_keys) osm_tag_keys_select_clauses = [ f"list_extract(map_extract(tags, '{osm_tag_key}'), 1) as \"{osm_tag_key}\"" @@ -1374,7 +1437,8 @@ def _generate_osm_tags_sql_select( f"(tag_entry.key = '{filter_tag_key}' AND tag_entry.value IN" f" ({', '.join(values_list)}))" ) - osm_tag_keys_select_clauses = [f""" + osm_tag_keys_select_clauses = [ + f""" map_from_entries( [ tag_entry @@ -1382,7 +1446,8 @@ def _generate_osm_tags_sql_select( if {" OR ".join(filter_tag_clauses)} ] ) as tags - """] + """ + ] elif self.merged_tags_filter and explode_tags: for filter_tag_key, filter_tag_value in self.merged_tags_filter.items(): if isinstance(filter_tag_value, bool) and filter_tag_value: @@ -1392,24 +1457,28 @@ def _generate_osm_tags_sql_select( ) elif isinstance(filter_tag_value, str): escaped_value = self._sql_escape(filter_tag_value) - osm_tag_keys_select_clauses.append(f""" + osm_tag_keys_select_clauses.append( + f""" CASE WHEN list_extract( map_extract(tags, '{filter_tag_key}'), 1 ) = '{escaped_value}' THEN '{escaped_value}' ELSE NULL END as "{filter_tag_key}" - """) + """ + ) elif isinstance(filter_tag_value, list) and filter_tag_value: values_list = [f"'{self._sql_escape(value)}'" for value in filter_tag_value] - osm_tag_keys_select_clauses.append(f""" + osm_tag_keys_select_clauses.append( + f""" CASE WHEN list_extract( map_extract(tags, '{filter_tag_key}'), 1 ) IN ({', '.join(values_list)}) THEN list_extract(map_extract(tags, '{filter_tag_key}'), 1) ELSE NULL END as "{filter_tag_key}" - """) + """ + ) if len(osm_tag_keys_select_clauses) > 100: warnings.warn( @@ -1476,10 +1545,12 @@ def _parse_features_relation_to_groups( case_clauses.append(case_clause) joined_case_clauses = ", ".join(case_clauses) - grouped_features_relation = self.connection.sql(f""" + grouped_features_relation = self.connection.sql( + f""" SELECT feature_id, {joined_case_clauses}, geometry FROM ({features_relation.sql_query()}) - """) + """ + ) else: case_clauses = [] group_names = sorted(grouped_tags_filter.keys()) @@ -1520,9 +1591,11 @@ def _parse_features_relation_to_groups( ] ) as tags""" - grouped_features_relation = self.connection.sql(f""" + grouped_features_relation = self.connection.sql( + f""" SELECT feature_id, {non_null_groups_map}, geometry FROM ({features_relation.sql_query()}) - """) + """ + ) return grouped_features_relation