Skip to content

Commit

Permalink
Merge pull request #991 from mapswipe/improve-ressource-usage
Browse files Browse the repository at this point in the history
refactor: improve RAM usage and calculation time for StreetProject
  • Loading branch information
ofr1tz authored Feb 6, 2025
2 parents 2a458b5 + d8a5833 commit 3974bf5
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 154 deletions.
182 changes: 75 additions & 107 deletions mapswipe_workers/mapswipe_workers/utils/process_mapillary.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
from functools import partial

import mercantile
import pandas as pd
import requests
from shapely import (
LineString,
MultiLineString,
MultiPolygon,
Point,
Polygon,
box,
unary_union,
)
from shapely import MultiPolygon, Point, Polygon, box, unary_union
from shapely.geometry import shape
from vt2geojson import tools as vt2geojson_tools

Expand Down Expand Up @@ -44,7 +37,7 @@ def create_tiles(polygon, level):
return tiles


def download_and_process_tile(row, attempt_limit=3):
def download_and_process_tile(row, polygon, kwargs, attempt_limit=3):
z = row["z"]
x = row["x"]
y = row["y"]
Expand All @@ -53,37 +46,24 @@ def download_and_process_tile(row, attempt_limit=3):
attempt = 0
while attempt < attempt_limit:
try:
r = requests.get(url)
assert r.status_code == 200, r.content
features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get(
"features", []
)
data = []
for feature in features:
geometry = feature.get("geometry", {})
properties = feature.get("properties", {})
geometry_type = geometry.get("type", None)
coordinates = geometry.get("coordinates", [])

element_geometry = None
if geometry_type == "Point":
element_geometry = Point(coordinates)
elif geometry_type == "LineString":
element_geometry = LineString(coordinates)
elif geometry_type == "MultiLineString":
element_geometry = MultiLineString(coordinates)
elif geometry_type == "Polygon":
element_geometry = Polygon(coordinates[0])
elif geometry_type == "MultiPolygon":
element_geometry = MultiPolygon(coordinates)

# Append the dictionary with geometry and properties
row = {"geometry": element_geometry, **properties}
data.append(row)

data = pd.DataFrame(data)

if not data.empty:
data = get_mapillary_data(url, x, y, z)
if data.isna().all().all() is False or data.empty is False:
data = data[data["geometry"].apply(lambda point: point.within(polygon))]
target_columns = [
"id",
"geometry",
"captured_at",
"is_pano",
"compass_angle",
"sequence",
"organization_id",
]
for col in target_columns:
if col not in data.columns:
data[col] = None
if data.isna().all().all() is False or data.empty is False:
data = filter_results(data, **kwargs)

return data
except Exception as e:
print(f"An exception occurred while requesting a tile: {e}")
Expand All @@ -93,8 +73,28 @@ def download_and_process_tile(row, attempt_limit=3):
return None


def get_mapillary_data(url, x, y, z):
r = requests.get(url)
assert r.status_code == 200, r.content
features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get(
"features", []
)
data = []
data.extend(
[
{
"geometry": Point(feature["geometry"]["coordinates"]),
**feature.get("properties", {}),
}
for feature in features
if feature.get("geometry", {}).get("type") == "Point"
]
)
return pd.DataFrame(data)


def coordinate_download(
polygon, level, use_concurrency=True, attempt_limit=3, workers=os.cpu_count() * 4
polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4
):
tiles = create_tiles(polygon, level)

Expand All @@ -104,48 +104,32 @@ def coordinate_download(
if not use_concurrency:
workers = 1

futures = []
with ThreadPoolExecutor(max_workers=workers) as executor:
for index, row in tiles.iterrows():
futures.append(
executor.submit(download_and_process_tile, row, attempt_limit)
)

for future in as_completed(futures):
if future is not None:
df = future.result()

if df is not None and not df.empty:
downloaded_metadata.append(df)
downloaded_metadata = parallelized_processing(
downloaded_metadata, kwargs, polygon, tiles, workers
)
if len(downloaded_metadata):
downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True)
else:
return pd.DataFrame(downloaded_metadata)

target_columns = [
"id",
"geometry",
"captured_at",
"is_pano",
"compass_angle",
"sequence",
"organization_id",
]
for col in target_columns:
if col not in downloaded_metadata.columns:
downloaded_metadata[col] = None
if (
downloaded_metadata.isna().all().all() is False
or downloaded_metadata.empty is False
):
downloaded_metadata = downloaded_metadata[
downloaded_metadata["geometry"].apply(
lambda point: point.within(polygon)
)
]
return downloaded_metadata


def parallelized_processing(data, kwargs, polygon, tiles, workers):
process_tile_with_args = partial(
download_and_process_tile, polygon=polygon, kwargs=kwargs
)
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = list(
executor.map(process_tile_with_args, tiles.to_dict(orient="records"))
)

for df in futures:
if df is not None and not df.empty:
data.append(df)
return data


def geojson_to_polygon(geojson_data):
if geojson_data["type"] == "FeatureCollection":
features = geojson_data["features"]
Expand Down Expand Up @@ -198,36 +182,31 @@ def filter_results(
)
return None
df = df[df["creator_id"] == creator_id]

if is_pano is not None:
if df["is_pano"].isna().all():
logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.")
return None
df = df[df["is_pano"] == is_pano]

if organization_id is not None:
if df["organization_id"].isna().all():
logger.exception(
"No Mapillary Feature in the AoI has an 'organization_id' value."
)
return None
df = df[df["organization_id"] == organization_id]

if start_time is not None:
if df["captured_at"].isna().all():
logger.exception(
"No Mapillary Feature in the AoI has a 'captured_at' value."
)
return None
df = filter_by_timerange(df, start_time, end_time)

return df


def get_image_metadata(
aoi_geojson,
level=14,
attempt_limit=3,
is_pano: bool = None,
creator_id: int = None,
organization_id: str = None,
Expand All @@ -236,33 +215,22 @@ def get_image_metadata(
randomize_order=False,
sampling_threshold=None,
):
kwargs = {
"is_pano": is_pano,
"creator_id": creator_id,
"organization_id": organization_id,
"start_time": start_time,
"end_time": end_time,
}
aoi_polygon = geojson_to_polygon(aoi_geojson)
downloaded_metadata = coordinate_download(aoi_polygon, level, attempt_limit)

downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs)
if downloaded_metadata.empty or downloaded_metadata.isna().all().all():
raise ValueError("No Mapillary Features in the AoI.")

downloaded_metadata = downloaded_metadata[
downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point))
]

downloaded_metadata = filter_results(
downloaded_metadata,
creator_id,
is_pano,
organization_id,
start_time,
end_time,
)

if (
downloaded_metadata is None
or downloaded_metadata.empty
or downloaded_metadata.isna().all().all()
):
raise ValueError("No Mapillary Features in the AoI match the filter criteria.")

downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)
raise ValueError(
"No Mapillary Features in the AoI or no Features match the filter criteria."
)
downloaded_metadata = downloaded_metadata.drop_duplicates(subset=["geometry"])
if sampling_threshold is not None:
downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)

if randomize_order is True:
downloaded_metadata = downloaded_metadata.sample(frac=1).reset_index(drop=True)
Expand Down
Loading

0 comments on commit 3974bf5

Please sign in to comment.