From 736e5159187a2efa899017f8c251b2f964bc6f6e Mon Sep 17 00:00:00 2001 From: gnrgomes Date: Mon, 10 Jun 2024 12:55:11 +0200 Subject: [PATCH] Generate statistics for several apps. --- .../gridding/decumulate_daily_grids.py | 4 +- .../gridding/generate_grids.py | 22 +++++ src/lisfloodutilities/gridding/lib/filters.py | 91 +++++++++++++------ src/lisfloodutilities/gridding/lib/utils.py | 2 +- src/lisfloodutilities/gridding/lib/writers.py | 38 ++++---- .../gridding/tools/read_stats_from_logs.py | 25 +++-- 6 files changed, 126 insertions(+), 56 deletions(-) diff --git a/src/lisfloodutilities/gridding/decumulate_daily_grids.py b/src/lisfloodutilities/gridding/decumulate_daily_grids.py index e2244dc..62ac940 100644 --- a/src/lisfloodutilities/gridding/decumulate_daily_grids.py +++ b/src/lisfloodutilities/gridding/decumulate_daily_grids.py @@ -49,7 +49,7 @@ def get_filter_class(conf: Config) -> KiwisFilter: plugins_columns_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_COLUMNS') plugins_columns_dic = eval(plugins_columns_def) plugins_columns = OrderedDict(plugins_columns_dic) - filter_class = KiwisFilter(filter_columns=plugins_columns) + filter_class = KiwisFilter(filter_columns=plugins_columns, var_code=conf.var_code, quiet_mode=quiet_mode) return filter_class @@ -113,7 +113,7 @@ def get_decumulated_dataframes(conf: Config, kiwis_filepaths: List[Path], kiwis_ plugins_columns_def = conf.get_config_field('PROPERTIES', 'KIWIS_FILTER_COLUMNS') plugins_columns_dic = eval(plugins_columns_def) filter_columns = OrderedDict(plugins_columns_dic) - filter_class = DowgradedDailyTo6HourlyObservationsKiwisFilter(filter_columns, filter_args) + filter_class = DowgradedDailyTo6HourlyObservationsKiwisFilter(filter_columns, filter_args, conf.var_code, quiet_mode) df_kiwis_array = filter_class.filter(kiwis_filepaths, kiwis_str_timestamps, kiwis_dataframes) return df_kiwis_array diff --git a/src/lisfloodutilities/gridding/generate_grids.py b/src/lisfloodutilities/gridding/generate_grids.py index 31dc568..70a5e36 100755 --- a/src/lisfloodutilities/gridding/generate_grids.py +++ b/src/lisfloodutilities/gridding/generate_grids.py @@ -18,6 +18,7 @@ import sys import os import csv +import copy from pathlib import Path import numpy as np from argparse import ArgumentParser, ArgumentTypeError @@ -61,6 +62,21 @@ def get_grid(grid_utils: GriddingUtils, filename: Path, tiff_filepath: Path=None grid_data = grid_utils.generate_grid(filename) return grid_data +def print_grid_statistics(var_code: str, grid_timestamp: datetime, grid: np.ndarray): + timestamp = grid_timestamp.strftime(FileUtils.DATE_PATTERN_SEPARATED) + grid_min = np.nanmin(grid) + grid_max = np.nanmax(grid) + grid_mean = np.nanmean(grid) + grid_percentile_10 = np.nanpercentile(grid, 10) + grid_percentile_90 = np.nanpercentile(grid, 90) + stats_string = ( + f'#APP_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{var_code}", ' + f'"MINIMUM_VALUE": {grid_min:.2f}, "MAXIMUM_VALUE": {grid_max:.2f}, ' + f'"MEAN_VALUE": {grid_mean:.2f}, "PERCENTILE_10": {grid_percentile_10:.2f}, ' + f'"PERCENTILE_90": {grid_percentile_90:.2f}}}' + ) + print_msg(stats_string) + def run(config_filename: str, infolder: str, output_file: str, processing_dates_file: str, file_utils: FileUtils, output_tiff: bool, output_netcdf: bool, overwrite_output: bool, use_existing_file: bool, get_existing_tiff: bool, start_date: datetime = None, end_date: datetime = None, interpolation_mode: str = 'adw', @@ -96,11 +112,14 @@ def run(config_filename: str, infolder: str, output_file: str, processing_dates_ netcdf_offset_file_date = int(conf.get_config_field('VAR_TIME','OFFSET_FILE_DATE')) + cur_writer = None if output_tiff: output_writer_tiff = GDALWriter(conf, overwrite_output, quiet_mode) + cur_writer = output_writer_tiff if output_netcdf: output_writer_netcdf = NetCDFWriter(conf, overwrite_output, quiet_mode) output_writer_netcdf.open(Path(output_file)) + cur_writer = output_writer_netcdf file_loader = KiwisLoader(conf, Path(infolder), dates_to_process, overwrite_output, use_existing_file, quiet_mode) for filename, kiwis_timestamp_str in file_loader: kiwis_timestamp = datetime.strptime(kiwis_timestamp_str, FileUtils.DATE_PATTERN_CONDENSED_SHORT) @@ -110,6 +129,9 @@ def run(config_filename: str, infolder: str, output_file: str, processing_dates_ if output_tiff: output_writer_tiff.open(tiff_filepath) grid_data = get_grid(grid_utils, filename, tiff_filepath, get_existing_tiff) + if cur_writer is not None: + cur_grid = cur_writer.setNaN(copy.deepcopy(grid_data)) + print_grid_statistics(conf.var_code, file_timestamp, cur_grid) if output_netcdf: output_writer_netcdf.write(grid_data, file_timestamp) if output_tiff: diff --git a/src/lisfloodutilities/gridding/lib/filters.py b/src/lisfloodutilities/gridding/lib/filters.py index 3d0cdfc..11b65b1 100644 --- a/src/lisfloodutilities/gridding/lib/filters.py +++ b/src/lisfloodutilities/gridding/lib/filters.py @@ -14,9 +14,14 @@ class KiwisFilter: to be used for interpolation. """ - def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False): self.args = filter_args self.filter_columns = filter_columns + self.var_code = var_code + self.quiet_mode = quiet_mode + self.QUALITY_CODE_VALID = '40' + self.QUALITY_CODE_SUSPICIOUS = '120' + self.QUALITY_CODE_WRONG = '160' self.stati = {"Active": 1, "Inactive": 0, "yes": 0, "no": 1, "Closed": 0, "Under construction": 0} self.defaultReturn = 1 self.cur_timestamp = '' @@ -52,13 +57,62 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: # Translate status columns df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] = df[self.COL_STATION_DIARY_STATUS].apply(self.__rewrite_column) df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] = df[self.COL_INACTIVE_HISTORIC].apply(self.__rewrite_column) - # Apply filtering rules - df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == '40') | (df[f'{self.COL_QUALITY_CODE}'] == '120')) & + # Apply filtering rules but get all quality codes for the statistic + df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_VALID) | + (df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_SUSPICIOUS) | + (df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_WRONG)) & (df[f'{self.COL_NO_GRIDDING}'] == 'no') & (df[f'{self.COL_IS_IN_DOMAIN}'] == 'yes') & (df[f'{self.COL_EXCLUDE}'] != 'yes') & (df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] == 1) & (df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] == 1)] + self.print_statistics(df) + # Show only codes valid and suspicious + df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_VALID) | + (df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_SUSPICIOUS))] return df + @staticmethod + def get_totals_by_quality_code(row: pd.Series, column_quality_code: str, quality_code: str) -> int: + cur_quality_code = row[column_quality_code] + if cur_quality_code == quality_code: + return row['count'] + return 0 + + def print_statistics(self, df: pd.DataFrame): + timestamp = self.cur_timestamp.strftime('%Y-%m-%d %H:%M:%S') + new_df = df.groupby([self.COL_PROVIDER_ID, self.COL_QUALITY_CODE]).size().reset_index(name='count') + # Transpose the quality codes + new_df[self.QUALITY_CODE_VALID] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, + column_quality_code=self.COL_QUALITY_CODE, + quality_code=self.QUALITY_CODE_VALID) + new_df[self.QUALITY_CODE_SUSPICIOUS] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, + column_quality_code=self.COL_QUALITY_CODE, + quality_code=self.QUALITY_CODE_SUSPICIOUS) + new_df[self.QUALITY_CODE_WRONG] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, + column_quality_code=self.COL_QUALITY_CODE, + quality_code=self.QUALITY_CODE_WRONG) + new_df.drop(columns=[self.COL_QUALITY_CODE, 'count'], inplace=True) + new_df = new_df.groupby(self.COL_PROVIDER_ID)[self.QUALITY_CODE_VALID, + self.QUALITY_CODE_SUSPICIOUS, + self.QUALITY_CODE_WRONG].sum() + new_df.reset_index(inplace=True) + for index, row in new_df.iterrows(): + provider_id = row[self.COL_PROVIDER_ID] + quality_code_valid = row[self.QUALITY_CODE_VALID] + quality_code_suspicious = row[self.QUALITY_CODE_SUSPICIOUS] + quality_code_wrong = row[self.QUALITY_CODE_WRONG] + total = quality_code_valid + quality_code_suspicious + quality_code_wrong + stats_string = ( + f'#KIWIS_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{self.var_code}", ' + f'"PROVIDER_ID": {provider_id}, "QUALITY_CODE_VALID": {quality_code_valid}, ' + f'"QUALITY_CODE_SUSPICIOUS": {quality_code_suspicious}, "QUALITY_CODE_WRONG": {quality_code_wrong}, ' + f'"TOTAL_OBSERVATIONS": {total}}}' + ) + self.print_msg(stats_string) + + def print_msg(self, msg: str = ''): + if not self.quiet_mode: + print(msg) + def get_dataframe_output_columns(self, df: pd.DataFrame) -> pd.DataFrame: return df[self.OUTPUT_COLUMNS] @@ -137,8 +191,8 @@ class ObservationsKiwisFilter(KiwisFilter): CLUSTER_COLLAPSE_RADIUS = 0.011582073434000193 # decimal degrees (1287 m) - def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): - super().__init__(filter_columns, filter_args) + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False): + super().__init__(filter_columns, filter_args, var_code, quiet_mode) self.INTERNAL_COLUMNS.append('has_neighbor_within_radius') # Calculating the radius in decimal degrees self.provider_radius = {} @@ -187,8 +241,8 @@ class DowgradedObservationsKiwisFilter(ObservationsKiwisFilter): downgraded station from all the dataframes. """ - def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): - super().__init__(filter_columns, filter_args) + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False): + super().__init__(filter_columns, filter_args, var_code, quiet_mode) self.filtered_station_ids = {} def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_data_frames: List[pd.DataFrame]) -> List[pd.DataFrame]: @@ -237,8 +291,8 @@ class DowgradedDailyTo6HourlyObservationsKiwisFilter(ObservationsKiwisFilter): downgraded station from all the dataframes. """ - def __init__(self, filter_columns: dict = {}, filter_args: dict = {}): - super().__init__(filter_columns, filter_args) + def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: str = '', quiet_mode: bool = False): + super().__init__(filter_columns, filter_args, var_code, quiet_mode) self.INTERNAL_COLUMNS.append('has_neighbor_within_radius_complete_6h') self.kiwis_24h_dataframe = None self.df_24h_without_neighbors = None @@ -338,9 +392,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat # Get all 6hourly slots aggregated self.all_6h_df = self.get_all_6hourly_station_values_df(filtered_data_frames) -# # TODO: Delete -# # self.all_6h_df.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/all_6h_df.tsv', index=True, header=True, sep="\t") - # Select the daily stations from the providers that do not contain a 6hourly station within radius and decumulate # its value dividing by the number of 6hourly slots. These will be inserted directly in all the 6hourly slots tree_all_6h = cKDTree(self.all_6h_df[[self.COL_LON, self.COL_LAT]]) @@ -349,10 +400,7 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat self.kiwis_24h_dataframe = self.update_column_if_provider_stations_are_in_radius(df=self.kiwis_24h_dataframe, tree=tree_all_6h) df_24h_from_providers = self.kiwis_24h_dataframe[self.kiwis_24h_dataframe[self.COL_PROVIDER_ID].isin(providers_ids)] self.df_24h_without_neighbors = df_24h_from_providers[df_24h_from_providers['has_neighbor_within_radius'] == False] - -# # TODO: Delete -# self.df_24h_without_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_without_neighbors1.tsv', index=True, header=True, sep="\t") - + # From the station without neighbors remove MARS stations that have a DWDSynop with the same WMO Number because # they are actually the same station even though they might not be exactly within the defined radius. wmo_nums_of_6h_dwd_stations = self.all_6h_df.loc[self.all_6h_df[self.COL_PROVIDER_ID] == self.PROVIDER_DWD_SYNOP, self.COL_STATION_NUM].values @@ -365,10 +413,7 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat # Decumulate the values of stations without neighbors self.df_24h_without_neighbors[self.COL_VALUE] /= self.num_6h_slots - -# # TODO: Delete -# self.df_24h_without_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_without_neighbors2_and_without_dwdSynop_with_same_WMO_decumulated.tsv', index=True, header=True, sep="\t") - + # Remove the daily rows that have a complete 6hourly station in the radius because there is no need to decumulate in these cases complete_6h_df = self.all_6h_df.loc[self.all_6h_df['count_6h_slots'] == self.num_6h_slots] self.df_24h_with_neighbors = df_24h_from_providers[df_24h_from_providers['has_neighbor_within_radius'] == True] @@ -378,9 +423,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat column_to_update='has_neighbor_within_radius_complete_6h') self.df_24h_with_neighbors = self.df_24h_with_neighbors[self.df_24h_with_neighbors['has_neighbor_within_radius_complete_6h'] == False] -# # TODO: Delete -# self.df_24h_with_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_with_neighbors1_and_incomplete_6h_slots.tsv', index=True, header=True, sep="\t") - # Change the 24h stations corresponding with the 6h stations containing missing values by changing its value using the formula # (PR - Sum(PR6)) / (number of missing values), if and only if the resulting value is positive (>=0). missing_6h_slots_df = self.all_6h_df.loc[self.all_6h_df['count_6h_slots'] < self.num_6h_slots] @@ -393,9 +435,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat axis=1, tree=tree_missing_6h, provider_id=provider_id, radius=radius, stations_6h_df=missing_6h_slots_df) self.df_24h_with_neighbors = self.df_24h_with_neighbors.loc[self.df_24h_with_neighbors[self.COL_VALUE] >= 0.0] - -# # TODO: Delete -# self.df_24h_with_neighbors.to_csv('/mnt/nahaUsers/gomesgo/EMDCC-1444/decumulation_output/df_24h_with_neighbors2_and_incomplete_6h_slots_decumulated.tsv', index=True, header=True, sep="\t") # Clean the dataframes of internal columns before merging them self.df_24h_without_neighbors = self.df_24h_without_neighbors.drop(columns=self.INTERNAL_COLUMNS, errors='ignore') diff --git a/src/lisfloodutilities/gridding/lib/utils.py b/src/lisfloodutilities/gridding/lib/utils.py index 50db754..9959c37 100755 --- a/src/lisfloodutilities/gridding/lib/utils.py +++ b/src/lisfloodutilities/gridding/lib/utils.py @@ -626,7 +626,7 @@ def __get_filter_classes(self) -> list: class_name = plugin class_args = plugins[plugin] module = importlib.import_module(module_name) - class_instance = getattr(module, class_name)(plugins_columns, class_args) + class_instance = getattr(module, class_name)(plugins_columns, class_args, self.var_code, self.quiet_mode) plugins_array.append(class_instance) except ImportError: print(f"Error: Could not import module '{module_name}'") diff --git a/src/lisfloodutilities/gridding/lib/writers.py b/src/lisfloodutilities/gridding/lib/writers.py index 38ca09f..513c565 100644 --- a/src/lisfloodutilities/gridding/lib/writers.py +++ b/src/lisfloodutilities/gridding/lib/writers.py @@ -49,6 +49,25 @@ def __setup_metadata(self): self.cell_methods = self.conf.get_config_field('PROPERTIES', 'CELL_METHODS') self.units = self.conf.get_config_field('PROPERTIES', 'UNIT') + def setNaN(self, value, defaultNaN=np.nan): + try: + value[value==1e31] = defaultNaN + except Exception as e: + self.print_msg(f"value==1e31 : {str(e)}") + try: + value[value==self.conf.VALUE_NAN] = defaultNaN + except Exception as e: + self.print_msg(f"value=={self.conf.VALUE_NAN} : {str(e)}") + try: + value[value==-32768.0] = defaultNaN + except Exception as e: + self.print_msg(f"value==-32768.0 : {str(e)}") + try: + value[value==31082] = defaultNaN + except Exception as e: + self.print_msg(f"value==31082 : {str(e)}") + return value + def open(self, out_filename: Path): self.filepath = out_filename self.time_created = timex.ctime(timex.time()) @@ -98,25 +117,6 @@ def open(self, out_filename: Path): else: raise ArgumentTypeError(f'File {self.filepath} already exists. Use --force flag to append.') - def setNaN(self, value, defaultNaN=np.nan): - try: - value[value==1e31] = defaultNaN - except Exception as e: - self.print_msg(f"value==1e31 : {str(e)}") - try: - value[value==self.conf.VALUE_NAN] = defaultNaN - except Exception as e: - self.print_msg(f"value=={self.conf.VALUE_NAN} : {str(e)}") - try: - value[value==-32768.0] = defaultNaN - except Exception as e: - self.print_msg(f"value==-32768.0 : {str(e)}") - try: - value[value==31082] = defaultNaN - except Exception as e: - self.print_msg(f"value==31082 : {str(e)}") - return value - def write(self, grid: np.ndarray, timestamp: datetime = None): timestep = -1 if timestamp is not None: diff --git a/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py b/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py index d67eb8c..d2973de 100644 --- a/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py +++ b/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py @@ -25,11 +25,11 @@ from lisfloodutilities.gridding.lib.utils import FileUtils -def run(infolder: str, outfile: str, search_string: str): - inwildcard = '*.log' - +def have_the_same_columns(df1: pd.DataFrame, df2: pd.DataFrame): + return all(df1.columns.isin(df2.columns)) + +def run(infolder: str, outfile: str, search_string: str, inwildcard: str = '*.log'): out_df = None - outfilepath = Path(outfile) # Create the output parent folders if not exist yet Path(outfilepath.parent).mkdir(parents=True, exist_ok=True) @@ -43,12 +43,16 @@ def run(infolder: str, outfile: str, search_string: str): if len(stats_dictionaries) > 0: df = pd.DataFrame(stats_dictionaries) if out_df is None: - out_df = df + if not df.empty: + out_df = df + elif not df.empty and have_the_same_columns(out_df, df): + out_df = pd.concat([out_df], df) else: - out_df = pd.concat(out_df, df) + print('ERROR: Both datasets do not have the same columns') if out_df is None or out_df.empty: print('WARNING: No lines containing statistics where found.') else: + out_df = out_df.drop_duplicates() out_df.to_csv(outfilepath, index=False, header=True, sep='\t') print(f'Wrote file: {outfilepath}') print(out_df) @@ -83,7 +87,8 @@ def main(argv): parser = ArgumentParser(epilog=program_license, description=program_version_string+program_longdesc) # set defaults - parser.set_defaults(search_string='#APP_STATS: ') + parser.set_defaults(search_string='#APP_STATS: ', + input_wildcard='*.log') parser.add_argument("-i", "--in", dest="infolder", required=True, type=FileUtils.folder_type, help="Set input folder path with log files (*.log)", @@ -91,6 +96,9 @@ def main(argv): parser.add_argument("-o", "--out", dest="outfile", required=True, type=FileUtils.file_type, help="Set output file name (*.tsv).", metavar="/path/to/output_file.tsv") + parser.add_argument("-w", "--wildcard", dest="input_wildcard", required=False, type=str, + help=('Set the input wildcard to filter out the log files to be processed.'), + metavar='*.log') parser.add_argument("-s", "--search", dest="search_string", required=False, type=str, help=('Set line tag that identifies the statistics dictionary. ' 'It will be used to parse the line, so the space at the end is necessary.'), @@ -101,9 +109,10 @@ def main(argv): print(f"Input Folder: {args.infolder}") print(f"Output Filer: {args.outfile}") + print(f"Input Wildcard: {args.input_wildcard}") print(f'Search String: [{args.search_string}]') - run(args.infolder, args.outfile, args.search_string) + run(args.infolder, args.outfile, args.search_string, args.input_wildcard) print("Finished.") except Exception as e: indent = len(program_name) * " "