Skip to content

Commit

Permalink
Follow-up 1318: Fix QualX fallback with default speedup and duration …
Browse files Browse the repository at this point in the history
…columns (NVIDIA#1330)

* Handle fallbacks from QualX

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>

* Fix comment

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>

* Remove try-except for ScanTblError

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>

---------

Signed-off-by: Partho Sarthi <psarthi@nvidia.com>
  • Loading branch information
parthosa authored Sep 6, 2024
1 parent ca97bc1 commit 277e951
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 43 deletions.
61 changes: 37 additions & 24 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,12 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame,
df = self.__update_apps_with_prediction_info(df,
self.ctxt.get_ctxt('estimationModelArgs'))
except Exception as e: # pylint: disable=broad-except
self.logger.error('Unable to use XGBoost estimation model for speed ups. '
'Falling-back to default model. Reason - %s:%s', type(e).__name__, e)
# If an error occurs while updating the apps with prediction info (speedups and durations),
# raise an error and stop the execution as the tool cannot continue without this information.
raise RuntimeError(
'Failed to use XGBoost estimation model for speedups. Qualification tool cannot continue. '
f'Reason - {type(e).__name__}: {e}'
) from e

# 2. Operations related to cluster information
try:
Expand Down Expand Up @@ -512,32 +516,41 @@ def __update_apps_with_prediction_info(self,
model_name = self.ctxt.platform.get_prediction_model_name()
qual_output_dir = self.ctxt.get_local('outputFolder')
output_info = self.__build_prediction_output_files_info()
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'])
try:
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'])
except Exception as e: # pylint: disable=broad-except
predictions_df = pd.DataFrame()
self.logger.error(
'Failed to execute the prediction model. Using default speed up of 1.0 for all apps. '
'Reason - %s:%s', type(e).__name__, e)

if predictions_df.empty:
return all_apps

result_info = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult')
# Merge with a left join to include all rows from all apps and relevant rows from model predictions
result_df = pd.merge(all_apps, predictions_df[result_info['subsetColumns']],
how='left', left_on='App ID', right_on='appId')
# Update columns in all apps with values from corresponding XGBoost columns.
for remap_column in result_info['remapColumns']:
src_col, dst_col = remap_column['srcCol'], remap_column['dstCol']
if src_col in result_df and dst_col in result_df:
result_df[dst_col] = result_df[src_col]
result_df.rename(columns={'speedup': 'Estimated GPU Speedup'},
inplace=True)
# if the qualx does not have a speedup value, default to 1.0
result_df['Estimated GPU Speedup'].fillna(1.0, inplace=True)
# if the qualx does not have a speedup value, default to App Duration
result_df['Estimated GPU Duration'].fillna(result_df['App Duration'], inplace=True)
result_df = all_apps.copy()
# If the prediction model fails, set the estimated GPU speedup to 1.0 and the estimated GPU duration to
# the app duration.
result_df['Estimated GPU Speedup'] = 1.0
result_df['Estimated GPU Duration'] = result_df['App Duration']
else:
result_info = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult')
# Merge with a left join to include all rows from all apps and relevant rows from model predictions
result_df = pd.merge(all_apps, predictions_df[result_info['subsetColumns']],
how='left', left_on='App ID', right_on='appId')
# Replace columns in all apps with values from corresponding XGBoost columns.
for remap_column in result_info['remapColumns']:
src_col, dst_col = remap_column['srcCol'], remap_column['dstCol']
# Drop the dest column if it exists
result_df.drop(columns=dst_col, errors='ignore', inplace=True)
# Rename the source column to the destination column
result_df.rename(columns={src_col: dst_col}, errors='ignore', inplace=True)
# if the qualx does not have a speedup value, default to 1.0
result_df['Estimated GPU Speedup'].fillna(1.0, inplace=True)
# if the qualx does not have a duration value, default to App Duration
result_df['Estimated GPU Duration'].fillna(result_df['App Duration'], inplace=True)
# We need to be careful about other columns that depend on remapped columns
result_df['Estimated GPU Time Saved'] = result_df['App Duration'] - result_df['Estimated GPU Duration']
# drop the subset_cols and ignore the errors in case some columns within the subset got renamed.
return result_df.drop(columns=result_info['subsetColumns'], errors='ignore')
return result_df

def _write_app_metadata(self, tools_processed_apps: pd.DataFrame,
metadata_file_info: dict, config_recommendations_dir_info: dict) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ local:
- 'speedup'
- 'appDuration_pred'
remapColumns:
- srcCol: 'speedup'
dstCol: 'Estimated GPU Speedup'
- srcCol: 'appDuration_pred'
dstCol: 'Estimated GPU Duration'
clusterInference:
Expand Down
30 changes: 11 additions & 19 deletions user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
load_profiles,
load_qtool_execs,
load_qual_csv,
PREPROCESSED_FILE,
ScanTblError
PREPROCESSED_FILE
)
from spark_rapids_tools.tools.qualx.model import (
extract_model_features,
Expand Down Expand Up @@ -573,24 +572,17 @@ def predict(
'platform': platform,
}

profile_df = pd.DataFrame()
try:
logger.info('Loading dataset: %s', dataset_name)
profile_df = load_profiles(
datasets=datasets,
node_level_supp=node_level_supp,
qual_tool_filter=qual_tool_filter,
qual_tool_output=qual_tool_output
)
# reset appName to original
profile_df['appName'] = profile_df['appId'].map(app_id_name_map)
except ScanTblError:
# ignore
logger.error('Skipping invalid dataset: %s', dataset_name)

logger.info('Loading dataset: %s', dataset_name)
profile_df = load_profiles(
datasets=datasets,
node_level_supp=node_level_supp,
qual_tool_filter=qual_tool_filter,
qual_tool_output=qual_tool_output
)
if profile_df.empty:
# this is an error condition, and we should not fall back to the default predictions.
raise ValueError('Data preprocessing resulted in an empty dataset. Speedup predictions will be skipped.')
raise ValueError('Data preprocessing resulted in an empty dataset. Speedup predictions will default to 1.0.')
# reset appName to original
profile_df['appName'] = profile_df['appId'].map(app_id_name_map)

filter_str = (
f'with {qual_tool_filter} filtering'
Expand Down

0 comments on commit 277e951

Please sign in to comment.