Skip to content

Commit

Permalink
Add configuration option for prefetchBatchSize and prefetchThreadPool…
Browse files Browse the repository at this point in the history
…Size.

Signed-off-by: Pascal Spörri <psp@zurich.ibm.com>
  • Loading branch information
pspoerri committed Jun 27, 2023
1 parent 7393636 commit abd0eb9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ These are optional configuration values that control how s3-shuffle behaves.
- `spark.shuffle.s3.forceBatchFetch`: Force batch fetch for Shuffle Blocks (default: `false`)
- `spark.shuffle.s3.supportsUnbuffer`: Streams can be unbuffered instead of closed (default: `true`,
if Storage-backend is S3A, `false` otherwise).
- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `10`).
- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `40`).

## Testing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class S3ShuffleDispatcher extends Logging {
val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false)
val useBlockManager: Boolean = conf.getBoolean("spark.shuffle.s3.useBlockManager", defaultValue = true)
val forceBatchFetch: Boolean = conf.getBoolean("spark.shuffle.s3.forceBatchFetch", defaultValue = false)
val prefetchBatchSize: Int = conf.getInt("spark.shuffle.s3.prefetchBatchSize", defaultValue = 25)
val prefetchThreadPoolSize: Int = conf.getInt("spark.shuffle.s3.prefetchThreadPoolSize", defaultValue = 100)

val appDir = f"/${startTime}-${appId}/"
val fs: FileSystem = FileSystem.get(URI.create(rootDir), {
Expand All @@ -46,6 +48,8 @@ class S3ShuffleDispatcher extends Logging {
logInfo(s"- spark.shuffle.s3.alwaysCreateIndex=${alwaysCreateIndex}")
logInfo(s"- spark.shuffle.s3.useBlockManager=${useBlockManager}")
logInfo(s"- spark.shuffle.s3.forceBatchFetch=${forceBatchFetch}")
logInfo(s"- spark.shuffle.s3.prefetchBlockSize=${prefetchBatchSize}")
logInfo(s"- spark.shuffle.s3.prefetchThreadPoolSize=${prefetchThreadPoolSize}")

def removeRoot(): Boolean = {
Range(0, 10).map(idx => {
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class S3ShuffleReader[K, C](
}(S3ShuffleReader.asyncExecutionContext)
}

val recordIter = slidingPrefetchIterator(recordIterPromise, 25).flatten
val recordIter = slidingPrefetchIterator(recordIterPromise, dispatcher.prefetchBatchSize).flatten

// Update the context task metrics for each record read.
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
Expand Down Expand Up @@ -196,6 +196,6 @@ class S3ShuffleReader[K, C](
}

object S3ShuffleReader {
private val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("s3-shuffle-reader-async-thread-pool", 100)
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
private lazy val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("s3-shuffle-reader-async-thread-pool", S3ShuffleDispatcher.get.prefetchThreadPoolSize)
private lazy implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
}

0 comments on commit abd0eb9

Please sign in to comment.