Skip to content

Commit

Permalink
Merge pull request #13 from aladinor/refactoring
Browse files Browse the repository at this point in the history
refactoring, adding new fuctions and using data streaming instead of …
  • Loading branch information
aladinor authored Sep 29, 2024
2 parents ec865a4 + e4ed6de commit e54c723
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 47 deletions.
109 changes: 64 additions & 45 deletions sigmet2zarr/task2zarr.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import os
import shutil
import datatree
import zarr
import xradar as xd
import numpy as np
from datatree import DataTree
from xarray.core.dataset import Dataset
from xarray.core.dataarray import DataArray
from xarray import full_like
from sigmet2zarr.utils import (
data_accessor,
Expand All @@ -28,27 +24,49 @@ def _get_root(dt: DataTree):
return root


def _fix_sn(dt: DataTree, sw_num: list[int]):
groups = [i for i in list(dt.groups) if i.startswith("/sweep")]
for group in groups:
sn: float = float(dt[group].ds.sweep_fixed_angle.values)
nsn: int = sw_num[sn]
new_sn = full_like(dt[group].ds.sweep_number, nsn)
dt[group]["sweep_number"] = new_sn
return dt
def _fix_sn(ds: Dataset, sw_num: dict[float, int]) -> dict:
sn: float = float(ds["sweep_fixed_angle"].values)
nsn: int = sw_num[sn]
new_sn = full_like(ds.sweep_number, nsn)
new_ds = ds.copy(deep=True)
new_ds["sweep_number"] = new_sn
return new_ds


def raw_to_dt(
file: str, append_dim: str, cache_storage: str = "/tmp/radar/"
) -> DataTree:
def prepare2append(dt: DataTree, append_dim: str, radar_name: str = "GUA") -> DataTree:
"""
Function that convert sigmet files into a datatree using xd.io.open_iris_datatree
@param append_dim: dimension where data will be appended
@param cache_storage: locally caching remote files path
@param file: radar file path
@return: xradar datatree with all sweeps within each file
Converts SIGMET radar files into a DataTree structure and prepares it for appending along a specified dimension.
This function processes a given DataTree of radar data, organizes it by sweep angles, and prepares it for appending
along the specified dimension. It uses configuration files to map radar sweep angles and numbers, and georeferences
the data before appending.
Parameters
----------
dt : DataTree
The DataTree object containing radar data to be processed.
append_dim : str
The dimension along which the data will be appended (e.g., time, elevation).
radar_name : str, optional
The radar name to identify the correct configuration (default is "GUA").
Returns
-------
DataTree
A new DataTree object with all sweeps processed and ready for appending along the specified dimension.
Notes
-----
- The function expects a configuration file in TOML format located at "../config/radar.toml", containing
the necessary radar sweep angle and sweep number information.
- Each sweep in the DataTree is georeferenced, and its sweep number is corrected before being organized
into the final DataTree structure.
Examples
--------
>>> radar_data = prepare2append(my_datatree, append_dim="time", radar_name="GUA")
>>> # radar_data is now prepared for appending along the time dimension
"""
radar_name = file.split("/")[-1].split(".")[0][:3]
elev: np.array = np.array(
load_toml("../config/radar.toml")[radar_name]["elevations"]
)
Expand All @@ -57,41 +75,41 @@ def raw_to_dt(
)
swps: dict[float, str] = {j: f"sweep_{idx}" for idx, j in enumerate(elev)}
sw_fix: dict[float, int] = {j: sw_num[idx] for idx, j in enumerate(elev)}
data: dict[float, Dataset] = {}
dt: DataTree = xd.io.open_iris_datatree(data_accessor(file))
dt: DataTree = _fix_sn(dt, sw_fix)
data.update(

tree = {
node.path: node.to_dataset()
for node in dt.subtree
if not node.path.startswith("/sweep")
}
tree.update(
{
float(dt[j].sweep_fixed_angle.values): fix_angle(
dt[j]
).ds.xradar.georeference()
for j in list(dt.children)
if j not in ["radar_parameters"]
swps[float(node.sweep_fixed_angle.values)]: fix_angle(
_fix_sn(node, sw_num=sw_fix)
)
.to_dataset()
.xradar.georeference()
for node in dt.subtree
if node.path.startswith("/sweep")
}
)
data = exp_dim(data, append_dim=append_dim)
dtree = _get_root(dt)
for i, sw in enumerate(data.keys()):
DataTree(data[sw], name=swps[sw], parent=dtree)
return dtree
tree = exp_dim(tree, append_dim=append_dim)
return DataTree.from_dict(tree)


def exp_dim(dt, append_dim) -> DataTree:
def exp_dim(dt: dict[str, Dataset], append_dim: str = "vcp_time") -> dict:
"""
Functions that expand dimension to each dataset within the datatree
@param dt: xarray.datatree
@param append_dim: dimension name which dataset will be expanded. e.g. 'vcp_time'
@return: xarray Datatree
"""
dt_new = {}
for sw, ds in dt.items():
_time = convert_time(ds)
if not _time:
continue
ds[append_dim] = _time
ds: Dataset = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0)
dt_new[sw] = ds
return dt_new
if sw.startswith("sweep"):
_time = convert_time(ds)
ds[append_dim] = _time
ds: Dataset = ds.set_coords(append_dim).expand_dims(dim=append_dim, axis=0)
dt[sw] = ds
return dt


def dt2zarr2(
Expand Down Expand Up @@ -176,7 +194,8 @@ def raw2zarr(
@param file: radar file path
@return: None
"""
dtree = raw_to_dt(file, append_dim=append_dim)
dt: DataTree = xd.io.open_iris_datatree(data_accessor(file))
dtree = prepare2append(dt, append_dim=append_dim)
elevations = [
np.round(np.median(dtree.children[i].elevation.data), 1)
for i in list(dtree.children)
Expand Down
56 changes: 54 additions & 2 deletions sigmet2zarr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,57 @@
from datatree import DataTree
import pandas as pd
import tomllib
from time import time
from collections.abc import Iterator
from typing import Any, List


def batch(iterable: List[Any], n: int = 1) -> Iterator[List[Any]]:
"""
Splits a list into consecutive chunks of size `n`.
This function takes a list and yields successive batches of size `n` from it.
If the length of the list is not evenly divisible by `n`, the last batch will
contain the remaining elements.
Parameters
----------
iterable : list[Any]
The list to be split into batches.
n : int, optional
The number of items in each batch (default is 1).
Yields
------
Iterator[list[Any]]
An iterator that yields slices of the original list of size `n`, except
for the last batch which may contain fewer elements if the total number
of elements in the list is not evenly divisible by `n`.
Examples
--------
>>> list(batch([1, 2, 3, 4, 5], n=2))
[[1, 2], [3, 4], [5]]
>>> list(batch(['a', 'b', 'c', 'd'], n=3))
[['a', 'b', 'c'], ['d']]
"""
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx : min(ndx + n, l)]


def timer_func(func):
# This function shows the execution time of
# the function object passed
def wrap_func(*args, **kwargs):
t1 = time()
result = func(*args, **kwargs)
t2 = time()
print(f"Function {func.__name__!r} executed in {(t2-t1):.4f}s")
return result

return wrap_func


def make_dir(path) -> None:
Expand Down Expand Up @@ -89,8 +140,9 @@ def convert_time(ds) -> pd.to_datetime:
"""
for i in ds.time.values:
time = pd.to_datetime(i)
if not pd.isnull(time):
return time
if pd.isnull(time):
continue
return time


def fix_angle(ds: xr.Dataset, tolerance: float = None, **kwargs) -> xr.Dataset:
Expand Down

0 comments on commit e54c723

Please sign in to comment.