Skip to content

Commit

Permalink
[SPARK-48628][CORE] Add task peak on/off heap memory metrics
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR is trying to revive #47192, which was [reverted](#47747) due to regression in `ExternalAppendOnlyUnsafeRowArrayBenchmark`.

**Root cause**
We eventually decided to aggregate peak memory usage from all consumers on each `acquireExecutionMemory` invocation. (see [this discussion](#47192 (comment))), which is O(n) complexity where `n` is the number of consumers.

`ExternalAppendOnlyUnsafeRowArrayBenchmark` is implemented in a way that all iterations are run in a single task context, therefore the number of consumers is exploding.

Notice that `TaskMemoryManager.consumers` is never cleaned up the whole lifecycle, and `TaskMemoryManager.acquireExecutionMemory` is a very frequent operation, doing a linear complexity(in terms of number of consumers) operation here might not be a good choice. This benchmark might be a corner case, but it's still possible to have a large number of consumers in a large query plan.

I fallback to the previous implementation: maintain current execution memory with an extra lock. cc Ngone51

#### Benchmark result
[ExternalAppendOnlyUnsafeRowArrayBenchmark-results](https://github.com/liuzqt/spark/actions/runs/10415213026)
[ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk21-results](https://github.com/liuzqt/spark/actions/runs/10414246805)

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
New unit tests.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #47776 from liuzqt/SPARK-48628.

Authored-by: Ziqi Liu <ziqi.liu@databricks.com>
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
  • Loading branch information
liuzqt authored and JoshRosen committed Aug 21, 2024
1 parent 99b7939 commit 9b9a7a7
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 55 deletions.
61 changes: 61 additions & 0 deletions core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ public class TaskMemoryManager {
*/
private volatile long acquiredButNotUsed = 0L;

/**
* Current off heap memory usage by this task.
*/
private long currentOffHeapMemory = 0L;

private final Object offHeapMemoryLock = new Object();

/*
* Current on heap memory usage by this task.
*/
private long currentOnHeapMemory = 0L;

private final Object onHeapMemoryLock = new Object();

/**
* Peak off heap memory usage by this task.
*/
private volatile long peakOffHeapMemory = 0L;

/**
* Peak on heap memory usage by this task.
*/
private volatile long peakOnHeapMemory = 0L;

/**
* Construct a new TaskMemoryManager.
*/
Expand Down Expand Up @@ -202,6 +226,19 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
requestingConsumer);
}

if (mode == MemoryMode.OFF_HEAP) {
synchronized (offHeapMemoryLock) {
currentOffHeapMemory += got;
peakOffHeapMemory = Math.max(peakOffHeapMemory, currentOffHeapMemory);
}
} else {
synchronized (onHeapMemoryLock) {
currentOnHeapMemory += got;
peakOnHeapMemory = Math.max(peakOnHeapMemory, currentOnHeapMemory);
}
}

return got;
}
}
Expand Down Expand Up @@ -269,6 +306,15 @@ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
consumer);
}
memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode());
if (consumer.getMode() == MemoryMode.OFF_HEAP) {
synchronized (offHeapMemoryLock) {
currentOffHeapMemory -= size;
}
} else {
synchronized (onHeapMemoryLock) {
currentOnHeapMemory -= size;
}
}
}

/**
Expand Down Expand Up @@ -507,4 +553,19 @@ public long getMemoryConsumptionForThisTask() {
public MemoryMode getTungstenMemoryMode() {
return tungstenMemoryMode;
}

/**
* Returns peak task-level off-heap memory usage in bytes.
*
*/
public long getPeakOnHeapExecutionMemory() {
return peakOnHeapMemory;
}

/**
* Returns peak task-level on-heap memory usage in bytes.
*/
public long getPeakOffHeapExecutionMemory() {
return peakOffHeapMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private[spark] object InternalAccumulator {
val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
val PEAK_ON_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOnHeapExecutionMemory"
val PEAK_OFF_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOffHeapExecutionMemory"
val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ private[spark] class Executor(
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
afterSerializationNs - beforeSerializationNs))
task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory)
task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory)
// Expose task metrics using the Dropwizard metrics system.
// Update task metrics counters
executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime)
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TaskMetrics private[spark] () extends Serializable {
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _peakOnHeapExecutionMemory = new LongAccumulator
private val _peakOffHeapExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]

/**
Expand Down Expand Up @@ -109,9 +111,22 @@ class TaskMetrics private[spark] () extends Serializable {
* joins. The value of this accumulator should be approximately the sum of the peak sizes
* across all such data structures created in this task. For SQL jobs, this only tracks all
* unsafe operators and ExternalSort.
* This is not equal to peakOnHeapExecutionMemory + peakOffHeapExecutionMemory
*/
// TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole
// execution memory. We'd better deprecate this once we have a replacement.
def peakExecutionMemory: Long = _peakExecutionMemory.sum

/**
* Peak on heap execution memory as tracked by TaskMemoryManager.
*/
def peakOnHeapExecutionMemory: Long = _peakOnHeapExecutionMemory.sum

/**
* Peak off heap execution memory as tracked by TaskMemoryManager.
*/
def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum

/**
* Storage statuses of any blocks that have been updated as a result of this task.
*
Expand Down Expand Up @@ -139,6 +154,10 @@ class TaskMetrics private[spark] () extends Serializable {
private[spark] def setResultSerializationTime(v: Long): Unit =
_resultSerializationTime.setValue(v)
private[spark] def setPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.setValue(v)
private[spark] def setPeakOnHeapExecutionMemory(v: Long): Unit =
_peakOnHeapExecutionMemory.setValue(v)
private[spark] def setPeakOffHeapExecutionMemory(v: Long): Unit =
_peakOffHeapExecutionMemory.setValue(v)
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
Expand Down Expand Up @@ -225,6 +244,8 @@ class TaskMetrics private[spark] () extends Serializable {
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
PEAK_ON_HEAP_EXECUTION_MEMORY -> _peakOnHeapExecutionMemory,
PEAK_OFF_HEAP_EXECUTION_MEMORY -> _peakOffHeapExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime)
g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime)
g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory)
g.writeNumberField("Peak On Heap Execution Memory", taskMetrics.peakOnHeapExecutionMemory)
g.writeNumberField("Peak Off Heap Execution Memory", taskMetrics.peakOffHeapExecutionMemory)
g.writeNumberField("Result Size", taskMetrics.resultSize)
g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime)
g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime)
Expand Down Expand Up @@ -1254,6 +1256,10 @@ private[spark] object JsonProtocol extends JsonUtils {
// The "Peak Execution Memory" field was added in Spark 3.0.0:
metrics.setPeakExecutionMemory(
jsonOption(json.get("Peak Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setPeakOnHeapExecutionMemory(
jsonOption(json.get("Peak On Heap Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setPeakOffHeapExecutionMemory(
jsonOption(json.get("Peak Off Heap Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setResultSize(json.get("Result Size").extractLong)
metrics.setJvmGCTime(json.get("JVM GC Time").extractLong)
metrics.setResultSerializationTime(json.get("Result Serialization Time").extractLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
tMemManager.releaseExecutionMemory(500L, c)
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
}

test("task peak execution memory usage") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)

val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
assert(result1 === 500L)
assert(result2 === 400L)
assert(tMemManager.getMemoryConsumptionForThisTask === 900L)
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
}
}

private object MemoryManagerSuite {
Expand Down
Loading

0 comments on commit 9b9a7a7

Please sign in to comment.