Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider additional factors in spark.sql.shuffle.partitions recommendation in Autotuner #722

Merged
merged 14 commits into from
Mar 12, 2024

Conversation

cindyyuanjiang
Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang commented Jan 10, 2024

Fixes #575

Goal
We want to enhance the spark.sql.shuffle.partitions recommendation logic to include more factors to improve accuracy, e.g. GC time, data size, data skew, etc.

Changes
In this PR, we consider spilling and data skew in shuffle stages for spark.sql.shuffle.partitions recommendation logic. Specifically:

If spilling occurred in shuffle stages:

  1. If there is no shuffle skew, we recommend increasing spark.sql.shuffle.partitions(Previously we only check spilling in any stage).
  2. If there is shuffle skew (when task's Shuffle Read Size > 3 * Avg Stage-level size), we add a comment to highlight this and keep recommending default spark.sql.shuffle.partitions.

…ation

Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
@amahussein amahussein added core_tools Scope the core module (scala) bug Something isn't working labels Jan 10, 2024
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
@kuhushukla
Copy link
Collaborator

I will look at this closely. In the meantime, can we run some manual and even write unit tests that can show the before and after recommendations. Did we try this with different stage level spill metrics and observe better partition numbers being suggested?

@amahussein
Copy link
Collaborator

Is there any progress on this?

@cindyyuanjiang
Copy link
Collaborator Author

Is there any progress on this?

I will comment my test results, then mark this ready for review.

@cindyyuanjiang
Copy link
Collaborator Author

I will look at this closely. In the meantime, can we run some manual and even write unit tests that can show the before and after recommendations. Did we try this with different stage level spill metrics and observe better partition numbers being suggested?

Thanks @kuhushukla! I will comment on my observations.

@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Jan 20, 2024

Here is a comparison of the recommendations before and after this PR.
Before:
Screen Shot 2024-01-19 at 5 15 33 PM
After:
Screen Shot 2024-01-19 at 5 16 11 PM

cc: @kuhushukla @amahussein

@cindyyuanjiang cindyyuanjiang marked this pull request as ready for review January 20, 2024 01:20
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in the PR have no effect on the AutoTuner.
A unit-test would have helped to verify that the AutoTuner will recommend the default settings without running any calculations.

Comment on lines 825 to 837
val shuffleStageSpilledMetrics = appInfoProvider.getSpillMetricsForStage("shuffle")

val dataSkewStages = appInfoProvider.getDataSkewStages
val dataSkewShuffleStages = shuffleStageSpilledMetrics.collect({
case (id, _) if dataSkewStages.contains(id) => id
})
if (!dataSkewShuffleStages.isEmpty) {
appendOptionalComment(lookup, "There is data skew (when task's Shuffle Read Size > 3 * " +
"Avg Stage-level size) in shuffle stages.")
}

val totalShuffleSpilledMetrics = shuffleStageSpilledMetrics.unzip._2.sum
if (totalShuffleSpilledMetrics > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this code represents a dead-block that will never be executed.
isCalculationEnabled(lookup) always returns false for spark.sql.shuffle.partitions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have run some tests, and confirm this is not dead code. isCalculationEnabled(lookup) returns true for command below.
Command: spark_rapids_user_tools onprem profiling --worker_info <my-worker-info> --verbose --eventlogs <my-event-log> -t $TOOLS_REPO/spark-rapids-tools/core/target/rapids-4-spark-tools_2.12-23.12.4-SNAPSHOT.jar

@@ -52,6 +52,8 @@ trait AppInfoPropertyGetter {
trait AppInfoSqlTaskAggMetricsVisitor {
def getJvmGCFractions: Seq[Double]
def getSpilledMetrics: Seq[Long]
def getSpillMetricsForStage(stage: String): Seq[(Long, Long)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Since the map is not part of the function name, Please add a comment of what the map keys-values are expected to be
  • Why do we have two APIs getSpilledMetrics and getSpillMetricsForStage ?
  • The definition is confusing.
    When I see getSpillMetricsForStage then I expect that the method takes a stageID as an argument. But the callers pass "shuffle" which is not stage. It looks more like taking a certain predicate to filter the nodes. In that case, this should not be in the upper level of the Interface.
     // aggregates the sum of spill per stage and returns a map stageID to sum spill metrics
     def getSpillMetricsForShufflePerStage: Seq[(Long, Long)]
    

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this function. Thanks!

Comment on lines 159 to 160
app.skewInfo.collect({
case row if (row.taskShuffleReadMB > 3 * row.avgShuffleReadMB) => row.stageId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dow we need this condition? I thought that the purpose of ShuffleSkewProfileResult (see analysis code) is to filter tasks which have skew. So, we guarantee that any entry in that sequence exhibits skew.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will update this.

@@ -81,6 +83,8 @@ class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter
override def getRapidsJars: Seq[String] = Seq()
override def getDistinctLocationPct: Double = 0.0
override def getRedundantReadSize: Long = 0
override def getSpillMetricsForStage(stage: String): Seq[(Long, Long)] = Seq()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment on the super method

override def getSpillMetricsForStage(stage: String): Seq[(Long, Long)] = {
// Get ids for input specified stage
val stageIds = app.sqlStageInfo.collect ({
case row if (row.nodeNames.exists(name => name.toLowerCase.contains(stage))) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to match nodeNames on "shuffle"? Should we match on the metric type? For example "AverageStageInfo.avgShuffleRead"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have updated the condition to identify shuffle stages to depend on shuffle read/write.

@amahussein amahussein closed this Jan 24, 2024
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
parthosa
parthosa previously approved these changes Feb 27, 2024
Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang. LGTM.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping that in this iteration you investigate deeper my comment that the code was in a dead-block.
Anyway, I found that there was a bug in the way the arguments were processed in getRecommendations. This is a bug that I was hoping you address it in your PR after I mentioned the default arguments in
getRecommendations
I created a PR #812 to fix that bug.
The above PR #812 should get merged first because it is a bug that affects the path of code target of change here.

I don't have clues of how to verify the calculations. Most of the GPU eventlogs I tested recommends 200 which is the default spark.sql.shuffle.partitions hardcoded in the code.

@cindyyuanjiang
Copy link
Collaborator Author

I was hoping that in this iteration you investigate deeper my comment that the code was in a dead-block. Anyway, I found that there was a bug in the way the arguments were processed in getRecommendations.

Apologies, I misunderstood this in our previous discussion. Thank you for putting up a fix @amahussein!

I don't have clues of how to verify the calculations. Most of the GPU eventlogs I tested recommends 200 which is the default spark.sql.shuffle.partitions hardcoded in the code.

I will share my manually tested results here and follow up with them.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #812 is merged. @cindyyuanjiang please merge with dev and address the new comments.
We need a way to verify the calculations.
In worst case, We could use the mock classes to return literal numbers to verify the functionality.

shufflePartitions *= DEF_SHUFFLE_PARTITION_MULTIPLIER
// Could be memory instead of partitions
appendOptionalComment(lookup,
s"'$lookup' should be increased since spilling occurred.")
}

val shuffleSkewStages = appInfoProvider.getShuffleSkewStages
val shuffleSkewStagesWithSpill = appInfoProvider.getSpillMetricsForShuffleStages.filter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is redundant to call the same method twice appInfoProvider.getSpillMetricsForShuffleStages. One time to get the totalSpilledMetricsForShuffleStages and another one applying the filter.
It should be stored into a local variable.

shufflePartitions *= DEF_SHUFFLE_PARTITION_MULTIPLIER
// Could be memory instead of partitions
appendOptionalComment(lookup,
s"'$lookup' should be increased since spilling occurred.")
}

val shuffleSkewStages = appInfoProvider.getShuffleSkewStages
val shuffleSkewStagesWithSpill = appInfoProvider.getSpillMetricsForShuffleStages.filter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not look like we actually need the shuffleSkewStagesWithSpill. The following code only checks if it is empty or not.
A filter in that case is not suitable for performance reasons. The code should use a predicate like "exists" to return true once an item satisfies the condition.

}
if (shuffleSkewStagesWithSpill.nonEmpty) {
appendOptionalComment(lookup, "There is data skew (when task's Shuffle Read Size > 3 * " +
s"Avg Stage-level size) in shuffle stages. $lookup recommendation may be less effective.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"bla..bla...$lookup recommendation may be less effective."

I do not find this comment easy to understand. "Less effective", what does it mean for the user? Is there a better way to explain this? If we want to say that the recommendation is not that good in the user's case, then why do we recommend it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the message and the logic. Thanks!

val totalSpilledMetrics = appInfoProvider.getSpilledMetrics.sum
if (totalSpilledMetrics > 0) {
val totalSpilledMetricsForShuffleStages =
appInfoProvider.getSpillMetricsForShuffleStages.unzip._2.sum
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For performance reason, we don't need to get the sum. "sum" iterates on the entire list.
We only need to check if any of the values is larger than 0. A predicate like exists should work better.

Comment on lines 178 to 181
app.jsMetAgg.collect { case row if (row.id.contains("stage") &&
row.srTotalBytesReadSum + row.swBytesWrittenSum > 0) =>
(row.id.split("_")(1).toLong, row.diskBytesSpilledSum + row.memoryBytesSpilledSum)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nartal1 Can you help Cindy with making sure how to pick "shuffles"?
are row.srTotalBytesReadSum and swBytesWrittenSum exclusive to shuffle operations? IF not, then what is the right conditions to successfully visit all the shuffles?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

srTotalBytesReadSum is the sum of local and remote bytes read in shuffle metrics across all tasks and swBytesWrittenSum is the sum of bytes written for the shuffle across all tasks. The above condition is correct.
So this is getting the spilled metrics for those shuffle stages(which will have either shuffle read or write).

Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Mar 6, 2024

Note currently spark.sql.shuffle.partitions recommendation only recommends the default value 200. To enable the calculation based on changes in this PR, users have to override argument limitedLogicList passed to getRecommendedProperties.

However, users who use the tools jar cannot access the source code which makes the shuffle partition calculation a dead code block. To improve this, we can either take spark.sql.shuffle.partitions off the limitedLogicList or expose limitedLogicList to the users, e.g. as command line option. Opening the discussion here.

s"'$lookup' should be increased since spilling occurred.")
if (shuffleStagesWithPosSpilling.nonEmpty) {
val shuffleSkewStages = appInfoProvider.getShuffleSkewStages
if (shuffleSkewStages.exists(id => shuffleStagesWithPosSpilling.contains(id))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check would make sure that the autotuner would not recommend increasing the shuffle partitions if there is atleast one stage with skewed data. Was wondering instead of checking for atleast one, can we still recommend increasing number of shuffle pariitions until a certain threshold (Ex: ratio of number of number of stages with skew to total number of stages with spills ).
This can be a follow-on as in improvement if it takes more time to test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. What would be a good way to determine the threshold?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since both are lists of stage ids, can we use intersect for better performance?

Suggested change
if (shuffleSkewStages.exists(id => shuffleStagesWithPosSpilling.contains(id))) {
if (shuffleSkewStages.intersect(shuffleStagesWithPosSpilling).nonEmpty) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nartal1 Thanks! I think this would require more discussion/testing. I can file a follow up issue to track this when we are ready to merge this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parthosa Thanks! I used exists because it will short-circuit after it finds an element which is true for the condition, but intersect will go over the entire sequence. In that case, I think exists has better performance than intersect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory a.intersect(b) should have complexity of O(min(a, b)) and a.exists(b.contains) can have complexity of O(a*b).

Although exists can short circuit, the inner contains cause a full iteration of the list every time.

Copy link
Collaborator

@amahussein amahussein Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S:getShuffleStagesWithPosSpilling already tries to find all the IDs. If we want a btetr performance then we will need to use another API that returns True/False if at least one exists. I did not want to request that change initially to keep the API standard for the first phase.

Back to the performance discussion here, In worst case b.contains is O(n) because b is a sequence. This is easily can be improved if we change the returnType of getShuffleStagesWithPosSpilling to set instead of sequence. this yields into a faster lookup.

a.intersect(b) the problem here that those are also sequences, which means that O(a*b). Even if we change it to Set, the output of the operation is the length of the intersection which is bigger than an operation that tries to answer "True/False"

Copy link
Collaborator Author

@cindyyuanjiang cindyyuanjiang Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current approach, we need the full list of getShuffleStagesWithPosSpilling's output to compare with shuffleSkewStages to check if they have any intersection. I will update getShuffleStagesWithPosSpilling and getShuffleSkewStages to return type of Set[Long] for better performance.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang

The argument of the function is disabled by default.
This means that the Profiler calls the AutoTuner while setting limitedLogicList to 'spark.sql.shuffle.partitions'. As a result the final behavior won't execute any of the code changes.

The unitTests overrides the default argument but the Profile code has not changed.

https://github.com/NVIDIA/spark-rapids-tools/blob/main/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala#L547
https://github.com/NVIDIA/spark-rapids-tools/blob/main/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala#L141

So, either

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use set instead of seq for better performance.

Comment on lines 111 to 112
override def getShuffleStagesWithPosSpilling: Seq[Long] = Seq()
override def getShuffleSkewStages: Seq[Long] = Seq()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type could be changed to Set to have optimize lookups.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated to Set.

@cindyyuanjiang
Copy link
Collaborator Author

The argument of the function is disabled by default. This means that the Profiler calls the AutoTuner while setting limitedLogicList to 'spark.sql.shuffle.partitions'. As a result the final behavior won't execute any of the code changes.

Thanks @amahussein for the feedback! I will update argument limitedLogicList of getRecommendedProperties to be empty list so the code would execute shuffle partitions recommendation calculation.

Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@nartal1 nartal1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @cindyyuanjiang !

@cindyyuanjiang cindyyuanjiang merged commit f10073b into NVIDIA:dev Mar 12, 2024
13 checks passed
@cindyyuanjiang cindyyuanjiang deleted the shuffle-partition branch March 12, 2024 01:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] AutoTuner recommendation for spark.sql.shuffle.partitions is not accurate
5 participants