Skip to content

Commit

Permalink
Merge pull request #29 from Datafalk/feat/extractroi
Browse files Browse the repository at this point in the history
fix(extractroi): update profile transform
  • Loading branch information
dyoussef authored Oct 17, 2024
2 parents 3f48a89 + 05893c5 commit fa384c7
Showing 1 changed file with 140 additions and 37 deletions.
177 changes: 140 additions & 37 deletions cars/extractroi.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,149 @@
import numpy as np
import rasterio as rio
from affine import Affine
from shapely.geometry import box


def get_slices_from_bbx(src, bbx):
"""get slices from bounding box"""
coordinates = []
transformer = rio.transform.RPCTransformer(src.rpcs)
for x_bbx in [bbx[0], bbx[2]]:
for y_bbx in [bbx[1], bbx[3]]:
coordinates.append(transformer.rowcol(x_bbx, y_bbx))
def is_bbx_in_image(bbx, image_dataset):
"""
Checks if the bounding box is within the image.
Parameters:
bbx (array): The bounding box of the image.
image_dataset (rio.DatasetReader): Opened image dataset.
"""
image_box = box(0, 0, image_dataset.width, image_dataset.height)

return image_box.contains(bbx)


def get_slices_from_bbx(image_dataset, bbx):
"""get slices from bounding box
Parameters:
image_dataset (rio.DatasetReader): Opened image dataset.
bbx (array): The bounding box of the image.
Returns:
tuple: The slices from the bounding box.
"""
transformer = rio.transform.RPCTransformer(image_dataset.rpcs)
coordinates = [
transformer.rowcol(bbx[0], bbx[1]),
transformer.rowcol(bbx[2], bbx[1]),
transformer.rowcol(bbx[2], bbx[3]),
transformer.rowcol(bbx[0], bbx[3]),
]
coordinates = np.array(coordinates)
(row_start, col_start) = np.amin(coordinates, axis=0)
(row_stop, col_stop) = np.amax(coordinates, axis=0)
rows = (row_start, row_stop)
cols = (col_start, col_stop)

return rows, cols


def create_geom_file(src, geom_filename):
"""create .geom file from rasterio dataset"""
rpcs_as_dict = src.rpcs.to_dict()
def process_image_file(
bbx, input_image_path, output_image_path, geom_file_path
):
"""
Processes an image file by extracting a region based on the given geometry.
Parameters:
region_geometry (dict): GeoJSON-like dictionary defining the region.
input_image_path (str): Path to the input image file.
output_image_path (str): Path to save the output image.
geom_file_path (str): Path to save the .geom file.
"""

with rio.open(input_image_path) as image_dataset:
if not image_dataset.rpcs:
raise ValueError("Image dataset has no RPCs")
validate_bounding_box(bbx, image_dataset)
row, col = get_slices_from_bbx(image_dataset, bbx)
window = rio.windows.Window.from_slices(row, col)
array = image_dataset.read(1, window=window)
profile = image_dataset.profile
profile["driver"] = "GTiff"
profile["width"] = window.width
profile["height"] = window.height
profile["transform"] = Affine.translation(
window.col_off, window.row_off
)
if "crs" in profile:
del profile["crs"]
with rio.open(output_image_path, "w", **profile) as dst:
# write data
dst.write(array, 1)
# copy rpc
dst.rpcs = image_dataset.rpcs

create_geom_file(image_dataset, geom_file_path)


def get_human_readable_bbox(image_dataset):
"""
Get the human-readable bounding box from an image dataset.
Parameters:
image_dataset (rio.DatasetReader): Opened image dataset.
Returns:
tuple: The human-readable bounding box in the format
(min_x, max_x, min_y, max_y).
"""

transformer = rio.transform.RPCTransformer(image_dataset.rpcs)

human_readable_bbx = [
transformer.xy(0, 0),
transformer.xy(image_dataset.width, image_dataset.height),
]
# fix coordinates to precision -7 for (x, y)
image_coords = [
(round(coord[0], 7), round(coord[1], 7)) for coord in human_readable_bbx
]
[(x_1, y_1), (x_2, y_2)] = image_coords

min_x, max_x = min(x_1, x_2), max(x_1, x_2)
min_y, max_y = min(y_1, y_2), max(y_1, y_2)

return min_x, max_x, min_y, max_y


def validate_bounding_box(bbx, image_dataset):
"""
Validate the bounding box coordinates.
Parameters:
bbx (array): The bounding box of the image.
image_dataset (rio.DatasetReader): Opened image dataset.
"""

transformer = rio.transform.RPCTransformer(image_dataset.rpcs)
input_box = box(
*transformer.rowcol(bbx[0], bbx[1]), *transformer.rowcol(bbx[2], bbx[3])
)
if not is_bbx_in_image(input_box, image_dataset):
min_x, max_x, min_y, max_y = get_human_readable_bbox(image_dataset)
raise ValueError(
f"Coordinates must between "
f"({min_x}, {min_y}) and ({max_x}, {max_y})"
)


def create_geom_file(image_dataset, geom_filename):
"""
Create and save a .geom file from a rasterio dataset
Parameters:
image_dataset (rio.DatasetReader): Opened image dataset.
geom_filename (str): Path to save the .geom file.
"""
if not image_dataset.rpcs:
raise ValueError("Image dataset has no RPCs")
rpcs_as_dict = image_dataset.rpcs.to_dict()
with open(geom_filename, "w", encoding="utf-8") as writer:
for key in rpcs_as_dict:
if isinstance(rpcs_as_dict[key], list):
Expand Down Expand Up @@ -98,33 +221,13 @@ def main():
os.makedirs(args.out)

# check first input in list to determine pipeline
for idx, image in enumerate(args.il):
ext = os.path.join(args.out, "ext_%03d.tif" % idx)
geom = os.path.splitext(ext)[0] + ".geom"

with rio.open(image) as src:
# read window from bbx
rows, cols = get_slices_from_bbx(src, args.bbx)
window = rio.windows.Window.from_slices(rows, cols)
array = src.read(1, window=window)

# update profile for extract
profile = src.profile
profile["driver"] = "GTiff"
profile["width"] = window.width
profile["height"] = window.height
if "crs" in profile:
del profile["crs"]

translate = Affine.translation(window.col_off, window.row_off)
profile["transform"] *= translate

# write extract
with rio.open(ext, "w", **profile) as dst:
dst.write(array, 1)

# write associated geom
create_geom_file(src, geom)
for idx, image_path in enumerate(args.il):
output_image_path = os.path.join(args.out, "ext_%03d.tif" % idx)
geom_file_path = os.path.splitext(output_image_path)[0] + ".geom"

process_image_file(
args.bbx, image_path, output_image_path, geom_file_path
)


if __name__ == "__main__":
Expand Down

0 comments on commit fa384c7

Please sign in to comment.