diff --git a/cars/extractroi.py b/cars/extractroi.py index b4891749..62742e01 100644 --- a/cars/extractroi.py +++ b/cars/extractroi.py @@ -28,26 +28,149 @@ import numpy as np import rasterio as rio from affine import Affine +from shapely.geometry import box -def get_slices_from_bbx(src, bbx): - """get slices from bounding box""" - coordinates = [] - transformer = rio.transform.RPCTransformer(src.rpcs) - for x_bbx in [bbx[0], bbx[2]]: - for y_bbx in [bbx[1], bbx[3]]: - coordinates.append(transformer.rowcol(x_bbx, y_bbx)) +def is_bbx_in_image(bbx, image_dataset): + """ + Checks if the bounding box is within the image. + + Parameters: + bbx (array): The bounding box of the image. + image_dataset (rio.DatasetReader): Opened image dataset. + + """ + image_box = box(0, 0, image_dataset.width, image_dataset.height) + + return image_box.contains(bbx) + + +def get_slices_from_bbx(image_dataset, bbx): + """get slices from bounding box + + Parameters: + image_dataset (rio.DatasetReader): Opened image dataset. + bbx (array): The bounding box of the image. + + Returns: + tuple: The slices from the bounding box. + """ + transformer = rio.transform.RPCTransformer(image_dataset.rpcs) + coordinates = [ + transformer.rowcol(bbx[0], bbx[1]), + transformer.rowcol(bbx[2], bbx[1]), + transformer.rowcol(bbx[2], bbx[3]), + transformer.rowcol(bbx[0], bbx[3]), + ] coordinates = np.array(coordinates) (row_start, col_start) = np.amin(coordinates, axis=0) (row_stop, col_stop) = np.amax(coordinates, axis=0) rows = (row_start, row_stop) cols = (col_start, col_stop) + return rows, cols -def create_geom_file(src, geom_filename): - """create .geom file from rasterio dataset""" - rpcs_as_dict = src.rpcs.to_dict() +def process_image_file( + bbx, input_image_path, output_image_path, geom_file_path +): + """ + Processes an image file by extracting a region based on the given geometry. + + Parameters: + region_geometry (dict): GeoJSON-like dictionary defining the region. + input_image_path (str): Path to the input image file. + output_image_path (str): Path to save the output image. + geom_file_path (str): Path to save the .geom file. + """ + + with rio.open(input_image_path) as image_dataset: + if not image_dataset.rpcs: + raise ValueError("Image dataset has no RPCs") + validate_bounding_box(bbx, image_dataset) + row, col = get_slices_from_bbx(image_dataset, bbx) + window = rio.windows.Window.from_slices(row, col) + array = image_dataset.read(1, window=window) + profile = image_dataset.profile + profile["driver"] = "GTiff" + profile["width"] = window.width + profile["height"] = window.height + profile["transform"] = Affine.translation( + window.col_off, window.row_off + ) + if "crs" in profile: + del profile["crs"] + with rio.open(output_image_path, "w", **profile) as dst: + # write data + dst.write(array, 1) + # copy rpc + dst.rpcs = image_dataset.rpcs + + create_geom_file(image_dataset, geom_file_path) + + +def get_human_readable_bbox(image_dataset): + """ + Get the human-readable bounding box from an image dataset. + + Parameters: + image_dataset (rio.DatasetReader): Opened image dataset. + + Returns: + tuple: The human-readable bounding box in the format + (min_x, max_x, min_y, max_y). + """ + + transformer = rio.transform.RPCTransformer(image_dataset.rpcs) + + human_readable_bbx = [ + transformer.xy(0, 0), + transformer.xy(image_dataset.width, image_dataset.height), + ] + # fix coordinates to precision -7 for (x, y) + image_coords = [ + (round(coord[0], 7), round(coord[1], 7)) for coord in human_readable_bbx + ] + [(x_1, y_1), (x_2, y_2)] = image_coords + + min_x, max_x = min(x_1, x_2), max(x_1, x_2) + min_y, max_y = min(y_1, y_2), max(y_1, y_2) + + return min_x, max_x, min_y, max_y + + +def validate_bounding_box(bbx, image_dataset): + """ + Validate the bounding box coordinates. + + Parameters: + bbx (array): The bounding box of the image. + image_dataset (rio.DatasetReader): Opened image dataset. + """ + + transformer = rio.transform.RPCTransformer(image_dataset.rpcs) + input_box = box( + *transformer.rowcol(bbx[0], bbx[1]), *transformer.rowcol(bbx[2], bbx[3]) + ) + if not is_bbx_in_image(input_box, image_dataset): + min_x, max_x, min_y, max_y = get_human_readable_bbox(image_dataset) + raise ValueError( + f"Coordinates must between " + f"({min_x}, {min_y}) and ({max_x}, {max_y})" + ) + + +def create_geom_file(image_dataset, geom_filename): + """ + Create and save a .geom file from a rasterio dataset + + Parameters: + image_dataset (rio.DatasetReader): Opened image dataset. + geom_filename (str): Path to save the .geom file. + """ + if not image_dataset.rpcs: + raise ValueError("Image dataset has no RPCs") + rpcs_as_dict = image_dataset.rpcs.to_dict() with open(geom_filename, "w", encoding="utf-8") as writer: for key in rpcs_as_dict: if isinstance(rpcs_as_dict[key], list): @@ -98,33 +221,13 @@ def main(): os.makedirs(args.out) # check first input in list to determine pipeline - for idx, image in enumerate(args.il): - ext = os.path.join(args.out, "ext_%03d.tif" % idx) - geom = os.path.splitext(ext)[0] + ".geom" - - with rio.open(image) as src: - # read window from bbx - rows, cols = get_slices_from_bbx(src, args.bbx) - window = rio.windows.Window.from_slices(rows, cols) - array = src.read(1, window=window) - - # update profile for extract - profile = src.profile - profile["driver"] = "GTiff" - profile["width"] = window.width - profile["height"] = window.height - if "crs" in profile: - del profile["crs"] - - translate = Affine.translation(window.col_off, window.row_off) - profile["transform"] *= translate - - # write extract - with rio.open(ext, "w", **profile) as dst: - dst.write(array, 1) - - # write associated geom - create_geom_file(src, geom) + for idx, image_path in enumerate(args.il): + output_image_path = os.path.join(args.out, "ext_%03d.tif" % idx) + geom_file_path = os.path.splitext(output_image_path)[0] + ".geom" + + process_image_file( + args.bbx, image_path, output_image_path, geom_file_path + ) if __name__ == "__main__":