Skip to content

Commit

Permalink
big cleanup : drop Scala 2.11 - ... (#418)
Browse files Browse the repository at this point in the history
* big cleanup : drop Scala 2.11 - ...

* fix github actions
  • Loading branch information
ahoy-jon authored Apr 3, 2024
1 parent fe5576c commit 0ec5549
Show file tree
Hide file tree
Showing 24 changed files with 1,228 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scala: ["2.11.12", "2.12.19", "2.13.8", "3.2.0"]
scala: ["2.12.19", "2.13.9", "3.3.3"]
project: [ "coreTests", "test" ]
steps:
- name: Checkout current branch
Expand Down
17 changes: 7 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,14 @@ addCommandAlias("testSpecific", "; clean; test;")
addCommandAlias("testSpecificWithCoverage", "; clean; coverage; test; coverageReport;")

// -- Lib versions
lazy val zio = "2.0.10"
lazy val zioPrelude = "1.0.0-RC19"
lazy val zio = "2.0.21"
lazy val zioPrelude = "1.0.0-RC23"

lazy val scala211 = "2.11.12"
lazy val scala212 = "2.12.19"
lazy val scala213 = "2.13.13"
lazy val scala3 = "3.4.1"
lazy val scala3 = "3.3.3"

lazy val supportedScalaVersions = List(scala211, scala212, scala213, scala3)
lazy val supportedScalaVersions = List(scala212, scala213, scala3)

lazy val scalaMajorVersion: SettingKey[Long] = SettingKey("scala major version")
lazy val scalaMinorVersion: SettingKey[Long] = SettingKey("scala minor version")
Expand Down Expand Up @@ -190,7 +189,6 @@ lazy val examples =
def generateMagnoliaDependency(scalaMajor: Long, scalaMinor: Long): Seq[ModuleID] =
scalaMinor match {
case _ if scalaMajor == 3 => Seq("com.softwaremill.magnolia1_3" %% "magnolia" % "1.3.4")
case 11 => Seq("me.lyh" %% "magnolia" % "0.12.1.0-b575bf3")
case 12 | 13 => Seq("com.softwaremill.magnolia1_2" %% "magnolia" % "1.1.8")
case _ => throw new Exception("It should be unreachable.")
}
Expand All @@ -208,7 +206,7 @@ def generateSparkLibraryDependencies(scalaMajor: Long, scalaMinor: Long): Seq[Mo
Seq(
sparkCore.cross(CrossVersion.for3Use2_13),
sparkSql.cross(CrossVersion.for3Use2_13),
"io.github.vincenzobaz" %% "spark-scala3" % "0.2.0"
"io.github.vincenzobaz" %% "spark-scala3-encoders" % "0.2.6"
)
case _ => throw new Exception("It should be unreachable.")
}
Expand All @@ -220,9 +218,8 @@ def generateSparkLibraryDependencies(scalaMajor: Long, scalaMinor: Long): Seq[Mo
*/
def sparkScalaVersionMapping(scalaMinor: Long): String =
scalaMinor match {
case 11 => "2.4.8"
case 12 => "3.3.1"
case 13 => "3.3.1"
case 12 => "3.5.1"
case 13 => "3.5.1"
case _ => throw new Exception("It should be unreachable.")
}

Expand Down
11 changes: 7 additions & 4 deletions examples/zio-ecosystem/src/main/scala/example/ZIOEcosystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import zio.spark.experimental.ZIOSparkAppDefault
import zio.spark.parameter._
import zio.spark.sql._

import scala.annotation.nowarn

object ZIOEcosystem extends ZIOSparkAppDefault {
// A more sophisticated layer to add middleware logs
private val session: ZLayer[Any, Throwable, SparkSession] =
Expand Down Expand Up @@ -88,10 +90,11 @@ object ZIOEcosystem extends ZIOSparkAppDefault {
}

// operator style
logInfo(command.example) *> job(command.example).timed.tap { case (duration, _) =>
val seconds: Float = duration.toMillis.toFloat / 1000
ZIO.logInfo(s"Example (${command.example.name}) correctly finished, it took $seconds seconds!")
}
@nowarn val prg =
logInfo(command.example) *> job(command.example).timed.tap { case (duration, _) =>
val seconds: Float = duration.toMillis.toFloat / 1000
ZIO.logInfo(s"Example (${command.example.name}) correctly finished, it took $seconds seconds!")
}

// for-comprehension style
for {
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1"
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.12")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.11")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.10")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")

lazy val codegen =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ case object KeyValueGroupedDatasetTemplate extends Template.Default {
| Encoder,
| TypedColumn,
| Dataset => UnderlyingDataset,
| KeyValueGroupedDataset => UnderlyingKeyValueGroupedDataset
| KeyValueGroupedDataset => UnderlyingKeyValueGroupedDataset,
| Column
|}
|""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import zio.spark.sql.implicits._
import zio.spark.test._
import zio.test._
import zio.test.Assertion._
import zio.test.TestAspect._

object DataFrameReaderSpec extends ZIOSparkSpecDefault {
val reader: DataFrameReader[WithoutSchema] = SparkSession.read
Expand Down Expand Up @@ -44,7 +43,7 @@ object DataFrameReaderSpec extends ZIOSparkSpecDefault {
df <- SparkSession.read.option("multiline", "true").json(s"$resourcesPath/data-json")
output <- df.count
} yield assertTrue(output == 4L)
} @@ scala211(ignore),
},
test("DataFrameReader can read a Parquet file") {
for {
df <- SparkSession.read.parquet(s"$resourcesPath/data-parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object DataFrameWriterSpec extends ZIOSparkSpecDefault {
extension = "parquet",
readAgain = path => SparkSession.read.parquet(path),
write = path => _.write.parquet(path)
) @@ scala211(os(!_.isMac)),
),
writerTest(
extension = "orc",
readAgain = path => SparkSession.read.orc(path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object DatasetSpec extends ZIOSparkSpecDefault {
df <- read
transformedDf = df.repartition(10).coalesce(2)
} yield assertTrue(transformedDf.rdd.partitions.length == 2)
} @@ scala211(ignore)
}
)

def sqlSpec: Spec[SparkSession, Any] =
Expand Down
86 changes: 85 additions & 1 deletion zio-spark-core/src/main/scala-2.12/zio/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
*/
def addJar(path: => String)(implicit trace: Trace): Task[Unit] = action(_.addJar(path))

/**
* Add a tag to be assigned to all the jobs started by this thread.
*
* @param tag
* The tag to be added. Cannot contain ',' (comma) character.
*
* @since 3.5.0
*/
def addJobTag(tag: => String)(implicit trace: Trace): Task[Unit] = action(_.addJobTag(tag))

def archives(implicit trace: Trace): Task[Seq[String]] = action(_.archives)

/**
Expand Down Expand Up @@ -531,7 +541,7 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in
* distributed functions. The variable will be sent to each cluster
* distributed functions. The variable will be sent to each executor
* only once.
*
* @param value
Expand Down Expand Up @@ -575,6 +585,17 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
*/
def cancelJobGroup(groupId: => String)(implicit trace: Trace): Task[Unit] = action(_.cancelJobGroup(groupId))

/**
* Cancel active jobs that have the specified tag. See
* `org.apache.spark.SparkContext.addJobTag`.
*
* @param tag
* The tag to be cancelled. Cannot contain ',' (comma) character.
*
* @since 3.5.0
*/
def cancelJobsWithTag(tag: => String)(implicit trace: Trace): Task[Unit] = action(_.cancelJobsWithTag(tag))

/**
* Cancel a given stage and all jobs associated with it.
*
Expand Down Expand Up @@ -609,6 +630,13 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
/** Clear the current thread's job group ID and its description. */
def clearJobGroup(implicit trace: Trace): Task[Unit] = action(_.clearJobGroup())

/**
* Clear the current thread's job tags.
*
* @since 3.5.0
*/
def clearJobTags(implicit trace: Trace): Task[Unit] = action(_.clearJobTags())

/**
* Create and register a `CollectionAccumulator`, which starts with
* empty list and accumulates inputs by adding them into the list.
Expand Down Expand Up @@ -645,6 +673,14 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
def getExecutorMemoryStatus(implicit trace: Trace): Task[Map[String, (Long, Long)]] =
action(_.getExecutorMemoryStatus)

/**
* Get the tags that are currently set to be assigned to all the jobs
* started by this thread.
*
* @since 3.5.0
*/
def getJobTags(implicit trace: Trace): Task[Set[String]] = action(_.getJobTags())

/**
* Get a local property set in this thread, or null if it is missing.
* See `org.apache.spark.SparkContext.setLocalProperty`.
Expand Down Expand Up @@ -853,6 +889,17 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
def register(acc: => AccumulatorV2[_, _], name: => String)(implicit trace: Trace): Task[Unit] =
action(_.register(acc, name))

/**
* Remove a tag previously added to be assigned to all the jobs
* started by this thread. Noop if such a tag was not added earlier.
*
* @param tag
* The tag to be removed. Cannot contain ',' (comma) character.
*
* @since 3.5.0
*/
def removeJobTag(tag: => String)(implicit trace: Trace): Task[Unit] = action(_.removeJobTag(tag))

def resources(implicit trace: Trace): Task[Map[String, ResourceInformation]] = action(_.resources)

/**
Expand Down Expand Up @@ -998,6 +1045,22 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
*/
def setCheckpointDir(directory: => String)(implicit trace: Trace): Task[Unit] = action(_.setCheckpointDir(directory))

/**
* Set the behavior of job cancellation from jobs started in this
* thread.
*
* @param interruptOnCancel
* If true, then job cancellation will result in
* `Thread.interrupt()` being called on the job's executor threads.
* This is useful to help ensure that the tasks are actually stopped
* in a timely manner, but is off by default due to HDFS-1208, where
* HDFS may respond to Thread.interrupt() by marking nodes as dead.
*
* @since 3.5.0
*/
def setInterruptOnCancel(interruptOnCancel: => Boolean)(implicit trace: Trace): Task[Unit] =
action(_.setInterruptOnCancel(interruptOnCancel))

/** Set a human readable description of the current job. */
def setJobDescription(value: => String)(implicit trace: Trace): Task[Unit] = action(_.setJobDescription(value))

Expand Down Expand Up @@ -1049,6 +1112,10 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
def setLocalProperty(key: => String, value: => String)(implicit trace: Trace): Task[Unit] =
action(_.setLocalProperty(key, value))

/* ------------------------------------------------------------------------------------- * Initialization. This code
* initializes the context in a manner that is exception-safe. | All internal fields holding state are initialized
* here, and any error prompts the | stop() method to be called. |
* ------------------------------------------------------------------------------------- */
/**
* Control our logLevel. This overrides any user-defined log settings.
* @param logLevel
Expand All @@ -1060,6 +1127,23 @@ final case class SparkContext(underlying: UnderlyingSparkContext) { self =>
/** Shut down the SparkContext. */
def stop(implicit trace: Trace): Task[Unit] = action(_.stop())

/**
* Shut down the SparkContext with exit code that will passed to
* scheduler backend. In client mode, client side may call
* `SparkContext.stop()` to clean up but exit with code not equal to
* 0. This behavior cause resource scheduler such as
* `ApplicationMaster` exit with success status but client side exited
* with failed status. Spark can call this method to stop SparkContext
* and pass client side correct exit code to scheduler backend. Then
* scheduler backend should send the exit code to corresponding
* resource scheduler to keep consistent.
*
* @param exitCode
* Specified exit code that will passed to scheduler backend in
* client mode.
*/
def stop(exitCode: => Int)(implicit trace: Trace): Task[Unit] = action(_.stop(exitCode))

/**
* Submit a job for execution and return a FutureJob holding the
* result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,7 @@ final case class DataFrameStatFunctions(underlying: UnderlyingDataFrameStatFunct

/**
* Computes a pair-wise frequency table of the given columns. Also
* known as a contingency table. The number of distinct values for
* each column should be less than 1e4. At most 1e6 non-zero pair
* frequencies will be returned. The first column of each row will be
* known as a contingency table. The first column of each row will be
* the distinct values of `col1` and the column names will be the
* distinct values of `col2`. The name of the first column will be
* `col1_col2`. Counts will be returned as `Long`s. Pairs that have no
Expand Down
Loading

0 comments on commit 0ec5549

Please sign in to comment.