diff --git a/.gitignore b/.gitignore index b1d6ef1..95364d1 100644 --- a/.gitignore +++ b/.gitignore @@ -167,4 +167,4 @@ sigmet2zarr.egg-info #.idea/ #Notebooks -sigmet2zarr/task2vcp.py +raw2zarr/task2vcp.py diff --git a/README.md b/README.md index 9d67da9..dbe4464 100755 --- a/README.md +++ b/README.md @@ -39,6 +39,11 @@ with the new open data paradigm, emphasizing the FAIR principles (Findable, Acce +> [!WARNING] +> **This project is currently in high development mode.** +> Features may change frequently, and some parts of the library may be incomplete or subject to change. Please proceed with caution. + + ### Running on Your Own Machine If you are interested in running this material locally on your computer, you will need to follow this workflow: diff --git a/environment.yml b/environment.yml index 56e2aed..5cfbe92 100755 --- a/environment.yml +++ b/environment.yml @@ -20,9 +20,8 @@ dependencies: - wradlib - hvplot - datashader + - xarray>=2024.10 + - xradar>=0.8.0 - pip - - pydata-sphinx-theme - pip: - - git+https://github.com/aladinor/raw2zarr - - git+https://github.com/openradar/xradar.git - - git+https://github.com/pydata/xarray.git + - -e . diff --git a/raw2zarr/__init__.py b/raw2zarr/__init__.py new file mode 100644 index 0000000..f2bba00 --- /dev/null +++ b/raw2zarr/__init__.py @@ -0,0 +1,25 @@ +""" +raw2zarr +====== + +Top-level package for raw2zarr. + +""" + +__author__ = """Alfonso Ladino""" +__email__ = "alfonso8@illinois.edu" + +from .dtree_builder import datatree_builder, append_sequential, append_parallel +from .data_reader import accessor_wrapper +from .utils import ensure_dimension, fix_angle, batch, dtree_encoding + +__all__ = [ + "datatree_builder", + "append_sequential", + "append_parallel", + "accessor_wrapper", + "ensure_dimension", + "fix_angle", + "batch", + "dtree_encoding", +] diff --git a/sigmet2zarr/create_dataset.py b/raw2zarr/create_dataset.py similarity index 99% rename from sigmet2zarr/create_dataset.py rename to raw2zarr/create_dataset.py index 948140e..83a8b62 100644 --- a/sigmet2zarr/create_dataset.py +++ b/raw2zarr/create_dataset.py @@ -38,6 +38,7 @@ def radar_convert(): query = create_query(date=date_query, radar_site=radar_name) str_bucket = "s3://s3-radaresideam/" fs = fsspec.filesystem("s3", anon=True) + x radar_files = [ f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*")) ][:30] diff --git a/raw2zarr/data_reader.py b/raw2zarr/data_reader.py new file mode 100644 index 0000000..f7bfe27 --- /dev/null +++ b/raw2zarr/data_reader.py @@ -0,0 +1,96 @@ +from typing import List, Iterable +import os + +import xradar +import fsspec +import dask.bag as db +from xarray import DataTree +from xarray.backends.common import _normalize_path +from s3fs import S3File + +# Relative imports +from .utils import prepare_for_read, batch, fix_angle + + +def accessor_wrapper( + filename_or_obj: str | os.PathLike, + engine: str = "iris", +) -> DataTree: + """Wrapper function to load radar data for a single file or iterable of files with fsspec and compression check.""" + try: + file = prepare_for_read(filename_or_obj) + return _load_file(file, engine) + except Exception as e: + print(f"Error loading {filename_or_obj}: {e}") + return None + + +def _load_file(file, engine) -> DataTree: + """Helper function to load a single file with the specified backend.""" + if engine == "iris": + if isinstance(file, S3File): + return xradar.io.open_iris_datatree(file.read()) + elif isinstance(file, bytes): + return xradar.io.open_iris_datatree(file) + else: + return xradar.io.open_iris_datatree(file) + elif engine == "odim": + return xradar.io.open_odim_datatree(file) + elif engine == "nexradlevel2": + if isinstance(file, S3File): + local_file = fsspec.open_local( + f"simplecache::s3://{file.path}", + s3={"anon": True}, + filecache={"cache_storage": "."}, + ) + data_tree = xradar.io.open_nexradlevel2_datatree(local_file) + + # Remove the local file after loading the data + os.remove(local_file) + return data_tree + else: + return xradar.io.open_nexradlevel2_datatree(file) + else: + raise ValueError(f"Unsupported backend: {engine}") + + +def _process_file(args): + file, engine = args + return accessor_wrapper(file, engine=engine) + + +def load_radar_data( + filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], + backend: str = "iris", + parallel: bool = False, + batch_size: int = 12, +) -> DataTree: + """ + Load radar data from files in batches to avoid memory overload. + + Parameters: + filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): Path(s) to radar data files. + backend (str): Backend type to use. Options include 'iris', 'odim', etc. Default is 'iris'. + parallel (bool): If True, enables parallel processing with Dask. Default is False. + batch_size (int): Number of files to process in each batch. + + Returns: + Iterable[List[DataTree]]: An iterable yielding batches of DataTree objects. + """ + filename_or_obj = _normalize_path(filename_or_obj) + + for files_batch in batch(filename_or_obj, batch_size): + ls_dtree = [] + + if parallel: + bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map( + accessor_wrapper, backend=backend + ) + ls_dtree.extend(bag.compute()) + else: + for file_path in files_batch: + result = accessor_wrapper(file_path, engine=backend) + if result is not None: + ls_dtree.append(result) + + yield ls_dtree # Yield each batch of DataTree objects diff --git a/raw2zarr/dtree_builder.py b/raw2zarr/dtree_builder.py new file mode 100644 index 0000000..67ece73 --- /dev/null +++ b/raw2zarr/dtree_builder.py @@ -0,0 +1,225 @@ +from typing import Iterable, List, Union +import os + +from xarray import DataTree, Dataset +from xarray.backends.common import _normalize_path + +# Relative imports +from .data_reader import accessor_wrapper +from .utils import ensure_dimension, fix_angle, dtree_encoding, batch + + +def datatree_builder( + filename_or_obj: Union[str, os.PathLike, Iterable[Union[str, os.PathLike]]], + engine: str = "iris", + dim: str = "vcp_time", +) -> DataTree: + """ + Construct a hierarchical xarray.DataTree from radar data files. + + This function loads radar data from one or more files and organizes it into a nested + `xarray.DataTree` structure. The data can be processed in batches and supports different + backend engines for reading the data. + + Parameters: + filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): + Path or paths to the radar data files to be loaded. Can be a single file, + a directory path, or an iterable of file paths. + engine (str, optional): + The backend engine to use for loading the radar data. Common options include + 'iris' (default) and 'odim'. The selected engine must be supported by the underlying + data processing libraries. + dim (str, optional): + The name of the dimension to use for concatenating data across files. Default is 'vcp_time'. + Note: The 'time' dimension cannot be used as the concatenation dimension because it is + already a predefined dimension in the dataset and reserved for temporal data. Choose + a unique dimension name that does not conflict with existing dimensions in the datasets. + + + Returns: + xarray.DataTree: + A nested `xarray.DataTree` object that combines all the loaded radar data files into a + hierarchical structure. Each node in the tree corresponds to an `xarray.Dataset`. + + Raises: + ValueError: + If no files are successfully loaded or if all batches result in empty data. + + Notes: + - This function is designed to handle large datasets efficiently, potentially + processing data in batches and leveraging parallelism if supported by the backend. + - The resulting `xarray.DataTree` retains a hierarchical organization based on the structure + of the input files and their metadata. + + Example: + >>> from raw2zarr import datatree_builder + >>> tree = datatree_builder(["file1.RAW", "file2.RAW"], engine="iris", dim="vcp_time") + >>> print(tree) + >>> print(tree["root/child"].to_dataset()) # Access a node's dataset + """ + # Initialize an empty dictionary to hold the nested structure + + # Load radar data in batches + filename_or_obj = _normalize_path(filename_or_obj) + dtree = accessor_wrapper(filename_or_obj, engine=engine) + task_name = dtree.attrs.get("scan_name", "default_task").strip() + dtree = (dtree.pipe(fix_angle).pipe(ensure_dimension, dim)).xradar.georeference() + dtree = DataTree.from_dict({task_name: dtree}) + dtree.encoding = dtree_encoding(dtree, append_dim=dim) + return dtree + + +def process_file(file: str, engine: str = "nexradlevel2") -> DataTree: + """ + Load and transform a single radar file into a DataTree object. + """ + try: + dtree = datatree_builder(file, engine=engine) + return dtree + except Exception as e: + print(f"Error processing file {file}: {e}") + return None + + +def append_sequential( + radar_files: Iterable[str | os.PathLike], append_dim: str, zarr_store: str, **kwargs +) -> None: + """ + Sequentially loads radar files and appends their data to a Zarr store. + + This function processes radar files one at a time, loading each file into a + `xarray.DataTree` object and appending its data sequentially to a Zarr store. + Although the files are processed sequentially, the write process ensures + that data is written in an ordered manner to the Zarr store. + + Parameters: + radar_files (Iterable[str | os.PathLike]): List of radar file paths to process. + append_dim (str): The dimension along which to append data in the Zarr store. + zarr_store (str): Path to the output Zarr store. + **kwargs: Additional arguments, including: + - zarr_format (int, optional): The Zarr format version (default: 2). + + Returns: + None: Outputs data directly to the specified Zarr store. + + Notes: + - Ensures ordered and sequential writing of data to the Zarr store. + - Handles encoding for compatibility with Zarr format. + """ + for file in radar_files: + dtree = process_file(file) + zarr_format = kwargs.get("zarr_format", 2) + if dtree: + enc = dtree.encoding + dtree = dtree[dtree.groups[1]] + try: + dtree.to_zarr( + store=zarr_store, + mode="a-", + encoding=enc, + consolidated=True, + zarr_format=zarr_format, + ) + except ValueError: + dtree.to_zarr( + store=zarr_store, + mode="a-", + consolidated=True, + append_dim=append_dim, + zarr_format=zarr_format, + ) + print("done") + + +def append_parallel( + radar_files: Iterable[str | os.PathLike], + append_dim: str, + zarr_store: str, + engine: str = "nexradlevel2", + batch_size: int = None, + **kwargs, +) -> None: + """ + Load radar files in parallel and append their data sequentially to a Zarr store. + + This function uses Dask Bag to load radar files in parallel, processing them in + configurable batches. After loading, the resulting `xarray.DataTree` objects are + processed and written sequentially to the Zarr store, ensuring consistent and ordered + data storage. A Dask LocalCluster is used to distribute computation across available cores. + + Parameters: + radar_files (Iterable[str | os.PathLike]): + An iterable containing paths to the radar files to process. + append_dim (str): + The dimension along which to append data in the Zarr store. + zarr_store (str): + The path to the output Zarr store where data will be written. + engine (str, optional): + The backend engine used to load radar files. Defaults to "nexradlevel2". + batch_size (int, optional): + The number of files to process in each batch. If not specified, it defaults to + the total number of cores available in the Dask cluster. + **kwargs: + Additional arguments, including: + - zarr_format (int, optional): The Zarr format version to use (default: 2). + + Returns: + None: + This function writes data directly to the specified Zarr store and does not return a value. + + Notes: + - File loading is parallelized using Dask Bag for efficiency, but data writing + to the Zarr store is performed sequentially to ensure consistent and ordered output. + - A Dask LocalCluster is created with a web-based dashboard for monitoring at + `http://127.0.0.1:8785` by default. + - If `batch_size` is not specified, it is automatically set based on the available cores + in the Dask cluster. + + Example: + >>> radar_files = ["file1.nc", "file2.nc", "file3.nc"] + >>> zarr_store = "output.zarr" + >>> append_parallel( + radar_files=radar_files, + append_dim="time", + zarr_store=zarr_store, + engine="nexradlevel2", + batch_size=5 + ) + """ + + from functools import partial + from dask import bag as db + from dask.distributed import Client, LocalCluster + + cluster = LocalCluster(dashboard_address="127.0.0.1:8785") + client = Client(cluster) + pf = partial(process_file, engine=engine) + + if not batch_size: + batch_size = sum(client.ncores().values()) + + for files in batch(radar_files, n=batch_size): + bag = db.from_sequence(files, npartitions=len(files)).map(pf) + + ls_dtree: List[DataTree] = bag.compute() + for dtree in ls_dtree: + zarr_format = kwargs.get("zarr_format", 2) + if dtree: + enc = dtree.encoding + dtree = dtree[dtree.groups[1]] + try: + dtree.to_zarr( + store=zarr_store, + mode="a-", + encoding=enc, + consolidated=True, + zarr_format=zarr_format, + ) + except ValueError: + dtree.to_zarr( + store=zarr_store, + mode="a-", + consolidated=True, + append_dim=append_dim, + zarr_format=zarr_format, + ) diff --git a/raw2zarr/main.py b/raw2zarr/main.py new file mode 100644 index 0000000..95196f3 --- /dev/null +++ b/raw2zarr/main.py @@ -0,0 +1,62 @@ +from datetime import datetime +from errno import EXDEV + +from raw2zarr.utils import ( + create_query, + timer_func, + data_accessor, + time_encoding, +) +from raw2zarr.task2zarr import prepare2append +from raw2zarr.dtree_builder import datatree_builder +import fsspec +import xradar as xd + + +def accessor_wrapper(filename): + return prepare2append( + xd.io.open_iris_datatree(data_accessor(filename)), + append_dim="vcp_time", + radar_name="GUA", + ) + + +@timer_func +def radar_convert(): + radar_name = "Guaviare" + year, month, day = 2022, 6, 1 # Guaviare + + date_query = datetime(year=year, month=month, day=day) + query = create_query(date=date_query, radar_site=radar_name) + str_bucket = "s3://s3-radaresideam/" + fs = fsspec.filesystem("s3", anon=True) + radar_files = [f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*"))][0:12] + + dtree = datatree_builder(radar_files, batch_size=4) + time_enc = time_encoding(dtree, append_dim="vcp_time") + zv = 2 + try: + path = "../zarr/Guaviare_v2.zarr" if zv == 2 else "../zarr/Guaviare_v3.zarr" + _ = dtree.to_zarr( + path, + encoding=time_enc, + consolidated=True, + mode="a-", + zarr_format=zv, + ) + except ValueError: + _ = dtree.to_zarr( + "../zarr/Guaviare_new.zarr", + consolidated=True, + mode="a-", + zarr_format=zv, + ) + print(dtree) + + +def main(): + radar_convert() + + +if __name__ == "__main__": + main() diff --git a/sigmet2zarr/task2zarr.py b/raw2zarr/task2zarr.py old mode 100755 new mode 100644 similarity index 87% rename from sigmet2zarr/task2zarr.py rename to raw2zarr/task2zarr.py index 21e29fe..7956720 --- a/sigmet2zarr/task2zarr.py +++ b/raw2zarr/task2zarr.py @@ -1,216 +1,199 @@ -import zarr -import xradar as xd -import numpy as np -from datatree import DataTree -from xarray.core.dataset import Dataset -from xarray import full_like -from zarr.errors import ContainsGroupError -from sigmet2zarr.utils import ( - data_accessor, - fix_angle, - convert_time, - write_file_radar, - load_toml, - time_encoding, -) - - -def _get_root(dt: DataTree): - groups = [ - i for i in list(dt.groups) if not i.startswith("/sweep") if i not in ["/"] - ] - root = DataTree(data=dt.root.ds, name="root") - for group in groups: - DataTree(data=dt[group].ds, name=group[1:], parent=root) - return root - - -def _fix_sn(ds: Dataset, sw_num: dict[float, int]) -> dict: - sn: float = float(ds["sweep_fixed_angle"].values) - nsn: int = sw_num[sn] - new_sn = full_like(ds.sweep_number, nsn) - new_ds = ds.copy(deep=True) - new_ds["sweep_number"] = new_sn - return new_ds - - -def prepare2append(dt: DataTree, append_dim: str, radar_name: str = "GUA") -> DataTree: - """ - Converts SIGMET radar files into a DataTree structure and prepares it for appending along a specified dimension. - - This function processes a given DataTree of radar data, organizes it by sweep angles, and prepares it for appending - along the specified dimension. It uses configuration files to map radar sweep angles and numbers, and georeferences - the data before appending. - - Parameters - ---------- - dt : DataTree - The DataTree object containing radar data to be processed. - append_dim : str - The dimension along which the data will be appended (e.g., time, elevation). - radar_name : str, optional - The radar name to identify the correct configuration (default is "GUA"). - - Returns - ------- - DataTree - A new DataTree object with all sweeps processed and ready for appending along the specified dimension. - - Notes - ----- - - The function expects a configuration file in TOML format located at "../config/radar.toml", containing - the necessary radar sweep angle and sweep number information. - - Each sweep in the DataTree is georeferenced, and its sweep number is corrected before being organized - into the final DataTree structure. - - Examples - -------- - >>> radar_data = prepare2append(my_datatree, append_dim="time", radar_name="GUA") - >>> # radar_data is now prepared for appending along the time dimension - """ - elev: np.array = np.array( - load_toml("../config/radar.toml")[radar_name]["elevations"] - ) - sw_num: np.array = np.array( - load_toml("../config/radar.toml")[radar_name]["sweep_number"] - ) - swps: dict[float, str] = {j: f"sweep_{idx}" for idx, j in enumerate(elev)} - sw_fix: dict[float, int] = {j: sw_num[idx] for idx, j in enumerate(elev)} - - tree = { - node.path: node.to_dataset() - for node in dt.subtree - if not node.path.startswith("/sweep") - } - tree.update( - { - swps[float(node.sweep_fixed_angle.values)]: fix_angle( - _fix_sn(node, sw_num=sw_fix) - ) - .to_dataset() - .xradar.georeference() - for node in dt.subtree - if node.path.startswith("/sweep") - } - ) - tree = exp_dim(tree, append_dim=append_dim) - return DataTree.from_dict(tree) - - -def exp_dim(dt: dict[str, Dataset], append_dim: str = "vcp_time") -> dict: - """ - Functions that expand dimension to each dataset within the datatree - @param dt: xarray.datatree - @param append_dim: dimension name which dataset will be expanded. e.g. 'vcp_time' - @return: xarray Datatree - """ - for sw, ds in dt.items(): - if sw.startswith("sweep"): - _time = convert_time(ds) - ds[append_dim] = _time - ds: Dataset = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) - dt[sw] = ds - return dt - - -def dt2zarr2( - dt: DataTree, - zarr_store: str, - zarr_version: int, - append_dim: str, - consolidated: bool, -) -> None: - """ - Functions to save xradar datatree using zarr format - @param consolidated: Xarray consolidated metadata. Default True - @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' - @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None - @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 - @param zarr_store: path to zarr store - @param dt: xradar datatree - @return: None - """ - st: zarr.DirectoryStore = ( - zarr.DirectoryStoreV3(zarr_store) - if zarr_version == 3 - else zarr.DirectoryStore(zarr_store) - ) - - for node in dt.subtree: - ds = node.to_dataset() - group_path = node.path - if group_path.startswith("/sweep"): - encoding = time_encoding(ds, append_dim) - else: - encoding = {} - try: - ds.to_zarr( - store=st, - group=group_path, - mode="w-", - encoding=encoding, - consolidated=consolidated, - ) - except ContainsGroupError: - try: - ds.to_zarr( - store=st, - group=group_path, - mode="a-", - consolidated=consolidated, - append_dim="vcp_time", - ) - except ValueError: - continue - - -def raw2zarr( - file: str, - zarr_store: str, - zarr_version: int = 2, - elevation: list[float] = None, - append_dim: str = "vcp_time", - mode: str = "a", - consolidated: bool = True, - p2c: str = "../results", -) -> None: - """ - Main function to convert sigmet radar files into xradar datatree and save them using zarr format - @param consolidated: Xarray consolidated metadata. Default True - @param p2c: path to write a file where each radar filename will be written once is processed. - @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None - @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' - @param elevation: list of elevation to be converted into zarr. - E.g. [0.5, 1.0, 3]. If None all sweeps within the radar object will be considered - @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 - @param zarr_store: path to zarr store - @param file: radar file path - @return: None - """ - dt: DataTree = xd.io.open_iris_datatree(data_accessor(file)) - dtree = prepare2append(dt, append_dim=append_dim) - elevations = [ - np.round(np.median(dtree.children[i].elevation.data), 1) - for i in list(dtree.children) - if i not in ["radar_parameters"] - ] - if not elevation: - dt2zarr2( - dt=dtree, - zarr_store=zarr_store, - zarr_version=zarr_version, - mode=mode, - consolidated=consolidated, - append_dim=append_dim, - ) - write_file_radar(file, p2c) - elif elevation in elevations: - dt2zarr2( - dt=dtree, - zarr_store=zarr_store, - zarr_version=zarr_version, - mode=mode, - consolidated=consolidated, - append_dim=append_dim, - ) - write_file_radar(file, p2c) +import zarr +import xradar as xd +import numpy as np +from xarray import full_like, Dataset, DataTree +from zarr.errors import ContainsGroupError +from raw2zarr.utils import ( + data_accessor, + fix_angle, + write_file_radar, + load_toml, + time_encoding, + exp_dim +) + + +def _get_root(dt: DataTree): + groups = [ + i for i in list(dt.groups) if not i.startswith("/sweep") if i not in ["/"] + ] + root = DataTree(data=dt.root.ds, name="root") + for group in groups: + DataTree(data=dt[group].ds, name=group[1:], parent=root) + return root + + +def _fix_sn(ds: Dataset, sw_num: dict[float, int]) -> dict: + sn: float = float(ds["sweep_fixed_angle"].values) + nsn: int = sw_num[sn] + new_sn = full_like(ds.sweep_number, nsn) + new_ds = ds.copy(deep=True) + new_ds["sweep_number"] = new_sn + return new_ds + + +def prepare2append(dt: DataTree, append_dim: str, radar_name: str = "GUA") -> DataTree: + """ + Converts SIGMET radar files into a DataTree structure and prepares it for appending along a specified dimension. + + This function processes a given DataTree of radar data, organizes it by sweep angles, and prepares it for appending + along the specified dimension. It uses configuration files to map radar sweep angles and numbers, and georeferences + the data before appending. + + Parameters + ---------- + dt : DataTree + The DataTree object containing radar data to be processed. + append_dim : str + The dimension along which the data will be appended (e.g., time, elevation). + radar_name : str, optional + The radar name to identify the correct configuration (default is "GUA"). + + Returns + ------- + DataTree + A new DataTree object with all sweeps processed and ready for appending along the specified dimension. + + Notes + ----- + - The function expects a configuration file in TOML format located at "../config/radar.toml", containing + the necessary radar sweep angle and sweep number information. + - Each sweep in the DataTree is georeferenced, and its sweep number is corrected before being organized + into the final DataTree structure. + + Examples + -------- + >>> radar_data = prepare2append(my_datatree, append_dim="time", radar_name="GUA") + >>> # radar_data is now prepared for appending along the time dimension + """ + elev: np.array = np.array( + load_toml("../config/radar.toml")[radar_name]["elevations"] + ) + sw_num: np.array = np.array( + load_toml("../config/radar.toml")[radar_name]["sweep_number"] + ) + swps: dict[float, str] = {j: f"sweep_{idx}" for idx, j in enumerate(elev)} + sw_fix: dict[float, int] = {j: sw_num[idx] for idx, j in enumerate(elev)} + + tree = { + node.path: node.to_dataset() + for node in dt.subtree + if not node.path.startswith("/sweep") + } + tree.update( + { + swps[float(node.sweep_fixed_angle.values)]: fix_angle( + _fix_sn(node, sw_num=sw_fix) + ) + .to_dataset() + .xradar.georeference() + for node in dt.subtree + if node.path.startswith("/sweep") + } + ) + tree = exp_dim(tree, append_dim=append_dim) + return DataTree.from_dict(tree) + + + +def dt2zarr2( + dt: DataTree, + zarr_store: str, + zarr_version: int, + append_dim: str, + consolidated: bool, +) -> None: + """ + Functions to save xradar datatree using zarr format + @param consolidated: Xarray consolidated metadata. Default True + @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' + @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None + @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 + @param zarr_store: path to zarr store + @param dt: xradar datatree + @return: None + """ + st: zarr.DirectoryStore = ( + zarr.DirectoryStoreV3(zarr_store) + if zarr_version == 3 + else zarr.DirectoryStore(zarr_store) + ) + + for node in dt.subtree: + ds = node.to_dataset() + group_path = node.path + if group_path.startswith("/sweep"): + encoding = time_encoding(ds, append_dim) + else: + encoding = {} + try: + ds.to_zarr( + store=st, + group=group_path, + mode="w-", + encoding=encoding, + consolidated=consolidated, + ) + except ContainsGroupError: + try: + ds.to_zarr( + store=st, + group=group_path, + mode="a-", + consolidated=consolidated, + append_dim="vcp_time", + ) + except ValueError: + continue + + +def raw2zarr( + file: str, + zarr_store: str, + zarr_version: int = 2, + elevation: list[float] = None, + append_dim: str = "vcp_time", + mode: str = "a", + consolidated: bool = True, + p2c: str = "../results", +) -> None: + """ + Main function to convert sigmet radar files into xradar datatree and save them using zarr format + @param consolidated: Xarray consolidated metadata. Default True + @param p2c: path to write a file where each radar filename will be written once is processed. + @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None + @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' + @param elevation: list of elevation to be converted into zarr. + E.g. [0.5, 1.0, 3]. If None all sweeps within the radar object will be considered + @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 + @param zarr_store: path to zarr store + @param file: radar file path + @return: None + """ + dt: DataTree = xd.io.open_iris_datatree(data_accessor(file)) + dtree = prepare2append(dt, append_dim=append_dim) + elevations = [ + np.round(np.median(dtree.children[i].elevation.data), 1) + for i in list(dtree.children) + if i not in ["radar_parameters"] + ] + if not elevation: + dt2zarr2( + dt=dtree, + zarr_store=zarr_store, + zarr_version=zarr_version, + mode=mode, + consolidated=consolidated, + append_dim=append_dim, + ) + write_file_radar(file, p2c) + elif elevation in elevations: + dt2zarr2( + dt=dtree, + zarr_store=zarr_store, + zarr_version=zarr_version, + mode=mode, + consolidated=consolidated, + append_dim=append_dim, + ) + write_file_radar(file, p2c) diff --git a/raw2zarr/utils.py b/raw2zarr/utils.py new file mode 100644 index 0000000..0b985d4 --- /dev/null +++ b/raw2zarr/utils.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os + +import xarray as xr +import xradar as xd +import fsspec +import numpy as np +from datetime import datetime, timezone +import pandas as pd +import tomllib +from time import time +from collections.abc import Iterator +from typing import Any, List +from xarray import DataTree +import gzip +import bz2 + + +def batch(iterable: List[Any], n: int = 1) -> Iterator[List[Any]]: + """ + Splits a list into consecutive chunks of size `n`. + + This function takes a list and yields successive batches of size `n` from it. + If the length of the list is not evenly divisible by `n`, the last batch will + contain the remaining elements. + + Parameters + ---------- + iterable : list[Any] + The list to be split into batches. + n : int, optional + The number of items in each batch (default is 1). + + Yields + ------ + Iterator[list[Any]] + An iterator that yields slices of the original list of size `n`, except + for the last batch which may contain fewer elements if the total number + of elements in the list is not evenly divisible by `n`. + + Examples + -------- + >>> list(batch([1, 2, 3, 4, 5], n=2)) + [[1, 2], [3, 4], [5]] + + >>> list(batch(['a', 'b', 'c', 'd'], n=3)) + [['a', 'b', 'c'], ['d']] + """ + l = len(iterable) + for ndx in range(0, l, n): + yield iterable[ndx : min(ndx + n, l)] + + +def timer_func(func): + # This function shows the execution time of + # the function object passed + def wrap_func(*args, **kwargs): + t1 = time() + result = func(*args, **kwargs) + t2 = time() + print(f"Function {func.__name__!r} executed in {(t2-t1):.4f}s") + return result + + return wrap_func + + +def make_dir(path) -> None: + """ + Makes directory based on path. + :param path: directory path that will be created + :return: + """ + try: + os.makedirs(path) + except FileExistsError: + pass + + +def load_toml(filepath: str) -> dict: + """ + Load a TOML data from file + @param filepath: path to TOML file + @return: dict + """ + with open(filepath, "rb") as f: + toml_data: dict = tomllib.load(f) + return toml_data + + +def time_3d(time_array, numbers) -> np.ndarray: + """ + Functions that creates a 3d time array from timestamps + :param time_array: 2d timestamp array + :param numbers: number of times in the new axis + :return: 3d time array + """ + v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) + _time = v_func(time_array) + times = np.repeat(_time[np.newaxis, :], numbers, axis=0) + return times + + +def get_time(time_array) -> np.ndarray: + """ + Functions that creates a 3d time array from timestamps + :param time_array: 2d timestamp array + :return: 3d time array + """ + v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) + _time = v_func(time_array) + return _time + + +def create_query(date, radar_site) -> str: + """ + Creates a string for quering the IDEAM radar files stored in AWS bucket + :param date: date to be queried. e.g datetime(2021, 10, 3, 12). Datetime python object + :param radar_site: radar site e.g. Guaviare + :return: string with a IDEAM radar bucket format + """ + if (date.hour != 0) and (date.hour != 0): + return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d%H}" + elif (date.hour != 0) and (date.hour == 0): + return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" + else: + return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" + + +def data_accessor(file: str): + """ + Open remotely a AWS S3 file using fsspec + """ + with fsspec.open(file, mode="rb", anon=True) as f: + return f.read() + + +def convert_time(ds) -> pd.to_datetime: + """ + Functions that create a timestamps for appending sweep data along a given dimension + @param ds: Xarray dataset + @return: pandas datetime + """ + for i in ds.time.values: + time = pd.to_datetime(i) + if pd.isnull(time): + continue + return time + + +def check_if_exist(file: str, path: str = "../results") -> bool: + """ + Function that check if a sigmet file was already processed based on a txt file that written during the conversion + @param file: file name + @param path: path where txt file was written with the list of sigmet files processed + @return: + """ + file_path = f"{path}" + file_name = f"{file_path}/{file.split('/')[-2]}_files.txt" + try: + with open(file_name, "r", newline="\n") as txt_file: + lines = txt_file.readlines() + txt_file.close() + _file = [i for i in lines if i.replace("\n", "") == file] + if len(_file) > 0: + print("File already processed") + return True + else: + return False + except FileNotFoundError: + return False + + +def write_file_radar(file: str, path: str = f"../results") -> None: + """ + Write a new line with the radar filename converted. This is intended to create a checklist to avoid file + reprocessing + @param path: path where the txt file will be saved + @param file: radar filename + @return: + """ + make_dir(path) + file_name = f"{path}/{file.split('/')[-2]}_files.txt" + with open(file_name, "a") as txt_file: + txt_file.write(f"{file}\n") + txt_file.close() + + +def dtree_encoding(dtree, append_dim) -> dict: + """ + Function that creates encoding for time, append_dim, and all variables in datasets within the DataTree. + + Parameters: + dtree (DataTree): Input xarray DataTree. + append_dim (str): The name of the dimension to encode (e.g., "vcp_time"). + + Returns: + dict: A dictionary with encoding parameters for variables and coordinates. + """ + + _encoding = {} + # Base encoding for time-related variables + time_enc = dict( + units="nanoseconds since 1950-01-01T00:00:00.00", + dtype="int64", + _FillValue=-9999, + ) + # Base encoding for general variables + var_enc = dict( + _FillValue=-9999, + ) + + if isinstance(dtree, DataTree): + # Append_dim encoding for each group + _encoding = { + f"{dtree[group].path}": {f"{append_dim}": time_enc} + for group in dtree.groups + } + + # Add encoding for sweeps (time and append_dim) + for node in dtree.match("*/sweep_*").groups[2:]: + for var_name in dtree[node].to_dataset().data_vars: + _encoding[node].update({var_name: var_enc, "time": time_enc}) + + # Remove root encoding if present + _encoding.pop("/", None) + + else: + _encoding = {} + + return _encoding + + +def prepare_for_read(filename, storage_options={"anon": True}): + """ + Return a file-like object ready for reading. + + Open a file for reading in binary mode with transparent decompression of + Gzip and BZip2 files. Supports local and S3 files. The resulting file-like + object should be closed. + + Parameters + ---------- + filename : str or file-like object + Filename or file-like object which will be opened. File-like objects + will not be examined for compressed data. + + Returns + ------- + file_like : file-like object + File-like object from which data can be read. + @param storage_options: + """ + # If already a file-like object, return as-is + if hasattr(filename, "read"): + return filename + + # Check if S3 path, and open with fsspec + if filename.startswith("s3://"): + # with fsspec.open(filename, mode="rb", anon=True) as f: + # return f.read() + return fsspec.open( + filename, mode="rb", compression="infer", **storage_options + ).open() + else: + # Open a local file and read the first few bytes to check for compression + file = open(filename, "rb") + + # Read first few bytes to check for compression (only for local files) + magic = file.read(3) + file.seek(0) # Reset pointer to beginning after reading header + + # Detect and handle gzip compression + if magic.startswith(b"\x1f\x8b"): + return gzip.GzipFile(fileobj=file) + + # Detect and handle bzip2 compression + if magic.startswith(b"BZh"): + return bz2.BZ2File(fileobj=file) + + # Return the file object as-is if no compression detected + return file + + +def exp_dim(dt: xr.DataTree, append_dim: str) -> xr.DataTree: + """ + Add a new dimension to all datasets in a DataTree and initialize it with a specific value. + + This function adds a new dimension to each dataset in the DataTree. The dimension is + initialized with the `time_coverage_start` value from the root of the DataTree and + is added as a coordinate. The new dimension is also expanded to allow additional values. + + Parameters: + dt (xr.DataTree): The DataTree containing radar datasets. + append_dim (str): The name of the dimension to add. + + Returns: + xr.DataTree: A DataTree with the specified dimension added to all datasets. + + Notes: + - The new dimension is initialized with the `time_coverage_start` attribute. + - Attributes describing the new dimension are added for metadata. + - The datasets are updated in place within the DataTree. + + Example: + Add a "vcp_time" dimension to all datasets in a DataTree: + + >>> dt = exp_dim(dt, "vcp_time") + """ + # Get the start time from the root node + start_time = pd.to_datetime(dt.time_coverage_start.item()) + + # Iterate over all nodes in the DataTree + for node in dt.subtree: + ds = node.to_dataset(inherit=False) # Extract the dataset without inheritance + + # Add the new dimension with the start_time value + ds[append_dim] = start_time + + # Define attributes for the new dimension + attrs = { + "description": "Volume Coverage Pattern time since start of volume scan", + } + ds[append_dim].attrs = attrs + + # Set the new variable as a coordinate and expand the dimension + ds = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) + + # Update the dataset in the DataTree node + dt[node.path].ds = ds + + return dt + + +def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.DataTree: + """ + Ensure that all datasets in a DataTree have a specified dimension. + + This function checks each dataset in the DataTree for the presence of the given dimension. + If the dimension is missing in a dataset, it is added using expand_dims. + + Parameters: + dt (xr.DataTree): The DataTree to check and modify. + dim (str): The name of the dimension to ensure in each dataset. + + Returns: + xr.DataTree: A DataTree where all datasets have the specified dimension. + + Notes: + - If the dimension is already present in a dataset, it is left unchanged. + - The new dimension is added without any associated coordinates. + - This function modifies datasets in-place within the DataTree. + + Example: + Ensure that all datasets in the DataTree have a "vcp_time" dimension: + + >>> dt = ensure_dimension(dt, "vcp_time") + """ + dims = [node.dims for node in dt.subtree if dim in node.dims] + if not dims: + return exp_dim(dt, dim) + return dt + + +def fix_angle(dt: xr.DataTree, tolerance: float = None, **kwargs) -> xr.DataTree: + """ + Reindex the radar azimuth angle to ensure all sweeps start and end at the same angle. + + This function processes each sweep in a radar dataset stored in an xarray DataTree. + It ensures that the azimuth angles are reindexed to cover a consistent range, removing + duplicates and interpolating as needed to maintain uniform spacing. + + Parameters: + dt (xr.DataTree): The input DataTree containing radar data, with each sweep represented as a node. + tolerance (float, optional): Tolerance for angle reindexing. If not specified, it will be + calculated based on the angular resolution. + **kwargs: Additional arguments passed to `xd.util.reindex_angle`. + + Returns: + xr.DataTree: The DataTree with azimuth angles reindexed for all sweeps. + + Notes: + - The function assumes the nodes of interest are named using the "sweep_*" pattern. + - It uses xradar utilities to extract angle parameters, remove duplicate rays, + and reindex angles for uniform azimuth coverage. + - The angular resolution (`ar`) is calculated dynamically based on the azimuth range and size. + """ + for node in dt.match("sweep_*"): + ds = dt[node].to_dataset() + ds["time"] = ds.time.load() + ds = fix_azimuth(ds) + angle_dict = xd.util.extract_angle_parameters(ds) + start_ang = angle_dict["start_angle"] + stop_ang = angle_dict["stop_angle"] + direction = angle_dict["direction"] + ds = xd.util.remove_duplicate_rays(ds) + az = len(np.arange(start_ang, stop_ang)) + ar = np.round(az / len(ds.azimuth.data), 2) + tolerance = ar if not tolerance else tolerance + ds = xd.util.reindex_angle( + ds, + start_ang, + stop_ang, + ar, + direction, + method="nearest", + tolerance=tolerance, + ) + dt[node].ds = ds + return dt + + +def fix_azimuth( + ds: xr.Dataset, target_size: int = 360, fill_value: str = "extrapolate" +) -> xr.Dataset: + """ + Ensure that the azimuth dimension in a radar dataset matches a target size. + + This function adjusts the azimuth dimension of a radar dataset to match a specified target size + (e.g., 360 for a full sweep). It detects the starting and stopping angles of the azimuth and + interpolates data to create a uniform azimuth grid. + + Parameters: + ds (xr.Dataset): The dataset containing radar data with an azimuth coordinate. + target_size (int, optional): The desired size of the azimuth dimension (default is 360). + fill_value (str, optional): Value used to fill points outside the data range during + interpolation. Options include: + - "extrapolate": Fill using extrapolation (default). + - None: Introduce `NaN` for points outside the data range. + + Returns: + xr.Dataset: The dataset with a completed and uniformly spaced azimuth dimension. + + Notes: + - If the current azimuth size matches the target size, no changes are made. + - The interpolation uses `xarray.interp` with the specified `fill_value` behavior. + - The azimuth grid is reconstructed to span from the detected start angle to the stop angle. + """ + # Current azimuth size and coordinates + current_size = ds["azimuth"].shape[0] + azimuth = ds["azimuth"].values + + # Detect start and stop angles from the azimuth + start_ang = np.min(azimuth) + stop_ang = np.max(azimuth) + + # Check if the azimuth size needs fixing + if current_size % target_size != 0: + print(f"Fixing azimuth dimension from {current_size} to {target_size}") + time_numeric = ds["time"].astype("float64") + # Create a new uniform azimuth array within the detected range + new_azimuth = np.linspace(start_ang, stop_ang, target_size, endpoint=False) + + # Interpolate data to the new azimuth array + ds = ds.interp(azimuth=new_azimuth, kwargs={"fill_value": fill_value}) + # Interpolate the `time` coordinate explicitly if it exists + if "time" not in ds.coords: + # Convert datetime64 to numeric + time_interpolated = xr.DataArray( + pd.to_datetime(np.interp(new_azimuth, azimuth, time_numeric)), + dims="azimuth", + coords={"azimuth": new_azimuth}, + ) + ds["time"] = time_interpolated + ds = ds.set_coords("time") + + return ds diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e1ab6a3 --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup, find_packages + +setup( + name="raw2zarr", + version="0.3.0", + description="Tools for working with radar data and converting it to Zarr format", + author="Alfonso Ladino, Max Grover", + author_email="alfonso8@illinois.edu", + url="https://github.com/aladinor/raw2zarr", + packages=find_packages(), # Automatically finds all packages in the directory + python_requires=">=3.12", + install_requires=[ + "cartopy", + "fsspec", + "dask", + "netCDF4", + "bottleneck", + "zarr", + "s3fs>=2024.3.1", + "arm_pyart>=1.18.1", + "wradlib", + "hvplot", + "datashader", + "xarray>=2024.10", + "xradar>=0.8.0", + ], + extras_require={ + "dev": ["pytest", "pytest-cov", "flake8"], # Add development dependencies here + "notebooks": ["jupyterlab"], # Dependencies for working with notebooks + }, + classifiers=[ + "Programming Language :: Python :: 3.12", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], +) diff --git a/sigmet2zarr/__init__.py b/sigmet2zarr/__init__.py deleted file mode 100755 index a0bdf68..0000000 --- a/sigmet2zarr/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -""" -raw2zarr -====== - -Top-level package for raw2zarr. - -""" - -__author__ = """Open Radar Developers""" -__email__ = "alfonso8@illinois.edu" - -# import subpackages -# from . import task2zarr -# from . import utils # noqa - -__all__ = [s for s in dir() if not s.startswith("_")] diff --git a/sigmet2zarr/utils.py b/sigmet2zarr/utils.py deleted file mode 100755 index 3cf947d..0000000 --- a/sigmet2zarr/utils.py +++ /dev/null @@ -1,238 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import os -import xarray as xr -import xradar as xd -import fsspec -import numpy as np -from datetime import datetime, timezone -from datatree import DataTree -import pandas as pd -import tomllib -from time import time -from collections.abc import Iterator -from typing import Any, List - - -def batch(iterable: List[Any], n: int = 1) -> Iterator[List[Any]]: - """ - Splits a list into consecutive chunks of size `n`. - - This function takes a list and yields successive batches of size `n` from it. - If the length of the list is not evenly divisible by `n`, the last batch will - contain the remaining elements. - - Parameters - ---------- - iterable : list[Any] - The list to be split into batches. - n : int, optional - The number of items in each batch (default is 1). - - Yields - ------ - Iterator[list[Any]] - An iterator that yields slices of the original list of size `n`, except - for the last batch which may contain fewer elements if the total number - of elements in the list is not evenly divisible by `n`. - - Examples - -------- - >>> list(batch([1, 2, 3, 4, 5], n=2)) - [[1, 2], [3, 4], [5]] - - >>> list(batch(['a', 'b', 'c', 'd'], n=3)) - [['a', 'b', 'c'], ['d']] - """ - l = len(iterable) - for ndx in range(0, l, n): - yield iterable[ndx : min(ndx + n, l)] - - -def timer_func(func): - # This function shows the execution time of - # the function object passed - def wrap_func(*args, **kwargs): - t1 = time() - result = func(*args, **kwargs) - t2 = time() - print(f"Function {func.__name__!r} executed in {(t2-t1):.4f}s") - return result - - return wrap_func - - -def make_dir(path) -> None: - """ - Makes directory based on path. - :param path: directory path that will be created - :return: - """ - try: - os.makedirs(path) - except FileExistsError: - pass - - -def load_toml(filepath: str) -> dict: - """ - Load a TOML data from file - @param filepath: path to TOML file - @return: dict - """ - with open(filepath, "rb") as f: - toml_data: dict = tomllib.load(f) - return toml_data - - -def time_3d(time_array, numbers) -> np.ndarray: - """ - Functions that creates a 3d time array from timestamps - :param time_array: 2d timestamp array - :param numbers: number of times in the new axis - :return: 3d time array - """ - v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) - _time = v_func(time_array) - times = np.repeat(_time[np.newaxis, :], numbers, axis=0) - return times - - -def get_time(time_array) -> np.ndarray: - """ - Functions that creates a 3d time array from timestamps - :param time_array: 2d timestamp array - :return: 3d time array - """ - v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) - _time = v_func(time_array) - return _time - - -def create_query(date, radar_site) -> str: - """ - Creates a string for quering the IDEAM radar files stored in AWS bucket - :param date: date to be queried. e.g datetime(2021, 10, 3, 12). Datetime python object - :param radar_site: radar site e.g. Guaviare - :return: string with a IDEAM radar bucket format - """ - if (date.hour != 0) and (date.hour != 0): - return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d%H}" - elif (date.hour != 0) and (date.hour == 0): - return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" - else: - return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" - - -def data_accessor(file: str): - """ - Open remotely a AWS S3 file using fsspec - """ - with fsspec.open(file, mode="rb", anon=True) as f: - return f.read() - - -def convert_time(ds) -> pd.to_datetime: - """ - Functions that create a timestamps for appending sweep data along a given dimension - @param ds: Xarray dataset - @return: pandas datetime - """ - for i in ds.time.values: - time = pd.to_datetime(i) - if pd.isnull(time): - continue - return time - - -def fix_angle(ds: xr.Dataset, tolerance: float = None, **kwargs) -> xr.Dataset: - """ - This function reindex the radar azimuth angle to make all sweeps starts and end at the same angle - @param tolerance: Tolerance at which angle reindex will be performed - @param ds: xarray dataset containing and xradar object - @return: azimuth reindex xarray dataset - """ - angle_dict = xd.util.extract_angle_parameters(ds) - start_ang = angle_dict["start_angle"] - stop_ang = angle_dict["stop_angle"] - direction = angle_dict["direction"] - ds = xd.util.remove_duplicate_rays(ds) - az = len(np.arange(start_ang, stop_ang)) - ar = np.round(az / len(ds.azimuth.data), 2) - tolerance = ar if not tolerance else tolerance - return xd.util.reindex_angle( - ds, - start_ang, - stop_ang, - ar, - direction, - method="nearest", - tolerance=tolerance, - **kwargs, - ) - - -def check_if_exist(file: str, path: str = "../results") -> bool: - """ - Function that check if a sigmet file was already processed based on a txt file that written during the conversion - @param file: file name - @param path: path where txt file was written with the list of sigmet files processed - @return: - """ - file_path = f"{path}" - file_name = f"{file_path}/{file.split('/')[-2]}_files.txt" - try: - with open(file_name, "r", newline="\n") as txt_file: - lines = txt_file.readlines() - txt_file.close() - _file = [i for i in lines if i.replace("\n", "") == file] - if len(_file) > 0: - print("File already processed") - return True - else: - return False - except FileNotFoundError: - return False - - -def write_file_radar(file: str, path: str = f"../results") -> None: - """ - Write a new line with the radar filename converted. This is intended to create a checklist to avoid file - reprocessing - @param path: path where the txt file will be saved - @param file: radar filename - @return: - """ - make_dir(path) - file_name = f"{path}/{file.split('/')[-2]}_files.txt" - with open(file_name, "a") as txt_file: - txt_file.write(f"{file}\n") - txt_file.close() - - -def time_encoding(dtree, append_dim) -> dict: - """ - Function that creates encoding for time and append_dim variables - @param dtree: Input xarray Datatree - @param append_dim: dimension name. e.g. "vcp_time" - @return: dict with encoding parameters - """ - encoding = {} - enc = dict( - units="nanoseconds since 2000-01-01T00:00:00.00", - dtype="float64", - _FillValue=np.datetime64("NaT"), - ) - if type(dtree) is DataTree: - groups = [i for i in list(dtree.groups) if i.startswith("/sweep_")] - for group in groups: - encoding.update({f"{group}": {f"{append_dim}": enc, "time": enc}}) - return encoding - else: - encoding.update( - { - f"{append_dim}": enc, - "time": enc, - } - ) - return encoding