From 277e9511401761b23cb55ccc19c71bbf8617d13b Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 6 Sep 2024 08:03:21 -0700 Subject: [PATCH] Follow-up 1318: Fix QualX fallback with default speedup and duration columns (#1330) * Handle fallbacks from QualX Signed-off-by: Partho Sarthi * Fix comment Signed-off-by: Partho Sarthi * Remove try-except for ScanTblError Signed-off-by: Partho Sarthi --------- Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 61 +++++++++++-------- .../resources/qualification-conf.yaml | 2 + .../tools/qualx/qualx_main.py | 30 ++++----- 3 files changed, 50 insertions(+), 43 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 0f0cad3da..18d0271ac 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -370,8 +370,12 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame, df = self.__update_apps_with_prediction_info(df, self.ctxt.get_ctxt('estimationModelArgs')) except Exception as e: # pylint: disable=broad-except - self.logger.error('Unable to use XGBoost estimation model for speed ups. ' - 'Falling-back to default model. Reason - %s:%s', type(e).__name__, e) + # If an error occurs while updating the apps with prediction info (speedups and durations), + # raise an error and stop the execution as the tool cannot continue without this information. + raise RuntimeError( + 'Failed to use XGBoost estimation model for speedups. Qualification tool cannot continue. ' + f'Reason - {type(e).__name__}: {e}' + ) from e # 2. Operations related to cluster information try: @@ -512,32 +516,41 @@ def __update_apps_with_prediction_info(self, model_name = self.ctxt.platform.get_prediction_model_name() qual_output_dir = self.ctxt.get_local('outputFolder') output_info = self.__build_prediction_output_files_info() - predictions_df = predict(platform=model_name, qual=qual_output_dir, - output_info=output_info, - model=estimation_model_args['customModelFile']) + try: + predictions_df = predict(platform=model_name, qual=qual_output_dir, + output_info=output_info, + model=estimation_model_args['customModelFile']) + except Exception as e: # pylint: disable=broad-except + predictions_df = pd.DataFrame() + self.logger.error( + 'Failed to execute the prediction model. Using default speed up of 1.0 for all apps. ' + 'Reason - %s:%s', type(e).__name__, e) if predictions_df.empty: - return all_apps - - result_info = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult') - # Merge with a left join to include all rows from all apps and relevant rows from model predictions - result_df = pd.merge(all_apps, predictions_df[result_info['subsetColumns']], - how='left', left_on='App ID', right_on='appId') - # Update columns in all apps with values from corresponding XGBoost columns. - for remap_column in result_info['remapColumns']: - src_col, dst_col = remap_column['srcCol'], remap_column['dstCol'] - if src_col in result_df and dst_col in result_df: - result_df[dst_col] = result_df[src_col] - result_df.rename(columns={'speedup': 'Estimated GPU Speedup'}, - inplace=True) - # if the qualx does not have a speedup value, default to 1.0 - result_df['Estimated GPU Speedup'].fillna(1.0, inplace=True) - # if the qualx does not have a speedup value, default to App Duration - result_df['Estimated GPU Duration'].fillna(result_df['App Duration'], inplace=True) + result_df = all_apps.copy() + # If the prediction model fails, set the estimated GPU speedup to 1.0 and the estimated GPU duration to + # the app duration. + result_df['Estimated GPU Speedup'] = 1.0 + result_df['Estimated GPU Duration'] = result_df['App Duration'] + else: + result_info = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult') + # Merge with a left join to include all rows from all apps and relevant rows from model predictions + result_df = pd.merge(all_apps, predictions_df[result_info['subsetColumns']], + how='left', left_on='App ID', right_on='appId') + # Replace columns in all apps with values from corresponding XGBoost columns. + for remap_column in result_info['remapColumns']: + src_col, dst_col = remap_column['srcCol'], remap_column['dstCol'] + # Drop the dest column if it exists + result_df.drop(columns=dst_col, errors='ignore', inplace=True) + # Rename the source column to the destination column + result_df.rename(columns={src_col: dst_col}, errors='ignore', inplace=True) + # if the qualx does not have a speedup value, default to 1.0 + result_df['Estimated GPU Speedup'].fillna(1.0, inplace=True) + # if the qualx does not have a duration value, default to App Duration + result_df['Estimated GPU Duration'].fillna(result_df['App Duration'], inplace=True) # We need to be careful about other columns that depend on remapped columns result_df['Estimated GPU Time Saved'] = result_df['App Duration'] - result_df['Estimated GPU Duration'] - # drop the subset_cols and ignore the errors in case some columns within the subset got renamed. - return result_df.drop(columns=result_info['subsetColumns'], errors='ignore') + return result_df def _write_app_metadata(self, tools_processed_apps: pd.DataFrame, metadata_file_info: dict, config_recommendations_dir_info: dict) -> None: diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 28d07cb61..e23034736 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -310,6 +310,8 @@ local: - 'speedup' - 'appDuration_pred' remapColumns: + - srcCol: 'speedup' + dstCol: 'Estimated GPU Speedup' - srcCol: 'appDuration_pred' dstCol: 'Estimated GPU Duration' clusterInference: diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py index 6a63afb5a..880fe7714 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py @@ -33,8 +33,7 @@ load_profiles, load_qtool_execs, load_qual_csv, - PREPROCESSED_FILE, - ScanTblError + PREPROCESSED_FILE ) from spark_rapids_tools.tools.qualx.model import ( extract_model_features, @@ -573,24 +572,17 @@ def predict( 'platform': platform, } - profile_df = pd.DataFrame() - try: - logger.info('Loading dataset: %s', dataset_name) - profile_df = load_profiles( - datasets=datasets, - node_level_supp=node_level_supp, - qual_tool_filter=qual_tool_filter, - qual_tool_output=qual_tool_output - ) - # reset appName to original - profile_df['appName'] = profile_df['appId'].map(app_id_name_map) - except ScanTblError: - # ignore - logger.error('Skipping invalid dataset: %s', dataset_name) - + logger.info('Loading dataset: %s', dataset_name) + profile_df = load_profiles( + datasets=datasets, + node_level_supp=node_level_supp, + qual_tool_filter=qual_tool_filter, + qual_tool_output=qual_tool_output + ) if profile_df.empty: - # this is an error condition, and we should not fall back to the default predictions. - raise ValueError('Data preprocessing resulted in an empty dataset. Speedup predictions will be skipped.') + raise ValueError('Data preprocessing resulted in an empty dataset. Speedup predictions will default to 1.0.') + # reset appName to original + profile_df['appName'] = profile_df['appId'].map(app_id_name_map) filter_str = ( f'with {qual_tool_filter} filtering'