From e2544becd491a9ed47c8bb29c7afb65cfa0b45fa Mon Sep 17 00:00:00 2001 From: Alfonso Ladino Date: Wed, 6 Nov 2024 15:49:22 -0600 Subject: [PATCH 1/9] refactoring raw2zarr --- .gitignore | 2 +- README.md | 5 + environment.yml | 5 +- {sigmet2zarr => raw2zarr}/__init__.py | 0 {sigmet2zarr => raw2zarr}/create_dataset.py | 1 + raw2zarr/data_reader.py | 72 +++ raw2zarr/dtree_builder.py | 78 +++ raw2zarr/main.py | 44 ++ {sigmet2zarr => raw2zarr}/task2zarr.py | 415 +++++++------- {sigmet2zarr => raw2zarr}/utils.py | 577 ++++++++++++-------- 10 files changed, 741 insertions(+), 458 deletions(-) rename {sigmet2zarr => raw2zarr}/__init__.py (100%) mode change 100755 => 100644 rename {sigmet2zarr => raw2zarr}/create_dataset.py (99%) create mode 100644 raw2zarr/data_reader.py create mode 100644 raw2zarr/dtree_builder.py create mode 100644 raw2zarr/main.py rename {sigmet2zarr => raw2zarr}/task2zarr.py (87%) mode change 100755 => 100644 rename {sigmet2zarr => raw2zarr}/utils.py (69%) mode change 100755 => 100644 diff --git a/.gitignore b/.gitignore index b1d6ef1..95364d1 100644 --- a/.gitignore +++ b/.gitignore @@ -167,4 +167,4 @@ sigmet2zarr.egg-info #.idea/ #Notebooks -sigmet2zarr/task2vcp.py +raw2zarr/task2vcp.py diff --git a/README.md b/README.md index 9d67da9..dbe4464 100755 --- a/README.md +++ b/README.md @@ -39,6 +39,11 @@ with the new open data paradigm, emphasizing the FAIR principles (Findable, Acce +> [!WARNING] +> **This project is currently in high development mode.** +> Features may change frequently, and some parts of the library may be incomplete or subject to change. Please proceed with caution. + + ### Running on Your Own Machine If you are interested in running this material locally on your computer, you will need to follow this workflow: diff --git a/environment.yml b/environment.yml index 56e2aed..f245565 100755 --- a/environment.yml +++ b/environment.yml @@ -20,9 +20,8 @@ dependencies: - wradlib - hvplot - datashader + - xarray>=2024.10 + - xradar>=0.8.0 - pip - - pydata-sphinx-theme - pip: - git+https://github.com/aladinor/raw2zarr - - git+https://github.com/openradar/xradar.git - - git+https://github.com/pydata/xarray.git diff --git a/sigmet2zarr/__init__.py b/raw2zarr/__init__.py old mode 100755 new mode 100644 similarity index 100% rename from sigmet2zarr/__init__.py rename to raw2zarr/__init__.py diff --git a/sigmet2zarr/create_dataset.py b/raw2zarr/create_dataset.py similarity index 99% rename from sigmet2zarr/create_dataset.py rename to raw2zarr/create_dataset.py index 948140e..83a8b62 100644 --- a/sigmet2zarr/create_dataset.py +++ b/raw2zarr/create_dataset.py @@ -38,6 +38,7 @@ def radar_convert(): query = create_query(date=date_query, radar_site=radar_name) str_bucket = "s3://s3-radaresideam/" fs = fsspec.filesystem("s3", anon=True) + x radar_files = [ f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*")) ][:30] diff --git a/raw2zarr/data_reader.py b/raw2zarr/data_reader.py new file mode 100644 index 0000000..cff59e6 --- /dev/null +++ b/raw2zarr/data_reader.py @@ -0,0 +1,72 @@ +from typing import List, Iterable +import os +import xradar +import dask.bag as db +from xarray import DataTree +from xarray.backends.common import _normalize_path +from raw2zarr.utils import prepare_for_read, batch + + +def accessor_wrapper( + filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], + backend: str = "iris" +) -> DataTree: + """Wrapper function to load radar data for a single file or iterable of files with fsspec and compression check.""" + try: + if isinstance(filename_or_obj, Iterable) and not isinstance(filename_or_obj, (str, os.PathLike)): + results = [] + for file in filename_or_obj: + prepared_file = prepare_for_read(file) + results.append(_load_file(prepared_file, backend)) + return results + else: + file = prepare_for_read(filename_or_obj) + return _load_file(file, backend) + except Exception as e: + print(f"Error loading {filename_or_obj}: {e}") + return None + + +def _load_file(file, backend) -> DataTree: + """Helper function to load a single file with the specified backend.""" + if backend == "iris": + return xradar.io.open_iris_datatree(file) + elif backend == "odim": + return xradar.io.open_odim_datatree(file) + else: + raise ValueError(f"Unsupported backend: {backend}") + + +def load_radar_data( + filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], + backend: str = "iris", + parallel: bool = False, + batch_size: int = 12 +) -> Iterable[List[DataTree]]: + """ + Load radar data from files in batches to avoid memory overload. + + Parameters: + filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): Path(s) to radar data files. + backend (str): Backend type to use. Options include 'iris', 'odim', etc. Default is 'iris'. + parallel (bool): If True, enables parallel processing with Dask. Default is False. + batch_size (int): Number of files to process in each batch. + + Returns: + Iterable[List[DataTree]]: An iterable yielding batches of DataTree objects. + """ + filename_or_obj = _normalize_path(filename_or_obj) + + for files_batch in batch(filename_or_obj, batch_size): + ls_dtree = [] + + if parallel: + bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map(accessor_wrapper, backend=backend) + ls_dtree.extend(bag.compute()) + else: + for file_path in files_batch: + result = accessor_wrapper(file_path, backend=backend) + if result is not None: + ls_dtree.append(result) + + yield ls_dtree # Yield each batch of DataTree objects diff --git a/raw2zarr/dtree_builder.py b/raw2zarr/dtree_builder.py new file mode 100644 index 0000000..838e933 --- /dev/null +++ b/raw2zarr/dtree_builder.py @@ -0,0 +1,78 @@ +from typing import List, Iterable, Union +import os +import xarray as xr +from xarray import DataTree, Dataset +from xarray.backends.common import _normalize_path +from data_reader import load_radar_data # Import the data loading function +from raw2zarr.utils import ensure_dimension + +def datatree_builder( + filename_or_obj: Union[str, os.PathLike, Iterable[Union[str, os.PathLike]]], + backend: str = "iris", + dim: str = "vcp_time", + parallel: bool = False, + batch_size: int = 12 +) -> DataTree: + """ + Load radar data from files in batches and build a nested DataTree from it. + + Parameters: + filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): Path(s) to radar data files. + backend (str): Backend type to use. Options include 'iris', 'odim', etc. Default is 'iris'. + parallel (bool): If True, enables parallel processing with Dask. Default is False. + batch_size (int): Number of files to process in each batch. + + Returns: + DataTree: A nested DataTree combining all input DataTree objects. + + Raises: + ValueError: If no files are loaded or all batches are empty. + """ + # Initialize an empty dictionary to hold the nested structure + nested_dict = {} + + # Load radar data in batches + filename_or_obj = _normalize_path(filename_or_obj) + + for dtree_batch in load_radar_data(filename_or_obj, backend=backend, parallel=parallel, batch_size=batch_size): + if not dtree_batch: + raise ValueError("A batch of DataTrees is empty. Ensure data is loaded correctly.") + + # Process each DataTree in the current batch + for dtree in dtree_batch: + task_name = dtree.attrs.get("scan_name", "default_task").strip() + + if task_name in nested_dict: + nested_dict[task_name] = append_dataset_to_node(nested_dict[task_name], dtree, dim=dim) + else: + nested_dict[task_name] = dtree + + # Final DataTree assembly + return DataTree.from_dict(nested_dict) + + +def append_dataset_to_node(existing_node: DataTree, new_node: DataTree, dim: str): + """ + Append datasets from new_node to the existing_node's structure. + + Parameters: + existing_node (DataTree): The existing node in the nested DataTree to which data will be appended. + new_node (DataTree): The new DataTree node containing datasets to be appended. + """ + + existing_node = ensure_dimension(existing_node, dim) + new_node = ensure_dimension(new_node, dim) + new_dtree = {} + for child in new_node.subtree: + node_name = child.path + if node_name in [node.path for node in existing_node.subtree]: + # Append the datasets if the node already exists + existing_dataset = existing_node[node_name].to_dataset() + new_dataset = child.to_dataset() + # Append data along a new dimension (e.g., time) or merge variables as needed + new_dtree[node_name] = xr.concat((existing_dataset, new_dataset), dim=dim) + else: + new_dtree = new_node[node_name].to_dataset() + + return DataTree.from_dict(new_dtree) + diff --git a/raw2zarr/main.py b/raw2zarr/main.py new file mode 100644 index 0000000..7301235 --- /dev/null +++ b/raw2zarr/main.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from raw2zarr.utils import ( + create_query, + timer_func, + data_accessor, +) +from raw2zarr.task2zarr import prepare2append +from raw2zarr.dtree_builder import datatree_builder +import fsspec +import xradar as xd + + + +def accessor_wrapper(filename): + return prepare2append( + xd.io.open_iris_datatree(data_accessor(filename)), + append_dim="vcp_time", + radar_name="GUA", + ) + + +@timer_func +def radar_convert(): + radar_name = "Guaviare" + year, month, day = 2022, 6, 1 # Guaviare + + date_query = datetime(year=year, month=month, day=day) + query = create_query(date=date_query, radar_site=radar_name) + str_bucket = "s3://s3-radaresideam/" + fs = fsspec.filesystem("s3", anon=True) + radar_files = [ + f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*")) + ][:100] + + builder = datatree_builder(radar_files, batch_size=4) + print(builder) + +def main(): + radar_convert() + + +if __name__ == "__main__": + main() diff --git a/sigmet2zarr/task2zarr.py b/raw2zarr/task2zarr.py old mode 100755 new mode 100644 similarity index 87% rename from sigmet2zarr/task2zarr.py rename to raw2zarr/task2zarr.py index 21e29fe..7956720 --- a/sigmet2zarr/task2zarr.py +++ b/raw2zarr/task2zarr.py @@ -1,216 +1,199 @@ -import zarr -import xradar as xd -import numpy as np -from datatree import DataTree -from xarray.core.dataset import Dataset -from xarray import full_like -from zarr.errors import ContainsGroupError -from sigmet2zarr.utils import ( - data_accessor, - fix_angle, - convert_time, - write_file_radar, - load_toml, - time_encoding, -) - - -def _get_root(dt: DataTree): - groups = [ - i for i in list(dt.groups) if not i.startswith("/sweep") if i not in ["/"] - ] - root = DataTree(data=dt.root.ds, name="root") - for group in groups: - DataTree(data=dt[group].ds, name=group[1:], parent=root) - return root - - -def _fix_sn(ds: Dataset, sw_num: dict[float, int]) -> dict: - sn: float = float(ds["sweep_fixed_angle"].values) - nsn: int = sw_num[sn] - new_sn = full_like(ds.sweep_number, nsn) - new_ds = ds.copy(deep=True) - new_ds["sweep_number"] = new_sn - return new_ds - - -def prepare2append(dt: DataTree, append_dim: str, radar_name: str = "GUA") -> DataTree: - """ - Converts SIGMET radar files into a DataTree structure and prepares it for appending along a specified dimension. - - This function processes a given DataTree of radar data, organizes it by sweep angles, and prepares it for appending - along the specified dimension. It uses configuration files to map radar sweep angles and numbers, and georeferences - the data before appending. - - Parameters - ---------- - dt : DataTree - The DataTree object containing radar data to be processed. - append_dim : str - The dimension along which the data will be appended (e.g., time, elevation). - radar_name : str, optional - The radar name to identify the correct configuration (default is "GUA"). - - Returns - ------- - DataTree - A new DataTree object with all sweeps processed and ready for appending along the specified dimension. - - Notes - ----- - - The function expects a configuration file in TOML format located at "../config/radar.toml", containing - the necessary radar sweep angle and sweep number information. - - Each sweep in the DataTree is georeferenced, and its sweep number is corrected before being organized - into the final DataTree structure. - - Examples - -------- - >>> radar_data = prepare2append(my_datatree, append_dim="time", radar_name="GUA") - >>> # radar_data is now prepared for appending along the time dimension - """ - elev: np.array = np.array( - load_toml("../config/radar.toml")[radar_name]["elevations"] - ) - sw_num: np.array = np.array( - load_toml("../config/radar.toml")[radar_name]["sweep_number"] - ) - swps: dict[float, str] = {j: f"sweep_{idx}" for idx, j in enumerate(elev)} - sw_fix: dict[float, int] = {j: sw_num[idx] for idx, j in enumerate(elev)} - - tree = { - node.path: node.to_dataset() - for node in dt.subtree - if not node.path.startswith("/sweep") - } - tree.update( - { - swps[float(node.sweep_fixed_angle.values)]: fix_angle( - _fix_sn(node, sw_num=sw_fix) - ) - .to_dataset() - .xradar.georeference() - for node in dt.subtree - if node.path.startswith("/sweep") - } - ) - tree = exp_dim(tree, append_dim=append_dim) - return DataTree.from_dict(tree) - - -def exp_dim(dt: dict[str, Dataset], append_dim: str = "vcp_time") -> dict: - """ - Functions that expand dimension to each dataset within the datatree - @param dt: xarray.datatree - @param append_dim: dimension name which dataset will be expanded. e.g. 'vcp_time' - @return: xarray Datatree - """ - for sw, ds in dt.items(): - if sw.startswith("sweep"): - _time = convert_time(ds) - ds[append_dim] = _time - ds: Dataset = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) - dt[sw] = ds - return dt - - -def dt2zarr2( - dt: DataTree, - zarr_store: str, - zarr_version: int, - append_dim: str, - consolidated: bool, -) -> None: - """ - Functions to save xradar datatree using zarr format - @param consolidated: Xarray consolidated metadata. Default True - @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' - @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None - @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 - @param zarr_store: path to zarr store - @param dt: xradar datatree - @return: None - """ - st: zarr.DirectoryStore = ( - zarr.DirectoryStoreV3(zarr_store) - if zarr_version == 3 - else zarr.DirectoryStore(zarr_store) - ) - - for node in dt.subtree: - ds = node.to_dataset() - group_path = node.path - if group_path.startswith("/sweep"): - encoding = time_encoding(ds, append_dim) - else: - encoding = {} - try: - ds.to_zarr( - store=st, - group=group_path, - mode="w-", - encoding=encoding, - consolidated=consolidated, - ) - except ContainsGroupError: - try: - ds.to_zarr( - store=st, - group=group_path, - mode="a-", - consolidated=consolidated, - append_dim="vcp_time", - ) - except ValueError: - continue - - -def raw2zarr( - file: str, - zarr_store: str, - zarr_version: int = 2, - elevation: list[float] = None, - append_dim: str = "vcp_time", - mode: str = "a", - consolidated: bool = True, - p2c: str = "../results", -) -> None: - """ - Main function to convert sigmet radar files into xradar datatree and save them using zarr format - @param consolidated: Xarray consolidated metadata. Default True - @param p2c: path to write a file where each radar filename will be written once is processed. - @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None - @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' - @param elevation: list of elevation to be converted into zarr. - E.g. [0.5, 1.0, 3]. If None all sweeps within the radar object will be considered - @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 - @param zarr_store: path to zarr store - @param file: radar file path - @return: None - """ - dt: DataTree = xd.io.open_iris_datatree(data_accessor(file)) - dtree = prepare2append(dt, append_dim=append_dim) - elevations = [ - np.round(np.median(dtree.children[i].elevation.data), 1) - for i in list(dtree.children) - if i not in ["radar_parameters"] - ] - if not elevation: - dt2zarr2( - dt=dtree, - zarr_store=zarr_store, - zarr_version=zarr_version, - mode=mode, - consolidated=consolidated, - append_dim=append_dim, - ) - write_file_radar(file, p2c) - elif elevation in elevations: - dt2zarr2( - dt=dtree, - zarr_store=zarr_store, - zarr_version=zarr_version, - mode=mode, - consolidated=consolidated, - append_dim=append_dim, - ) - write_file_radar(file, p2c) +import zarr +import xradar as xd +import numpy as np +from xarray import full_like, Dataset, DataTree +from zarr.errors import ContainsGroupError +from raw2zarr.utils import ( + data_accessor, + fix_angle, + write_file_radar, + load_toml, + time_encoding, + exp_dim +) + + +def _get_root(dt: DataTree): + groups = [ + i for i in list(dt.groups) if not i.startswith("/sweep") if i not in ["/"] + ] + root = DataTree(data=dt.root.ds, name="root") + for group in groups: + DataTree(data=dt[group].ds, name=group[1:], parent=root) + return root + + +def _fix_sn(ds: Dataset, sw_num: dict[float, int]) -> dict: + sn: float = float(ds["sweep_fixed_angle"].values) + nsn: int = sw_num[sn] + new_sn = full_like(ds.sweep_number, nsn) + new_ds = ds.copy(deep=True) + new_ds["sweep_number"] = new_sn + return new_ds + + +def prepare2append(dt: DataTree, append_dim: str, radar_name: str = "GUA") -> DataTree: + """ + Converts SIGMET radar files into a DataTree structure and prepares it for appending along a specified dimension. + + This function processes a given DataTree of radar data, organizes it by sweep angles, and prepares it for appending + along the specified dimension. It uses configuration files to map radar sweep angles and numbers, and georeferences + the data before appending. + + Parameters + ---------- + dt : DataTree + The DataTree object containing radar data to be processed. + append_dim : str + The dimension along which the data will be appended (e.g., time, elevation). + radar_name : str, optional + The radar name to identify the correct configuration (default is "GUA"). + + Returns + ------- + DataTree + A new DataTree object with all sweeps processed and ready for appending along the specified dimension. + + Notes + ----- + - The function expects a configuration file in TOML format located at "../config/radar.toml", containing + the necessary radar sweep angle and sweep number information. + - Each sweep in the DataTree is georeferenced, and its sweep number is corrected before being organized + into the final DataTree structure. + + Examples + -------- + >>> radar_data = prepare2append(my_datatree, append_dim="time", radar_name="GUA") + >>> # radar_data is now prepared for appending along the time dimension + """ + elev: np.array = np.array( + load_toml("../config/radar.toml")[radar_name]["elevations"] + ) + sw_num: np.array = np.array( + load_toml("../config/radar.toml")[radar_name]["sweep_number"] + ) + swps: dict[float, str] = {j: f"sweep_{idx}" for idx, j in enumerate(elev)} + sw_fix: dict[float, int] = {j: sw_num[idx] for idx, j in enumerate(elev)} + + tree = { + node.path: node.to_dataset() + for node in dt.subtree + if not node.path.startswith("/sweep") + } + tree.update( + { + swps[float(node.sweep_fixed_angle.values)]: fix_angle( + _fix_sn(node, sw_num=sw_fix) + ) + .to_dataset() + .xradar.georeference() + for node in dt.subtree + if node.path.startswith("/sweep") + } + ) + tree = exp_dim(tree, append_dim=append_dim) + return DataTree.from_dict(tree) + + + +def dt2zarr2( + dt: DataTree, + zarr_store: str, + zarr_version: int, + append_dim: str, + consolidated: bool, +) -> None: + """ + Functions to save xradar datatree using zarr format + @param consolidated: Xarray consolidated metadata. Default True + @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' + @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None + @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 + @param zarr_store: path to zarr store + @param dt: xradar datatree + @return: None + """ + st: zarr.DirectoryStore = ( + zarr.DirectoryStoreV3(zarr_store) + if zarr_version == 3 + else zarr.DirectoryStore(zarr_store) + ) + + for node in dt.subtree: + ds = node.to_dataset() + group_path = node.path + if group_path.startswith("/sweep"): + encoding = time_encoding(ds, append_dim) + else: + encoding = {} + try: + ds.to_zarr( + store=st, + group=group_path, + mode="w-", + encoding=encoding, + consolidated=consolidated, + ) + except ContainsGroupError: + try: + ds.to_zarr( + store=st, + group=group_path, + mode="a-", + consolidated=consolidated, + append_dim="vcp_time", + ) + except ValueError: + continue + + +def raw2zarr( + file: str, + zarr_store: str, + zarr_version: int = 2, + elevation: list[float] = None, + append_dim: str = "vcp_time", + mode: str = "a", + consolidated: bool = True, + p2c: str = "../results", +) -> None: + """ + Main function to convert sigmet radar files into xradar datatree and save them using zarr format + @param consolidated: Xarray consolidated metadata. Default True + @param p2c: path to write a file where each radar filename will be written once is processed. + @param mode: Xarray.to_zarr mode. Options are "w", "w-", "a", "a-", r+", None + @param append_dim: dimension name where data will be appended. e.g. 'vcp_time' + @param elevation: list of elevation to be converted into zarr. + E.g. [0.5, 1.0, 3]. If None all sweeps within the radar object will be considered + @param zarr_version: data can be store in zarr format using version 2 or 3. Default V=2 + @param zarr_store: path to zarr store + @param file: radar file path + @return: None + """ + dt: DataTree = xd.io.open_iris_datatree(data_accessor(file)) + dtree = prepare2append(dt, append_dim=append_dim) + elevations = [ + np.round(np.median(dtree.children[i].elevation.data), 1) + for i in list(dtree.children) + if i not in ["radar_parameters"] + ] + if not elevation: + dt2zarr2( + dt=dtree, + zarr_store=zarr_store, + zarr_version=zarr_version, + mode=mode, + consolidated=consolidated, + append_dim=append_dim, + ) + write_file_radar(file, p2c) + elif elevation in elevations: + dt2zarr2( + dt=dtree, + zarr_store=zarr_store, + zarr_version=zarr_version, + mode=mode, + consolidated=consolidated, + append_dim=append_dim, + ) + write_file_radar(file, p2c) diff --git a/sigmet2zarr/utils.py b/raw2zarr/utils.py old mode 100755 new mode 100644 similarity index 69% rename from sigmet2zarr/utils.py rename to raw2zarr/utils.py index 3cf947d..598ebff --- a/sigmet2zarr/utils.py +++ b/raw2zarr/utils.py @@ -1,238 +1,339 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import os -import xarray as xr -import xradar as xd -import fsspec -import numpy as np -from datetime import datetime, timezone -from datatree import DataTree -import pandas as pd -import tomllib -from time import time -from collections.abc import Iterator -from typing import Any, List - - -def batch(iterable: List[Any], n: int = 1) -> Iterator[List[Any]]: - """ - Splits a list into consecutive chunks of size `n`. - - This function takes a list and yields successive batches of size `n` from it. - If the length of the list is not evenly divisible by `n`, the last batch will - contain the remaining elements. - - Parameters - ---------- - iterable : list[Any] - The list to be split into batches. - n : int, optional - The number of items in each batch (default is 1). - - Yields - ------ - Iterator[list[Any]] - An iterator that yields slices of the original list of size `n`, except - for the last batch which may contain fewer elements if the total number - of elements in the list is not evenly divisible by `n`. - - Examples - -------- - >>> list(batch([1, 2, 3, 4, 5], n=2)) - [[1, 2], [3, 4], [5]] - - >>> list(batch(['a', 'b', 'c', 'd'], n=3)) - [['a', 'b', 'c'], ['d']] - """ - l = len(iterable) - for ndx in range(0, l, n): - yield iterable[ndx : min(ndx + n, l)] - - -def timer_func(func): - # This function shows the execution time of - # the function object passed - def wrap_func(*args, **kwargs): - t1 = time() - result = func(*args, **kwargs) - t2 = time() - print(f"Function {func.__name__!r} executed in {(t2-t1):.4f}s") - return result - - return wrap_func - - -def make_dir(path) -> None: - """ - Makes directory based on path. - :param path: directory path that will be created - :return: - """ - try: - os.makedirs(path) - except FileExistsError: - pass - - -def load_toml(filepath: str) -> dict: - """ - Load a TOML data from file - @param filepath: path to TOML file - @return: dict - """ - with open(filepath, "rb") as f: - toml_data: dict = tomllib.load(f) - return toml_data - - -def time_3d(time_array, numbers) -> np.ndarray: - """ - Functions that creates a 3d time array from timestamps - :param time_array: 2d timestamp array - :param numbers: number of times in the new axis - :return: 3d time array - """ - v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) - _time = v_func(time_array) - times = np.repeat(_time[np.newaxis, :], numbers, axis=0) - return times - - -def get_time(time_array) -> np.ndarray: - """ - Functions that creates a 3d time array from timestamps - :param time_array: 2d timestamp array - :return: 3d time array - """ - v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) - _time = v_func(time_array) - return _time - - -def create_query(date, radar_site) -> str: - """ - Creates a string for quering the IDEAM radar files stored in AWS bucket - :param date: date to be queried. e.g datetime(2021, 10, 3, 12). Datetime python object - :param radar_site: radar site e.g. Guaviare - :return: string with a IDEAM radar bucket format - """ - if (date.hour != 0) and (date.hour != 0): - return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d%H}" - elif (date.hour != 0) and (date.hour == 0): - return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" - else: - return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" - - -def data_accessor(file: str): - """ - Open remotely a AWS S3 file using fsspec - """ - with fsspec.open(file, mode="rb", anon=True) as f: - return f.read() - - -def convert_time(ds) -> pd.to_datetime: - """ - Functions that create a timestamps for appending sweep data along a given dimension - @param ds: Xarray dataset - @return: pandas datetime - """ - for i in ds.time.values: - time = pd.to_datetime(i) - if pd.isnull(time): - continue - return time - - -def fix_angle(ds: xr.Dataset, tolerance: float = None, **kwargs) -> xr.Dataset: - """ - This function reindex the radar azimuth angle to make all sweeps starts and end at the same angle - @param tolerance: Tolerance at which angle reindex will be performed - @param ds: xarray dataset containing and xradar object - @return: azimuth reindex xarray dataset - """ - angle_dict = xd.util.extract_angle_parameters(ds) - start_ang = angle_dict["start_angle"] - stop_ang = angle_dict["stop_angle"] - direction = angle_dict["direction"] - ds = xd.util.remove_duplicate_rays(ds) - az = len(np.arange(start_ang, stop_ang)) - ar = np.round(az / len(ds.azimuth.data), 2) - tolerance = ar if not tolerance else tolerance - return xd.util.reindex_angle( - ds, - start_ang, - stop_ang, - ar, - direction, - method="nearest", - tolerance=tolerance, - **kwargs, - ) - - -def check_if_exist(file: str, path: str = "../results") -> bool: - """ - Function that check if a sigmet file was already processed based on a txt file that written during the conversion - @param file: file name - @param path: path where txt file was written with the list of sigmet files processed - @return: - """ - file_path = f"{path}" - file_name = f"{file_path}/{file.split('/')[-2]}_files.txt" - try: - with open(file_name, "r", newline="\n") as txt_file: - lines = txt_file.readlines() - txt_file.close() - _file = [i for i in lines if i.replace("\n", "") == file] - if len(_file) > 0: - print("File already processed") - return True - else: - return False - except FileNotFoundError: - return False - - -def write_file_radar(file: str, path: str = f"../results") -> None: - """ - Write a new line with the radar filename converted. This is intended to create a checklist to avoid file - reprocessing - @param path: path where the txt file will be saved - @param file: radar filename - @return: - """ - make_dir(path) - file_name = f"{path}/{file.split('/')[-2]}_files.txt" - with open(file_name, "a") as txt_file: - txt_file.write(f"{file}\n") - txt_file.close() - - -def time_encoding(dtree, append_dim) -> dict: - """ - Function that creates encoding for time and append_dim variables - @param dtree: Input xarray Datatree - @param append_dim: dimension name. e.g. "vcp_time" - @return: dict with encoding parameters - """ - encoding = {} - enc = dict( - units="nanoseconds since 2000-01-01T00:00:00.00", - dtype="float64", - _FillValue=np.datetime64("NaT"), - ) - if type(dtree) is DataTree: - groups = [i for i in list(dtree.groups) if i.startswith("/sweep_")] - for group in groups: - encoding.update({f"{group}": {f"{append_dim}": enc, "time": enc}}) - return encoding - else: - encoding.update( - { - f"{append_dim}": enc, - "time": enc, - } - ) - return encoding +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os +import xarray as xr +import xradar as xd +import fsspec +import numpy as np +from datetime import datetime, timezone +import pandas as pd +import tomllib +from time import time +from collections.abc import Iterator +from typing import Any, List +from xarray import DataTree, Dataset +import gzip +import bz2 + + + +def batch(iterable: List[Any], n: int = 1) -> Iterator[List[Any]]: + """ + Splits a list into consecutive chunks of size `n`. + + This function takes a list and yields successive batches of size `n` from it. + If the length of the list is not evenly divisible by `n`, the last batch will + contain the remaining elements. + + Parameters + ---------- + iterable : list[Any] + The list to be split into batches. + n : int, optional + The number of items in each batch (default is 1). + + Yields + ------ + Iterator[list[Any]] + An iterator that yields slices of the original list of size `n`, except + for the last batch which may contain fewer elements if the total number + of elements in the list is not evenly divisible by `n`. + + Examples + -------- + >>> list(batch([1, 2, 3, 4, 5], n=2)) + [[1, 2], [3, 4], [5]] + + >>> list(batch(['a', 'b', 'c', 'd'], n=3)) + [['a', 'b', 'c'], ['d']] + """ + l = len(iterable) + for ndx in range(0, l, n): + yield iterable[ndx : min(ndx + n, l)] + + +def timer_func(func): + # This function shows the execution time of + # the function object passed + def wrap_func(*args, **kwargs): + t1 = time() + result = func(*args, **kwargs) + t2 = time() + print(f"Function {func.__name__!r} executed in {(t2-t1):.4f}s") + return result + + return wrap_func + + +def make_dir(path) -> None: + """ + Makes directory based on path. + :param path: directory path that will be created + :return: + """ + try: + os.makedirs(path) + except FileExistsError: + pass + + +def load_toml(filepath: str) -> dict: + """ + Load a TOML data from file + @param filepath: path to TOML file + @return: dict + """ + with open(filepath, "rb") as f: + toml_data: dict = tomllib.load(f) + return toml_data + + +def time_3d(time_array, numbers) -> np.ndarray: + """ + Functions that creates a 3d time array from timestamps + :param time_array: 2d timestamp array + :param numbers: number of times in the new axis + :return: 3d time array + """ + v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) + _time = v_func(time_array) + times = np.repeat(_time[np.newaxis, :], numbers, axis=0) + return times + + +def get_time(time_array) -> np.ndarray: + """ + Functions that creates a 3d time array from timestamps + :param time_array: 2d timestamp array + :return: 3d time array + """ + v_func = np.vectorize(lambda x: datetime.fromtimestamp(x, tz=timezone.utc)) + _time = v_func(time_array) + return _time + + +def create_query(date, radar_site) -> str: + """ + Creates a string for quering the IDEAM radar files stored in AWS bucket + :param date: date to be queried. e.g datetime(2021, 10, 3, 12). Datetime python object + :param radar_site: radar site e.g. Guaviare + :return: string with a IDEAM radar bucket format + """ + if (date.hour != 0) and (date.hour != 0): + return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d%H}" + elif (date.hour != 0) and (date.hour == 0): + return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" + else: + return f"l2_data/{date:%Y}/{date:%m}/{date:%d}/{radar_site}/{radar_site[:3].upper()}{date:%y%m%d}" + + +def data_accessor(file: str): + """ + Open remotely a AWS S3 file using fsspec + """ + with fsspec.open(file, mode="rb", anon=True) as f: + return f.read() + + +def convert_time(ds) -> pd.to_datetime: + """ + Functions that create a timestamps for appending sweep data along a given dimension + @param ds: Xarray dataset + @return: pandas datetime + """ + for i in ds.time.values: + time = pd.to_datetime(i) + if pd.isnull(time): + continue + return time + + +def fix_angle(ds: xr.Dataset, tolerance: float = None, **kwargs) -> xr.Dataset: + """ + This function reindex the radar azimuth angle to make all sweeps starts and end at the same angle + @param tolerance: Tolerance at which angle reindex will be performed + @param ds: xarray dataset containing and xradar object + @return: azimuth reindex xarray dataset + """ + angle_dict = xd.util.extract_angle_parameters(ds) + start_ang = angle_dict["start_angle"] + stop_ang = angle_dict["stop_angle"] + direction = angle_dict["direction"] + ds = xd.util.remove_duplicate_rays(ds) + az = len(np.arange(start_ang, stop_ang)) + ar = np.round(az / len(ds.azimuth.data), 2) + tolerance = ar if not tolerance else tolerance + return xd.util.reindex_angle( + ds, + start_ang, + stop_ang, + ar, + direction, + method="nearest", + tolerance=tolerance, + **kwargs, + ) + + +def check_if_exist(file: str, path: str = "../results") -> bool: + """ + Function that check if a sigmet file was already processed based on a txt file that written during the conversion + @param file: file name + @param path: path where txt file was written with the list of sigmet files processed + @return: + """ + file_path = f"{path}" + file_name = f"{file_path}/{file.split('/')[-2]}_files.txt" + try: + with open(file_name, "r", newline="\n") as txt_file: + lines = txt_file.readlines() + txt_file.close() + _file = [i for i in lines if i.replace("\n", "") == file] + if len(_file) > 0: + print("File already processed") + return True + else: + return False + except FileNotFoundError: + return False + + +def write_file_radar(file: str, path: str = f"../results") -> None: + """ + Write a new line with the radar filename converted. This is intended to create a checklist to avoid file + reprocessing + @param path: path where the txt file will be saved + @param file: radar filename + @return: + """ + make_dir(path) + file_name = f"{path}/{file.split('/')[-2]}_files.txt" + with open(file_name, "a") as txt_file: + txt_file.write(f"{file}\n") + txt_file.close() + + +def time_encoding(dtree, append_dim) -> dict: + """ + Function that creates encoding for time and append_dim variables + @param dtree: Input xarray Datatree + @param append_dim: dimension name. e.g. "vcp_time" + @return: dict with encoding parameters + """ + encoding = {} + enc = dict( + units="nanoseconds since 2000-01-01T00:00:00.00", + dtype="float64", + _FillValue=np.datetime64("NaT"), + ) + if type(dtree) is DataTree: + groups = [i for i in list(dtree.groups) if i.startswith("/sweep_")] + for group in groups: + encoding.update({f"{group}": {f"{append_dim}": enc, "time": enc}}) + return encoding + else: + encoding.update( + { + f"{append_dim}": enc, + "time": enc, + } + ) + return encoding + + + +def prepare_for_read(filename): + """ + Return a file-like object ready for reading. + + Open a file for reading in binary mode with transparent decompression of + Gzip and BZip2 files. Supports local and S3 files. The resulting file-like + object should be closed. + + Parameters + ---------- + filename : str or file-like object + Filename or file-like object which will be opened. File-like objects + will not be examined for compressed data. + + Returns + ------- + file_like : file-like object + File-like object from which data can be read. + """ + # If already a file-like object, return as-is + if hasattr(filename, 'read'): + return filename + + # Check if S3 path, and open with fsspec + if filename.startswith("s3://"): + with fsspec.open(filename, mode="rb", anon=True) as f: + return f.read() + else: + # Open a local file and read the first few bytes to check for compression + file = open(filename, 'rb') + + # Read first few bytes to check for compression (only for local files) + magic = file.read(3) + file.seek(0) # Reset pointer to beginning after reading header + + # Detect and handle gzip compression + if magic.startswith(b'\x1f\x8b'): + return gzip.GzipFile(fileobj=file) + + # Detect and handle bzip2 compression + if magic.startswith(b'BZh'): + return bz2.BZ2File(fileobj=file) + + # Return the file object as-is if no compression detected + return file + + +# def exp_dim(dt: dict[str, Dataset], append_dim: str = "vcp_time") -> dict: +# """ +# Functions that expand dimension to each dataset within the datatree +# @param dt: xarray.datatree +# @param append_dim: dimension name which dataset will be expanded. e.g. 'vcp_time' +# @return: xarray Datatree +# """ +# for sw, ds in dt.items(): +# if sw.startswith("sweep"): +# _time = convert_time(ds) +# ds[append_dim] = _time +# ds: Dataset = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) +# dt[sw] = ds +# return dt + +def exp_dim(dt: DataTree, append_dim: str): + try: + start_time = dt.time_coverage_start.item() + print(start_time) + except ValueError as e: + print(e) + print(dt.time_coverage_start.item()) + for node in dt.subtree: + ds = node.to_dataset() + ds[append_dim] = start_time + ds = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) + dt[node.path].ds = ds + return dt + + + +def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.Dataset: + """ + Ensure that a dataset has a given dimension. If the dimension is missing, + add it using expand_dims with the specified coordinate value. + + Parameters: + ds (xr.Dataset): The dataset to check. + dim (str): The name of the dimension to ensure. + coord_value: The coordinate value to use if adding the dimension. + + Returns: + xr.Dataset: Dataset with the specified dimension. + """ + dims = [node.dims for node in dt.subtree if dim in node.dims] + if not dims: + return exp_dim(dt, dim) + return dt From cc89760ae9e2afd75b5752bf681815763fb4cc84 Mon Sep 17 00:00:00 2001 From: aladinor Date: Sun, 10 Nov 2024 12:03:21 -0600 Subject: [PATCH 2/9] new modifications --- raw2zarr/data_reader.py | 41 +++++++++++---- raw2zarr/dtree_builder.py | 37 ++++++++------ raw2zarr/main.py | 30 ++++++++--- raw2zarr/utils.py | 104 ++++++++++++++++++-------------------- 4 files changed, 125 insertions(+), 87 deletions(-) diff --git a/raw2zarr/data_reader.py b/raw2zarr/data_reader.py index cff59e6..87615f6 100644 --- a/raw2zarr/data_reader.py +++ b/raw2zarr/data_reader.py @@ -3,17 +3,20 @@ import xradar import dask.bag as db from xarray import DataTree -from xarray.backends.common import _normalize_path -from raw2zarr.utils import prepare_for_read, batch +from xarray.backends.common import _normalize_path +from raw2zarr.utils import prepare_for_read, batch, fix_angle +from multiprocessing import Pool def accessor_wrapper( - filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], - backend: str = "iris" + filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], + backend: str = "iris", ) -> DataTree: """Wrapper function to load radar data for a single file or iterable of files with fsspec and compression check.""" try: - if isinstance(filename_or_obj, Iterable) and not isinstance(filename_or_obj, (str, os.PathLike)): + if isinstance(filename_or_obj, Iterable) and not isinstance( + filename_or_obj, (str, os.PathLike) + ): results = [] for file in filename_or_obj: prepared_file = prepare_for_read(file) @@ -33,15 +36,22 @@ def _load_file(file, backend) -> DataTree: return xradar.io.open_iris_datatree(file) elif backend == "odim": return xradar.io.open_odim_datatree(file) + elif backend == "nexradlevel2": + return xradar.io.open_nexradlevel2_datatree(file) else: raise ValueError(f"Unsupported backend: {backend}") +def _process_file(args): + file, backend = args + return accessor_wrapper(file, backend=backend) + + def load_radar_data( - filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], - backend: str = "iris", - parallel: bool = False, - batch_size: int = 12 + filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], + backend: str = "iris", + parallel: bool = False, + batch_size: int = 12, ) -> Iterable[List[DataTree]]: """ Load radar data from files in batches to avoid memory overload. @@ -61,8 +71,17 @@ def load_radar_data( ls_dtree = [] if parallel: - bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map(accessor_wrapper, backend=backend) - ls_dtree.extend(bag.compute()) + # bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map(accessor_wrapper, backend=backend) + # ls_dtree.extend(bag.compute()) + + # Number of processes to use + num_processes = min(len(files_batch), os.cpu_count()) + files_with_backend = [(file, backend) for file in files_batch] + with Pool(num_processes) as pool: + results = pool.map(_process_file, files_with_backend) + + # Extend ls_dtree with results + ls_dtree.extend(results) else: for file_path in files_batch: result = accessor_wrapper(file_path, backend=backend) diff --git a/raw2zarr/dtree_builder.py b/raw2zarr/dtree_builder.py index 838e933..c4fdd64 100644 --- a/raw2zarr/dtree_builder.py +++ b/raw2zarr/dtree_builder.py @@ -2,16 +2,18 @@ import os import xarray as xr from xarray import DataTree, Dataset -from xarray.backends.common import _normalize_path +from xarray.backends.common import _normalize_path +import xradar from data_reader import load_radar_data # Import the data loading function -from raw2zarr.utils import ensure_dimension +from raw2zarr.utils import ensure_dimension, fix_angle + def datatree_builder( - filename_or_obj: Union[str, os.PathLike, Iterable[Union[str, os.PathLike]]], - backend: str = "iris", - dim: str = "vcp_time", - parallel: bool = False, - batch_size: int = 12 + filename_or_obj: Union[str, os.PathLike, Iterable[Union[str, os.PathLike]]], + backend: str = "iris", + dim: str = "vcp_time", + parallel: bool = False, + batch_size: int = 12, ) -> DataTree: """ Load radar data from files in batches and build a nested DataTree from it. @@ -34,20 +36,29 @@ def datatree_builder( # Load radar data in batches filename_or_obj = _normalize_path(filename_or_obj) - for dtree_batch in load_radar_data(filename_or_obj, backend=backend, parallel=parallel, batch_size=batch_size): + for dtree_batch in load_radar_data( + filename_or_obj, backend=backend, parallel=parallel, batch_size=batch_size + ): if not dtree_batch: - raise ValueError("A batch of DataTrees is empty. Ensure data is loaded correctly.") + raise ValueError( + "A batch of DataTrees is empty. Ensure data is loaded correctly." + ) # Process each DataTree in the current batch for dtree in dtree_batch: task_name = dtree.attrs.get("scan_name", "default_task").strip() - + dtree = ( + dtree.pipe(ensure_dimension, dim).pipe(fix_angle) + ).xradar.georeference() if task_name in nested_dict: - nested_dict[task_name] = append_dataset_to_node(nested_dict[task_name], dtree, dim=dim) + nested_dict[task_name] = append_dataset_to_node( + nested_dict[task_name], dtree, dim=dim + ) else: nested_dict[task_name] = dtree # Final DataTree assembly + return DataTree.from_dict(nested_dict) @@ -59,9 +70,6 @@ def append_dataset_to_node(existing_node: DataTree, new_node: DataTree, dim: str existing_node (DataTree): The existing node in the nested DataTree to which data will be appended. new_node (DataTree): The new DataTree node containing datasets to be appended. """ - - existing_node = ensure_dimension(existing_node, dim) - new_node = ensure_dimension(new_node, dim) new_dtree = {} for child in new_node.subtree: node_name = child.path @@ -75,4 +83,3 @@ def append_dataset_to_node(existing_node: DataTree, new_node: DataTree, dim: str new_dtree = new_node[node_name].to_dataset() return DataTree.from_dict(new_dtree) - diff --git a/raw2zarr/main.py b/raw2zarr/main.py index 7301235..95196f3 100644 --- a/raw2zarr/main.py +++ b/raw2zarr/main.py @@ -1,9 +1,11 @@ from datetime import datetime +from errno import EXDEV from raw2zarr.utils import ( create_query, timer_func, data_accessor, + time_encoding, ) from raw2zarr.task2zarr import prepare2append from raw2zarr.dtree_builder import datatree_builder @@ -11,7 +13,6 @@ import xradar as xd - def accessor_wrapper(filename): return prepare2append( xd.io.open_iris_datatree(data_accessor(filename)), @@ -29,12 +30,29 @@ def radar_convert(): query = create_query(date=date_query, radar_site=radar_name) str_bucket = "s3://s3-radaresideam/" fs = fsspec.filesystem("s3", anon=True) - radar_files = [ - f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*")) - ][:100] + radar_files = [f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*"))][0:12] + + dtree = datatree_builder(radar_files, batch_size=4) + time_enc = time_encoding(dtree, append_dim="vcp_time") + zv = 2 + try: + path = "../zarr/Guaviare_v2.zarr" if zv == 2 else "../zarr/Guaviare_v3.zarr" + _ = dtree.to_zarr( + path, + encoding=time_enc, + consolidated=True, + mode="a-", + zarr_format=zv, + ) + except ValueError: + _ = dtree.to_zarr( + "../zarr/Guaviare_new.zarr", + consolidated=True, + mode="a-", + zarr_format=zv, + ) + print(dtree) - builder = datatree_builder(radar_files, batch_size=4) - print(builder) def main(): radar_convert() diff --git a/raw2zarr/utils.py b/raw2zarr/utils.py index 598ebff..441f227 100644 --- a/raw2zarr/utils.py +++ b/raw2zarr/utils.py @@ -16,7 +16,6 @@ import bz2 - def batch(iterable: List[Any], n: int = 1) -> Iterator[List[Any]]: """ Splits a list into consecutive chunks of size `n`. @@ -148,33 +147,6 @@ def convert_time(ds) -> pd.to_datetime: return time -def fix_angle(ds: xr.Dataset, tolerance: float = None, **kwargs) -> xr.Dataset: - """ - This function reindex the radar azimuth angle to make all sweeps starts and end at the same angle - @param tolerance: Tolerance at which angle reindex will be performed - @param ds: xarray dataset containing and xradar object - @return: azimuth reindex xarray dataset - """ - angle_dict = xd.util.extract_angle_parameters(ds) - start_ang = angle_dict["start_angle"] - stop_ang = angle_dict["stop_angle"] - direction = angle_dict["direction"] - ds = xd.util.remove_duplicate_rays(ds) - az = len(np.arange(start_ang, stop_ang)) - ar = np.round(az / len(ds.azimuth.data), 2) - tolerance = ar if not tolerance else tolerance - return xd.util.reindex_angle( - ds, - start_ang, - stop_ang, - ar, - direction, - method="nearest", - tolerance=tolerance, - **kwargs, - ) - - def check_if_exist(file: str, path: str = "../results") -> bool: """ Function that check if a sigmet file was already processed based on a txt file that written during the conversion @@ -227,9 +199,19 @@ def time_encoding(dtree, append_dim) -> dict: _FillValue=np.datetime64("NaT"), ) if type(dtree) is DataTree: - groups = [i for i in list(dtree.groups) if i.startswith("/sweep_")] - for group in groups: - encoding.update({f"{group}": {f"{append_dim}": enc, "time": enc}}) + # [dtree[node].data_vars for node in dtree.match("sweep_*")] + encoding = ( + { + f"{group.parent.path}": {f"{append_dim}": enc} # , "time": enc} + for group in dtree.match("sweep_*").leaves + if append_dim in list(group.variables) + } + if isinstance(dtree, DataTree) + else {} + ) + encoding.update( + {f"{group.path}": {"time": enc} for group in dtree.match("sweep_*").leaves} + ) return encoding else: encoding.update( @@ -241,7 +223,6 @@ def time_encoding(dtree, append_dim) -> dict: return encoding - def prepare_for_read(filename): """ Return a file-like object ready for reading. @@ -262,7 +243,7 @@ def prepare_for_read(filename): File-like object from which data can be read. """ # If already a file-like object, return as-is - if hasattr(filename, 'read'): + if hasattr(filename, "read"): return filename # Check if S3 path, and open with fsspec @@ -271,55 +252,37 @@ def prepare_for_read(filename): return f.read() else: # Open a local file and read the first few bytes to check for compression - file = open(filename, 'rb') + file = open(filename, "rb") # Read first few bytes to check for compression (only for local files) magic = file.read(3) file.seek(0) # Reset pointer to beginning after reading header # Detect and handle gzip compression - if magic.startswith(b'\x1f\x8b'): + if magic.startswith(b"\x1f\x8b"): return gzip.GzipFile(fileobj=file) # Detect and handle bzip2 compression - if magic.startswith(b'BZh'): + if magic.startswith(b"BZh"): return bz2.BZ2File(fileobj=file) # Return the file object as-is if no compression detected return file -# def exp_dim(dt: dict[str, Dataset], append_dim: str = "vcp_time") -> dict: -# """ -# Functions that expand dimension to each dataset within the datatree -# @param dt: xarray.datatree -# @param append_dim: dimension name which dataset will be expanded. e.g. 'vcp_time' -# @return: xarray Datatree -# """ -# for sw, ds in dt.items(): -# if sw.startswith("sweep"): -# _time = convert_time(ds) -# ds[append_dim] = _time -# ds: Dataset = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) -# dt[sw] = ds -# return dt - def exp_dim(dt: DataTree, append_dim: str): try: start_time = dt.time_coverage_start.item() - print(start_time) except ValueError as e: print(e) - print(dt.time_coverage_start.item()) for node in dt.subtree: ds = node.to_dataset() - ds[append_dim] = start_time + ds[append_dim] = pd.to_datetime(start_time) ds = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) dt[node.path].ds = ds return dt - def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.Dataset: """ Ensure that a dataset has a given dimension. If the dimension is missing, @@ -337,3 +300,34 @@ def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.Dataset: if not dims: return exp_dim(dt, dim) return dt + + +def fix_angle(dt: xr.DataTree, tolerance: float = None, **kwargs) -> xr.DataTree: + """ + This function reindex the radar azimuth angle to make all sweeps starts and end at the same angle + @param tolerance: Torelance for angle reindex. + @param ds: xarray dataset containing and xradar object + @return: azimuth reindex xarray dataset + """ + for node in dt.match("sweep_*"): + ds = dt[node].to_dataset() + ds["time"] = ds.time.load() + angle_dict = xd.util.extract_angle_parameters(ds) + start_ang = angle_dict["start_angle"] + stop_ang = angle_dict["stop_angle"] + direction = angle_dict["direction"] + ds = xd.util.remove_duplicate_rays(ds) + az = len(np.arange(start_ang, stop_ang)) + ar = np.round(az / len(ds.azimuth.data), 2) + tolerance = ar if not tolerance else tolerance + ds = xd.util.reindex_angle( + ds, + start_ang, + stop_ang, + ar, + direction, + method="nearest", + tolerance=tolerance, + ) + dt[node].ds = ds + return dt From af0e7819c89e96e51db68558c1b57b9eb91ab6f5 Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 10:00:56 -0600 Subject: [PATCH 3/9] adding new fucntions and recoding old ones --- raw2zarr/utils.py | 278 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 225 insertions(+), 53 deletions(-) diff --git a/raw2zarr/utils.py b/raw2zarr/utils.py index 441f227..1e8b5a5 100644 --- a/raw2zarr/utils.py +++ b/raw2zarr/utils.py @@ -1,6 +1,8 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os +from idlelib.iomenu import encoding + import xarray as xr import xradar as xd import fsspec @@ -185,45 +187,89 @@ def write_file_radar(file: str, path: str = f"../results") -> None: txt_file.close() -def time_encoding(dtree, append_dim) -> dict: +# +# def time_encoding(dtree, append_dim) -> dict: +# """ +# Function that creates encoding for time and append_dim variables +# @param dtree: Input xarray Datatree +# @param append_dim: dimension name. e.g. "vcp_time" +# @return: dict with encoding parameters +# """ +# encoding = {} +# enc = dict( +# units="nanoseconds since 1950-01-01T00:00:00.00", +# dtype="float64", +# _FillValue=np.datetime64("NaT"), +# ) +# if isinstance(dtree, DataTree): +# +# # append_dim encoding +# encoding = {f"{dtree[group].path}": {f"{append_dim}": enc} for group in dtree.groups} +# +# # time encoding (only sweeps) +# encoding.update( +# {f"{node}": {"time": enc, append_dim: enc} for node in dtree.match("*/sweep_*").groups[2:]} +# ) +# try: +# encoding.pop("/") +# except KeyError: +# pass +# else: +# encoding.update( +# { +# f"{append_dim}": enc, +# "time": enc, +# } +# ) +# return encoding + + +def dtree_encoding(dtree, append_dim) -> dict: """ - Function that creates encoding for time and append_dim variables - @param dtree: Input xarray Datatree - @param append_dim: dimension name. e.g. "vcp_time" - @return: dict with encoding parameters + Function that creates encoding for time, append_dim, and all variables in datasets within the DataTree. + + Parameters: + dtree (DataTree): Input xarray DataTree. + append_dim (str): The name of the dimension to encode (e.g., "vcp_time"). + + Returns: + dict: A dictionary with encoding parameters for variables and coordinates. """ - encoding = {} - enc = dict( - units="nanoseconds since 2000-01-01T00:00:00.00", - dtype="float64", - _FillValue=np.datetime64("NaT"), + + _encoding = {} + # Base encoding for time-related variables + time_enc = dict( + units="nanoseconds since 1950-01-01T00:00:00.00", + dtype="int64", + _FillValue=-9999, ) - if type(dtree) is DataTree: - # [dtree[node].data_vars for node in dtree.match("sweep_*")] - encoding = ( - { - f"{group.parent.path}": {f"{append_dim}": enc} # , "time": enc} - for group in dtree.match("sweep_*").leaves - if append_dim in list(group.variables) - } - if isinstance(dtree, DataTree) - else {} - ) - encoding.update( - {f"{group.path}": {"time": enc} for group in dtree.match("sweep_*").leaves} - ) - return encoding + # Base encoding for general variables + var_enc = dict( + _FillValue=-9999, + ) + + if isinstance(dtree, DataTree): + # Append_dim encoding for each group + _encoding = { + f"{dtree[group].path}": {f"{append_dim}": time_enc} + for group in dtree.groups + } + + # Add encoding for sweeps (time and append_dim) + for node in dtree.match("*/sweep_*").groups[2:]: + for var_name in dtree[node].to_dataset().data_vars: + _encoding[node].update({var_name: var_enc, "time": time_enc}) + + # Remove root encoding if present + _encoding.pop("/", None) + else: - encoding.update( - { - f"{append_dim}": enc, - "time": enc, - } - ) - return encoding + _encoding = {} + + return _encoding -def prepare_for_read(filename): +def prepare_for_read(filename, storage_options={"anon": True}): """ Return a file-like object ready for reading. @@ -241,6 +287,7 @@ def prepare_for_read(filename): ------- file_like : file-like object File-like object from which data can be read. + @param storage_options: """ # If already a file-like object, return as-is if hasattr(filename, "read"): @@ -248,8 +295,11 @@ def prepare_for_read(filename): # Check if S3 path, and open with fsspec if filename.startswith("s3://"): - with fsspec.open(filename, mode="rb", anon=True) as f: - return f.read() + # with fsspec.open(filename, mode="rb", anon=True) as f: + # return f.read() + return fsspec.open( + filename, mode="rb", compression="infer", **storage_options + ).open() else: # Open a local file and read the first few bytes to check for compression file = open(filename, "rb") @@ -270,31 +320,79 @@ def prepare_for_read(filename): return file -def exp_dim(dt: DataTree, append_dim: str): - try: - start_time = dt.time_coverage_start.item() - except ValueError as e: - print(e) +def exp_dim(dt: xr.DataTree, append_dim: str) -> xr.DataTree: + """ + Add a new dimension to all datasets in a DataTree and initialize it with a specific value. + + This function adds a new dimension to each dataset in the DataTree. The dimension is + initialized with the `time_coverage_start` value from the root of the DataTree and + is added as a coordinate. The new dimension is also expanded to allow additional values. + + Parameters: + dt (xr.DataTree): The DataTree containing radar datasets. + append_dim (str): The name of the dimension to add. + + Returns: + xr.DataTree: A DataTree with the specified dimension added to all datasets. + + Notes: + - The new dimension is initialized with the `time_coverage_start` attribute. + - Attributes describing the new dimension are added for metadata. + - The datasets are updated in place within the DataTree. + + Example: + Add a "vcp_time" dimension to all datasets in a DataTree: + + >>> dt = exp_dim(dt, "vcp_time") + """ + # Get the start time from the root node + start_time = pd.to_datetime(dt.time_coverage_start.item()) + + # Iterate over all nodes in the DataTree for node in dt.subtree: - ds = node.to_dataset() - ds[append_dim] = pd.to_datetime(start_time) + ds = node.to_dataset(inherit=False) # Extract the dataset without inheritance + + # Add the new dimension with the start_time value + ds[append_dim] = start_time + + # Define attributes for the new dimension + attrs = { + "description": "Volume Coverage Pattern time since start of volume scan", + } + ds[append_dim].attrs = attrs + + # Set the new variable as a coordinate and expand the dimension ds = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0) + + # Update the dataset in the DataTree node dt[node.path].ds = ds + return dt -def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.Dataset: +def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.DataTree: """ - Ensure that a dataset has a given dimension. If the dimension is missing, - add it using expand_dims with the specified coordinate value. + Ensure that all datasets in a DataTree have a specified dimension. + + This function checks each dataset in the DataTree for the presence of the given dimension. + If the dimension is missing in a dataset, it is added using expand_dims. Parameters: - ds (xr.Dataset): The dataset to check. - dim (str): The name of the dimension to ensure. - coord_value: The coordinate value to use if adding the dimension. + dt (xr.DataTree): The DataTree to check and modify. + dim (str): The name of the dimension to ensure in each dataset. Returns: - xr.Dataset: Dataset with the specified dimension. + xr.DataTree: A DataTree where all datasets have the specified dimension. + + Notes: + - If the dimension is already present in a dataset, it is left unchanged. + - The new dimension is added without any associated coordinates. + - This function modifies datasets in-place within the DataTree. + + Example: + Ensure that all datasets in the DataTree have a "vcp_time" dimension: + + >>> dt = ensure_dimension(dt, "vcp_time") """ dims = [node.dims for node in dt.subtree if dim in node.dims] if not dims: @@ -304,14 +402,31 @@ def ensure_dimension(dt: xr.DataTree, dim: str) -> xr.Dataset: def fix_angle(dt: xr.DataTree, tolerance: float = None, **kwargs) -> xr.DataTree: """ - This function reindex the radar azimuth angle to make all sweeps starts and end at the same angle - @param tolerance: Torelance for angle reindex. - @param ds: xarray dataset containing and xradar object - @return: azimuth reindex xarray dataset + Reindex the radar azimuth angle to ensure all sweeps start and end at the same angle. + + This function processes each sweep in a radar dataset stored in an xarray DataTree. + It ensures that the azimuth angles are reindexed to cover a consistent range, removing + duplicates and interpolating as needed to maintain uniform spacing. + + Parameters: + dt (xr.DataTree): The input DataTree containing radar data, with each sweep represented as a node. + tolerance (float, optional): Tolerance for angle reindexing. If not specified, it will be + calculated based on the angular resolution. + **kwargs: Additional arguments passed to `xd.util.reindex_angle`. + + Returns: + xr.DataTree: The DataTree with azimuth angles reindexed for all sweeps. + + Notes: + - The function assumes the nodes of interest are named using the "sweep_*" pattern. + - It uses xradar utilities to extract angle parameters, remove duplicate rays, + and reindex angles for uniform azimuth coverage. + - The angular resolution (`ar`) is calculated dynamically based on the azimuth range and size. """ for node in dt.match("sweep_*"): ds = dt[node].to_dataset() ds["time"] = ds.time.load() + ds = fix_azimuth(ds) angle_dict = xd.util.extract_angle_parameters(ds) start_ang = angle_dict["start_angle"] stop_ang = angle_dict["stop_angle"] @@ -331,3 +446,60 @@ def fix_angle(dt: xr.DataTree, tolerance: float = None, **kwargs) -> xr.DataTree ) dt[node].ds = ds return dt + + +def fix_azimuth( + ds: xr.Dataset, target_size: int = 360, fill_value: str = "extrapolate" +) -> xr.Dataset: + """ + Ensure that the azimuth dimension in a radar dataset matches a target size. + + This function adjusts the azimuth dimension of a radar dataset to match a specified target size + (e.g., 360 for a full sweep). It detects the starting and stopping angles of the azimuth and + interpolates data to create a uniform azimuth grid. + + Parameters: + ds (xr.Dataset): The dataset containing radar data with an azimuth coordinate. + target_size (int, optional): The desired size of the azimuth dimension (default is 360). + fill_value (str, optional): Value used to fill points outside the data range during + interpolation. Options include: + - "extrapolate": Fill using extrapolation (default). + - None: Introduce `NaN` for points outside the data range. + + Returns: + xr.Dataset: The dataset with a completed and uniformly spaced azimuth dimension. + + Notes: + - If the current azimuth size matches the target size, no changes are made. + - The interpolation uses `xarray.interp` with the specified `fill_value` behavior. + - The azimuth grid is reconstructed to span from the detected start angle to the stop angle. + """ + # Current azimuth size and coordinates + current_size = ds["azimuth"].shape[0] + azimuth = ds["azimuth"].values + + # Detect start and stop angles from the azimuth + start_ang = np.min(azimuth) + stop_ang = np.max(azimuth) + + # Check if the azimuth size needs fixing + if current_size % target_size != 0: + print(f"Fixing azimuth dimension from {current_size} to {target_size}") + time_numeric = ds["time"].astype("float64") + # Create a new uniform azimuth array within the detected range + new_azimuth = np.linspace(start_ang, stop_ang, target_size, endpoint=False) + + # Interpolate data to the new azimuth array + ds = ds.interp(azimuth=new_azimuth, kwargs={"fill_value": fill_value}) + # Interpolate the `time` coordinate explicitly if it exists + if "time" not in ds.coords: + # Convert datetime64 to numeric + time_interpolated = xr.DataArray( + pd.to_datetime(np.interp(new_azimuth, azimuth, time_numeric)), + dims="azimuth", + coords={"azimuth": new_azimuth}, + ) + ds["time"] = time_interpolated + ds = ds.set_coords("time") + + return ds From c71ad567e0bddedd6f50676cd54340b89b01d11b Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 11:08:25 -0600 Subject: [PATCH 4/9] updating environment.yml file --- environment.yml | 2 +- setup.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 setup.py diff --git a/environment.yml b/environment.yml index f245565..5cfbe92 100755 --- a/environment.yml +++ b/environment.yml @@ -24,4 +24,4 @@ dependencies: - xradar>=0.8.0 - pip - pip: - - git+https://github.com/aladinor/raw2zarr + - -e . diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e69de29 From 29e748d94892329c5cc8c446fe5219080313527a Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 11:08:44 -0600 Subject: [PATCH 5/9] adding setup.py file --- setup.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/setup.py b/setup.py index e69de29..e1ab6a3 100644 --- a/setup.py +++ b/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup, find_packages + +setup( + name="raw2zarr", + version="0.3.0", + description="Tools for working with radar data and converting it to Zarr format", + author="Alfonso Ladino, Max Grover", + author_email="alfonso8@illinois.edu", + url="https://github.com/aladinor/raw2zarr", + packages=find_packages(), # Automatically finds all packages in the directory + python_requires=">=3.12", + install_requires=[ + "cartopy", + "fsspec", + "dask", + "netCDF4", + "bottleneck", + "zarr", + "s3fs>=2024.3.1", + "arm_pyart>=1.18.1", + "wradlib", + "hvplot", + "datashader", + "xarray>=2024.10", + "xradar>=0.8.0", + ], + extras_require={ + "dev": ["pytest", "pytest-cov", "flake8"], # Add development dependencies here + "notebooks": ["jupyterlab"], # Dependencies for working with notebooks + }, + classifiers=[ + "Programming Language :: Python :: 3.12", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], +) From d03348d2ee8004afe3cbffb8beb71bb3e2543776 Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 11:09:11 -0600 Subject: [PATCH 6/9] updating fucntions within the package --- raw2zarr/__init__.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/raw2zarr/__init__.py b/raw2zarr/__init__.py index a0bdf68..f2bba00 100644 --- a/raw2zarr/__init__.py +++ b/raw2zarr/__init__.py @@ -6,11 +6,20 @@ """ -__author__ = """Open Radar Developers""" +__author__ = """Alfonso Ladino""" __email__ = "alfonso8@illinois.edu" -# import subpackages -# from . import task2zarr -# from . import utils # noqa +from .dtree_builder import datatree_builder, append_sequential, append_parallel +from .data_reader import accessor_wrapper +from .utils import ensure_dimension, fix_angle, batch, dtree_encoding -__all__ = [s for s in dir() if not s.startswith("_")] +__all__ = [ + "datatree_builder", + "append_sequential", + "append_parallel", + "accessor_wrapper", + "ensure_dimension", + "fix_angle", + "batch", + "dtree_encoding", +] From 484df20ffaff6158d18b16be0c6dc33f322f01b0 Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 11:09:51 -0600 Subject: [PATCH 7/9] update docstring and funcitons --- raw2zarr/data_reader.py | 79 ++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/raw2zarr/data_reader.py b/raw2zarr/data_reader.py index 87615f6..f7bfe27 100644 --- a/raw2zarr/data_reader.py +++ b/raw2zarr/data_reader.py @@ -1,50 +1,62 @@ from typing import List, Iterable import os + import xradar +import fsspec import dask.bag as db from xarray import DataTree from xarray.backends.common import _normalize_path -from raw2zarr.utils import prepare_for_read, batch, fix_angle -from multiprocessing import Pool +from s3fs import S3File + +# Relative imports +from .utils import prepare_for_read, batch, fix_angle def accessor_wrapper( - filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike], - backend: str = "iris", + filename_or_obj: str | os.PathLike, + engine: str = "iris", ) -> DataTree: """Wrapper function to load radar data for a single file or iterable of files with fsspec and compression check.""" try: - if isinstance(filename_or_obj, Iterable) and not isinstance( - filename_or_obj, (str, os.PathLike) - ): - results = [] - for file in filename_or_obj: - prepared_file = prepare_for_read(file) - results.append(_load_file(prepared_file, backend)) - return results - else: - file = prepare_for_read(filename_or_obj) - return _load_file(file, backend) + file = prepare_for_read(filename_or_obj) + return _load_file(file, engine) except Exception as e: print(f"Error loading {filename_or_obj}: {e}") return None -def _load_file(file, backend) -> DataTree: +def _load_file(file, engine) -> DataTree: """Helper function to load a single file with the specified backend.""" - if backend == "iris": - return xradar.io.open_iris_datatree(file) - elif backend == "odim": + if engine == "iris": + if isinstance(file, S3File): + return xradar.io.open_iris_datatree(file.read()) + elif isinstance(file, bytes): + return xradar.io.open_iris_datatree(file) + else: + return xradar.io.open_iris_datatree(file) + elif engine == "odim": return xradar.io.open_odim_datatree(file) - elif backend == "nexradlevel2": - return xradar.io.open_nexradlevel2_datatree(file) + elif engine == "nexradlevel2": + if isinstance(file, S3File): + local_file = fsspec.open_local( + f"simplecache::s3://{file.path}", + s3={"anon": True}, + filecache={"cache_storage": "."}, + ) + data_tree = xradar.io.open_nexradlevel2_datatree(local_file) + + # Remove the local file after loading the data + os.remove(local_file) + return data_tree + else: + return xradar.io.open_nexradlevel2_datatree(file) else: - raise ValueError(f"Unsupported backend: {backend}") + raise ValueError(f"Unsupported backend: {engine}") def _process_file(args): - file, backend = args - return accessor_wrapper(file, backend=backend) + file, engine = args + return accessor_wrapper(file, engine=engine) def load_radar_data( @@ -52,7 +64,7 @@ def load_radar_data( backend: str = "iris", parallel: bool = False, batch_size: int = 12, -) -> Iterable[List[DataTree]]: +) -> DataTree: """ Load radar data from files in batches to avoid memory overload. @@ -71,20 +83,13 @@ def load_radar_data( ls_dtree = [] if parallel: - # bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map(accessor_wrapper, backend=backend) - # ls_dtree.extend(bag.compute()) - - # Number of processes to use - num_processes = min(len(files_batch), os.cpu_count()) - files_with_backend = [(file, backend) for file in files_batch] - with Pool(num_processes) as pool: - results = pool.map(_process_file, files_with_backend) - - # Extend ls_dtree with results - ls_dtree.extend(results) + bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map( + accessor_wrapper, backend=backend + ) + ls_dtree.extend(bag.compute()) else: for file_path in files_batch: - result = accessor_wrapper(file_path, backend=backend) + result = accessor_wrapper(file_path, engine=backend) if result is not None: ls_dtree.append(result) From e07a602c7e3778fa304bb0334fac88d347cc0b2f Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 11:10:15 -0600 Subject: [PATCH 8/9] update docstring and funcitons --- raw2zarr/dtree_builder.py | 248 +++++++++++++++++++++++++++++--------- 1 file changed, 194 insertions(+), 54 deletions(-) diff --git a/raw2zarr/dtree_builder.py b/raw2zarr/dtree_builder.py index c4fdd64..67ece73 100644 --- a/raw2zarr/dtree_builder.py +++ b/raw2zarr/dtree_builder.py @@ -1,85 +1,225 @@ -from typing import List, Iterable, Union +from typing import Iterable, List, Union import os -import xarray as xr + from xarray import DataTree, Dataset from xarray.backends.common import _normalize_path -import xradar -from data_reader import load_radar_data # Import the data loading function -from raw2zarr.utils import ensure_dimension, fix_angle + +# Relative imports +from .data_reader import accessor_wrapper +from .utils import ensure_dimension, fix_angle, dtree_encoding, batch def datatree_builder( filename_or_obj: Union[str, os.PathLike, Iterable[Union[str, os.PathLike]]], - backend: str = "iris", + engine: str = "iris", dim: str = "vcp_time", - parallel: bool = False, - batch_size: int = 12, ) -> DataTree: """ - Load radar data from files in batches and build a nested DataTree from it. + Construct a hierarchical xarray.DataTree from radar data files. + + This function loads radar data from one or more files and organizes it into a nested + `xarray.DataTree` structure. The data can be processed in batches and supports different + backend engines for reading the data. Parameters: - filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): Path(s) to radar data files. - backend (str): Backend type to use. Options include 'iris', 'odim', etc. Default is 'iris'. - parallel (bool): If True, enables parallel processing with Dask. Default is False. - batch_size (int): Number of files to process in each batch. + filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): + Path or paths to the radar data files to be loaded. Can be a single file, + a directory path, or an iterable of file paths. + engine (str, optional): + The backend engine to use for loading the radar data. Common options include + 'iris' (default) and 'odim'. The selected engine must be supported by the underlying + data processing libraries. + dim (str, optional): + The name of the dimension to use for concatenating data across files. Default is 'vcp_time'. + Note: The 'time' dimension cannot be used as the concatenation dimension because it is + already a predefined dimension in the dataset and reserved for temporal data. Choose + a unique dimension name that does not conflict with existing dimensions in the datasets. + Returns: - DataTree: A nested DataTree combining all input DataTree objects. + xarray.DataTree: + A nested `xarray.DataTree` object that combines all the loaded radar data files into a + hierarchical structure. Each node in the tree corresponds to an `xarray.Dataset`. Raises: - ValueError: If no files are loaded or all batches are empty. + ValueError: + If no files are successfully loaded or if all batches result in empty data. + + Notes: + - This function is designed to handle large datasets efficiently, potentially + processing data in batches and leveraging parallelism if supported by the backend. + - The resulting `xarray.DataTree` retains a hierarchical organization based on the structure + of the input files and their metadata. + + Example: + >>> from raw2zarr import datatree_builder + >>> tree = datatree_builder(["file1.RAW", "file2.RAW"], engine="iris", dim="vcp_time") + >>> print(tree) + >>> print(tree["root/child"].to_dataset()) # Access a node's dataset """ # Initialize an empty dictionary to hold the nested structure - nested_dict = {} # Load radar data in batches filename_or_obj = _normalize_path(filename_or_obj) + dtree = accessor_wrapper(filename_or_obj, engine=engine) + task_name = dtree.attrs.get("scan_name", "default_task").strip() + dtree = (dtree.pipe(fix_angle).pipe(ensure_dimension, dim)).xradar.georeference() + dtree = DataTree.from_dict({task_name: dtree}) + dtree.encoding = dtree_encoding(dtree, append_dim=dim) + return dtree - for dtree_batch in load_radar_data( - filename_or_obj, backend=backend, parallel=parallel, batch_size=batch_size - ): - if not dtree_batch: - raise ValueError( - "A batch of DataTrees is empty. Ensure data is loaded correctly." - ) - # Process each DataTree in the current batch - for dtree in dtree_batch: - task_name = dtree.attrs.get("scan_name", "default_task").strip() - dtree = ( - dtree.pipe(ensure_dimension, dim).pipe(fix_angle) - ).xradar.georeference() - if task_name in nested_dict: - nested_dict[task_name] = append_dataset_to_node( - nested_dict[task_name], dtree, dim=dim - ) - else: - nested_dict[task_name] = dtree +def process_file(file: str, engine: str = "nexradlevel2") -> DataTree: + """ + Load and transform a single radar file into a DataTree object. + """ + try: + dtree = datatree_builder(file, engine=engine) + return dtree + except Exception as e: + print(f"Error processing file {file}: {e}") + return None + + +def append_sequential( + radar_files: Iterable[str | os.PathLike], append_dim: str, zarr_store: str, **kwargs +) -> None: + """ + Sequentially loads radar files and appends their data to a Zarr store. - # Final DataTree assembly + This function processes radar files one at a time, loading each file into a + `xarray.DataTree` object and appending its data sequentially to a Zarr store. + Although the files are processed sequentially, the write process ensures + that data is written in an ordered manner to the Zarr store. - return DataTree.from_dict(nested_dict) + Parameters: + radar_files (Iterable[str | os.PathLike]): List of radar file paths to process. + append_dim (str): The dimension along which to append data in the Zarr store. + zarr_store (str): Path to the output Zarr store. + **kwargs: Additional arguments, including: + - zarr_format (int, optional): The Zarr format version (default: 2). + Returns: + None: Outputs data directly to the specified Zarr store. -def append_dataset_to_node(existing_node: DataTree, new_node: DataTree, dim: str): + Notes: + - Ensures ordered and sequential writing of data to the Zarr store. + - Handles encoding for compatibility with Zarr format. """ - Append datasets from new_node to the existing_node's structure. + for file in radar_files: + dtree = process_file(file) + zarr_format = kwargs.get("zarr_format", 2) + if dtree: + enc = dtree.encoding + dtree = dtree[dtree.groups[1]] + try: + dtree.to_zarr( + store=zarr_store, + mode="a-", + encoding=enc, + consolidated=True, + zarr_format=zarr_format, + ) + except ValueError: + dtree.to_zarr( + store=zarr_store, + mode="a-", + consolidated=True, + append_dim=append_dim, + zarr_format=zarr_format, + ) + print("done") + + +def append_parallel( + radar_files: Iterable[str | os.PathLike], + append_dim: str, + zarr_store: str, + engine: str = "nexradlevel2", + batch_size: int = None, + **kwargs, +) -> None: + """ + Load radar files in parallel and append their data sequentially to a Zarr store. + + This function uses Dask Bag to load radar files in parallel, processing them in + configurable batches. After loading, the resulting `xarray.DataTree` objects are + processed and written sequentially to the Zarr store, ensuring consistent and ordered + data storage. A Dask LocalCluster is used to distribute computation across available cores. Parameters: - existing_node (DataTree): The existing node in the nested DataTree to which data will be appended. - new_node (DataTree): The new DataTree node containing datasets to be appended. + radar_files (Iterable[str | os.PathLike]): + An iterable containing paths to the radar files to process. + append_dim (str): + The dimension along which to append data in the Zarr store. + zarr_store (str): + The path to the output Zarr store where data will be written. + engine (str, optional): + The backend engine used to load radar files. Defaults to "nexradlevel2". + batch_size (int, optional): + The number of files to process in each batch. If not specified, it defaults to + the total number of cores available in the Dask cluster. + **kwargs: + Additional arguments, including: + - zarr_format (int, optional): The Zarr format version to use (default: 2). + + Returns: + None: + This function writes data directly to the specified Zarr store and does not return a value. + + Notes: + - File loading is parallelized using Dask Bag for efficiency, but data writing + to the Zarr store is performed sequentially to ensure consistent and ordered output. + - A Dask LocalCluster is created with a web-based dashboard for monitoring at + `http://127.0.0.1:8785` by default. + - If `batch_size` is not specified, it is automatically set based on the available cores + in the Dask cluster. + + Example: + >>> radar_files = ["file1.nc", "file2.nc", "file3.nc"] + >>> zarr_store = "output.zarr" + >>> append_parallel( + radar_files=radar_files, + append_dim="time", + zarr_store=zarr_store, + engine="nexradlevel2", + batch_size=5 + ) """ - new_dtree = {} - for child in new_node.subtree: - node_name = child.path - if node_name in [node.path for node in existing_node.subtree]: - # Append the datasets if the node already exists - existing_dataset = existing_node[node_name].to_dataset() - new_dataset = child.to_dataset() - # Append data along a new dimension (e.g., time) or merge variables as needed - new_dtree[node_name] = xr.concat((existing_dataset, new_dataset), dim=dim) - else: - new_dtree = new_node[node_name].to_dataset() - - return DataTree.from_dict(new_dtree) + + from functools import partial + from dask import bag as db + from dask.distributed import Client, LocalCluster + + cluster = LocalCluster(dashboard_address="127.0.0.1:8785") + client = Client(cluster) + pf = partial(process_file, engine=engine) + + if not batch_size: + batch_size = sum(client.ncores().values()) + + for files in batch(radar_files, n=batch_size): + bag = db.from_sequence(files, npartitions=len(files)).map(pf) + + ls_dtree: List[DataTree] = bag.compute() + for dtree in ls_dtree: + zarr_format = kwargs.get("zarr_format", 2) + if dtree: + enc = dtree.encoding + dtree = dtree[dtree.groups[1]] + try: + dtree.to_zarr( + store=zarr_store, + mode="a-", + encoding=enc, + consolidated=True, + zarr_format=zarr_format, + ) + except ValueError: + dtree.to_zarr( + store=zarr_store, + mode="a-", + consolidated=True, + append_dim=append_dim, + zarr_format=zarr_format, + ) From 4dcfdf1bf828dbe31ddb8d889ef119180aa04a01 Mon Sep 17 00:00:00 2001 From: aladinor Date: Mon, 18 Nov 2024 11:10:49 -0600 Subject: [PATCH 9/9] removing unused funcitons --- raw2zarr/utils.py | 40 +--------------------------------------- 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/raw2zarr/utils.py b/raw2zarr/utils.py index 1e8b5a5..0b985d4 100644 --- a/raw2zarr/utils.py +++ b/raw2zarr/utils.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import os -from idlelib.iomenu import encoding import xarray as xr import xradar as xd @@ -13,7 +12,7 @@ from time import time from collections.abc import Iterator from typing import Any, List -from xarray import DataTree, Dataset +from xarray import DataTree import gzip import bz2 @@ -187,43 +186,6 @@ def write_file_radar(file: str, path: str = f"../results") -> None: txt_file.close() -# -# def time_encoding(dtree, append_dim) -> dict: -# """ -# Function that creates encoding for time and append_dim variables -# @param dtree: Input xarray Datatree -# @param append_dim: dimension name. e.g. "vcp_time" -# @return: dict with encoding parameters -# """ -# encoding = {} -# enc = dict( -# units="nanoseconds since 1950-01-01T00:00:00.00", -# dtype="float64", -# _FillValue=np.datetime64("NaT"), -# ) -# if isinstance(dtree, DataTree): -# -# # append_dim encoding -# encoding = {f"{dtree[group].path}": {f"{append_dim}": enc} for group in dtree.groups} -# -# # time encoding (only sweeps) -# encoding.update( -# {f"{node}": {"time": enc, append_dim: enc} for node in dtree.match("*/sweep_*").groups[2:]} -# ) -# try: -# encoding.pop("/") -# except KeyError: -# pass -# else: -# encoding.update( -# { -# f"{append_dim}": enc, -# "time": enc, -# } -# ) -# return encoding - - def dtree_encoding(dtree, append_dim) -> dict: """ Function that creates encoding for time, append_dim, and all variables in datasets within the DataTree.