Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang committed Feb 22, 2025
1 parent 30f4f4e commit cb2c365
Show file tree
Hide file tree
Showing 19 changed files with 2 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -806,8 +806,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

protected def caseConvert(tableName: String): String = tableName

private def withOrWithout(isDistinct: Boolean): String = if (isDistinct) "with" else "without"

Seq(true, false).foreach { isDistinct =>
val distinct = if (isDistinct) "DISTINCT " else ""
val withOrWithout = if (isDistinct) "with" else "without"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1924,8 +1924,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
PrivateMethod[mutable.HashMap[Int, Int]](Symbol("numLocalityAwareTasksPerResourceProfileId"))
private val _rpIdToHostToLocalTaskCount =
PrivateMethod[Map[Int, Map[String, Int]]](Symbol("rpIdToHostToLocalTaskCount"))
private val _onSpeculativeTaskSubmitted =
PrivateMethod[Unit](Symbol("onSpeculativeTaskSubmitted"))
private val _totalRunningTasksPerResourceProfile =
PrivateMethod[Int](Symbol("totalRunningTasksPerResourceProfile"))

Expand All @@ -1942,12 +1940,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
nmap(rp.id)
}

private def updateAndSyncNumExecutorsTarget(
manager: ExecutorAllocationManager,
now: Long): Unit = {
manager invokePrivate _updateAndSyncNumExecutorsTarget(now)
}

private def numExecutorsTargetForDefaultProfileId(manager: ExecutorAllocationManager): Int = {
numExecutorsTarget(manager, defaultProfile.id)
}
Expand Down Expand Up @@ -2025,10 +2017,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _onSchedulerQueueEmpty()
}

private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = {
manager invokePrivate _onSpeculativeTaskSubmitted(id)
}

private def localityAwareTasksForDefaultProfile(manager: ExecutorAllocationManager): Int = {
val localMap = manager invokePrivate _localityAwareTasksPerResourceProfileId()
localMap(defaultProfile.id)
Expand All @@ -2044,7 +2032,4 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
rpIdToHostLocal(defaultProfile.id)
}

private def getResourceProfileIdOfExecutor(manager: ExecutorAllocationManager): Int = {
defaultProfile.id
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,6 @@ class AppClientSuite
// | Utility methods for testing |
// ===============================

/** Return a SparkConf for applications that want to talk to our Master. */
private def appConf: SparkConf = {
new SparkConf()
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set("spark.executor.memory", "256m")
}

/** Make a master to which our application will send executor requests. */
private def makeMaster(): Master = {
val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
Expand Down
23 changes: 0 additions & 23 deletions core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,29 +105,6 @@ class StorageSuite extends SparkFunSuite {
assert(status.diskUsed === actualDiskUsed)
}

// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
private def stockStorageStatuses: Seq[StorageStatus] = {
val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L, Some(2000L), Some(0L))
val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L, Some(3000L), Some(0L))
status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L))
status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L))
status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L))
Seq(status1, status2, status3)
}

// For testing StorageUtils.updateRddInfo
private def stockRDDInfos: Seq[RDDInfo] = {
val info0 = new RDDInfo(0, "0", 10, memAndDisk, false, Seq(3))
val info1 = new RDDInfo(1, "1", 3, memAndDisk, false, Seq(4))
Seq(info0, info1)
}

private val offheap = StorageLevel.OFF_HEAP
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap
// and offheap blocks
Expand Down
16 changes: 0 additions & 16 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1370,18 +1370,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
}
}

private def assertOptionEquals[T](
opt1: Option[T],
opt2: Option[T],
assertEquals: (T, T) => Unit): Unit = {
if (opt1.isDefined) {
assert(opt2.isDefined)
assertEquals(opt1.get, opt2.get)
} else {
assert(opt2.isEmpty)
}
}

/**
* Use different names for methods we pass in to assertSeqEquals or assertOptionEquals
*/
Expand All @@ -1407,10 +1395,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
assert(ste1.getFileName === ste2.getFileName)
}

private def assertEquals(rp1: ResourceProfile, rp2: ResourceProfile): Unit = {
assert(rp1 === rp2)
}

/** ----------------------------------- *
| Util methods for constructing events |
* ------------------------------------ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
runIsotonicRegression(labels, Array.fill(labels.size)(1d).toImmutableArraySeq, isotonic)
}

private def runIsotonicRegression(
labels: Seq[Double],
features: Seq[Double],
weights: Seq[Double],
isotonic: Boolean): IsotonicRegressionModel = {
runIsotonicRegressionOnInput(
labels.indices.map(i => (labels(i), features(i), weights(i))),
isotonic)
}

private def runIsotonicRegressionOnInput(
input: Seq[(Double, Double, Double)],
isotonic: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,6 @@ class AnsiTypeCoercionSuite extends TypeCoercionSuiteBase {

override def dateTimeOperationsRule: TypeCoercionRule = AnsiTypeCoercion.DateTimeOperations

private def shouldCastStringLiteral(to: AbstractDataType, expected: DataType): Unit = {
val input = Literal("123")
val castResult = AnsiTypeCoercion.implicitCast(input, to)
assert(DataTypeUtils.equalsIgnoreCaseAndNullability(
castResult.map(_.dataType).orNull, expected),
s"Failed to cast String literal to $to")
}

private def shouldNotCastStringLiteral(to: AbstractDataType): Unit = {
val input = Literal("123")
val castResult = AnsiTypeCoercion.implicitCast(input, to)
assert(castResult.isEmpty, s"Should not be able to cast String literal to $to")
}

private def shouldNotCastStringInput(to: AbstractDataType): Unit = {
val input = AttributeReference("s", StringType)()
val castResult = AnsiTypeCoercion.implicitCast(input, to)
assert(castResult.isEmpty, s"Should not be able to cast non-foldable String input to $to")
}

private def checkWidenType(
widenFunc: (DataType, DataType) => Option[DataType],
t1: DataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, NamedArgumentExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{FunctionBuilderBase, FunctionSignature, InputParameter, NamedParametersSupport}
import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, InputParameter, NamedParametersSupport}
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -89,14 +89,6 @@ class NamedParameterFunctionSuite extends AnalysisTest {
NamedParametersSupport.defaultRearrange(functionSignature, expressions, functionName))
}

private def parseExternalException[T <: FunctionBuilderBase[_]](
functionName: String,
builder: T,
expressions: Seq[Expression]) : SparkThrowable = {
intercept[SparkThrowable](
FunctionRegistry.rearrangeExpressions[T](functionName, builder, expressions))
}

test("DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT") {
val condition =
"DUPLICATE_ROUTINE_PARAMETER_ASSIGNMENT.BOTH_POSITIONAL_AND_NAMED"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.encoders

import scala.reflect.ClassTag

import org.apache.spark.{SPARK_DOC_ROOT, SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.Encoders

Expand Down Expand Up @@ -98,5 +96,4 @@ class EncoderErrorMessageSuite extends SparkFunSuite {
)
}

private def clsName[T : ClassTag]: String = implicitly[ClassTag[T]].runtimeClass.getName
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,6 @@ class CastWithAnsiOnSuite extends CastSuiteBase with QueryErrorsBase {
s"cannot be cast to ${toSQLType(to)} because it is malformed."
}

private def castErrMsg(l: Literal, to: DataType): String = {
castErrMsg(l, to, l.dataType)
}

test("cast from invalid string to numeric should throw NumberFormatException") {
def check(value: String, dataType: DataType): Unit = {
checkExceptionInExpression[NumberFormatException](cast(value, dataType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.mutable
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExpressionSet, UnspecifiedFrame}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.types.IntegerType
Expand All @@ -47,9 +45,6 @@ class DistinctKeyVisitorSuite extends PlanTest {
assert(plan.analyze.distinctKeys === distinctKeys)
}

implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] =
ExpressionEncoder[T]()

test("Aggregate's distinct attributes") {
checkDistinctAttributes(t1.groupBy($"a", $"b")($"a", $"b", 1), Set(ExpressionSet(Seq(a, b))))
checkDistinctAttributes(t1.groupBy($"a")($"a"), Set(ExpressionSet(Seq(a))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,6 @@ class IntervalUtilsSuite extends SparkFunSuite with SQLHelper {
assert(safeStringToInterval(UTF8String.fromString(input)) === null)
}

private def checkFromInvalidStringUnknownError(input: String, word: String): Unit = {
checkError(
exception = intercept[SparkIllegalArgumentException] {
stringToInterval(UTF8String.fromString(input))
},
condition = "INVALID_INTERVAL_FORMAT.UNKNOWN_PARSING_ERROR",
parameters = Map(
"input" -> Option(input).map(_.toString).getOrElse("null"),
"word" -> word))
assert(safeStringToInterval(UTF8String.fromString(input)) === null)
}

private def failFuncWithInvalidInput(
input: String, errorMsg: String, converter: String => CalendarInterval): Unit = {
withClue("Expected to throw an exception for the invalid input") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,16 +1388,6 @@ class DataSourceV2SQLSuiteV1Filter
}
}

private def testShowNamespaces(
sqlText: String,
expected: Seq[String]): Unit = {
val schema = new StructType().add("namespace", StringType, nullable = false)

val df = spark.sql(sqlText)
assert(df.schema === schema)
assert(df.collect().map(_.getAs[String](0)).sorted === expected.sorted)
}

test("Use: basic tests with USE statements") {
val catalogManager = spark.sessionState.catalogManager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,6 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}

private def assertRelationNotFound(query: String, relation: String): Unit = {
val e = intercept[AnalysisException] {
sql(query)
}
checkErrorTableNotFound(e, relation)
}

private def assertRelationNotFound(query: String, relation: String, context: ExpectedContext):
Unit = {
val e = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,6 @@ object StateStoreBasicOperationsBenchmark extends SqlBasedBenchmark {
}
}

private def getRows(store: StateStore, keys: Seq[UnsafeRow]): Seq[UnsafeRow] = {
keys.map(key => store.get(key))
}

private def loadInitialData(
provider: StateStoreProvider,
data: Seq[(UnsafeRow, UnsafeRow)]): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import org.apache.spark.util.UninterruptibleThread

class HDFSMetadataLogSuite extends SharedSparkSession {

private implicit def toOption[A](a: A): Option[A] = Option(a)

test("SPARK-46339: Directory with number name should not be treated as metadata log") {
withTempDir { temp =>
val dir = new File(temp, "dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

override val streamingTimeout = 80.seconds

/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
format: String,
path: String,
schema: Option[StructType] = None): FileStreamSource = {
getSourceFromFileStream(createFileStream(format, path, schema))
}

private def createFileStreamSourceAndGetSchema(
format: Option[String],
path: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,6 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
meq(Map.empty))
}

private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath

test("check foreach() catches null writers") {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.orc.OrcConf
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcQueryTest
import org.apache.spark.sql.hive.{HiveSessionCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -231,17 +231,6 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
.getCachedDataSourceTable(table)
}

private def checkCached(tableIdentifier: TableIdentifier): Unit = {
getCachedDataSourceTable(tableIdentifier) match {
case null => fail(s"Converted ${tableIdentifier.table} should be cached in the cache.")
case LogicalRelationWithTable(_: HadoopFsRelation, _) => // OK
case other =>
fail(
s"The cached ${tableIdentifier.table} should be a HadoopFsRelation. " +
s"However, $other is returned form the cache.")
}
}

test("SPARK-28573 ORC conversation could be applied for partitioned table insertion") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
Expand Down

0 comments on commit cb2c365

Please sign in to comment.