Skip to content

Commit

Permalink
Merge pull request #22 from aladinor/refactoring
Browse files Browse the repository at this point in the history
refactoring raw2zarr
  • Loading branch information
aladinor authored Nov 18, 2024
2 parents f344eaf + 4dcfdf1 commit 26c011a
Show file tree
Hide file tree
Showing 13 changed files with 1,120 additions and 475 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ sigmet2zarr.egg-info
#.idea/

#Notebooks
sigmet2zarr/task2vcp.py
raw2zarr/task2vcp.py
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ with the new open data paradigm, emphasizing the FAIR principles (Findable, Acce
<img src="https://contrib.rocks/image?repo=aladinor/raw2zarr" />
</a>

> [!WARNING]
> **This project is currently in high development mode.**
> Features may change frequently, and some parts of the library may be incomplete or subject to change. Please proceed with caution.

### Running on Your Own Machine
If you are interested in running this material locally on your computer, you will need to follow this workflow:

Expand Down
7 changes: 3 additions & 4 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ dependencies:
- wradlib
- hvplot
- datashader
- xarray>=2024.10
- xradar>=0.8.0
- pip
- pydata-sphinx-theme
- pip:
- git+https://github.com/aladinor/raw2zarr
- git+https://github.com/openradar/xradar.git
- git+https://github.com/pydata/xarray.git
- -e .
25 changes: 25 additions & 0 deletions raw2zarr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
raw2zarr
======
Top-level package for raw2zarr.
"""

__author__ = """Alfonso Ladino"""
__email__ = "alfonso8@illinois.edu"

from .dtree_builder import datatree_builder, append_sequential, append_parallel
from .data_reader import accessor_wrapper
from .utils import ensure_dimension, fix_angle, batch, dtree_encoding

__all__ = [
"datatree_builder",
"append_sequential",
"append_parallel",
"accessor_wrapper",
"ensure_dimension",
"fix_angle",
"batch",
"dtree_encoding",
]
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def radar_convert():
query = create_query(date=date_query, radar_site=radar_name)
str_bucket = "s3://s3-radaresideam/"
fs = fsspec.filesystem("s3", anon=True)
x
radar_files = [
f"s3://{i}" for i in sorted(fs.glob(f"{str_bucket}{query}*"))
][:30]
Expand Down
96 changes: 96 additions & 0 deletions raw2zarr/data_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import List, Iterable
import os

import xradar
import fsspec
import dask.bag as db
from xarray import DataTree
from xarray.backends.common import _normalize_path
from s3fs import S3File

# Relative imports
from .utils import prepare_for_read, batch, fix_angle


def accessor_wrapper(
filename_or_obj: str | os.PathLike,
engine: str = "iris",
) -> DataTree:
"""Wrapper function to load radar data for a single file or iterable of files with fsspec and compression check."""
try:
file = prepare_for_read(filename_or_obj)
return _load_file(file, engine)
except Exception as e:
print(f"Error loading {filename_or_obj}: {e}")
return None


def _load_file(file, engine) -> DataTree:
"""Helper function to load a single file with the specified backend."""
if engine == "iris":
if isinstance(file, S3File):
return xradar.io.open_iris_datatree(file.read())
elif isinstance(file, bytes):
return xradar.io.open_iris_datatree(file)
else:
return xradar.io.open_iris_datatree(file)
elif engine == "odim":
return xradar.io.open_odim_datatree(file)
elif engine == "nexradlevel2":
if isinstance(file, S3File):
local_file = fsspec.open_local(
f"simplecache::s3://{file.path}",
s3={"anon": True},
filecache={"cache_storage": "."},
)
data_tree = xradar.io.open_nexradlevel2_datatree(local_file)

# Remove the local file after loading the data
os.remove(local_file)
return data_tree
else:
return xradar.io.open_nexradlevel2_datatree(file)
else:
raise ValueError(f"Unsupported backend: {engine}")


def _process_file(args):
file, engine = args
return accessor_wrapper(file, engine=engine)


def load_radar_data(
filename_or_obj: str | os.PathLike | Iterable[str | os.PathLike],
backend: str = "iris",
parallel: bool = False,
batch_size: int = 12,
) -> DataTree:
"""
Load radar data from files in batches to avoid memory overload.
Parameters:
filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]): Path(s) to radar data files.
backend (str): Backend type to use. Options include 'iris', 'odim', etc. Default is 'iris'.
parallel (bool): If True, enables parallel processing with Dask. Default is False.
batch_size (int): Number of files to process in each batch.
Returns:
Iterable[List[DataTree]]: An iterable yielding batches of DataTree objects.
"""
filename_or_obj = _normalize_path(filename_or_obj)

for files_batch in batch(filename_or_obj, batch_size):
ls_dtree = []

if parallel:
bag = db.from_sequence(files_batch, npartitions=len(files_batch)).map(
accessor_wrapper, backend=backend
)
ls_dtree.extend(bag.compute())
else:
for file_path in files_batch:
result = accessor_wrapper(file_path, engine=backend)
if result is not None:
ls_dtree.append(result)

yield ls_dtree # Yield each batch of DataTree objects
225 changes: 225 additions & 0 deletions raw2zarr/dtree_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
from typing import Iterable, List, Union
import os

from xarray import DataTree, Dataset
from xarray.backends.common import _normalize_path

# Relative imports
from .data_reader import accessor_wrapper
from .utils import ensure_dimension, fix_angle, dtree_encoding, batch


def datatree_builder(
filename_or_obj: Union[str, os.PathLike, Iterable[Union[str, os.PathLike]]],
engine: str = "iris",
dim: str = "vcp_time",
) -> DataTree:
"""
Construct a hierarchical xarray.DataTree from radar data files.
This function loads radar data from one or more files and organizes it into a nested
`xarray.DataTree` structure. The data can be processed in batches and supports different
backend engines for reading the data.
Parameters:
filename_or_obj (str | os.PathLike | Iterable[str | os.PathLike]):
Path or paths to the radar data files to be loaded. Can be a single file,
a directory path, or an iterable of file paths.
engine (str, optional):
The backend engine to use for loading the radar data. Common options include
'iris' (default) and 'odim'. The selected engine must be supported by the underlying
data processing libraries.
dim (str, optional):
The name of the dimension to use for concatenating data across files. Default is 'vcp_time'.
Note: The 'time' dimension cannot be used as the concatenation dimension because it is
already a predefined dimension in the dataset and reserved for temporal data. Choose
a unique dimension name that does not conflict with existing dimensions in the datasets.
Returns:
xarray.DataTree:
A nested `xarray.DataTree` object that combines all the loaded radar data files into a
hierarchical structure. Each node in the tree corresponds to an `xarray.Dataset`.
Raises:
ValueError:
If no files are successfully loaded or if all batches result in empty data.
Notes:
- This function is designed to handle large datasets efficiently, potentially
processing data in batches and leveraging parallelism if supported by the backend.
- The resulting `xarray.DataTree` retains a hierarchical organization based on the structure
of the input files and their metadata.
Example:
>>> from raw2zarr import datatree_builder
>>> tree = datatree_builder(["file1.RAW", "file2.RAW"], engine="iris", dim="vcp_time")
>>> print(tree)
>>> print(tree["root/child"].to_dataset()) # Access a node's dataset
"""
# Initialize an empty dictionary to hold the nested structure

# Load radar data in batches
filename_or_obj = _normalize_path(filename_or_obj)
dtree = accessor_wrapper(filename_or_obj, engine=engine)
task_name = dtree.attrs.get("scan_name", "default_task").strip()
dtree = (dtree.pipe(fix_angle).pipe(ensure_dimension, dim)).xradar.georeference()
dtree = DataTree.from_dict({task_name: dtree})
dtree.encoding = dtree_encoding(dtree, append_dim=dim)
return dtree


def process_file(file: str, engine: str = "nexradlevel2") -> DataTree:
"""
Load and transform a single radar file into a DataTree object.
"""
try:
dtree = datatree_builder(file, engine=engine)
return dtree
except Exception as e:
print(f"Error processing file {file}: {e}")
return None


def append_sequential(
radar_files: Iterable[str | os.PathLike], append_dim: str, zarr_store: str, **kwargs
) -> None:
"""
Sequentially loads radar files and appends their data to a Zarr store.
This function processes radar files one at a time, loading each file into a
`xarray.DataTree` object and appending its data sequentially to a Zarr store.
Although the files are processed sequentially, the write process ensures
that data is written in an ordered manner to the Zarr store.
Parameters:
radar_files (Iterable[str | os.PathLike]): List of radar file paths to process.
append_dim (str): The dimension along which to append data in the Zarr store.
zarr_store (str): Path to the output Zarr store.
**kwargs: Additional arguments, including:
- zarr_format (int, optional): The Zarr format version (default: 2).
Returns:
None: Outputs data directly to the specified Zarr store.
Notes:
- Ensures ordered and sequential writing of data to the Zarr store.
- Handles encoding for compatibility with Zarr format.
"""
for file in radar_files:
dtree = process_file(file)
zarr_format = kwargs.get("zarr_format", 2)
if dtree:
enc = dtree.encoding
dtree = dtree[dtree.groups[1]]
try:
dtree.to_zarr(
store=zarr_store,
mode="a-",
encoding=enc,
consolidated=True,
zarr_format=zarr_format,
)
except ValueError:
dtree.to_zarr(
store=zarr_store,
mode="a-",
consolidated=True,
append_dim=append_dim,
zarr_format=zarr_format,
)
print("done")


def append_parallel(
radar_files: Iterable[str | os.PathLike],
append_dim: str,
zarr_store: str,
engine: str = "nexradlevel2",
batch_size: int = None,
**kwargs,
) -> None:
"""
Load radar files in parallel and append their data sequentially to a Zarr store.
This function uses Dask Bag to load radar files in parallel, processing them in
configurable batches. After loading, the resulting `xarray.DataTree` objects are
processed and written sequentially to the Zarr store, ensuring consistent and ordered
data storage. A Dask LocalCluster is used to distribute computation across available cores.
Parameters:
radar_files (Iterable[str | os.PathLike]):
An iterable containing paths to the radar files to process.
append_dim (str):
The dimension along which to append data in the Zarr store.
zarr_store (str):
The path to the output Zarr store where data will be written.
engine (str, optional):
The backend engine used to load radar files. Defaults to "nexradlevel2".
batch_size (int, optional):
The number of files to process in each batch. If not specified, it defaults to
the total number of cores available in the Dask cluster.
**kwargs:
Additional arguments, including:
- zarr_format (int, optional): The Zarr format version to use (default: 2).
Returns:
None:
This function writes data directly to the specified Zarr store and does not return a value.
Notes:
- File loading is parallelized using Dask Bag for efficiency, but data writing
to the Zarr store is performed sequentially to ensure consistent and ordered output.
- A Dask LocalCluster is created with a web-based dashboard for monitoring at
`http://127.0.0.1:8785` by default.
- If `batch_size` is not specified, it is automatically set based on the available cores
in the Dask cluster.
Example:
>>> radar_files = ["file1.nc", "file2.nc", "file3.nc"]
>>> zarr_store = "output.zarr"
>>> append_parallel(
radar_files=radar_files,
append_dim="time",
zarr_store=zarr_store,
engine="nexradlevel2",
batch_size=5
)
"""

from functools import partial
from dask import bag as db
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(dashboard_address="127.0.0.1:8785")
client = Client(cluster)
pf = partial(process_file, engine=engine)

if not batch_size:
batch_size = sum(client.ncores().values())

for files in batch(radar_files, n=batch_size):
bag = db.from_sequence(files, npartitions=len(files)).map(pf)

ls_dtree: List[DataTree] = bag.compute()
for dtree in ls_dtree:
zarr_format = kwargs.get("zarr_format", 2)
if dtree:
enc = dtree.encoding
dtree = dtree[dtree.groups[1]]
try:
dtree.to_zarr(
store=zarr_store,
mode="a-",
encoding=enc,
consolidated=True,
zarr_format=zarr_format,
)
except ValueError:
dtree.to_zarr(
store=zarr_store,
mode="a-",
consolidated=True,
append_dim=append_dim,
zarr_format=zarr_format,
)
Loading

0 comments on commit 26c011a

Please sign in to comment.