Skip to content

Commit

Permalink
[SPARK-51279][CONNECT] Avoid constant sleep for waiting Spark Connect…
Browse files Browse the repository at this point in the history
… server in Scala

### What changes were proposed in this pull request?

This PR proposes to address #50017 (comment) by avoiding constant sleep but waiting until the log file is created by the local Spark Connect server.

### Why are the changes needed?

To make it robust.

### Does this PR introduce _any_ user-facing change?

Maybe they won't see retrying logs anymore.

### How was this patch tested?

Manually tested via `./bin/spark-shell --remote local`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50039 from HyukjinKwon/avoid-sleep.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 666f45d)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Feb 22, 2025
1 parent e4cfb64 commit 87b2ed7
Showing 1 changed file with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package org.apache.spark.sql.connect

import java.io.File
import java.net.URI
import java.nio.file.{Files, Paths}
import java.nio.file.{Files, FileSystems, Path, Paths, StandardWatchEventKinds}
import java.util.Locale
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try
Expand All @@ -37,7 +41,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.ExecutePlanResponse.ObservedMetrics
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.CONFIG
import org.apache.spark.internal.LogKeys.{CONFIG, PATH}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql
import org.apache.spark.sql.{Column, Encoder, ExperimentalMethods, Observation, Row, SparkSessionBuilder, SparkSessionCompanion, SparkSessionExtensions}
Expand Down Expand Up @@ -706,6 +710,33 @@ object SparkSession extends SparkSessionCompanion with Logging {
override def load(c: Configuration): SparkSession = create(c)
})

private def waitUntilFileExists(file: File): Unit = {
val deadline = 30.seconds.fromNow
val watchService = FileSystems.getDefault.newWatchService()
try {
file.toPath.getParent.register(watchService, StandardWatchEventKinds.ENTRY_CREATE)
while (!file.exists() && deadline.hasTimeLeft()) {
Option(watchService.poll(deadline.timeLeft.toSeconds + 1, TimeUnit.SECONDS)) match {
case Some(key) =>
key.pollEvents().forEach { event =>
val kind = event.kind()
val filename = event.context().asInstanceOf[Path]

if (kind == StandardWatchEventKinds.ENTRY_CREATE
&& filename.toString == file.toPath.getFileName.toString) {
key.cancel()
return
}
}
key.reset()
case None =>
}
}
} finally {
watchService.close()
}
}

/**
* Create a new Spark Connect server to connect locally.
*/
Expand All @@ -731,6 +762,7 @@ object SparkSession extends SparkSessionCompanion with Logging {
(remoteString.exists(_.startsWith("local")) ||
(remoteString.isDefined && isAPIModeConnect)) &&
maybeConnectStartScript.exists(Files.exists(_))) {
val serverId = UUID.randomUUID().toString
server = Some {
val args =
Seq(
Expand All @@ -745,12 +777,29 @@ object SparkSession extends SparkSessionCompanion with Logging {
val pb = new ProcessBuilder(args: _*)
// So don't exclude spark-sql jar in classpath
pb.environment().remove(SparkConnectClient.SPARK_REMOTE)
pb.environment().put("SPARK_IDENT_STRING", serverId)
pb.environment().put("HOSTNAME", "local")
pb.start()
}

// Let the server start. We will directly request to set the configurations
// and this sleep makes less noisy with retries.
Thread.sleep(2000L)
// Let the server start, and wait until the log file is created.
Option(System.getenv("SPARK_LOG_DIR"))
.orElse(Option(System.getenv("SPARK_HOME")).map(p => Paths.get(p, "logs").toString))
.foreach { p =>
val logFile = Paths
.get(
p,
s"spark-$serverId-" +
s"org.apache.spark.sql.connect.service.SparkConnectServer-1-local.out")
.toFile
waitUntilFileExists(logFile)
if (logFile.exists()) {
logInfo(log"Spark Connect server started with the log file: ${MDC(PATH, logFile)}")
} else {
logWarning(log"Spark Connect server log not found at ${MDC(PATH, logFile)}")
}
}

System.setProperty("spark.remote", "sc://localhost")

// scalastyle:off runtimeaddshutdownhook
Expand Down

0 comments on commit 87b2ed7

Please sign in to comment.