Skip to content

Commit

Permalink
feat: refactor duckdb parquet operations
Browse files Browse the repository at this point in the history
  • Loading branch information
RaczeQ committed Sep 11, 2024
1 parent 0c54d8d commit 5c2f3e3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Bumped minimal DuckDB version to `1.1.0`
- Refactored geoparquet operations for compatibility with new DuckDB version
- Excluded `conftest.py` file from the final library build
- Replaced `unary_union` calls with `union_all()` on all GeoDataFrames
- Silenced `pooch` library warnings regarding empty SHA hash
Expand Down
70 changes: 45 additions & 25 deletions quackosm/pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ def _parse_pbf_file(
'{filtered_nodes_with_geometry_path}/**',
'{filtered_ways_with_proper_geometry_path}/**',
'{filtered_relations_with_geometry_path}/**'
]);
], union_by_name=True);
"""
)

Expand Down Expand Up @@ -1601,6 +1601,11 @@ def _save_parquet_file(
self._run_query(query, run_in_separate_process)
if self.debug_memory:
log_message(f"Saved to directory: {file_path}")

is_empty = not any(file_path.iterdir())
if is_empty:
relation.to_parquet(str(file_path / "empty.parquet"))

return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')")

def _run_query(
Expand Down Expand Up @@ -1650,10 +1655,12 @@ def _calculate_unique_ids_to_parquet(
if result_path is None:
result_path = file_path / "distinct"

relation = self.connection.sql(f"SELECT id FROM read_parquet('{file_path}/**') GROUP BY id")

self.connection.sql(
f"""
COPY (
SELECT id FROM read_parquet('{file_path}/**') GROUP BY id
{relation.sql_query()}
) TO '{result_path}' (
FORMAT 'parquet',
PER_THREAD_OUTPUT true,
Expand All @@ -1665,6 +1672,10 @@ def _calculate_unique_ids_to_parquet(
if self.debug_memory:
log_message(f"Saved to directory: {result_path}")

is_empty = not any(result_path.iterdir())
if is_empty:
relation.to_parquet(str(result_path / "empty.parquet"))

return self.connection.sql(f"SELECT * FROM read_parquet('{result_path}/**')")

def _get_filtered_nodes_with_geometry(
Expand Down Expand Up @@ -2146,6 +2157,7 @@ def _get_filtered_relations_with_geometry(
file_path=self.tmp_dir_path / "relation_inner_parts",
step_name="Saving relations inner parts",
)
any_relation_inner_parts = relation_inner_parts_parquet.count("id").fetchone()[0] != 0
relation_outer_parts = self.connection.sql(
f"""
SELECT id, geometry_id, ST_MakePolygon(geometry) geometry
Expand All @@ -2158,18 +2170,31 @@ def _get_filtered_relations_with_geometry(
file_path=self.tmp_dir_path / "relation_outer_parts",
step_name="Saving relations outer parts",
)
relation_outer_parts_with_holes = self.connection.sql(
f"""
SELECT
og.id,
og.geometry_id,
ST_Difference(any_value(og.geometry), ST_Union_Agg(ig.geometry)) geometry
FROM ({relation_outer_parts_parquet.sql_query()}) og
JOIN ({relation_inner_parts_parquet.sql_query()}) ig
ON og.id = ig.id AND ST_WITHIN(ig.geometry, og.geometry)
GROUP BY og.id, og.geometry_id
"""
)
if any_relation_inner_parts:
relation_outer_parts_with_holes = self.connection.sql(
f"""
SELECT
og.id,
og.geometry_id,
ST_Difference(any_value(og.geometry), ST_Union_Agg(ig.geometry)) geometry
FROM ({relation_outer_parts_parquet.sql_query()}) og
JOIN ({relation_inner_parts_parquet.sql_query()}) ig
ON og.id = ig.id AND ST_WITHIN(ig.geometry, og.geometry)
GROUP BY og.id, og.geometry_id
"""
)
else:
# Fake empty relation
relation_outer_parts_with_holes = self.connection.sql(
f"""
SELECT
og.id,
og.geometry_id,
og.geometry
FROM ({relation_outer_parts_parquet.sql_query()}) og
WHERE og.id IS NULL
"""
)
relation_outer_parts_with_holes_parquet = self._save_parquet_file_with_geometry(
relation=relation_outer_parts_with_holes,
file_path=self.tmp_dir_path / "relation_outer_parts_with_holes",
Expand Down Expand Up @@ -2232,7 +2257,7 @@ def _save_parquet_file_with_geometry(
f"""
COPY (
SELECT
* EXCLUDE (geometry), ST_AsWKB(ST_MakeValid(geometry)) geometry
* EXCLUDE (geometry), ST_MakeValid(geometry) geometry
FROM ({relation.sql_query()})
) TO '{file_path}' (
FORMAT 'parquet',
Expand All @@ -2245,6 +2270,10 @@ def _save_parquet_file_with_geometry(
if self.debug_memory:
log_message(f"Saved to directory: {file_path}")

is_empty = not any(file_path.iterdir())
if is_empty:
relation.to_parquet(str(file_path / "empty.parquet"))

return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')")

def _concatenate_results_to_geoparquet(
Expand Down Expand Up @@ -2274,18 +2303,9 @@ def _concatenate_results_to_geoparquet(
unioned_features, keep_all_tags=keep_all_tags, explode_tags=explode_tags
)

features_full_relation = self.connection.sql(
f"""
SELECT
* EXCLUDE (geometry),
ST_MakeValid(geometry) geometry
FROM ({grouped_features.sql_query()})
"""
)

features_parquet_path = self.tmp_dir_path / "osm_valid_elements"
self._save_parquet_file_with_geometry(
features_full_relation,
grouped_features,
features_parquet_path,
step_name="Saving all features",
)
Expand Down

0 comments on commit 5c2f3e3

Please sign in to comment.