From 105cdcc5c16755a37e032b3c05dcfa018ef8e69b Mon Sep 17 00:00:00 2001 From: gnrgomes Date: Fri, 14 Jun 2024 17:18:47 +0200 Subject: [PATCH] Correct stats creation. --- src/lisfloodutilities/gridding/lib/filters.py | 72 ++++++++++--------- .../gridding/tools/analyse_incidents.py | 17 +++-- .../gridding/tools/read_stats_from_logs.py | 23 ++++++ 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/src/lisfloodutilities/gridding/lib/filters.py b/src/lisfloodutilities/gridding/lib/filters.py index fc5211b..a912eaf 100644 --- a/src/lisfloodutilities/gridding/lib/filters.py +++ b/src/lisfloodutilities/gridding/lib/filters.py @@ -28,6 +28,7 @@ def __init__(self, filter_columns: dict = {}, filter_args: dict = {}, var_code: self.setup_column_names() self.OUTPUT_COLUMNS = [self.COL_LON, self.COL_LAT, self.COL_VALUE] self.INTERNAL_COLUMNS = [f'{self.COL_STATION_DIARY_STATUS}_INTERNAL', f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] + self.print_stats = False def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_data_frames: List[pd.DataFrame]) -> List[pd.DataFrame]: """ @@ -39,8 +40,10 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat for file_path in kiwis_files: if len(kiwis_data_frames) > 0: df_kiwis = kiwis_data_frames[i] + self.print_stats = False else: df_kiwis = pd.read_csv(file_path, sep="\t") + self.print_stats = True self.cur_timestamp = dt.strptime(f'{kiwis_timestamps[i]}', "%Y%m%d%H%M") df_kiwis = self.apply_filter(df_kiwis) filtered_data_frames.append(df_kiwis) @@ -64,7 +67,7 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: (df[f'{self.COL_NO_GRIDDING}'] == 'no') & (df[f'{self.COL_IS_IN_DOMAIN}'] == 'yes') & (df[f'{self.COL_EXCLUDE}'] != 'yes') & (df[f'{self.COL_STATION_DIARY_STATUS}_INTERNAL'] == 1) & (df[f'{self.COL_INACTIVE_HISTORIC}_INTERNAL'] == 1)] - self.print_statistics(df) + self.print_statistics(df, force_print=self.print_stats) # Show only codes valid and suspicious df = df.loc[((df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_VALID) | (df[f'{self.COL_QUALITY_CODE}'] == self.QUALITY_CODE_SUSPICIOUS))] @@ -77,37 +80,39 @@ def get_totals_by_quality_code(row: pd.Series, column_quality_code: str, quality return row['count'] return 0 - def print_statistics(self, df: pd.DataFrame): - timestamp = self.cur_timestamp.strftime('%Y-%m-%d %H:%M:%S') - new_df = df.groupby([self.COL_PROVIDER_ID, self.COL_QUALITY_CODE]).size().reset_index(name='count') - # Transpose the quality codes - new_df[self.QUALITY_CODE_VALID] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, - column_quality_code=self.COL_QUALITY_CODE, - quality_code=self.QUALITY_CODE_VALID) - new_df[self.QUALITY_CODE_SUSPICIOUS] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, - column_quality_code=self.COL_QUALITY_CODE, - quality_code=self.QUALITY_CODE_SUSPICIOUS) - new_df[self.QUALITY_CODE_WRONG] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, - column_quality_code=self.COL_QUALITY_CODE, - quality_code=self.QUALITY_CODE_WRONG) - new_df.drop(columns=[self.COL_QUALITY_CODE, 'count'], inplace=True) - new_df = new_df.groupby(self.COL_PROVIDER_ID)[[self.QUALITY_CODE_VALID, - self.QUALITY_CODE_SUSPICIOUS, - self.QUALITY_CODE_WRONG]].sum() - new_df.reset_index(inplace=True) - for index, row in new_df.iterrows(): - provider_id = row[self.COL_PROVIDER_ID] - quality_code_valid = row[self.QUALITY_CODE_VALID] - quality_code_suspicious = row[self.QUALITY_CODE_SUSPICIOUS] - quality_code_wrong = row[self.QUALITY_CODE_WRONG] - total = quality_code_valid + quality_code_suspicious + quality_code_wrong - stats_string = ( - f'#KIWIS_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{self.var_code}", ' - f'"PROVIDER_ID": {provider_id}, "QUALITY_CODE_VALID": {quality_code_valid}, ' - f'"QUALITY_CODE_SUSPICIOUS": {quality_code_suspicious}, "QUALITY_CODE_WRONG": {quality_code_wrong}, ' - f'"TOTAL_OBSERVATIONS": {total}}}' - ) - self.print_msg(stats_string) + def print_statistics(self, df: pd.DataFrame, stats_tag: str = 'KIWIS_STATS', force_print: bool = True): + # print only once when reading a file for the first time + if force_print: + timestamp = self.cur_timestamp.strftime('%Y-%m-%d %H:%M:%S') + new_df = df.groupby([self.COL_PROVIDER_ID, self.COL_QUALITY_CODE]).size().reset_index(name='count') + # Transpose the quality codes + new_df[self.QUALITY_CODE_VALID] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, + column_quality_code=self.COL_QUALITY_CODE, + quality_code=self.QUALITY_CODE_VALID) + new_df[self.QUALITY_CODE_SUSPICIOUS] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, + column_quality_code=self.COL_QUALITY_CODE, + quality_code=self.QUALITY_CODE_SUSPICIOUS) + new_df[self.QUALITY_CODE_WRONG] = new_df.apply(KiwisFilter.get_totals_by_quality_code, axis=1, + column_quality_code=self.COL_QUALITY_CODE, + quality_code=self.QUALITY_CODE_WRONG) + new_df.drop(columns=[self.COL_QUALITY_CODE, 'count'], inplace=True) + new_df = new_df.groupby(self.COL_PROVIDER_ID)[[self.QUALITY_CODE_VALID, + self.QUALITY_CODE_SUSPICIOUS, + self.QUALITY_CODE_WRONG]].sum() + new_df.reset_index(inplace=True) + for index, row in new_df.iterrows(): + provider_id = row[self.COL_PROVIDER_ID] + quality_code_valid = row[self.QUALITY_CODE_VALID] + quality_code_suspicious = row[self.QUALITY_CODE_SUSPICIOUS] + quality_code_wrong = row[self.QUALITY_CODE_WRONG] + total = quality_code_valid + quality_code_suspicious + quality_code_wrong + stats_string = ( + f'#KIWIS_STATS: {{"TIMESTAMP": "{timestamp}", "VAR_CODE": "{self.var_code}", ' + f'"PROVIDER_ID": {provider_id}, "QUALITY_CODE_VALID": {quality_code_valid}, ' + f'"QUALITY_CODE_SUSPICIOUS": {quality_code_suspicious}, "QUALITY_CODE_WRONG": {quality_code_wrong}, ' + f'"TOTAL_OBSERVATIONS": {total}}}' + ) + self.print_msg(stats_string) def print_msg(self, msg: str = ''): if not self.quiet_mode: @@ -217,6 +222,7 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: df['has_neighbor_within_radius'] = df.apply(self.has_neighbor_within_radius_from_other_providers, axis=1, tree=tree, provider_id=provider_id, radius=radius) df = df.loc[~(df['has_neighbor_within_radius'])] + self.print_statistics(df) return df def has_neighbor_within_radius_from_other_providers(self, row: pd.Series, tree: cKDTree = None, provider_id: int = 0, @@ -270,6 +276,7 @@ def apply_filter(self, df: pd.DataFrame) -> pd.DataFrame: axis=1, tree=tree, provider_id=provider_id, radius=radius) self.set_filtered_stations(df) df = df.loc[~(df['has_neighbor_within_radius'])] + self.print_statistics(df) return df def set_filtered_stations(self, df: pd.DataFrame): @@ -455,5 +462,6 @@ def filter(self, kiwis_files: List[Path], kiwis_timestamps: List[str], kiwis_dat # Append both decumulated 24h dataframes to the 6h slot df_filtered = pd.concat([df, self.df_24h_without_neighbors, df_decumulated_24h]) return_data_frames.append(df_filtered) + self.print_statistics(df_filtered) return return_data_frames diff --git a/src/lisfloodutilities/gridding/tools/analyse_incidents.py b/src/lisfloodutilities/gridding/tools/analyse_incidents.py index 78e3d0e..31a987b 100644 --- a/src/lisfloodutilities/gridding/tools/analyse_incidents.py +++ b/src/lisfloodutilities/gridding/tools/analyse_incidents.py @@ -104,8 +104,8 @@ def run(infolder: str, outfolder: str): outfilepath = Path(outfile) # Create the output parent folders if not exist yet Path(outfilepath.parent).mkdir(parents=True, exist_ok=True) - df = pd.read_csv(filename, delimiter=DELIMITER_INPUT) - df = df.astype({COL_INCIDENTS: 'str'}) + df = pd.read_csv(filename, delimiter=DELIMITER_INPUT, low_memory=False) + df = df.astype({COL_PROVIDER_ID: 'str', COL_INCIDENTS: 'str'}) incident_type_columns = {} df[COL_TOTAL_INCIDENTS] = df.apply(get_total_incidents, axis=1) @@ -166,8 +166,7 @@ def main(argv): See the Licence for the specific language governing permissions and limitations under the Licence. """ - # try: - if True: + try: # setup option parser parser = ArgumentParser(epilog=program_license, description=program_version_string+program_longdesc) @@ -189,11 +188,11 @@ def main(argv): run(args.infolder, args.outfolder) print("Finished.") - # except Exception as e: - # indent = len(program_name) * " " - # sys.stderr.write(program_name + ": " + repr(e) + "\n") - # sys.stderr.write(indent + " for help use --help") - # return 2 + except Exception as e: + indent = len(program_name) * " " + sys.stderr.write(program_name + ": " + repr(e) + "\n") + sys.stderr.write(indent + " for help use --help") + return 2 def main_script(): diff --git a/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py b/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py index d2973de..b0efa78 100644 --- a/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py +++ b/src/lisfloodutilities/gridding/tools/read_stats_from_logs.py @@ -1,4 +1,5 @@ from dask.dataframe.io.tests.test_json import df +from pandas.tests.io.test_fsspec import df1 __author__="Goncalo Gomes" __date__="$Jun 06, 2024 10:45:00$" __version__="0.1" @@ -28,6 +29,27 @@ def have_the_same_columns(df1: pd.DataFrame, df2: pd.DataFrame): return all(df1.columns.isin(df2.columns)) +def merge_kiwis_stats(df: pd.DataFrame, search_string: str = ''): + ''' + KIWIS Stats will contain several rows from different KiwisFilters each one filtering more and more observations. + That is why we need to get the minimum out of the VALID and SUSPICIOUS observations, but for the WRONG ones they + are filtered in the first interaction with the KiwisFilter class. + ''' + KWIWS_SEARCH_STRING = '#KIWIS_STATS: ' + result_df = df + if search_string == KWIWS_SEARCH_STRING: + group_cols = ['TIMESTAMP', 'VAR_CODE', 'PROVIDER_ID'] + agg_dict = {'QUALITY_CODE_VALID': 'min', 'QUALITY_CODE_SUSPICIOUS': 'min', + 'QUALITY_CODE_WRONG': 'max', 'TOTAL_OBSERVATIONS': 'max'} + result_df = result_df.groupby(group_cols).agg(agg_dict).reset_index() + result_df.columns = ['TIMESTAMP', 'VAR_CODE', 'PROVIDER_ID', 'QUALITY_CODE_VALID', + 'QUALITY_CODE_SUSPICIOUS', 'QUALITY_CODE_WRONG', 'TOTAL_OBSERVATIONS'] + result_df.reset_index(drop=True, inplace=True) + # Recalculate the total + result_df['TOTAL_OBSERVATIONS'] = result_df['QUALITY_CODE_VALID'] + result_df['QUALITY_CODE_SUSPICIOUS'] + result_df['QUALITY_CODE_WRONG'] + return result_df + + def run(infolder: str, outfile: str, search_string: str, inwildcard: str = '*.log'): out_df = None outfilepath = Path(outfile) @@ -53,6 +75,7 @@ def run(infolder: str, outfile: str, search_string: str, inwildcard: str = '*.lo print('WARNING: No lines containing statistics where found.') else: out_df = out_df.drop_duplicates() + out_df = merge_kiwis_stats(out_df, search_string) out_df.to_csv(outfilepath, index=False, header=True, sep='\t') print(f'Wrote file: {outfilepath}') print(out_df)