diff --git a/pdm.lock b/pdm.lock index 4ad36d7..c582f1c 100644 --- a/pdm.lock +++ b/pdm.lock @@ -3,8 +3,9 @@ [metadata] groups = ["default", "dev", "docs", "license", "lint", "test", "cli", "cli-dev"] -strategy = ["cross_platform"] -lock_version = "4.4" +cross_platform = true +static_urls = false +lock_version = "4.3" content_hash = "sha256:eac2ad81a7700f5d1257f51451388c3813c5309544b0a97c4c8c99edd83296ab" [[package]] @@ -2939,7 +2940,7 @@ files = [ [[package]] name = "topojson" version = "1.7" -summary = "topojson - a powerful library to encode geographic data as topology in Python!ğŸŒ�" +summary = "topojson - a powerful library to encode geographic data as topology in Python!🌍" dependencies = [ "numpy", "packaging", diff --git a/quackosm/_rich_progress.py b/quackosm/_rich_progress.py index 5f1372d..94a6419 100644 --- a/quackosm/_rich_progress.py +++ b/quackosm/_rich_progress.py @@ -5,6 +5,7 @@ __all__ = ["TaskProgressSpinner", "TaskProgressBar"] +TOTAL_STEPS = 34 class TaskProgressSpinner: def __init__(self, step_name: str, step_number: str): @@ -18,7 +19,7 @@ def __enter__(self): self.progress = Progress( SpinnerColumn(), - TextColumn(f"[{self.step_number: >4}/18]"), + TextColumn(f"[{self.step_number: >4}/{TOTAL_STEPS}]"), TextColumn("[progress.description]{task.description}"), TextColumn("•"), TimeElapsedColumn(), @@ -58,7 +59,7 @@ def __enter__(self): self.progress = Progress( SpinnerColumn(), - TextColumn(f"[{self.step_number: >4}/18]"), + TextColumn(f"[{self.step_number: >4}/{TOTAL_STEPS}]"), TextColumn( "[progress.description]{task.description}" " [progress.percentage]{task.percentage:>3.0f}%" @@ -87,7 +88,7 @@ def __exit__(self, exc_type, exc_value, exc_tb): def track(self, iterable: Iterable): if self.progress is not None: - for i in self.progress.track(iterable, description=self.step_name): + for i in self.progress.track(list(iterable), description=self.step_name): yield i else: for i in iterable: diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index 9d4dad8..8e372a2 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -6,7 +6,6 @@ import hashlib import json -import shutil import tempfile import warnings from collections.abc import Iterable @@ -224,7 +223,9 @@ def convert_pbf_to_gpq( if explode_tags is None: explode_tags = self.tags_filter is not None - with tempfile.TemporaryDirectory(dir=self.working_directory.resolve()) as tmp_dir_name: + with tempfile.TemporaryDirectory(dir=self.working_directory.resolve()) as tmp_dir_name_2: + tmp_dir_name = tmp_dir_name_2 + tmp_dir_name = "files/xd" try: self._set_up_duckdb_connection(tmp_dir_name) result_file_path = result_file_path or self._generate_geoparquet_result_file_path( @@ -401,13 +402,7 @@ def _prefilter_elements_ids( is_intersecting = self.geometry_filter is not None - nodes_prepared_ids_path = Path(tmp_dir_name) / "nodes_prepared_ids" - nodes_prepared_ids_path.mkdir(parents=True, exist_ok=True) - - ways_prepared_ids_path = Path(tmp_dir_name) / "ways_prepared_ids" - ways_prepared_ids_path.mkdir(parents=True, exist_ok=True) - - with TaskProgressSpinner("Filtering nodes", "1"): + with TaskProgressSpinner("Reading nodes", "1"): # NODES - VALID (NV) # - select all with kind = 'node' # - select all with lat and lon not empty @@ -424,14 +419,15 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "nodes_valid_with_tags", ) - # NODES - INTERSECTING (NI) - # - select all from NV which intersect given geometry filter - # NODES - FILTERED (NF) - # - select all from NI with tags filter - filter_osm_node_ids_filter = self._generate_elements_filter(filter_osm_ids, "node") - if is_intersecting: - wkt = cast(BaseGeometry, self.geometry_filter).wkt - intersection_filter = f"ST_Intersects(ST_Point(lon, lat), ST_GeomFromText('{wkt}'))" + # NODES - INTERSECTING (NI) + # - select all from NV which intersect given geometry filter + # NODES - FILTERED (NF) + # - select all from NI with tags filter + filter_osm_node_ids_filter = self._generate_elements_filter(filter_osm_ids, "node") + if is_intersecting: + wkt = cast(BaseGeometry, self.geometry_filter).wkt + intersection_filter = f"ST_Intersects(ST_Point(lon, lat), ST_GeomFromText('{wkt}'))" + with TaskProgressSpinner("Filtering nodes - intersection", "2"): nodes_intersecting_ids = self._sql_to_parquet_file( sql_query=f""" SELECT DISTINCT id FROM ({nodes_valid_with_tags.sql_query()}) n @@ -439,6 +435,7 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "nodes_intersecting_ids", ) + with TaskProgressSpinner("Filtering nodes - tags", "3"): self._sql_to_parquet_file( sql_query=f""" SELECT id FROM ({nodes_valid_with_tags.sql_query()}) n @@ -448,7 +445,10 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "nodes_filtered_non_distinct_ids", ) - else: + else: + with TaskProgressSpinner("Filtering nodes - intersection", "2"): + pass + with TaskProgressSpinner("Filtering nodes - tags", "3"): nodes_intersecting_ids = nodes_valid_with_tags self._sql_to_parquet_file( sql_query=f""" @@ -458,12 +458,13 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "nodes_filtered_non_distinct_ids", ) + with TaskProgressSpinner("Calculating distinct filtered nodes ids", "4"): nodes_filtered_ids = self._calculate_unique_ids_to_parquet( Path(tmp_dir_name) / "nodes_filtered_non_distinct_ids", Path(tmp_dir_name) / "nodes_filtered_ids", ) - with TaskProgressSpinner("Filtering ways", "2"): + with TaskProgressSpinner("Reading ways", "5"): # WAYS - VALID (WV) # - select all with kind = 'way' # - select all with more then one ref @@ -487,6 +488,7 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "ways_all_with_tags", ) + with TaskProgressSpinner("Unnesting ways", "6"): ways_with_unnested_nodes_refs = self._sql_to_parquet_file( sql_query=""" SELECT w.id, UNNEST(refs) as ref, UNNEST(range(length(refs))) as ref_idx @@ -494,6 +496,7 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "ways_with_unnested_nodes_refs", ) + with TaskProgressSpinner("Filtering ways - valid refs", "7"): ways_valid_ids = self._sql_to_parquet_file( sql_query=f""" WITH total_ways_with_nodes_refs AS ( @@ -513,6 +516,8 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "ways_valid_ids", ) + + with TaskProgressSpinner("Filtering ways - intersection", "8"): # WAYS - INTERSECTING (WI) # - select all from WV with joining any from NV on ref if is_intersecting: @@ -527,6 +532,7 @@ def _prefilter_elements_ids( ) else: ways_intersecting_ids = ways_valid_ids + with TaskProgressSpinner("Filtering ways - tags", "9"): # WAYS - FILTERED (WF) # - select all from WI with tags filter filter_osm_way_ids_filter = self._generate_elements_filter(filter_osm_ids, "way") @@ -538,12 +544,17 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "ways_filtered_non_distinct_ids", ) + + ways_prepared_ids_path = Path(tmp_dir_name) / "ways_prepared_ids" + ways_prepared_ids_path.mkdir(parents=True, exist_ok=True) + + with TaskProgressSpinner("Calculating distinct filtered ways ids", "10"): ways_filtered_ids = self._calculate_unique_ids_to_parquet( Path(tmp_dir_name) / "ways_filtered_non_distinct_ids", ways_prepared_ids_path / "filtered", ) - with TaskProgressSpinner("Filtering relations", "3"): + with TaskProgressSpinner("Reading relations", "11"): # RELATIONS - VALID (RV) # - select all with kind = 'relation' # - select all with more then one ref @@ -570,6 +581,8 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "relations_all_with_tags", ) + + with TaskProgressSpinner("Unnesting relations", "12"): relations_with_unnested_way_refs = self._sql_to_parquet_file( sql_query=""" WITH unnested_relation_refs AS ( @@ -587,6 +600,8 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "relations_with_unnested_way_refs", ) + + with TaskProgressSpinner("Filtering relations - valid refs", "13"): relations_valid_ids = self._sql_to_parquet_file( sql_query=f""" WITH total_relation_refs AS ( @@ -606,6 +621,8 @@ def _prefilter_elements_ids( """, file_path=Path(tmp_dir_name) / "relations_valid_ids", ) + + with TaskProgressSpinner("Filtering relations - intersection", "14"): # RELATIONS - INTERSECTING (RI) # - select all from RW with joining any from RV on ref if is_intersecting: @@ -620,6 +637,8 @@ def _prefilter_elements_ids( ) else: relations_intersecting_ids = relations_valid_ids + + with TaskProgressSpinner("Filtering relations - tags", "15"): # RELATIONS - FILTERED (RF) # - select all from RI with tags filter filter_osm_relation_ids_filter = self._generate_elements_filter( @@ -636,11 +655,13 @@ def _prefilter_elements_ids( """, file_path=relations_ids_path / "filtered", ) + + with TaskProgressSpinner("Calculating distinct filtered relations ids", "16"): relations_filtered_ids = self._calculate_unique_ids_to_parquet( relations_ids_path / "filtered", Path(tmp_dir_name) / "relations_filtered_ids" ) - with TaskProgressSpinner("Loading required ways", "4"): + with TaskProgressSpinner("Loading required ways - by relations", "17"): # WAYS - REQUIRED (WR) # - required - all IDs from WF # + all needed to construct relations from RF @@ -652,11 +673,16 @@ def _prefilter_elements_ids( """, file_path=ways_prepared_ids_path / "required_by_relations", ) + + with TaskProgressSpinner("Calculating distinct required ways ids", "18"): ways_required_ids = self._calculate_unique_ids_to_parquet( ways_prepared_ids_path, Path(tmp_dir_name) / "ways_required_ids" ) - with TaskProgressSpinner("Loading required nodes", "5"): + nodes_prepared_ids_path = Path(tmp_dir_name) / "nodes_prepared_ids" + nodes_prepared_ids_path.mkdir(parents=True, exist_ok=True) + + with TaskProgressSpinner("Loading required nodes - by relations", "19"): # NODES - REQUIRED (WR) # - required - all IDs from NF # + all needed to construct ways from WR @@ -669,6 +695,8 @@ def _prefilter_elements_ids( """, file_path=nodes_prepared_ids_path / "required_by_relations", ) + + with TaskProgressSpinner("Loading required nodes - by ways", "20"): self._sql_to_parquet_file( sql_query=f""" SELECT ref as id @@ -677,6 +705,8 @@ def _prefilter_elements_ids( """, file_path=nodes_prepared_ids_path / "required_by_ways", ) + + with TaskProgressSpinner("Calculating distinct required nodes ids", "21"): nodes_required_ids = self._calculate_unique_ids_to_parquet( nodes_prepared_ids_path, Path(tmp_dir_name) / "nodes_required_ids" ) @@ -703,7 +733,7 @@ def _delete_directories( directory_path = Path(tmp_dir_name) / directory if not directory_path.exists(): continue - shutil.rmtree(directory_path) + # shutil.rmtree(directory_path) def _generate_osm_tags_sql_filter(self) -> str: """Prepare features filter clauses based on tags filter.""" @@ -828,7 +858,7 @@ def _get_filtered_nodes_with_geometry( relation=nodes_with_geometry, file_path=Path(tmp_dir_name) / "filtered_nodes_with_geometry", step_name="Saving nodes with geometries", - step_number="6", + step_number="22", ) return nodes_parquet @@ -844,7 +874,7 @@ def _get_required_nodes_with_structs( FROM ({osm_parquet_files.nodes_valid_with_tags.sql_query()}) n SEMI JOIN ({osm_parquet_files.nodes_required_ids.sql_query()}) rn ON n.id = rn.id """) - with TaskProgressSpinner("Saving filtered nodes with structs", "7"): + with TaskProgressSpinner("Saving filtered nodes with structs", "23"): nodes_parquet = self._save_parquet_file( relation=nodes_with_structs, file_path=Path(tmp_dir_name) / "required_nodes_with_points", @@ -857,7 +887,7 @@ def _get_required_ways_with_linestrings( required_nodes_with_structs: "duckdb.DuckDBPyRelation", tmp_dir_name: str, ) -> "duckdb.DuckDBPyRelation": - with TaskProgressSpinner("Grouping required ways", "8"): + with TaskProgressSpinner("Grouping required ways", "24"): total_required_ways = osm_parquet_files.ways_required_ids.count("id").fetchone()[0] required_ways_with_linestrings_path = ( @@ -875,19 +905,28 @@ def _get_required_ways_with_linestrings( groups = floor(total_required_ways / self.rows_per_bucket) grouped_required_ways_ids_path = Path(tmp_dir_name) / "ways_required_ids_grouped" + self.connection.sql(f""" COPY ( + WITH grouped_ways_ids AS ( + SELECT id, + floor( + row_number() OVER () / {self.rows_per_bucket} + )::INTEGER as "group", + FROM ({osm_parquet_files.ways_required_ids.sql_query()}) + ) SELECT - *, - floor( - row_number() OVER (ORDER BY id) / {self.rows_per_bucket} - )::INTEGER as "group", - FROM ({osm_parquet_files.ways_required_ids.sql_query()}) + w.id, n.point, w.ref_idx, rw."group", + FROM ({osm_parquet_files.ways_with_unnested_nodes_refs.sql_query()}) w + JOIN grouped_ways_ids rw + ON w.id = rw.id + JOIN ({required_nodes_with_structs.sql_query()}) n + ON n.id = w.ref ) TO '{grouped_required_ways_ids_path}' (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) """) - with TaskProgressBar("Saving required ways with linestrings", "9") as bar: + with TaskProgressBar("Saving required ways with linestrings", "25") as bar: for group in bar.track(range(groups + 1)): current_required_ways_ids_group_path = ( grouped_required_ways_ids_path / f"group={group}" @@ -898,14 +937,7 @@ def _get_required_ways_with_linestrings( ways_with_linestrings = self.connection.sql(f""" SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring - FROM ( - SELECT w.id, n.point, w.ref_idx - FROM ({osm_parquet_files.ways_with_unnested_nodes_refs.sql_query()}) w - SEMI JOIN ({current_required_ways_ids_group_relation.sql_query()}) rw - ON w.id = rw.id - JOIN ({required_nodes_with_structs.sql_query()}) n - ON n.id = w.ref - ) + FROM ({current_required_ways_ids_group_relation.sql_query()}) GROUP BY id """) self._save_parquet_file( @@ -994,7 +1026,7 @@ def _get_filtered_ways_with_proper_geometry( relation=ways_with_proper_geometry, file_path=Path(tmp_dir_name) / "filtered_ways_with_geometry", step_name="Saving ways with geometries", - step_number="10", + step_number="26", ) return ways_parquet @@ -1063,8 +1095,8 @@ def _get_filtered_relations_with_geometry( valid_relation_parts_parquet = self._save_parquet_file_with_geometry( relation=valid_relation_parts, file_path=Path(tmp_dir_name) / "valid_relation_parts", - step_name="Saving valid relation parts", - step_number="11", + step_name="Saving valid relations parts", + step_number="27", ) relation_inner_parts = self.connection.sql(f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry @@ -1075,8 +1107,8 @@ def _get_filtered_relations_with_geometry( relation=relation_inner_parts, file_path=Path(tmp_dir_name) / "relation_inner_parts", fix_geometries=True, - step_name="Saving relation inner parts", - step_number="12", + step_name="Saving relations inner parts", + step_number="28", ) relation_outer_parts = self.connection.sql(f""" SELECT id, geometry_id, ST_MakePolygon(geometry) geometry @@ -1087,8 +1119,8 @@ def _get_filtered_relations_with_geometry( relation=relation_outer_parts, file_path=Path(tmp_dir_name) / "relation_outer_parts", fix_geometries=True, - step_name="Saving relation outer parts", - step_number="13", + step_name="Saving relations outer parts", + step_number="29", ) relation_outer_parts_with_holes = self.connection.sql(f""" SELECT @@ -1103,8 +1135,8 @@ def _get_filtered_relations_with_geometry( relation_outer_parts_with_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_with_holes, file_path=Path(tmp_dir_name) / "relation_outer_parts_with_holes", - step_name="Saving relation outer parts with holes", - step_number="14", + step_name="Saving relations outer parts with holes", + step_number="30", ) relation_outer_parts_without_holes = self.connection.sql(f""" SELECT @@ -1118,8 +1150,8 @@ def _get_filtered_relations_with_geometry( relation_outer_parts_without_holes_parquet = self._save_parquet_file_with_geometry( relation=relation_outer_parts_without_holes, file_path=Path(tmp_dir_name) / "relation_outer_parts_without_holes", - step_name="Saving relation outer parts without holes", - step_number="15", + step_name="Saving relations outer parts without holes", + step_number="31", ) relations_with_geometry = self.connection.sql(f""" WITH unioned_outer_geometries AS ( @@ -1142,8 +1174,8 @@ def _get_filtered_relations_with_geometry( relations_parquet = self._save_parquet_file_with_geometry( relation=relations_with_geometry, file_path=Path(tmp_dir_name) / "filtered_relations_with_geometry", - step_name="Saving relation with geometries", - step_number="16", + step_name="Saving relations with geometries", + step_number="32", ) return relations_parquet @@ -1290,7 +1322,7 @@ def _concatenate_results_to_geoparquet( valid_features_full_relation, valid_features_parquet_path, step_name="Saving valid features", - step_number="17.1", + step_number="33.1", ) valid_features_parquet_table = pq.read_table(valid_features_parquet_path) @@ -1327,7 +1359,7 @@ def _concatenate_results_to_geoparquet( invalid_features = total_features - valid_features if invalid_features > 0: - with TaskProgressSpinner("Grouping invalid features", "17.2"): + with TaskProgressSpinner("Grouping invalid features", "33.2"): groups = floor(invalid_features / self.rows_per_bucket) grouped_invalid_features_result_parquet = ( Path(tmp_dir_name) / "osm_invalid_elements_grouped" @@ -1337,14 +1369,14 @@ def _concatenate_results_to_geoparquet( SELECT * EXCLUDE (geometry), ST_AsWKB(geometry) geometry_wkb, floor( - row_number() OVER (ORDER BY feature_id) / {self.rows_per_bucket} + row_number() OVER () / {self.rows_per_bucket} )::INTEGER as "group", FROM ({invalid_features_full_relation.sql_query()}) ) TO '{grouped_invalid_features_result_parquet}' (FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000) """) - with TaskProgressBar("Fixing invalid features", "17.3") as bar: + with TaskProgressBar("Fixing invalid features", "33.3") as bar: for group in bar.track(range(groups + 1)): current_invalid_features_group_path = ( grouped_invalid_features_result_parquet / f"group={group}" @@ -1390,7 +1422,7 @@ def _concatenate_results_to_geoparquet( if empty_columns: joined_parquet_table = joined_parquet_table.drop(empty_columns) - with TaskProgressSpinner("Saving final geoparquet file", "18"): + with TaskProgressSpinner("Saving final geoparquet file", "34"): io.write_geoparquet_table( # type: ignore joined_parquet_table, save_file_path, primary_geometry_column=GEOMETRY_COLUMN )