Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds filter for failed and non completed stages #1558

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from

Conversation

sayedbilalbari
Copy link
Collaborator

Fixes #1552

Currently we store the stageInfo using the stageModelManager class where we map incoming stage information during the following events -

  1. doSparkListenerStageCompleted
  2. doSparkListenerStageSubmitted. -

So a stage information is updated once when a stage is submitted and once during completion. A stageCompleted event comes for an attempt for a failed stage as well ( eg - there will be two stage Submitted and StageCompleted events for stage that fails on first attempt and succeeds on attempt 2)
Both those stageInfo objects are updated in the map
This PR adds extra filters to calculate aggregate metrics only for stages that have not failed and have completed successfully.

Changes -

  1. Add a new method - getAllCompletedStages for filtering StageInfo objects on being successful and completed
  2. Changes previous usages of getAllStages to getAllCompletedStages

Effects -

This change now ensures that all stage level aggregate metrics generated only take into account the successful ( not failed and completed ) ones

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you see any change in the output as an impact of that change? If you a diff between the outputs of before/after is there any change in the metrics?

@@ -321,7 +321,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
app.stageManager.getAllCompletedStages.map { sm =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the Todo? If not we can replace it with explanation about why we are using getAllCompleteStages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the TODO

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO still there

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@amahussein amahussein added bug Something isn't working core_tools Scope the core module (scala) labels Feb 21, 2025
@eordentlich
Copy link
Collaborator

@leewyang can you take a look at this wrt qualx?

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Comment on lines 690 to 692
// This assert ensures only two StageAggTaskMetricsProfileResult which
// have the same id are aggregated. This is important because we retain the
// id and appIndex of the first stage.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make that a proper jdoc on top of the method with more details as much as we can.
I am not sure, the name of the method is easy to understand what this method does.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A killed status for a stage is not there. It can only exist for a Task.
Updated the method with the correct jdoc and the naming is now better.
Thanks for pointing it out.

@@ -321,7 +321,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
app.stageManager.getAllCompletedStages.map { sm =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO still there

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sayedbilalbari

It is interesting that this change did not trigger any change in the expected results of the unit tests. If that's the case, it will be great if we can add a test that shows this version is actually generating something different compared to the previous version.
Otherwise, it might be possible that the code had no impact at all.

@leewyang
Copy link
Collaborator

For qualx purposes, would there be a way to get the metrics associated with the failed stages/tasks, or will these be dropped entirely? For customers who care about the total time/resources used, it would be useful to get the metrics associated with successful stages/tasks and teh failed stages/tasks, so we could tell that a job spent 90% of it's time/resources on failed stages/stasks. Otherwise, the job would appear to only take 10% of the actual time/resources used in reality.

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
@sayedbilalbari
Copy link
Collaborator Author

Thanks @amahussein for reviewing.
For the killed/failed tasks, as mentioned before, the way accumulable come in is different from how we are currently parsing it. The logic is as follows-

  1. For killed tasks, the accumUpdates show up in the object event.reason.accumUpdates but not all of them are relevant. Ob further analysis, majority of them are zeros. They just denote the accumulables this task was supposed to deal with.
  2. But the accumulables where it actually made updates show up in the taskInfo.accumulables ( as mentioned above like peakExecutoinMemory, jvmGCTime etc. )
  3. So no extra logic is needed to parse the accumulableUpdates that come in with killedTasks, since we already parse the TaskInfo and use that to store relevant information.

As for no failure scenarios in the test, this change tackles two major scenarios -

  1. Multiple successful attempts for a stage - will update the test case for this ( have already tested that this gives correct and different results for event logs )
  2. Failed or non completed stages - test event logs did not have scenarios where a stage does not complete at all. There are scenarios where a primary attempt will fail but the secondary attempt will override the primary failure entry and hence no change in aggregated files is seen.

We can discuss this offline about what would be the best way to generate event logs like this for the test case.

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
@leewyang
Copy link
Collaborator

For qualx purposes, would there be a way to get the metrics associated with the failed stages/tasks, or will these be dropped entirely? For customers who care about the total time/resources used, it would be useful to get the metrics associated with successful stages/tasks and teh failed stages/tasks, so we could tell that a job spent 90% of it's time/resources on failed stages/stasks. Otherwise, the job would appear to only take 10% of the actual time/resources used in reality.

FYI, we have existing code that is trying to look at metrics for failed stages.

// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
app.stageManager.getAllSuccessfulStageAttempts.map { sm =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sm.duration, can we output both all stages and successFulStageAttempts? It's quite common to have failed stages for an in-product runs. For duration, we want to see the cost there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Aggregate metric per stage is missing filter for stage attempts
5 participants