Skip to content

Commit

Permalink
Generate statistics for several apps.
Browse files Browse the repository at this point in the history
  • Loading branch information
gnrgomes committed Jun 10, 2024
1 parent 4740a8e commit 736e515
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/lisfloodutilities/gridding/decumulate_daily_grids.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_filter_class(conf: Config) -> KiwisFilter:
plugins_columns_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_COLUMNS')
plugins_columns_dic = eval(plugins_columns_def)
plugins_columns = OrderedDict(plugins_columns_dic)
filter_class = KiwisFilter(filter_columns=plugins_columns)
filter_class = KiwisFilter(filter_columns=plugins_columns, var_code=conf.var_code, quiet_mode=quiet_mode)
return filter_class


Expand Down Expand Up @@ -113,7 +113,7 @@ def get_decumulated_dataframes(conf: Config, kiwis_filepaths: List[Path], kiwis_
plugins_columns_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_COLUMNS')
plugins_columns_dic = eval(plugins_columns_def)
filter_columns = OrderedDict(plugins_columns_dic)
filter_class = DowgradedDailyTo6HourlyObservationsKiwisFilter(filter_columns, filter_args)
filter_class = DowgradedDailyTo6HourlyObservationsKiwisFilter(filter_columns, filter_args, conf.var_code, quiet_mode)
df_kiwis_array = filter_class.filter(kiwis_filepaths, kiwis_str_timestamps, kiwis_dataframes)
return df_kiwis_array

Expand Down
22 changes: 22 additions & 0 deletions src/lisfloodutilities/gridding/generate_grids.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import os
import csv
import copy
from pathlib import Path
import numpy as np
from argparse import ArgumentParser, ArgumentTypeError
Expand Down Expand Up @@ -61,6 +62,21 @@ def get_grid(grid_utils: GriddingUtils, filename: Path, tiff_filepath: Path=None
grid_data = grid_utils.generate_grid(filename)
return grid_data

def print_grid_statistics(var_code: str, grid_timestamp: datetime, grid: np.ndarray):
timestamp = grid_timestamp.strftime(FileUtils.DATE_PATTERN_SEPARATED)
grid_min = np.nanmin(grid)
grid_max = np.nanmax(grid)
grid_mean = np.nanmean(grid)
grid_percentile_10 = np.nanpercentile(grid, 10)
grid_percentile_90 = np.nanpercentile(grid, 90)
stats_string = (
f'#APP_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{var_code}", '
f'"MINIMUM_VALUE": {grid_min:.2f}, "MAXIMUM_VALUE": {grid_max:.2f}, '
f'"MEAN_VALUE": {grid_mean:.2f}, "PERCENTILE_10": {grid_percentile_10:.2f}, '
f'"PERCENTILE_90": {grid_percentile_90:.2f}}}'
)
print_msg(stats_string)

def run(config_filename: str, infolder: str, output_file: str, processing_dates_file: str, file_utils: FileUtils,
output_tiff: bool, output_netcdf: bool, overwrite_output: bool, use_existing_file: bool, get_existing_tiff: bool,
start_date: datetime = None, end_date: datetime = None, interpolation_mode: str = 'adw',
Expand Down Expand Up @@ -96,11 +112,14 @@ def run(config_filename: str, infolder: str, output_file: str, processing_dates_

netcdf_offset_file_date = int(conf.get_config_field('VAR_TIME','OFFSET_FILE_DATE'))

cur_writer = None
if output_tiff:
output_writer_tiff = GDALWriter(conf, overwrite_output, quiet_mode)
cur_writer = output_writer_tiff
if output_netcdf:
output_writer_netcdf = NetCDFWriter(conf, overwrite_output, quiet_mode)
output_writer_netcdf.open(Path(output_file))
cur_writer = output_writer_netcdf
file_loader = KiwisLoader(conf, Path(infolder), dates_to_process, overwrite_output, use_existing_file, quiet_mode)
for filename, kiwis_timestamp_str in file_loader:
kiwis_timestamp = datetime.strptime(kiwis_timestamp_str, FileUtils.DATE_PATTERN_CONDENSED_SHORT)
Expand All @@ -110,6 +129,9 @@ def run(config_filename: str, infolder: str, output_file: str, processing_dates_
if output_tiff:
output_writer_tiff.open(tiff_filepath)
grid_data = get_grid(grid_utils, filename, tiff_filepath, get_existing_tiff)
if cur_writer is not None:
cur_grid = cur_writer.setNaN(copy.deepcopy(grid_data))
print_grid_statistics(conf.var_code, file_timestamp, cur_grid)
if output_netcdf:
output_writer_netcdf.write(grid_data, file_timestamp)
if output_tiff:
Expand Down
91 changes: 65 additions & 26 deletions src/lisfloodutilities/gridding/lib/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ class KiwisFilter:
to be used for interpolation.
"""

def __init__(self, filter_columns: dict = {}, filter_args: dict = {}):
def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False):
self.args = filter_args
self.filter_columns = filter_columns
self.var_code = var_code
self.quiet_mode = quiet_mode
self.QUALITY_CODE_VALID = '40'
self.QUALITY_CODE_SUSPICIOUS = '120'
self.QUALITY_CODE_WRONG = '160'
self.stati = {"Active": 1, "Inactive": 0, "yes": 0, "no": 1, "Closed": 0, "Under construction": 0}
self.defaultReturn = 1
self.cur_timestamp = ''
Expand Down Expand Up @@ -52,13 +57,62 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame:
# Translate status columns
df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] = df[self.COL_STATION_DIARY_STATUS].apply(self.__rewrite_column)
df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] = df[self.COL_INACTIVE_HISTORIC].apply(self.__rewrite_column)
# Apply filtering rules
df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == '40') | (df[f'{self.COL_QUALITY_CODE}'] == '120')) &
# Apply filtering rules but get all quality codes for the statistic
df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_VALID) |
(df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_SUSPICIOUS) |
(df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_WRONG)) &
(df[f'{self.COL_NO_GRIDDING}'] == 'no') & (df[f'{self.COL_IS_IN_DOMAIN}'] == 'yes') &
(df[f'{self.COL_EXCLUDE}'] != 'yes') & (df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] == 1) &
(df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] == 1)]
self.print_statistics(df)
# Show only codes valid and suspicious
df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_VALID) |
(df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_SUSPICIOUS))]
return df

@staticmethod
def get_totals_by_quality_code(row: pd.Series, column_quality_code: str, quality_code: str) -> int:
cur_quality_code = row[column_quality_code]
if cur_quality_code == quality_code:
return row['count']
return 0

def print_statistics(self, df: pd.DataFrame):
timestamp = self.cur_timestamp.strftime('%Y-%m-%d %H:%M:%S')
new_df = df.groupby([self.COL_PROVIDER_ID, self.COL_QUALITY_CODE]).size().reset_index(name='count')
# Transpose the quality codes
new_df[self.QUALITY_CODE_VALID] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_VALID)
new_df[self.QUALITY_CODE_SUSPICIOUS] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_SUSPICIOUS)
new_df[self.QUALITY_CODE_WRONG] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1,
column_quality_code=self.COL_QUALITY_CODE,
quality_code=self.QUALITY_CODE_WRONG)
new_df.drop(columns=[self.COL_QUALITY_CODE, 'count'], inplace=True)
new_df = new_df.groupby(self.COL_PROVIDER_ID)[self.QUALITY_CODE_VALID,
self.QUALITY_CODE_SUSPICIOUS,
self.QUALITY_CODE_WRONG].sum()
new_df.reset_index(inplace=True)
for index, row in new_df.iterrows():
provider_id = row[self.COL_PROVIDER_ID]
quality_code_valid = row[self.QUALITY_CODE_VALID]
quality_code_suspicious = row[self.QUALITY_CODE_SUSPICIOUS]
quality_code_wrong = row[self.QUALITY_CODE_WRONG]
total = quality_code_valid + quality_code_suspicious + quality_code_wrong
stats_string = (
f'#KIWIS_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{self.var_code}", '
f'"PROVIDER_ID": {provider_id}, "QUALITY_CODE_VALID": {quality_code_valid}, '
f'"QUALITY_CODE_SUSPICIOUS": {quality_code_suspicious}, "QUALITY_CODE_WRONG": {quality_code_wrong}, '
f'"TOTAL_OBSERVATIONS": {total}}}'
)
self.print_msg(stats_string)

def print_msg(self, msg: str = ''):
if not self.quiet_mode:
print(msg)

def get_dataframe_output_columns(self, df: pd.DataFrame) -> pd.DataFrame:
return df[self.OUTPUT_COLUMNS]

Expand Down Expand Up @@ -137,8 +191,8 @@ class ObservationsKiwisFilter(KiwisFilter):

CLUSTER_COLLAPSE_RADIUS = 0.011582073434000193 # decimal degrees (1287 m)

def __init__(self, filter_columns: dict = {}, filter_args: dict = {}):
super().__init__(filter_columns, filter_args)
def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False):
super().__init__(filter_columns, filter_args, var_code, quiet_mode)
self.INTERNAL_COLUMNS.append('has_neighbor_within_radius')
# Calculating the radius in decimal degrees
self.provider_radius = {}
Expand Down Expand Up @@ -187,8 +241,8 @@ class DowgradedObservationsKiwisFilter(ObservationsKiwisFilter):
downgraded station from all the dataframes.
"""

def __init__(self, filter_columns: dict = {}, filter_args: dict = {}):
super().__init__(filter_columns, filter_args)
def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False):
super().__init__(filter_columns, filter_args, var_code, quiet_mode)
self.filtered_station_ids = {}

def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_data_frames: List[pd.DataFrame]) -> List[pd.DataFrame]:
Expand Down Expand Up @@ -237,8 +291,8 @@ class DowgradedDailyTo6HourlyObservationsKiwisFilter(ObservationsKiwisFilter):
downgraded station from all the dataframes.
"""

def __init__(self, filter_columns: dict = {}, filter_args: dict = {}):
super().__init__(filter_columns, filter_args)
def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False):
super().__init__(filter_columns, filter_args, var_code, quiet_mode)
self.INTERNAL_COLUMNS.append('has_neighbor_within_radius_complete_6h')
self.kiwis_24h_dataframe = None
self.df_24h_without_neighbors = None
Expand Down Expand Up @@ -338,9 +392,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat
# Get all 6hourly slots aggregated
self.all_6h_df = self.get_all_6hourly_station_values_df(filtered_data_frames)

# # TODO: Delete
# # self.all_6h_df.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/all_6h_df.tsv', index=True, header=True, sep="\t")

# Select the daily stations from the providers that do not contain a 6hourly station within radius and decumulate
# its value dividing by the number of 6hourly slots. These will be inserted directly in all the 6hourly slots
tree_all_6h = cKDTree(self.all_6h_df[[self.COL_LON, self.COL_LAT]])
Expand All @@ -349,10 +400,7 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat
self.kiwis_24h_dataframe = self.update_column_if_provider_stations_are_in_radius(df=self.kiwis_24h_dataframe, tree=tree_all_6h)
df_24h_from_providers = self.kiwis_24h_dataframe[self.kiwis_24h_dataframe[self.COL_PROVIDER_ID].isin(providers_ids)]
self.df_24h_without_neighbors = df_24h_from_providers[df_24h_from_providers['has_neighbor_within_radius'] == False]

# # TODO: Delete
# self.df_24h_without_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_without_neighbors1.tsv', index=True, header=True, sep="\t")


# From the station without neighbors remove MARS stations that have a DWDSynop with the same WMO Number because
# they are actually the same station even though they might not be exactly within the defined radius.
wmo_nums_of_6h_dwd_stations = self.all_6h_df.loc[self.all_6h_df[self.COL_PROVIDER_ID] == self.PROVIDER_DWD_SYNOP, self.COL_STATION_NUM].values
Expand All @@ -365,10 +413,7 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat

# Decumulate the values of stations without neighbors
self.df_24h_without_neighbors[self.COL_VALUE] /= self.num_6h_slots

# # TODO: Delete
# self.df_24h_without_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_without_neighbors2_and_without_dwdSynop_with_same_WMO_decumulated.tsv', index=True, header=True, sep="\t")


# Remove the daily rows that have a complete 6hourly station in the radius because there is no need to decumulate in these cases
complete_6h_df = self.all_6h_df.loc[self.all_6h_df['count_6h_slots'] == self.num_6h_slots]
self.df_24h_with_neighbors = df_24h_from_providers[df_24h_from_providers['has_neighbor_within_radius'] == True]
Expand All @@ -378,9 +423,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat
column_to_update='has_neighbor_within_radius_complete_6h')
self.df_24h_with_neighbors = self.df_24h_with_neighbors[self.df_24h_with_neighbors['has_neighbor_within_radius_complete_6h'] == False]

# # TODO: Delete
# self.df_24h_with_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_with_neighbors1_and_incomplete_6h_slots.tsv', index=True, header=True, sep="\t")

# Change the 24h stations corresponding with the 6h stations containing missing values by changing its value using the formula
# (PR - Sum(PR6)) / (number of missing values), if and only if the resulting value is positive (>=0).
missing_6h_slots_df = self.all_6h_df.loc[self.all_6h_df['count_6h_slots'] < self.num_6h_slots]
Expand All @@ -393,9 +435,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat
axis=1, tree=tree_missing_6h, provider_id=provider_id,
radius=radius, stations_6h_df=missing_6h_slots_df)
self.df_24h_with_neighbors = self.df_24h_with_neighbors.loc[self.df_24h_with_neighbors[self.COL_VALUE] >= 0.0]

# # TODO: Delete
# self.df_24h_with_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_with_neighbors2_and_incomplete_6h_slots_decumulated.tsv', index=True, header=True, sep="\t")

# Clean the dataframes of internal columns before merging them
self.df_24h_without_neighbors = self.df_24h_without_neighbors.drop(columns=self.INTERNAL_COLUMNS, errors='ignore')
Expand Down
2 changes: 1 addition & 1 deletion src/lisfloodutilities/gridding/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ def __get_filter_classes(self) -> list:
class_name = plugin
class_args = plugins[plugin]
module = importlib.import_module(module_name)
class_instance = getattr(module, class_name)(plugins_columns, class_args)
class_instance = getattr(module, class_name)(plugins_columns, class_args, self.var_code, self.quiet_mode)
plugins_array.append(class_instance)
except ImportError:
print(f"Error: Could not import module '{module_name}'")
Expand Down
38 changes: 19 additions & 19 deletions src/lisfloodutilities/gridding/lib/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ def __setup_metadata(self):
self.cell_methods = self.conf.get_config_field('PROPERTIES', 'CELL_METHODS')
self.units = self.conf.get_config_field('PROPERTIES', 'UNIT')

def setNaN(self, value, defaultNaN=np.nan):
try:
value[value==1e31] = defaultNaN
except Exception as e:
self.print_msg(f"value==1e31 : {str(e)}")
try:
value[value==self.conf.VALUE_NAN] = defaultNaN
except Exception as e:
self.print_msg(f"value=={self.conf.VALUE_NAN} : {str(e)}")
try:
value[value==-32768.0] = defaultNaN
except Exception as e:
self.print_msg(f"value==-32768.0 : {str(e)}")
try:
value[value==31082] = defaultNaN
except Exception as e:
self.print_msg(f"value==31082 : {str(e)}")
return value

def open(self, out_filename: Path):
self.filepath = out_filename
self.time_created = timex.ctime(timex.time())
Expand Down Expand Up @@ -98,25 +117,6 @@ def open(self, out_filename: Path):
else:
raise ArgumentTypeError(f'File {self.filepath} already exists. Use --force flag to append.')

def setNaN(self, value, defaultNaN=np.nan):
try:
value[value==1e31] = defaultNaN
except Exception as e:
self.print_msg(f"value==1e31 : {str(e)}")
try:
value[value==self.conf.VALUE_NAN] = defaultNaN
except Exception as e:
self.print_msg(f"value=={self.conf.VALUE_NAN} : {str(e)}")
try:
value[value==-32768.0] = defaultNaN
except Exception as e:
self.print_msg(f"value==-32768.0 : {str(e)}")
try:
value[value==31082] = defaultNaN
except Exception as e:
self.print_msg(f"value==31082 : {str(e)}")
return value

def write(self, grid: np.ndarray, timestamp: datetime = None):
timestep = -1
if timestamp is not None:
Expand Down
Loading

0 comments on commit 736e515

Please sign in to comment.