-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider additional factors in spark.sql.shuffle.partitions recommendation in Autotuner #722
Conversation
…ation Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
I will look at this closely. In the meantime, can we run some manual and even write unit tests that can show the before and after recommendations. Did we try this with different stage level spill metrics and observe better partition numbers being suggested? |
Is there any progress on this? |
I will comment my test results, then mark this ready for review. |
Thanks @kuhushukla! I will comment on my observations. |
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in the PR have no effect on the AutoTuner.
A unit-test would have helped to verify that the AutoTuner will recommend the default settings without running any calculations.
val shuffleStageSpilledMetrics = appInfoProvider.getSpillMetricsForStage("shuffle") | ||
|
||
val dataSkewStages = appInfoProvider.getDataSkewStages | ||
val dataSkewShuffleStages = shuffleStageSpilledMetrics.collect({ | ||
case (id, _) if dataSkewStages.contains(id) => id | ||
}) | ||
if (!dataSkewShuffleStages.isEmpty) { | ||
appendOptionalComment(lookup, "There is data skew (when task's Shuffle Read Size > 3 * " + | ||
"Avg Stage-level size) in shuffle stages.") | ||
} | ||
|
||
val totalShuffleSpilledMetrics = shuffleStageSpilledMetrics.unzip._2.sum | ||
if (totalShuffleSpilledMetrics > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this code represents a dead-block that will never be executed.
isCalculationEnabled(lookup)
always returns false
for spark.sql.shuffle.partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have run some tests, and confirm this is not dead code. isCalculationEnabled(lookup)
returns true
for command below.
Command: spark_rapids_user_tools onprem profiling --worker_info <my-worker-info> --verbose --eventlogs <my-event-log> -t $TOOLS_REPO/spark-rapids-tools/core/target/rapids-4-spark-tools_2.12-23.12.4-SNAPSHOT.jar
@@ -52,6 +52,8 @@ trait AppInfoPropertyGetter { | |||
trait AppInfoSqlTaskAggMetricsVisitor { | |||
def getJvmGCFractions: Seq[Double] | |||
def getSpilledMetrics: Seq[Long] | |||
def getSpillMetricsForStage(stage: String): Seq[(Long, Long)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Since the map is not part of the function name, Please add a comment of what the map keys-values are expected to be
- Why do we have two APIs
getSpilledMetrics
andgetSpillMetricsForStage
? - The definition is confusing.
When I seegetSpillMetricsForStage
then I expect that the method takes a stageID as an argument. But the callers pass "shuffle" which is not stage. It looks more like taking a certain predicate to filter the nodes. In that case, this should not be in the upper level of the Interface.// aggregates the sum of spill per stage and returns a map stageID to sum spill metrics def getSpillMetricsForShufflePerStage: Seq[(Long, Long)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this function. Thanks!
app.skewInfo.collect({ | ||
case row if (row.taskShuffleReadMB > 3 * row.avgShuffleReadMB) => row.stageId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dow we need this condition? I thought that the purpose of ShuffleSkewProfileResult
(see analysis code) is to filter tasks which have skew. So, we guarantee that any entry in that sequence exhibits skew.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will update this.
@@ -81,6 +83,8 @@ class AppSummaryInfoBaseProvider extends AppInfoPropertyGetter | |||
override def getRapidsJars: Seq[String] = Seq() | |||
override def getDistinctLocationPct: Double = 0.0 | |||
override def getRedundantReadSize: Long = 0 | |||
override def getSpillMetricsForStage(stage: String): Seq[(Long, Long)] = Seq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment on the super method
override def getSpillMetricsForStage(stage: String): Seq[(Long, Long)] = { | ||
// Get ids for input specified stage | ||
val stageIds = app.sqlStageInfo.collect ({ | ||
case row if (row.nodeNames.exists(name => name.toLowerCase.contains(stage))) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to match nodeNames on "shuffle"? Should we match on the metric type? For example "AverageStageInfo.avgShuffleRead"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have updated the condition to identify shuffle stages to depend on shuffle read/write.
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping that in this iteration you investigate deeper my comment that the code was in a dead-block.
Anyway, I found that there was a bug in the way the arguments were processed in getRecommendations
. This is a bug that I was hoping you address it in your PR after I mentioned the default arguments in
getRecommendations
I created a PR #812 to fix that bug.
The above PR #812 should get merged first because it is a bug that affects the path of code target of change here.
I don't have clues of how to verify the calculations. Most of the GPU eventlogs I tested recommends 200
which is the default spark.sql.shuffle.partitions
hardcoded in the code.
Apologies, I misunderstood this in our previous discussion. Thank you for putting up a fix @amahussein!
I will share my manually tested results here and follow up with them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR #812 is merged. @cindyyuanjiang please merge with dev and address the new comments.
We need a way to verify the calculations.
In worst case, We could use the mock classes to return literal numbers to verify the functionality.
shufflePartitions *= DEF_SHUFFLE_PARTITION_MULTIPLIER | ||
// Could be memory instead of partitions | ||
appendOptionalComment(lookup, | ||
s"'$lookup' should be increased since spilling occurred.") | ||
} | ||
|
||
val shuffleSkewStages = appInfoProvider.getShuffleSkewStages | ||
val shuffleSkewStagesWithSpill = appInfoProvider.getSpillMetricsForShuffleStages.filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is redundant to call the same method twice appInfoProvider.getSpillMetricsForShuffleStages
. One time to get the totalSpilledMetricsForShuffleStages
and another one applying the filter.
It should be stored into a local variable.
shufflePartitions *= DEF_SHUFFLE_PARTITION_MULTIPLIER | ||
// Could be memory instead of partitions | ||
appendOptionalComment(lookup, | ||
s"'$lookup' should be increased since spilling occurred.") | ||
} | ||
|
||
val shuffleSkewStages = appInfoProvider.getShuffleSkewStages | ||
val shuffleSkewStagesWithSpill = appInfoProvider.getSpillMetricsForShuffleStages.filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not look like we actually need the shuffleSkewStagesWithSpill
. The following code only checks if it is empty or not.
A filter in that case is not suitable for performance reasons. The code should use a predicate like "exists
" to return true once an item satisfies the condition.
} | ||
if (shuffleSkewStagesWithSpill.nonEmpty) { | ||
appendOptionalComment(lookup, "There is data skew (when task's Shuffle Read Size > 3 * " + | ||
s"Avg Stage-level size) in shuffle stages. $lookup recommendation may be less effective.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"bla..bla...$lookup recommendation may be less effective.
"
I do not find this comment easy to understand. "Less effective", what does it mean for the user? Is there a better way to explain this? If we want to say that the recommendation is not that good in the user's case, then why do we recommend it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the message and the logic. Thanks!
val totalSpilledMetrics = appInfoProvider.getSpilledMetrics.sum | ||
if (totalSpilledMetrics > 0) { | ||
val totalSpilledMetricsForShuffleStages = | ||
appInfoProvider.getSpillMetricsForShuffleStages.unzip._2.sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For performance reason, we don't need to get the sum. "sum
" iterates on the entire list.
We only need to check if any of the values is larger than 0. A predicate like exists
should work better.
app.jsMetAgg.collect { case row if (row.id.contains("stage") && | ||
row.srTotalBytesReadSum + row.swBytesWrittenSum > 0) => | ||
(row.id.split("_")(1).toLong, row.diskBytesSpilledSum + row.memoryBytesSpilledSum) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nartal1 Can you help Cindy with making sure how to pick "shuffles"?
are row.srTotalBytesReadSum
and swBytesWrittenSum
exclusive to shuffle operations? IF not, then what is the right conditions to successfully visit all the shuffles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
srTotalBytesReadSum
is the sum of local and remote bytes read in shuffle metrics across all tasks and swBytesWrittenSum
is the sum of bytes written for the shuffle across all tasks. The above condition is correct.
So this is getting the spilled metrics for those shuffle stages(which will have either shuffle read or write).
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Note currently However, users who use the tools jar cannot access the source code which makes the shuffle partition calculation a dead code block. To improve this, we can either take |
s"'$lookup' should be increased since spilling occurred.") | ||
if (shuffleStagesWithPosSpilling.nonEmpty) { | ||
val shuffleSkewStages = appInfoProvider.getShuffleSkewStages | ||
if (shuffleSkewStages.exists(id => shuffleStagesWithPosSpilling.contains(id))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check would make sure that the autotuner would not recommend increasing the shuffle partitions if there is atleast one stage with skewed data. Was wondering instead of checking for atleast one, can we still recommend increasing number of shuffle pariitions
until a certain threshold (Ex: ratio of number of number of stages with skew to total number of stages with spills ).
This can be a follow-on as in improvement if it takes more time to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. What would be a good way to determine the threshold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since both are lists of stage ids, can we use intersect
for better performance?
if (shuffleSkewStages.exists(id => shuffleStagesWithPosSpilling.contains(id))) { | |
if (shuffleSkewStages.intersect(shuffleStagesWithPosSpilling).nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nartal1 Thanks! I think this would require more discussion/testing. I can file a follow up issue to track this when we are ready to merge this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@parthosa Thanks! I used exists
because it will short-circuit after it finds an element which is true for the condition, but intersect
will go over the entire sequence. In that case, I think exists
has better performance than intersect
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory a.intersect(b)
should have complexity of O(min(a, b))
and a.exists(b.contains)
can have complexity of O(a*b)
.
Although exists
can short circuit, the inner contains
cause a full iteration of the list every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S:getShuffleStagesWithPosSpilling
already tries to find all the IDs. If we want a btetr performance then we will need to use another API that returns True/False if at least one exists. I did not want to request that change initially to keep the API standard for the first phase.
Back to the performance discussion here, In worst case b.contains
is O(n) because b
is a sequence. This is easily can be improved if we change the returnType of getShuffleStagesWithPosSpilling
to set
instead of sequence. this yields into a faster lookup.
a.intersect(b)
the problem here that those are also sequences, which means that O(a*b). Even if we change it to Set, the output of the operation is the length of the intersection which is bigger than an operation that tries to answer "True/False"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the current approach, we need the full list of getShuffleStagesWithPosSpilling
's output to compare with shuffleSkewStages
to check if they have any intersection. I will update getShuffleStagesWithPosSpilling
and getShuffleSkewStages
to return type of Set[Long]
for better performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang
The argument of the function is disabled by default.
This means that the Profiler calls the AutoTuner while setting limitedLogicList
to 'spark.sql.shuffle.partitions'
. As a result the final behavior won't execute any of the code changes.
The unitTests overrides the default argument but the Profile code has not changed.
https://github.com/NVIDIA/spark-rapids-tools/blob/main/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala#L547
https://github.com/NVIDIA/spark-rapids-tools/blob/main/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala#L141
So, either
- you change the default argument "limitedLogicList" of getRecommendedProperties to be empty list https://github.com/NVIDIA/spark-rapids-tools/blob/main/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala#L993 ;
- or override all the calls to
getRecommendedProperties
in production code to pass an empty sequence forlimitedLogicList
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use set
instead of seq for better performance.
override def getShuffleStagesWithPosSpilling: Seq[Long] = Seq() | ||
override def getShuffleSkewStages: Seq[Long] = Seq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type could be changed to Set to have optimize lookups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated to Set.
Thanks @amahussein for the feedback! I will update argument |
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @cindyyuanjiang !
Fixes #575
Goal
We want to enhance the
spark.sql.shuffle.partitions
recommendation logic to include more factors to improve accuracy, e.g. GC time, data size, data skew, etc.Changes
In this PR, we consider spilling and data skew in shuffle stages for
spark.sql.shuffle.partitions
recommendation logic. Specifically:If spilling occurred in shuffle stages:
spark.sql.shuffle.partitions
(Previously we only check spilling in any stage).spark.sql.shuffle.partitions
.