-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds filter for failed and non completed stages #1558
base: dev
Are you sure you want to change the base?
Conversation
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you see any change in the output as an impact of that change? If you a diff between the outputs of before/after is there any change in the metrics?
@@ -321,7 +321,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
val emptyNodeNames = Seq.empty[String] | |||
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults] | |||
// TODO: this has stage attempts. we should handle different attempts | |||
app.stageManager.getAllStages.map { sm => | |||
app.stageManager.getAllCompletedStages.map { sm => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the Todo? If not we can replace it with explanation about why we are using getAllCompleteStages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have removed the TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO still there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Show resolved
Hide resolved
@leewyang can you take a look at this wrt qualx? |
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
…ids-tools into issue1552-bilal
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Show resolved
Hide resolved
// This assert ensures only two StageAggTaskMetricsProfileResult which | ||
// have the same id are aggregated. This is important because we retain the | ||
// id and appIndex of the first stage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make that a proper jdoc on top of the method with more details as much as we can.
I am not sure, the name of the method is easy to understand what this method does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A killed status for a stage is not there. It can only exist for a Task.
Updated the method with the correct jdoc and the naming is now better.
Thanks for pointing it out.
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModel.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
Outdated
Show resolved
Hide resolved
@@ -321,7 +321,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
val emptyNodeNames = Seq.empty[String] | |||
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults] | |||
// TODO: this has stage attempts. we should handle different attempts | |||
app.stageManager.getAllStages.map { sm => | |||
app.stageManager.getAllCompletedStages.map { sm => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO still there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sayedbilalbari
It is interesting that this change did not trigger any change in the expected results of the unit tests. If that's the case, it will be great if we can add a test that shows this version is actually generating something different compared to the previous version.
Otherwise, it might be possible that the code had no impact at all.
For qualx purposes, would there be a way to get the metrics associated with the failed stages/tasks, or will these be dropped entirely? For customers who care about the total time/resources used, it would be useful to get the metrics associated with successful stages/tasks and teh failed stages/tasks, so we could tell that a job spent 90% of it's time/resources on failed stages/stasks. Otherwise, the job would appear to only take 10% of the actual time/resources used in reality. |
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Thanks @amahussein for reviewing.
As for no failure scenarios in the test, this change tackles two major scenarios -
We can discuss this offline about what would be the best way to generate event logs like this for the test case. |
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
FYI, we have existing code that is trying to look at metrics for failed stages. |
// TODO: this has stage attempts. we should handle different attempts | ||
app.stageManager.getAllStages.map { sm => | ||
// TODO: Should we only consider successful tasks? | ||
app.stageManager.getAllSuccessfulStageAttempts.map { sm => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sm.duration
, can we output both all stages
and successFulStageAttempts
? It's quite common to have failed stages for an in-product runs. For duration, we want to see the cost there.
Fixes #1552
Currently we store the stageInfo using the stageModelManager class where we map incoming stage information during the following events -
spark-rapids-tools/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala
Line 475 in 1f037fa
spark-rapids-tools/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala
Line 464 in 1f037fa
So a stage information is updated once when a stage is submitted and once during completion. A stageCompleted event comes for an attempt for a failed stage as well ( eg - there will be two stage Submitted and StageCompleted events for stage that fails on first attempt and succeeds on attempt 2)
Both those stageInfo objects are updated in the map
This PR adds extra filters to calculate aggregate metrics only for stages that have not failed and have completed successfully.
Changes -
Effects -
This change now ensures that all stage level aggregate metrics generated only take into account the successful ( not failed and completed ) ones