From 5c2f3e304f5822526750f1600ae37799fbc1cd33 Mon Sep 17 00:00:00 2001 From: Kamil Raczycki Date: Wed, 11 Sep 2024 12:43:54 +0200 Subject: [PATCH] feat: refactor duckdb parquet operations --- CHANGELOG.md | 1 + quackosm/pbf_file_reader.py | 70 ++++++++++++++++++++++++------------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7421c9..bb6cbee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Bumped minimal DuckDB version to `1.1.0` +- Refactored geoparquet operations for compatibility with new DuckDB version - Excluded `conftest.py` file from the final library build - Replaced `unary_union` calls with `union_all()` on all GeoDataFrames - Silenced `pooch` library warnings regarding empty SHA hash diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index 2ed0a60..795aa2d 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -915,7 +915,7 @@ def _parse_pbf_file( '{filtered_nodes_with_geometry_path}/**', '{filtered_ways_with_proper_geometry_path}/**', '{filtered_relations_with_geometry_path}/**' - ]); + ], union_by_name=True); """ ) @@ -1601,6 +1601,11 @@ def _save_parquet_file( self._run_query(query, run_in_separate_process) if self.debug_memory: log_message(f"Saved to directory: {file_path}") + + is_empty = not any(file_path.iterdir()) + if is_empty: + relation.to_parquet(str(file_path / "empty.parquet")) + return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')") def _run_query( @@ -1650,10 +1655,12 @@ def _calculate_unique_ids_to_parquet( if result_path is None: result_path = file_path / "distinct" + relation = self.connection.sql(f"SELECT id FROM read_parquet('{file_path}/**') GROUP BY id") + self.connection.sql( f""" COPY ( - SELECT id FROM read_parquet('{file_path}/**') GROUP BY id + {relation.sql_query()} ) TO '{result_path}' ( FORMAT 'parquet', PER_THREAD_OUTPUT true, @@ -1665,6 +1672,10 @@ def _calculate_unique_ids_to_parquet( if self.debug_memory: log_message(f"Saved to directory: {result_path}") + is_empty = not any(result_path.iterdir()) + if is_empty: + relation.to_parquet(str(result_path / "empty.parquet")) + return self.connection.sql(f"SELECT * FROM read_parquet('{result_path}/**')") def _get_filtered_nodes_with_geometry( @@ -2146,6 +2157,7 @@ def _get_filtered_relations_with_geometry( file_path=self.tmp_dir_path / "relation_inner_parts", step_name="Saving relations inner parts", ) + any_relation_inner_parts = relation_inner_parts_parquet.count("id").fetchone()[0] != 0 relation_outer_parts = self.connection.sql( f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry @@ -2158,18 +2170,31 @@ def _get_filtered_relations_with_geometry( file_path=self.tmp_dir_path / "relation_outer_parts", step_name="Saving relations outer parts", ) - relation_outer_parts_with_holes = self.connection.sql( - f""" - SELECT - og.id, - og.geometry_id, - ST_Difference(any_value(og.geometry), ST_Union_Agg(ig.geometry)) geometry - FROM ({relation_outer_parts_parquet.sql_query()}) og - JOIN ({relation_inner_parts_parquet.sql_query()}) ig - ON og.id = ig.id AND ST_WITHIN(ig.geometry, og.geometry) - GROUP BY og.id, og.geometry_id - """ - ) + if any_relation_inner_parts: + relation_outer_parts_with_holes = self.connection.sql( + f""" + SELECT + og.id, + og.geometry_id, + ST_Difference(any_value(og.geometry), ST_Union_Agg(ig.geometry)) geometry + FROM ({relation_outer_parts_parquet.sql_query()}) og + JOIN ({relation_inner_parts_parquet.sql_query()}) ig + ON og.id = ig.id AND ST_WITHIN(ig.geometry, og.geometry) + GROUP BY og.id, og.geometry_id + """ + ) + else: + # Fake empty relation + relation_outer_parts_with_holes = self.connection.sql( + f""" + SELECT + og.id, + og.geometry_id, + og.geometry + FROM ({relation_outer_parts_parquet.sql_query()}) og + WHERE og.id IS NULL + """ + ) relation_outer_parts_with_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_with_holes, file_path=self.tmp_dir_path / "relation_outer_parts_with_holes", @@ -2232,7 +2257,7 @@ def _save_parquet_file_with_geometry( f""" COPY ( SELECT - * EXCLUDE (geometry), ST_AsWKB(ST_MakeValid(geometry)) geometry + * EXCLUDE (geometry), ST_MakeValid(geometry) geometry FROM ({relation.sql_query()}) ) TO '{file_path}' ( FORMAT 'parquet', @@ -2245,6 +2270,10 @@ def _save_parquet_file_with_geometry( if self.debug_memory: log_message(f"Saved to directory: {file_path}") + is_empty = not any(file_path.iterdir()) + if is_empty: + relation.to_parquet(str(file_path / "empty.parquet")) + return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')") def _concatenate_results_to_geoparquet( @@ -2274,18 +2303,9 @@ def _concatenate_results_to_geoparquet( unioned_features, keep_all_tags=keep_all_tags, explode_tags=explode_tags ) - features_full_relation = self.connection.sql( - f""" - SELECT - * EXCLUDE (geometry), - ST_MakeValid(geometry) geometry - FROM ({grouped_features.sql_query()}) - """ - ) - features_parquet_path = self.tmp_dir_path / "osm_valid_elements" self._save_parquet_file_with_geometry( - features_full_relation, + grouped_features, features_parquet_path, step_name="Saving all features", )