|
1 |
| -from typing import Callable, List, Union |
| 1 | +import os |
| 2 | +from typing import Callable |
2 | 3 |
|
3 |
| -import geoutils as gu |
4 |
| -import numpy as np |
5 | 4 | import pytest
|
6 |
| -import richdem as rd |
7 |
| -from geoutils.raster import RasterType |
8 | 5 |
|
9 |
| -from xdem._typing import NDArrayf |
| 6 | +from xdem.examples import download_and_extract_tarball |
10 | 7 |
|
11 |
| - |
12 |
| -@pytest.fixture(scope="session") # type: ignore |
13 |
| -def raster_to_rda() -> Callable[[RasterType], rd.rdarray]: |
14 |
| - def _raster_to_rda(rst: RasterType) -> rd.rdarray: |
15 |
| - """ |
16 |
| - Convert geoutils.Raster to richDEM rdarray. |
17 |
| - """ |
18 |
| - arr = rst.data.filled(rst.nodata).squeeze() |
19 |
| - rda = rd.rdarray(arr, no_data=rst.nodata) |
20 |
| - rda.geotransform = rst.transform.to_gdal() |
21 |
| - return rda |
22 |
| - |
23 |
| - return _raster_to_rda |
24 |
| - |
25 |
| - |
26 |
| -@pytest.fixture(scope="session") # type: ignore |
27 |
| -def get_terrainattr_richdem(raster_to_rda: Callable[[RasterType], rd.rdarray]) -> Callable[[RasterType, str], NDArrayf]: |
28 |
| - def _get_terrainattr_richdem(rst: RasterType, attribute: str = "slope_radians") -> NDArrayf: |
29 |
| - """ |
30 |
| - Derive terrain attribute for DEM opened with geoutils.Raster using RichDEM. |
31 |
| - """ |
32 |
| - rda = raster_to_rda(rst) |
33 |
| - terrattr = rd.TerrainAttribute(rda, attrib=attribute) |
34 |
| - terrattr[terrattr == terrattr.no_data] = np.nan |
35 |
| - return np.array(terrattr) |
36 |
| - |
37 |
| - return _get_terrainattr_richdem |
| 8 | +_TESTDATA_DIRECTORY = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "tests", "test_data")) |
38 | 9 |
|
39 | 10 |
|
40 | 11 | @pytest.fixture(scope="session") # type: ignore
|
41 |
| -def get_terrain_attribute_richdem( |
42 |
| - get_terrainattr_richdem: Callable[[RasterType, str], NDArrayf] |
43 |
| -) -> Callable[[RasterType, Union[str, list[str]], bool, float, float, float], Union[RasterType, list[RasterType]]]: |
44 |
| - def _get_terrain_attribute_richdem( |
45 |
| - dem: RasterType, |
46 |
| - attribute: Union[str, List[str]], |
47 |
| - degrees: bool = True, |
48 |
| - hillshade_altitude: float = 45.0, |
49 |
| - hillshade_azimuth: float = 315.0, |
50 |
| - hillshade_z_factor: float = 1.0, |
51 |
| - ) -> Union[RasterType, List[RasterType]]: |
52 |
| - """ |
53 |
| - Derive one or multiple terrain attributes from a DEM using RichDEM. |
54 |
| - """ |
55 |
| - if isinstance(attribute, str): |
56 |
| - attribute = [attribute] |
57 |
| - |
58 |
| - if not isinstance(dem, gu.Raster): |
59 |
| - raise ValueError("DEM must be a geoutils.Raster object.") |
60 |
| - |
61 |
| - terrain_attributes = {} |
62 |
| - |
63 |
| - # Check which products should be made to optimize the processing |
64 |
| - make_aspect = any(attr in attribute for attr in ["aspect", "hillshade"]) |
65 |
| - make_slope = any( |
66 |
| - attr in attribute |
67 |
| - for attr in [ |
68 |
| - "slope", |
69 |
| - "hillshade", |
70 |
| - "planform_curvature", |
71 |
| - "aspect", |
72 |
| - "profile_curvature", |
73 |
| - "maximum_curvature", |
74 |
| - ] |
75 |
| - ) |
76 |
| - make_hillshade = "hillshade" in attribute |
77 |
| - make_curvature = "curvature" in attribute |
78 |
| - make_planform_curvature = "planform_curvature" in attribute or "maximum_curvature" in attribute |
79 |
| - make_profile_curvature = "profile_curvature" in attribute or "maximum_curvature" in attribute |
80 |
| - |
81 |
| - if make_slope: |
82 |
| - terrain_attributes["slope"] = get_terrainattr_richdem(dem, "slope_radians") |
83 |
| - |
84 |
| - if make_aspect: |
85 |
| - # The aspect of RichDEM is returned in degrees, we convert to radians to match the others |
86 |
| - terrain_attributes["aspect"] = np.deg2rad(get_terrainattr_richdem(dem, "aspect")) |
87 |
| - # For flat slopes, RichDEM returns a 90° aspect by default, while GDAL return a 180° aspect |
88 |
| - # We stay consistent with GDAL |
89 |
| - slope_tmp = get_terrainattr_richdem(dem, "slope_radians") |
90 |
| - terrain_attributes["aspect"][slope_tmp == 0] = np.pi |
91 |
| - |
92 |
| - if make_hillshade: |
93 |
| - # If a different z-factor was given, slopemap with exaggerated gradients. |
94 |
| - if hillshade_z_factor != 1.0: |
95 |
| - slopemap = np.arctan(np.tan(terrain_attributes["slope"]) * hillshade_z_factor) |
96 |
| - else: |
97 |
| - slopemap = terrain_attributes["slope"] |
98 |
| - |
99 |
| - azimuth_rad = np.deg2rad(360 - hillshade_azimuth) |
100 |
| - altitude_rad = np.deg2rad(hillshade_altitude) |
101 |
| - |
102 |
| - # The operation below yielded the closest hillshade to GDAL (multiplying by 255 did not work) |
103 |
| - # As 0 is generally no data for this uint8, we add 1 and then 0.5 for the rounding to occur between |
104 |
| - # 1 and 255 |
105 |
| - terrain_attributes["hillshade"] = np.clip( |
106 |
| - 1.5 |
107 |
| - + 254 |
108 |
| - * ( |
109 |
| - np.sin(altitude_rad) * np.cos(slopemap) |
110 |
| - + np.cos(altitude_rad) * np.sin(slopemap) * np.sin(azimuth_rad - terrain_attributes["aspect"]) |
111 |
| - ), |
112 |
| - 0, |
113 |
| - 255, |
114 |
| - ).astype("float32") |
115 |
| - |
116 |
| - if make_curvature: |
117 |
| - terrain_attributes["curvature"] = get_terrainattr_richdem(dem, "curvature") |
118 |
| - |
119 |
| - if make_planform_curvature: |
120 |
| - terrain_attributes["planform_curvature"] = get_terrainattr_richdem(dem, "planform_curvature") |
121 |
| - |
122 |
| - if make_profile_curvature: |
123 |
| - terrain_attributes["profile_curvature"] = get_terrainattr_richdem(dem, "profile_curvature") |
124 |
| - |
125 |
| - # Convert the unit if wanted. |
126 |
| - if degrees: |
127 |
| - for attr in ["slope", "aspect"]: |
128 |
| - if attr not in terrain_attributes: |
129 |
| - continue |
130 |
| - terrain_attributes[attr] = np.rad2deg(terrain_attributes[attr]) |
131 |
| - |
132 |
| - output_attributes = [terrain_attributes[key].reshape(dem.shape) for key in attribute] |
| 12 | +def get_test_data_path() -> Callable[[str], str]: |
| 13 | + def _get_test_data_path(filename: str, overwrite: bool = False) -> str: |
| 14 | + """Get file from test_data""" |
| 15 | + download_and_extract_tarball(dir="test_data", target_dir=_TESTDATA_DIRECTORY, overwrite=overwrite) |
| 16 | + file_path = os.path.join(_TESTDATA_DIRECTORY, filename) |
133 | 17 |
|
134 |
| - if isinstance(dem, gu.Raster): |
135 |
| - output_attributes = [ |
136 |
| - gu.Raster.from_array(attr, transform=dem.transform, crs=dem.crs, nodata=-99999) |
137 |
| - for attr in output_attributes |
138 |
| - ] |
| 18 | + if not os.path.exists(file_path): |
| 19 | + if overwrite: |
| 20 | + raise FileNotFoundError(f"The file {filename} was not found in the test_data directory.") |
| 21 | + file_path = _get_test_data_path(filename, overwrite=True) |
139 | 22 |
|
140 |
| - return output_attributes if len(output_attributes) > 1 else output_attributes[0] |
| 23 | + return file_path |
141 | 24 |
|
142 |
| - return _get_terrain_attribute_richdem |
| 25 | + return _get_test_data_path |
0 commit comments