Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

very first interim_l3 dataset #121

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions src/halodrops/helper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@
)


path_to_flight_ids = "{platform}/Level_0"
path_to_l0_files = "{platform}/Level_0/{flight_id}"

l2_flight_attributes_map = {
"True Air Speed (m/s)": "true_air_speed_(ms-1)",
"Ground Speed (m/s)": "ground_speed_(ms-1)",
Expand All @@ -111,8 +108,13 @@
}


path_to_flight_ids = "{platform}/Level_0"
path_to_l0_files = "{platform}/Level_0/{flight_id}"

l2_filename_template = "{platform}_{launch_time}_{flight_id}_{serial_id}_Level_2.nc"

l3_filename_template = "{platform}_{flight_id}_Level_3.nc"


def get_bool(s):
if isinstance(s, bool):
Expand Down Expand Up @@ -219,16 +221,25 @@ def calc_q_from_rh(ds):

Function to estimate specific humidity from the relative humidity, temperature and pressure in the given dataset.
"""
e_s = calc_saturation_pressure(ds.ta.values)
w_s = mpcalc.mixing_ratio(e_s * units.Pa, ds.p.values * units.Pa).magnitude
w = ds.rh.values * w_s
q = w / (1 + w)
ds["q"] = (ds.rh.dims, q)

vmr = mpcalc.mixing_ratio_from_relative_humidity(
ds["p"].values * units.Pa,
ds.ta.values * units.kelvin,
(ds.rh * 100) * units.percent,
)
q = mpcalc.specific_humidity_from_mixing_ratio(vmr)

ds["q"] = (ds.rh.dims, q.magnitude)
ds["q"].attrs = dict(
standard_name="specific humidity",
long_name="specific humidity",
units=str(q.units),
)

return ds


def calc_theta_from_T(dataset):
def calc_theta_from_T(ds):
"""
Input :

Expand All @@ -241,8 +252,13 @@ def calc_theta_from_T(dataset):
Function to estimate potential temperature from the temperature and pressure in the given dataset.
"""
theta = mpcalc.potential_temperature(
dataset.p.values * units.Pa, dataset.ta.values * units.kelvin
).magnitude
ds["theta"] = (ds.ta.dims, theta)
ds.p.values * units.Pa, ds.ta.values * units.kelvin
)
ds["theta"] = (ds.ta.dims, theta.magnitude)
ds["theta"].attrs = dict(
standard_name="potential temperature",
long_name="potential temperature",
units=str(theta.units),
)

return ds
73 changes: 48 additions & 25 deletions src/halodrops/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from .helper.paths import Platform, Flight
from .helper.__init__ import path_to_flight_ids, path_to_l0_files
from .processor import Sonde
from .processor import Sonde, Gridded
import configparser
import inspect
import os
import xarray as xr
import warnings


def get_mandatory_args(function):
Expand Down Expand Up @@ -284,12 +285,26 @@ def iterate_Sonde_method_over_dict_of_Sondes_objects(
return my_dict


def sondes_to_gridded(sondes: dict) -> xr.Dataset:
pass
def sondes_to_gridded(sondes: dict, config: configparser.ConfigParser):
flight_id = list(sondes.values())[0].flight_id
platform_id = list(sondes.values())[0].platform_id
gridded = Gridded(sondes, flight_id, platform_id)
gridded.concat_sondes()
return gridded


def iterate_method_over_dataset(dataset: xr.Dataset, functions: list) -> xr.Dataset:
pass
def iterate_method_over_dataset(
obj: Gridded,
functions: list,
config: configparser.ConfigParser,
) -> xr.Dataset:
"""
This is NOT what the function should do in the end. only used to save the base-l3
"""
for function_name in functions:
function = getattr(Gridded, function_name)
result = function(obj, **get_args_for_function(config, function))
return result


def gridded_to_pattern(
Expand Down Expand Up @@ -424,26 +439,34 @@ def run_pipeline(pipeline: dict, config: configparser.ConfigParser):
"output": "sondes",
"comment": "This steps creates the L2 files after the QC (user says how QC flags are used to go from L1 to L2) and then saves these as L2 NC datasets.",
},
# "read_and_process_L2": {
# "intake": "sondes",
# "apply": iterate_Sonde_method_over_dict_of_Sondes_objects,
# "functions": [],
# "output": "sondes",
# "comment": "This step reads from the saved L2 files and prepares individual sonde datasets before they can be concatenated to create L3.",
# },
# "concatenate_L2": {
# "intake": "sondes",
# "apply": sondes_to_gridded,
# "output": "gridded",
# "comment": "This step concatenates the individual sonde datasets to create the L3 dataset and saves it as an NC file.",
# },
# "create_L3": {
# "intake": "gridded",
# "apply": iterate_method_over_dataset,
# "functions": [],
# "output": "gridded",
# "comment": "This step creates the L3 dataset after adding additional products.",
# },
"process_L2": {
"intake": "sondes",
"apply": iterate_Sonde_method_over_dict_of_Sondes_objects,
"functions": [
"get_l2_filename",
"add_l2_ds",
"create_prep_l3",
"add_q_and_theta_to_l2_ds",
"remove_non_mono_incr_alt",
"interpolate_alt",
"prepare_l2_for_gridded",
],
"output": "sondes",
"comment": "This step reads from the saved L2 files and prepares individual sonde datasets before they can be concatenated to create L3.",
},
"concatenate_L2": {
"intake": "sondes",
"apply": sondes_to_gridded,
"output": "gridded",
"comment": "This step concatenates the individual sonde datasets to create the L3 dataset.",
},
"create_L3": {
"intake": "gridded",
"apply": iterate_method_over_dataset,
"functions": ["get_l3_dir", "get_l3_filename", "write_l3"],
"output": "gridded",
"comment": "This step creates the L3 dataset after adding additional products.",
},
# "create_patterns": {
# "intake": "gridded",
# "apply": gridded_to_pattern,
Expand Down
172 changes: 165 additions & 7 deletions src/halodrops/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from typing import Any, Optional, List
import os
import subprocess
import warnings

import numpy as np
import xarray as xr


from halodrops.helper import rawreader as rr
import halodrops.helper as hh

Expand Down Expand Up @@ -965,11 +967,116 @@ def add_l2_ds(self, l2_dir: str = None):
"""
if l2_dir is None:
l2_dir = self.l2_dir
try:
object.__setattr__(
self, "l2_ds", xr.open_dataset(os.path.join(l2_dir, self.l2_filename))
)
return self
except FileNotFoundError:
return None

def create_prep_l3(self):
_prep_l3_ds = self.l2_ds.assign_coords(
{"sonde_id": ("sonde_id", [self.l2_ds.sonde_id.values])}
).sortby("time")
object.__setattr__(self, "_prep_l3_ds", _prep_l3_ds)
return self

def remove_non_mono_incr_alt(self):
"""
This function removes the indices in the
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This function removes the indices in the
This function removes the value at the indices which have a non-monotonically increasing value in the

geopotential height ('gpsalt')
"""
_prep_l3_ds = self._prep_l3_ds.load()
gps_alt = _prep_l3_ds.gpsalt
curr_alt = gps_alt.isel(time=0)
for i in range(len(gps_alt)):
if gps_alt[i] > curr_alt:
gps_alt[i] = np.nan
elif ~np.isnan(gps_alt[i]):
curr_alt = gps_alt[i]
_prep_l3_ds["gpsalt"] = gps_alt

mask = ~np.isnan(gps_alt)
object.__setattr__(
self, "l2_ds", xr.open_dataset(os.path.join(l2_dir, self.l2_filename))
self,
"_prep_l3_ds",
_prep_l3_ds.sel(time=mask),
)
return self

def interpolate_alt(
self,
interp_start=-5,
interp_stop=14515,
interp_step=10,
max_gap_fill: int = 50,
method: str = "bin",
):

interpolation_grid = np.arange(interp_start, interp_stop, interp_step)

if not (self._prep_l3_ds["gpsalt"].diff(dim="time") < 0).any():
warnings.warn(
f"your altitude for sonde {self._interim_l3_ds.sonde_id.values} is not sorted."
)
ds = self._prep_l3_ds.swap_dims({"time": "gpsalt"}).load()

if method == "linear_interpolate":
interp_ds = ds.interp(gpsalt=interpolation_grid)
elif method == "bin":

interpolation_bins = interpolation_grid.astype("int")
interpolation_label = np.arange(
interp_start + interp_step / 2,
interp_stop - interp_step / 2,
interp_step,
)
interp_ds = ds.groupby_bins(
"gpsalt",
interpolation_bins,
labels=interpolation_label,
).mean(dim="gpsalt")
# somehow coordinates are lost and need to be added again
for coord in ["lat", "lon", "time"]:
interp_ds = interp_ds.assign_coords(
{
coord: (
"gpsalt",
ds[coord]
.groupby_bins(
"gpsalt", interpolation_bins, labels=interpolation_label
)
.mean("gpsalt")
.values,
)
}
)
interp_ds = (
interp_ds.transpose()
.interpolate_na(
dim="gpsalt_bins", max_gap=max_gap_fill, use_coordinate=True
)
.rename({"gpsalt_bins": "gpsalt", "time": "interpolated_time"})
)

object.__setattr__(self, "_prep_l3_ds", interp_ds)

return self

def prepare_l2_for_gridded(self):
"""
Prepares l2 datasets to be concatenated to gridded.
adds all attributes as variables to avoid conflicts when concatenating because attributes are different
(and not lose information)

"""
_prep_l3_ds = self._prep_l3_ds
for attr, value in self._prep_l3_ds.attrs.items():
_prep_l3_ds[attr] = value

_prep_l3_ds.attrs.clear()
object.__setattr__(self, "_prep_l3_ds", _prep_l3_ds)
return self

def add_q_and_theta_to_l2_ds(self):
Expand All @@ -985,14 +1092,65 @@ def add_q_and_theta_to_l2_ds(self):
self : object
Returns the sonde object with potential temperature and specific humidity added to the L2 dataset.
"""
if hasattr(self, "_interim_l3_ds"):
ds = self._interim_l3_ds
else:
ds = self.l2_ds
ds = self._prep_l3_ds

ds = calc_q_from_rh(ds)
ds = calc_theta_from_T(ds)
ds = hh.calc_q_from_rh(ds)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a different commit. (that's my bad to not include hh, sorry :P)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes true. I subcontiously changed it at some point ^^'

ds = hh.calc_theta_from_T(ds)

object.__setattr__(self, "_interim_l3_ds", ds)

return self


@dataclass(order=True)
class Gridded:
sondes: dict
flight_id: str
platform_id: str

def concat_sondes(self):
"""
function to concatenate all sondes using the combination of all measurement times and launch times
"""
list_of_l2_ds = [sonde._prep_l3_ds for sonde in self.sondes.values()]
combined = xr.combine_by_coords(list_of_l2_ds)
self._interim_l3_ds = combined
return self

def get_l3_dir(self, l3_dirname: str = None):
if l3_dirname:
self.l3_dir = l3_dirname
elif not self.sondes is None:
self.l3_dir = list(self.sondes.values())[0].l3_dir
else:
raise ValueError("No sondes and no l3 directory given, cannot continue ")

def get_l3_filename(
self, l3_filename_template: str = None, l3_filename: str = None
):
if l3_filename is None:
if l3_filename_template is None:
l3_filename = hh.l3_filename_template.format(
platform=self.platform_id,
flight_id=self.flight_id,
)
else:
l3_filename = l3_filename_template.format(
platform=self.platform_id,
flight_id=self.flight_id,
)

self.l3_filename = l3_filename

return self

def write_l3(self, l3_dir: str = None):
if l3_dir is None:
l3_dir = self.l3_dir

if not os.path.exists(l3_dir):
os.makedirs(l3_dir)

self._interim_l3_ds.to_netcdf(os.path.join(l3_dir, self.l3_filename))

return self
Loading
Loading