diff --git a/.github/workflows/maven-test.yml b/.github/workflows/maven-test.yml index faf3c8673..31a9a90ef 100644 --- a/.github/workflows/maven-test.yml +++ b/.github/workflows/maven-test.yml @@ -187,7 +187,7 @@ jobs: path: ./rust/target/release/ - name: Build with Maven run: | - mvn -B test -pl lakesoul-spark -am -Pcross-build -Pparallel-test --file pom.xml -Dtest='!UpdateScalaSuite,!AlterTableByNameSuite,!ReadSuite,!UpdateSQLSuite,!ParquetNativeFilterSuite,!DeleteScalaSuite,!DeleteSQLSuite,!ParquetV2FilterSuite,!ParquetScanSuite,!UpsertSuiteBase,!RBACOperationSuite' -Dsurefire.failIfNoSpecifiedTests=false + mvn -B test -pl lakesoul-spark -am -Pcross-build -Pparallel-test --file pom.xml -Dtest='!UpdateScalaSuite,!AlterTableByNameSuite,!ReadSuite,!UpdateSQLSuite,!ParquetNativeFilterSuite,!DeleteScalaSuite,!DeleteSQLSuite,!ParquetV2FilterSuite,!ParquetScanSuite,!UpsertSuiteBase,!RBACOperationSuite,!DeltaJoinSuite' -Dsurefire.failIfNoSpecifiedTests=false - name: Generate Report Site if: always() run: | diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala index 880a99649..da78e4b5b 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/LakeSoulFileWriter.scala @@ -34,14 +34,23 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.vectorized.ArrowFakeRowAdaptor import org.apache.spark.util.{SerializableConfiguration, Utils} import com.dmetasoul.lakesoul.meta.DBConfig.{LAKESOUL_NON_PARTITION_TABLE_PART_DESC, LAKESOUL_RANGE_PARTITION_SPLITTER} +import com.dmetasoul.lakesoul.meta.DBUtil import com.dmetasoul.lakesoul.spark.clean.CleanOldCompaction.splitCompactFilePath -import org.apache.spark.sql.lakesoul.DelayedCopyCommitProtocol +import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetCompactionColumnarOutputWriter, NativeParquetOutputWriter} +import org.apache.spark.sql.lakesoul.{DelayedCommitProtocol, DelayedCopyCommitProtocol} import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType} import java.util.{Date, UUID} +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` /** A helper object for writing FileFormat data out to a location. */ object LakeSoulFileWriter extends Logging { + val MAX_FILE_SIZE_KEY = "max_file_size" + val HASH_BUCKET_ID_KEY = "hash_bucket_id" + val SNAPPY_COMPRESS_RATIO = 3 + val COPY_FILE_WRITER_KEY = "copy_file_writer" + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -178,7 +187,11 @@ object LakeSoulFileWriter extends Logging { val nativeIOEnable = sparkSession.sessionState.conf.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE) def nativeWrap(plan: SparkPlan): RDD[InternalRow] = { - if (isCompaction && !isCDC && !isBucketNumChanged && nativeIOEnable) { + if (isCompaction + && staticBucketId != -1 + && !isCDC + && !isBucketNumChanged + && nativeIOEnable) { plan match { case withPartitionAndOrdering(_, _, child) => return nativeWrap(child) @@ -203,8 +216,8 @@ object LakeSoulFileWriter extends Logging { try { // for compaction, we won't break ordering from batch scan val (rdd, concurrentOutputWriterSpec) = - if (isCompaction && options.getOrElse("copyCompactedFile", "").nonEmpty) { - val data = Seq(InternalRow(options("copyCompactedFile"))) + if (isCompaction && options.getOrElse(COPY_FILE_WRITER_KEY, "false").toBoolean) { + val data = Seq(InternalRow(COPY_FILE_WRITER_KEY)) (sparkSession.sparkContext.parallelize(data), None) } else if (!isBucketNumChanged && (orderingMatched || isCompaction)) { (nativeWrap(empty2NullPlan), None) @@ -410,6 +423,7 @@ object LakeSoulFileWriter extends Logging { private var recordsInFile: Long = _ private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) + private val maxFileSize = options.get(MAX_FILE_SIZE_KEY) /** Given an input row, returns the corresponding `bucketId` */ protected lazy val getBucketId: InternalRow => Int = { @@ -419,26 +433,56 @@ object LakeSoulFileWriter extends Logging { row => proj(row).getInt(0) } + override protected def releaseCurrentWriter(): Unit = { + if (currentWriter != null) { + try { + currentWriter.close() + if (maxFileSize.isDefined) { + currentWriter.asInstanceOf[NativeParquetOutputWriter].flushResult.foreach(result => { + val (partitionDesc, flushResult) = result + val partitionDescList = if (partitionDesc == "-4") { + DBUtil.parsePartitionDesc(options.getOrElse("partValue", LAKESOUL_NON_PARTITION_TABLE_PART_DESC)).asScala.toList + } else { + DBUtil.parsePartitionDesc(partitionDesc).asScala.toList + } + committer.asInstanceOf[DelayedCommitProtocol].addOutputFile(partitionDescList, flushResult.map(_.getFilePath).toList) + }) + } + statsTrackers.foreach(_.closeFile(currentWriter.path())) + } finally { + currentWriter = null + } + } + } + private def newOutputWriter(record: InternalRow): Unit = { recordsInFile = 0 releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) val suffix = if (bucketSpec.isDefined) { - val bucketIdStr = if (partitionId == -1) { - BucketingUtils.bucketIdToString(getBucketId(record)) + val bucketId = if (partitionId == -1) { + getBucketId(record) } else { - BucketingUtils.bucketIdToString(partitionId) + partitionId } + taskAttemptContext.getConfiguration.set(HASH_BUCKET_ID_KEY, bucketId.toString) + + val bucketIdStr = BucketingUtils.bucketIdToString(bucketId) f"$bucketIdStr.c$fileCounter%03d" + ext } else { f"-c$fileCounter%03d" + ext } + if (maxFileSize.isDefined) { + taskAttemptContext.getConfiguration.set(MAX_FILE_SIZE_KEY, maxFileSize.get) + } + val currentPath = committer.newTaskTempFile( taskAttemptContext, partValue, - suffix) + if (maxFileSize.isDefined) "" else suffix + ) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -473,21 +517,8 @@ object LakeSoulFileWriter extends Logging { customMetrics: Map[String, SQLMetric] = Map.empty) extends FileFormatDataWriter(description, taskAttemptContext, committer, customMetrics) { - private val partValue: Option[String] = options.get("partValue").filter(_ != LAKESOUL_NON_PARTITION_TABLE_PART_DESC) - .map(_.replace(LAKESOUL_RANGE_PARTITION_SPLITTER, "/")) - - /** Given an input row, returns the corresponding `bucketId` */ - protected lazy val getSrcPath: InternalRow => String = { - row => row.get(0, StringType).asInstanceOf[String] - } - override def write(record: InternalRow): Unit = { - val dstPath = committer.newTaskTempFile( - taskAttemptContext, - partValue, - getSrcPath(record)) - - statsTrackers.foreach(_.newFile(dstPath)) + logInfo("copy file") } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala index 5137c2007..0f339ea0d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/merge/MergeParquetScan.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.sources.{EqualTo, Filter, Not} import org.apache.spark.sql.lakesoul._ import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf +import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT} import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TableInfo, TimestampFormatter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -412,15 +413,34 @@ case class OnePartitionMergeBucketScan(sparkSession: SparkSession, val fileWithBucketId = groupByPartition.head._2 .groupBy(_.fileBucketId).map(f => (f._1, f._2.toArray)) + val fileNumLimit = options.getOrDefault(SCAN_FILE_NUMBER_LIMIT.key, Int.MaxValue.toString).toInt + val isCompactionTask = options.getOrDefault(COMPACTION_TASK.key, COMPACTION_TASK.defaultValueString).toBoolean + Seq.tabulate(bucketNum) { bucketId => var files = fileWithBucketId.getOrElse(bucketId, Array.empty) - val isSingleFile = files.length == 1 + var groupedFiles = if (fileNumLimit < Int.MaxValue && isCompactionTask) { + val groupedFiles = new ArrayBuffer[Array[MergePartitionedFile]] + for (i <- files.indices by fileNumLimit) { + groupedFiles += files.slice(i, i + fileNumLimit) + } + groupedFiles.toArray + } else { + Array(files) + } - if (!isSingleFile) { - val versionFiles = for (version <- files.indices) yield files(version).copy(writeVersion = version + 1) - files = versionFiles.toArray + var allPartitionIsSingleFile = true + var isSingleFile = false + + for (index <- groupedFiles.indices) { + isSingleFile = groupedFiles(index).length == 1 + if (!isSingleFile) { + val versionFiles = for (elem <- groupedFiles(index).indices) yield groupedFiles(index)(elem).copy(writeVersion = elem) + groupedFiles(index) = versionFiles.toArray + allPartitionIsSingleFile = false + } } - MergeFilePartition(bucketId, Array(files), isSingleFile) + + MergeFilePartition(bucketId, groupedFiles, allPartitionIsSingleFile) } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala index e47044cb5..bc817a9a2 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetFileFormat.scala @@ -31,7 +31,8 @@ class NativeParquetFileFormat extends FileFormat if (options.getOrElse("isCompaction", "false").toBoolean && !options.getOrElse("isCDC", "false").toBoolean && - !options.getOrElse("isBucketNumChanged", "false").toBoolean + !options.getOrElse("isBucketNumChanged", "false").toBoolean && + options.contains("staticBucketId") ) { new OutputWriterFactory { override def newInstance( diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala index c55275424..405aaae19 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala @@ -5,6 +5,7 @@ package org.apache.spark.sql.execution.datasources.v2.parquet import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter.FlushResult import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.types.pojo.Schema @@ -18,12 +19,18 @@ import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils} +import java.util +import scala.collection.JavaConverters.mapAsScalaMapConverter +import scala.collection.mutable + class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter { val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE: Int = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) private var recordCount = 0 + var flushResult: mutable.Map[String, util.List[FlushResult]] = mutable.Map.empty + val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId) protected val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema) @@ -32,7 +39,11 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo GlutenUtils.setArrowAllocator(nativeIOWriter) nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) - nativeIOWriter.addFile(path) + if (path.endsWith(".parquet")) { + nativeIOWriter.addFile(path) + } else { + nativeIOWriter.withPrefix(path) + } NativeIOUtils.setNativeIOOptions(nativeIOWriter, NativeIOUtils.getNativeIOOptions(context, new Path(path))) @@ -61,7 +72,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo recordWriter.finish() nativeIOWriter.write(root) - nativeIOWriter.flush() + flushResult = nativeIOWriter.flush().asScala recordWriter.reset() root.close() diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala index 086012cf0..0c82d0215 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCommitProtocol.scala @@ -90,26 +90,39 @@ class DelayedCommitProtocol(jobId: String, } override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val filename = getFileName(taskContext, ext) val partitionValues = dir.map(parsePartitions).getOrElse(List.empty[(String, String)]) val unescapedDir = if (partitionValues.nonEmpty) { Some(partitionValues.map(partitionValue => partitionValue._1 + "=" + partitionValue._2).mkString("/")) } else { dir } - val relativePath = randomPrefixLength.map { prefixLength => - getRandomPrefix(prefixLength) // Generate a random prefix as a first choice - }.orElse { - // or else write into the partition unescaped directory if it is partitioned + if (ext.isEmpty) { unescapedDir - }.map { subDir => - new Path(subDir, filename) - }.getOrElse(new Path(filename)) // or directly write out to the output path - - val absolutePath = new Path(path, relativePath).toUri.toString - //returns the absolute path to the file - addedFiles.append((partitionValues, absolutePath)) - absolutePath + .map(new Path(path, _)) + .getOrElse(new Path(path)) + .toUri.toString + } else { + val filename = getFileName(taskContext, ext) + + val relativePath = randomPrefixLength.map { prefixLength => + getRandomPrefix(prefixLength) // Generate a random prefix as a first choice + }.orElse { + // or else write into the partition unescaped directory if it is partitioned + unescapedDir + }.map { subDir => + new Path(subDir, filename) + }.getOrElse(new Path(filename)) // or directly write out to the output path + + + val absolutePath = new Path(path, relativePath).toUri.toString + //returns the absolute path to the file + addedFiles.append((partitionValues, absolutePath)) + absolutePath + } + } + + def addOutputFile(partitionValues: List[(String, String)], files: List[String]): Unit = { + files.foreach(file => addedFiles.append((partitionValues, file))) } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala index b46be810e..4284f3026 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/DelayedCopyCommitProtocol.scala @@ -23,31 +23,16 @@ import scala.util.Random /** * Writes out the files to `path` and returns a list of them in `addedStatuses`. */ -class DelayedCopyCommitProtocol(jobId: String, +class DelayedCopyCommitProtocol(srcFiles: Seq[DataFileInfo], + jobId: String, dstPath: String, randomPrefixLength: Option[Int]) extends DelayedCommitProtocol(jobId, dstPath, randomPrefixLength) with Serializable with Logging { - @transient private var copyFiles: ArrayBuffer[(String, String)] = _ - - override def setupJob(jobContext: JobContext): Unit = { - - } - - override def abortJob(jobContext: JobContext): Unit = { - // TODO: Best effort cleanup - } - - override def setupTask(taskContext: TaskAttemptContext): Unit = { - copyFiles = new ArrayBuffer[(String, String)] - } - - override def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val (srcCompactDir, srcBasePath) = splitCompactFilePath(ext) - copyFiles += dir.getOrElse("-5") -> ext - new Path(dstPath, srcBasePath).toString + throw new UnsupportedOperationException( + s"$this does not support adding files with an absolute path") } override def newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { @@ -57,15 +42,14 @@ class DelayedCopyCommitProtocol(jobId: String, override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { - if (copyFiles.nonEmpty) { - val fs = new Path(copyFiles.head._2).getFileSystem(taskContext.getConfiguration) - val statuses = copyFiles.map { f => - val (partitionDesc, srcPath) = f - val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcPath) + if (srcFiles.nonEmpty) { + val fs = new Path(srcFiles.head.path).getFileSystem(taskContext.getConfiguration) + val statuses = srcFiles.map { srcFile => + val (srcCompactDir, srcBasePath) = splitCompactFilePath(srcFile.path) val dstFile = new Path(dstPath, srcBasePath) - FileUtil.copy(fs, new Path(srcPath), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) + FileUtil.copy(fs, new Path(srcFile.path), fs, new Path(dstPath, srcBasePath), false, taskContext.getConfiguration) val status = fs.getFileStatus(dstFile) - DataFileInfo(partitionDesc, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime) + DataFileInfo(srcFile.range_partitions, fs.makeQualified(dstFile).toString, "add", status.getLen, status.getModificationTime) } new TaskCommitMessage(statuses) diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala index 455e652b8..66109a1d3 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/TransactionalWrite.scala @@ -12,6 +12,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.COPY_FILE_WRITER_KEY import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, LakeSoulFileWriter, WriteJobStatsTracker} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.functions.{col, when} @@ -103,7 +104,8 @@ trait TransactionalWrite { */ def writeFiles(oriData: Dataset[_], writeOptions: Option[LakeSoulOptions], - isCompaction: Boolean): (Seq[DataFileInfo], Path) = { + isCompaction: Boolean, + copyCompactedFile: Seq[DataFileInfo] = Seq.empty): (Seq[DataFileInfo], Path) = { val spark = oriData.sparkSession // LakeSoul always writes timestamp data with timezone=UTC spark.conf.set("spark.sql.session.timeZone", "UTC") @@ -160,7 +162,7 @@ trait TransactionalWrite { options.put("isBucketNumChanged", "false") } val cdcCol = snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) - if (cdcCol.nonEmpty) { + if (cdcCol.nonEmpty && copyCompactedFile.isEmpty) { options.put("isCDC", "true") val cdcColName = cdcCol.get if (writeOptions.forall(_.options.getOrElse("fullCompaction", "true").equals("true"))) { @@ -214,10 +216,9 @@ trait TransactionalWrite { output.length < data.schema.size) } - val committer = if (writeOptions.exists(_.options.getOrElse("copyCompactedFile", "").nonEmpty)) { - val srcPath = writeOptions.get.options.get("copyCompactedFile") - options.put("copyCompactedFile", srcPath.get) - new DelayedCopyCommitProtocol("lakesoul", outputPath.toString, None) + val committer = if (copyCompactedFile.nonEmpty) { + options.put(COPY_FILE_WRITER_KEY, "true") + new DelayedCopyCommitProtocol(copyCompactedFile, "lakesoul", outputPath.toString, None) } else { getCommitter(outputPath) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala index f74a35bf5..b0df3475c 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/commands/CompactionCommand.scala @@ -14,13 +14,14 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{MAX_FILE_SIZE_KEY, SNAPPY_COMPRESS_RATIO} import org.apache.spark.sql.execution.datasources.v2.merge.MergeDeltaParquetScan import org.apache.spark.sql.execution.datasources.v2.parquet.{NativeParquetScan, ParquetScan} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} -import org.apache.spark.sql.functions.{expr, forall} +import org.apache.spark.sql.functions.expr import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.RENAME_COMPACTED_FILE +import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf.{COMPACTION_TASK, SCAN_FILE_NUMBER_LIMIT} import org.apache.spark.sql.lakesoul.utils.TableInfo import org.apache.spark.sql.lakesoul.{BatchDataSoulFileIndexV2, LakeSoulOptions, SnapshotManagement, TransactionCommit} import org.apache.spark.sql.types.StructType @@ -29,9 +30,9 @@ import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.util.Utils import java.util.UUID +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConverters._ case class CompactionCommand(snapshotManagement: SnapshotManagement, conditionString: String, @@ -69,8 +70,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, files: Seq[DataFileInfo], readPartitionInfo: Array[PartitionInfoScala], compactionPath: String, - fullCompaction: Boolean, - copyCompactedFile: String = ""): List[DataCommitInfo] = { + copySrcFiles: Boolean = false): List[DataCommitInfo] = { if (newBucketNum.isEmpty && readPartitionInfo.forall(p => p.commit_op.equals("CompactionCommit") && p.read_files.length == 1)) { logInfo("=========== All Partitions Have Compacted, This Operation Will Cancel!!! ===========") return List.empty @@ -85,7 +85,11 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, Option(mergeOperatorInfo) ) val option = new CaseInsensitiveStringMap( - Map("basePath" -> tc.tableInfo.table_path_s.get, "isCompaction" -> "true").asJava) + Map("basePath" -> tc.tableInfo.table_path_s.get, + "isCompaction" -> "true", + SCAN_FILE_NUMBER_LIMIT.key -> fileNumLimit.getOrElse(Int.MaxValue).toString, + COMPACTION_TASK.key -> "true" + ).asJava) val partitionNames = readPartitionInfo.head.range_value.split(',').map(p => { p.split('=').head @@ -123,13 +127,19 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, val map = mutable.HashMap[String, String]() map.put("isCompaction", "true") map.put("compactionPath", compactionPath) - map.put("fullCompaction", fullCompaction.toString) - if (copyCompactedFile.nonEmpty) { - map.put("copyCompactedFile", copyCompactedFile) + + val copyCompactedFiles = if (copySrcFiles) { + files + } else { + Seq.empty } if (readPartitionInfo.nonEmpty) { map.put("partValue", readPartitionInfo.head.range_value) } + if (fileSizeLimit.isDefined) { + map.put("fullCompaction", "false") + map.put(MAX_FILE_SIZE_KEY, (fileSizeLimit.get * SNAPPY_COMPRESS_RATIO).toString) + } if (bucketNumChanged) { map.put("newBucketNum", newBucketNum.get.toString) } else if (tableInfo.hash_partition_columns.nonEmpty) { @@ -140,7 +150,7 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, } logInfo(s"write CompactData with Option=$map") - val (newFiles, path) = tc.writeFiles(compactDF, Some(new LakeSoulOptions(map.toMap, spark.sessionState.conf)), isCompaction = true) + val (newFiles, path) = tc.writeFiles(compactDF, Some(new LakeSoulOptions(map.toMap, spark.sessionState.conf)), isCompaction = true, copyCompactedFiles) tc.createDataCommitInfo(newFiles, Seq.empty, "", -1)._1 } @@ -195,63 +205,21 @@ case class CompactionCommand(snapshotManagement: SnapshotManagement, def compactSinglePartition(sparkSession: SparkSession, tc: TransactionCommit, files: Seq[DataFileInfo], sourcePartition: PartitionInfoScala): String = { logInfo(s"Compacting Single Partition=${sourcePartition} with ${files.length} files") - val bucketedFiles = if (tableInfo.hash_partition_columns.isEmpty || bucketNumChanged) { - Seq(-1 -> files) + val (copyFiles, scanFiles) = if (fileSizeLimit.isEmpty || bucketNumChanged || force) { + (Seq.empty, files) } else { - files.groupBy(_.file_bucket_id) + files.splitAt(files.indexWhere(_.size < fileSizeLimit.get * 0.5)) } - val compactionPath = newCompactPath - val allDataCommitInfo = bucketedFiles.flatMap(groupByBucketId => { - val (bucketId, files) = groupByBucketId - val groupedFiles = if (fileNumLimit.isDefined || fileSizeLimit.isDefined) { - val groupedFiles = new ArrayBuffer[Seq[DataFileInfo]] - var groupHead = 0 - var groupSize = 0L - var groupFileCount = 0 - for (i <- files.indices) { - // each group contains at least one file - if (i == groupHead) { - groupSize += files(i).size - groupFileCount += 1 - } else if (fileSizeLimit.exists(groupSize + files(i).size > _) || fileNumLimit.exists(groupFileCount + 1 > _)) { - // if the file size limit is reached, or the file count limit is reached, we need to start a new group - groupedFiles += files.slice(groupHead, i) - groupHead = i - groupSize = files(i).size - groupFileCount = 1 - } else { - // otherwise, we add the file to the current group - groupSize += files(i).size - groupFileCount += 1 - } - } - // add the last group to the groupedFiles - groupedFiles += files.slice(groupHead, files.length) - groupedFiles - } else { - Seq(files) - } - val fullCompaction = groupedFiles.size == 1 - groupedFiles.flatMap(files => { - lazy val incrementFiles = if (force || newBucketNum.isDefined) { - false - } else { - files.size == 1 && splitCompactFilePath(files.head.path)._1.nonEmpty - } - if (!incrementFiles) { - executeCompaction(sparkSession, tc, files, Array(sourcePartition), compactionPath, fullCompaction) - } else { - logInfo(s"== Partition ${sourcePartition.range_value} has no increment file.") - val origCompactedFile = files.head - if (sparkSession.sessionState.conf.getConf(RENAME_COMPACTED_FILE)) { - renameOldCompactedFile(tc, origCompactedFile, sourcePartition.range_value, compactionPath) - } else { - executeCompaction(sparkSession, tc, files, Array(sourcePartition), compactionPath, fullCompaction, origCompactedFile.path) - } - } - }) - }) + + val compactionPath = newCompactPath + val allDataCommitInfo = new ArrayBuffer[DataCommitInfo] + if (copyFiles.nonEmpty) { + allDataCommitInfo ++= executeCompaction(sparkSession, tc, copyFiles, Array(sourcePartition), compactionPath, true) + } + if (scanFiles.nonEmpty) { + allDataCommitInfo ++= executeCompaction(sparkSession, tc, scanFiles, Array(sourcePartition), compactionPath) + } if (allDataCommitInfo.nonEmpty) { val compactDataCommitInfoId = UUID.randomUUID diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala index 1f27d2faf..334f6ea5e 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/sources/LakeSoulSQLConf.scala @@ -167,4 +167,23 @@ object LakeSoulSQLConf { """.stripMargin) .booleanConf .createWithDefault(false) + + val SCAN_FILE_NUMBER_LIMIT: ConfigEntry[Int] = + buildConf("scan.file.number.limit") + .doc( + """ + |If SCAN_FILE_NUMBER_LIMIT < Int.MaxValue, Scan will scan file with number less than SCAN_FILE_NUMBER_LIMIT per file group + """.stripMargin) + .intConf + .createWithDefault(Int.MaxValue) + + + val COMPACTION_TASK: ConfigEntry[Boolean] = + buildConf("scan.file.size.limit") + .doc( + """ + |If SCAN_FILE_NUMBER_LIMIT < Int.MaxValue, Scan will scan file with number less than SCAN_FILE_NUMBER_LIMIT per file group + """.stripMargin) + .booleanConf + .createWithDefault(false) } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala index 1f550ef7f..c01e7a61f 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.parquet.hadoop.ParquetInputFormat import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.LakeSoulFileWriter.{HASH_BUCKET_ID_KEY, MAX_FILE_SIZE_KEY} import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -32,10 +33,11 @@ class NativeIOOptions(val s3Bucket: String, val s3Region: String, val fsUser: String, val defaultFS: String, - val virtual_path_style: Boolean + val virtual_path_style: Boolean, + val others: Map[String, String] = Map.empty ) -object NativeIOUtils{ +object NativeIOUtils { def asArrayColumnVector(vectorSchemaRoot: VectorSchemaRoot): Array[ColumnVector] = { asScalaIteratorConverter(vectorSchemaRoot.getFieldVectors.iterator()) @@ -62,6 +64,13 @@ object NativeIOUtils{ var defaultFS = taskAttemptContext.getConfiguration.get("fs.defaultFS") if (defaultFS == null) defaultFS = taskAttemptContext.getConfiguration.get("fs.default.name") val fileSystem = file.getFileSystem(taskAttemptContext.getConfiguration) + var otherOptions = Map[String, String]() + if (taskAttemptContext.getConfiguration.get(HASH_BUCKET_ID_KEY, "").nonEmpty) { + otherOptions += HASH_BUCKET_ID_KEY -> taskAttemptContext.getConfiguration.get(HASH_BUCKET_ID_KEY) + } + if (taskAttemptContext.getConfiguration.get(MAX_FILE_SIZE_KEY, "").nonEmpty) { + otherOptions += MAX_FILE_SIZE_KEY -> taskAttemptContext.getConfiguration.get(MAX_FILE_SIZE_KEY) + } if (hasS3AFileSystemClass) { fileSystem match { case s3aFileSystem: S3AFileSystem => @@ -71,11 +80,11 @@ object NativeIOUtils{ val s3aAccessKey = taskAttemptContext.getConfiguration.get("fs.s3a.access.key") val s3aSecretKey = taskAttemptContext.getConfiguration.get("fs.s3a.secret.key") val virtualPathStyle = taskAttemptContext.getConfiguration.getBoolean("fs.s3a.path.style.access", false) - return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle) + return new NativeIOOptions(awsS3Bucket, s3aAccessKey, s3aSecretKey, s3aEndpoint, s3aRegion, user, defaultFS, virtualPathStyle, otherOptions) case _ => } } - new NativeIOOptions(null, null, null, null, null, user, defaultFS, false) + new NativeIOOptions(null, null, null, null, null, user, defaultFS, false, otherOptions) } def setNativeIOOptions(nativeIO: NativeIOBase, options: NativeIOOptions): Unit = { @@ -89,6 +98,7 @@ object NativeIOUtils{ options.defaultFS, options.virtual_path_style ) + options.others.foreach(options => nativeIO.setOption(options._1, options._2)) } def setParquetConfigurations(sparkSession: SparkSession, hadoopConf: Configuration, readDataSchema: StructType): Unit = { diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala index 7dd4a508b..ac5ac15d7 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/SplitDescSuite.scala @@ -4,7 +4,7 @@ import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.sql._ import org.apache.spark.sql.lakesoul.RandomStringGenerator.generateRandomString -import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} import org.apache.spark.sql.test.SharedSparkSession import org.junit.runner.RunWith import org.scalatestplus.junit.JUnitRunner @@ -15,7 +15,7 @@ import scala.util.Random @RunWith(classOf[JUnitRunner]) class SplitDescSuite extends QueryTest with SharedSparkSession - with LakeSoulTestUtils { + with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala index 95e2389fe..ac0bb1434 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/commands/CompactionSuite.scala @@ -549,29 +549,30 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialFileCount = getFileList(tablePath).length - println(s"before compact initialPartitionInfoCount=$initialFileCount") + println(s"before ${c}th time compact file count=$initialFileCount") lakeSoulTable.toDF.show // Perform limited compaction (group every compactGroupSize PartitionInfo) lakeSoulTable.compaction(fileNumLimit = Some(compactGroupSize)) // Get PartitionInfo count after compaction - val compactedFileCount = getFileList(tablePath).length + val compactedFileList = getFileList(tablePath) + val compactedFileCount = compactedFileList.length - println(s"after compact compactedPartitionInfoCount=$compactedFileCount") + println(s"after ${c}th time compact file count=$compactedFileCount") lakeSoulTable.toDF.show // Verify results - assert(compactedFileCount < initialFileCount, - s"Compaction should reduce the number of files, but it changed from ${initialFileCount} to $compactedFileCount") - + assert(compactedFileCount <= hashBucketNum, + s"Compaction should have hashBucketNum files, but it has $compactedFileCount") - assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, - s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") - assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, - s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") + // assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, + // s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") + // + // assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, + // s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") } // Verify data integrity @@ -650,15 +651,15 @@ class CompactionSuite extends QueryTest lakeSoulTable.toDF.show // Verify results - assert(compactedFileCount < initialFileCount, - s"Compaction should reduce the number of files, but it changed from ${initialFileCount} to $compactedFileCount") + assert(compactedFileCount <= hashBucketNum, + s"Compaction should have hashBucketNum files, but it has $compactedFileCount") - assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, - s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") - - assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, - s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") + // assert(compactedFileCount >= (initialFileCount - 1) / compactGroupSize + 1, + // s"Compaction should produce files above a lower bound, but there are ${compactedFileCount} files") + // + // assert(compactedFileCount <= (initialFileCount - 1) / compactGroupSize + 1 + hashBucketNum, + // s"Compaction should produce files below a upper bound, but there are ${compactedFileCount} files") } // Verify data integrity @@ -715,23 +716,26 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialMaxFileSize = getFileList(tablePath).map(_.size).max - println(s"before compact initialMaxFileSize=$initialMaxFileSize") - - // Perform limited compaction (group every compactGroupSize PartitionInfo) + println(s"before ${c}th compact initialMaxFileSize=$initialMaxFileSize") LakeSoulTable.uncached(tablePath) - lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + spark.time({ + // Perform limited compaction (group every compactGroupSize PartitionInfo) + lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + // lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = true) + // lakeSoulTable.compaction() + }) // Get PartitionInfo count after compaction val compactedFiles = getFileList(tablePath) val compactedFileMax = compactedFiles.map(_.size).max - println(s"after compact compactedFileMax=$compactedFileMax") + println(s"after ${c}th compact compactedFileMax=$compactedFileMax") // Verify results - assert(compactedFileMax >= initialMaxFileSize, - s"Compaction should reduce the number of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") + // assert(compactedFileMax >= initialMaxFileSize, + // s"Compaction should increase the max size of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") - assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.2, + assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.1, s"Compaction should produce file with upper-bounded size, but there is a larger ${compactedFileMax} file size") val (compactDir, _) = splitCompactFilePath(compactedFiles.head.path) @@ -757,7 +761,7 @@ class CompactionSuite extends QueryTest val hashBucketNum = 4 val compactRounds = 5 val upsertPerRounds = 10 - val rowsPerUpsert = 1002 + val rowsPerUpsert = 1000 val compactFileSize = "10KB" // Create test data @@ -801,23 +805,26 @@ class CompactionSuite extends QueryTest // Get initial PartitionInfo count val initialMaxFileSize = getFileList(tablePath).map(_.size).max - println(s"before compact initialMaxFileSize=$initialMaxFileSize") + println(s"before ${c}th compact initialMaxFileSize=$initialMaxFileSize") // Perform limited compaction (group every compactGroupSize PartitionInfo) LakeSoulTable.uncached(tablePath) - lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + spark.time({ + lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = false) + // lakeSoulTable.compaction(fileSizeLimit = Some(compactFileSize), force = true) + }) // Get PartitionInfo count after compaction val compactedFiles = getFileList(tablePath) val compactedFileMax = compactedFiles.map(_.size).max - println(s"after compact compactedFileMax=$compactedFileMax") + println(s"after ${c}th compact compactedFileMax=$compactedFileMax") // Verify results // assert(compactedFileMax >= initialMaxFileSize, // s"Compaction should reduce the number of files, but it changed from ${initialMaxFileSize} to $compactedFileMax") - assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.2, + assert(compactedFileMax <= DBUtil.parseMemoryExpression(compactFileSize) * 1.1, s"Compaction should produce file with upper-bounded size, but there is a larger ${compactedFileMax} file size") val (compactDir, _) = splitCompactFilePath(compactedFiles.head.path) @@ -827,7 +834,9 @@ class CompactionSuite extends QueryTest // Verify data integrity LakeSoulTable.uncached(tablePath) - val compactedData = lakeSoulTable.toDF.orderBy("id", "date").collect() + val finalData = lakeSoulTable.toDF.orderBy("id", "date") + // println(finalData.queryExecution) + val compactedData = finalData.collect() // println(compactedData.mkString("Array(", ", ", ")")) assert(compactedData.length == 6 + rowsPerUpsert * upsertPerRounds * compactRounds / 2, s"The compressed data should have ${6 + rowsPerUpsert * upsertPerRounds * compactRounds / 2} rows, but it actually has ${compactedData.length} rows") diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java index 5af7f9023..7a30b369a 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java @@ -166,6 +166,15 @@ public String getFilePath() { public String getFileExistCols() { return fileExistCols; } + + @Override + public String toString() { + return "FlushResult{" + + "filePath='" + filePath + '\'' + + ", fileSize=" + fileSize + + ", fileExistCols='" + fileExistCols + '\'' + + '}'; + } } public static FlushResult decodeFlushResult(String encoded) { diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 150dd605c..26280bbe9 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -812,25 +812,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-deque" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.9.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1996,13 +1977,13 @@ dependencies = [ "lazy_static", "log", "ndarray", + "nohash", "object_store", "parking_lot", "parquet", "prost", "proto", "rand", - "rayon", "serde", "serde_json", "smallvec", @@ -2013,6 +1994,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "uuid", "whoami", ] @@ -2328,6 +2310,12 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "nohash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0f889fb66f7acdf83442c35775764b51fed3c606ab9cee51500dbde2cf528ca" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2898,26 +2886,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" -[[package]] -name = "rayon" -version = "1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" -dependencies = [ - "either", - "rayon-core", -] - -[[package]] -name = "rayon-core" -version = "1.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" -dependencies = [ - "crossbeam-deque", - "crossbeam-utils", -] - [[package]] name = "redox_syscall" version = "0.5.8" diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index f9558ef18..0a704c449 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -49,7 +49,8 @@ dhat = { version="0.3.3", optional = true } async-recursion = "1.1.1" ndarray = "0.15.6" #hdf5 = {version = "0.8.1"} -rayon = "1.10.0" +nohash = "0.2.0" +uuid = { workspace = true } [features] diff --git a/rust/lakesoul-io/examples/profiling_merge.rs b/rust/lakesoul-io/examples/profiling_merge.rs new file mode 100644 index 000000000..32d21a40a --- /dev/null +++ b/rust/lakesoul-io/examples/profiling_merge.rs @@ -0,0 +1,170 @@ +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow_array::{ArrayRef, Int64Array, StringArray}; +use arrow_cast::pretty::print_batches; +use arrow_schema::SchemaRef; +use datafusion::error::Result; +use rand::Rng; +use std::sync::Arc; +use tokio::runtime::Builder; +use tokio::time::Instant; +use rand::distributions::DistString; + +use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder; +use lakesoul_io::lakesoul_reader::{LakeSoulReader, SyncSendableMutableLakeSoulReader}; +use lakesoul_io::lakesoul_writer::SyncSendableMutableLakeSoulWriter; + +struct LinearPKGenerator { + a: i64, + b: i64, + current: i64, +} + +impl LinearPKGenerator { + fn new(a: i64, b: i64) -> Self { + LinearPKGenerator { a, b, current: 0 } + } + + fn next_pk(&mut self) -> i64 { + let pk = self.a * self.current + self.b; + self.current += 1; + pk + } +} + +fn create_batch(num_columns: usize, num_rows: usize, str_len: usize, pk_generator: &mut Option) -> RecordBatch { + let mut rng = rand::thread_rng(); + let mut len_rng = rand::thread_rng(); + let mut iter = vec![]; + if let Some(generator) = pk_generator { + let pk_iter = (0..num_rows).map(|_| uuid::Builder::from_bytes_le((generator.next_pk() as u128).to_le_bytes()).as_uuid().to_string()); + iter.push(( + "pk".to_string(), + Arc::new(StringArray::from_iter_values(pk_iter)) as ArrayRef, + true, + )); + } + for i in 0..num_columns { + iter.push(( + format!("col_{}", i), + Arc::new(StringArray::from( + (0..num_rows) + .into_iter() + .map(|_| { + rand::distributions::Alphanumeric + .sample_string(&mut rng, len_rng.gen_range(str_len..str_len * 3)) + }) + .collect::>(), + )) as ArrayRef, + true, + )); + } + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + +} + +fn create_schema(num_columns: usize, with_pk: bool) -> SchemaRef { + let mut fields = vec![]; + if with_pk { + fields.push(Field::new("pk", DataType::Utf8, true)); + } + for i in 0..num_columns { + fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true)); + } + Arc::new(Schema::new(fields)) +} + +fn main() -> Result<()> { + let num_batch = 128; + let num_rows = 512; + let num_columns = 16; + let str_len = 4; + let temp_dir = std::env::current_dir()?.join("temp_dir"); + let with_pk = true; + let file_num = 50; + let to_write_schema = create_schema(num_columns, with_pk); + + for i in 0..file_num { + + let mut generator = if with_pk { Some(LinearPKGenerator::new(i + 2, 0)) } else { None } ; + // let to_write = create_batch(num_columns, num_rows, str_len, &mut generator); + let path = temp_dir + .clone() + .join(format!("test{}.parquet", i)) + .into_os_string() + .into_string() + .unwrap(); + dbg!(&path); + let writer_conf = LakeSoulIOConfigBuilder::new() + .with_files(vec![path.clone()]) + // .with_prefix(tempfile::tempdir()?.into_path().into_os_string().into_string().unwrap()) + .with_thread_num(2) + .with_batch_size(num_rows) + // .with_max_row_group_size(2000) + // .with_max_row_group_num_values(4_00_000) + .with_schema(to_write_schema.clone()) + .with_primary_keys( + vec!["pk".to_string()] + ) + // .with_aux_sort_column("col2".to_string()) + // .with_option(OPTION_KEY_MEM_LIMIT, format!("{}", 1024 * 1024 * 48)) + // .set_dynamic_partition(true) + .with_hash_bucket_num(4) + // .with_max_file_size(1024 * 1024 * 32) + .build(); + + let mut writer = SyncSendableMutableLakeSoulWriter::try_new( + writer_conf, + Builder::new_multi_thread().enable_all().build().unwrap() + )?; + + for _ in 0..num_batch { + writer.write_batch(create_batch(num_columns, num_rows, str_len, &mut generator))?; + } + let flush_start = Instant::now(); + writer.flush_and_close()?; + println!("write into file {} cost: {}ms", path, flush_start.elapsed().as_millis()); + } + + + let reader_conf = LakeSoulIOConfigBuilder::new() + .with_thread_num(2) + .with_batch_size(num_rows) + .with_files((0..file_num).map(|i| temp_dir.join(format!("test{}.parquet", i)).into_os_string().into_string().unwrap()).collect::>()) + .with_primary_keys(vec!["pk".to_string()]) + .with_schema(to_write_schema.clone()) + + // .with_files(vec![ + // "file:///Users/ceng/Desktop/user/part-AQsUZHJtSBeZNEEb_0000.parquet".to_string(), + // "file:///Users/ceng/Desktop/user/part-b71CCbFTr0vx6GZK_0000.parquet".to_string(), + // "file:///Users/ceng/Desktop/user/part-ZMbzBvkaJCGnh7iJ_0000.parquet".to_string(), + // "file:///Users/ceng/Desktop/user/part-bn8GKq1YAjF7cN1H_0000.parquet".to_string(), + // "file:///Users/ceng/Desktop/user/part-BrRLeHtqE7RqgaTC_0000.parquet".to_string(), + // ]) + // .with_schema(Arc::new(Schema::new(vec![ + // Field::new("order_id", DataType::Int32, true), + // Field::new("name", DataType::Utf8, true), + // Field::new("score", DataType::Decimal128(10, 2), true), + // ]))) + // .with_primary_keys(vec!["name".to_string()]) + .build(); + + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let lakesoul_reader = LakeSoulReader::new(reader_conf)?; + let mut reader = SyncSendableMutableLakeSoulReader::new(lakesoul_reader, runtime); + let _ = reader.start_blocked(); + let start = Instant::now(); + let mut rb_count = 0; + while let Some(rb) = reader.next_rb_blocked() { + let rb = rb.unwrap(); + rb_count += rb.num_rows(); + // let _ = print_batches(&[rb]); + // dbg!(&rb.column_by_name("pk").unwrap()); + } + if file_num == 2 { + assert_eq!(rb_count, num_rows * num_batch * 2 - num_rows * num_batch / 3 - 1); + } + println!("time cost: {:?}ms", start.elapsed().as_millis()); // ms + + Ok(()) +} \ No newline at end of file diff --git a/rust/lakesoul-io/src/async_writer/multipart_writer.rs b/rust/lakesoul-io/src/async_writer/multipart_writer.rs index 57855e35b..96fb6e92d 100644 --- a/rust/lakesoul-io/src/async_writer/multipart_writer.rs +++ b/rust/lakesoul-io/src/async_writer/multipart_writer.rs @@ -11,6 +11,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::{object_store: use datafusion_common::{project_schema, DataFusionError, Result}; use object_store::{path::Path, MultipartId, ObjectStore}; use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties}; +use parquet::basic::ZstdLevel; use tokio::io::{AsyncWrite, AsyncWriteExt}; use url::Url; @@ -103,8 +104,8 @@ impl MultiPartAsyncWriter { WriterProperties::builder() .set_max_row_group_size(max_row_group_size) .set_write_batch_size(config.batch_size) - .set_compression(Compression::SNAPPY) - // .set_statistics_enabled(parquet::file::properties::EnabledStatistics::None) + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_dictionary_enabled(false) .build(), ), )?; diff --git a/rust/lakesoul-io/src/constant.rs b/rust/lakesoul-io/src/constant.rs index 6531653cf..ae3762359 100644 --- a/rust/lakesoul-io/src/constant.rs +++ b/rust/lakesoul-io/src/constant.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use arrow::array::ArrayRef; use arrow::compute::CastOptions; -use arrow_array::{new_empty_array, new_null_array}; +use arrow_array::{new_empty_array, new_null_array, Array}; use arrow_cast::display::FormatOptions; use arrow_schema::DataType; @@ -57,6 +57,12 @@ impl ConstNullArray { } } } + + pub fn get_ref(&mut self, datatype: &DataType) -> &dyn Array { + self.inner + .entry(datatype.clone()) + .or_insert_with(|| new_null_array(datatype, 1)) + } } #[derive(Debug, Default)] diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index 87c5b52b3..edb6c0b2e 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -179,6 +179,10 @@ impl LakeSoulIOConfig { self.option(OPTION_KEY_MEM_LIMIT).map(|x| x.parse().unwrap()) } + pub fn max_file_size_option(&self) -> Option { + self.option(OPTION_KEY_MAX_FILE_SIZE).map(|x| x.parse().unwrap()) + } + pub fn pool_size(&self) -> Option { self.option(OPTION_KEY_POOL_SIZE).map(|x| x.parse().unwrap()) } @@ -577,6 +581,7 @@ pub fn create_session_context_with_planner( sess_conf.options_mut().optimizer.prefer_hash_join = false; //if true, panicked at 'range end out of bounds' sess_conf.options_mut().execution.parquet.pushdown_filters = config.parquet_filter_pushdown; sess_conf.options_mut().execution.target_partitions = 1; + sess_conf.options_mut().execution.parquet.dictionary_enabled = Some(false); // sess_conf.options_mut().execution.sort_in_place_threshold_bytes = 16 * 1024; // sess_conf.options_mut().execution.sort_spill_reservation_bytes = 2 * 1024 * 1024; // sess_conf.options_mut().catalog.default_catalog = "lakesoul".into(); @@ -674,9 +679,11 @@ mod tests { ] ); let mut lakesoulconfigbuilder = LakeSoulIOConfigBuilder::from(conf.clone()); - let conf = lakesoulconfigbuilder.with_d(32 as u64).with_nbits(64 as u64).build(); - assert_eq!(conf.seed,1234 as u64); - assert_eq!(conf.d,Some(32)); - assert_eq!(conf.nbits,Some(64)); + let conf = lakesoulconfigbuilder.build(); + assert_eq!(conf.max_file_size,None); + assert_eq!(conf.max_row_group_size,250000); + assert_eq!(conf.max_row_group_num_values,2147483647); + assert_eq!(conf.prefetch_size,1); + assert_eq!(conf.parquet_filter_pushdown,false); } } diff --git a/rust/lakesoul-io/src/lakesoul_reader.rs b/rust/lakesoul-io/src/lakesoul_reader.rs index 54a555b5f..d0e9cdeeb 100644 --- a/rust/lakesoul-io/src/lakesoul_reader.rs +++ b/rust/lakesoul-io/src/lakesoul_reader.rs @@ -155,7 +155,7 @@ impl SyncSendableMutableLakeSoulReader { mod tests { use super::*; use arrow::array::as_primitive_array; - use arrow_array::ArrayRef; + use arrow_array::{ArrayRef, Int64Array, StringArray}; use rand::prelude::*; use std::mem::ManuallyDrop; use std::ops::Not; @@ -166,6 +166,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, TimestampSecondType}; use arrow::util::pretty::print_batches; + use rand::{distributions::DistString, Rng}; + #[tokio::test] async fn test_reader_local() -> Result<()> { let project_dir = std::env::current_dir()?; @@ -360,6 +362,7 @@ mod tests { } use crate::lakesoul_io_config::LakeSoulIOConfigBuilder; + use crate::lakesoul_writer::SyncSendableMutableLakeSoulWriter; use datafusion::logical_expr::{col, Expr}; use datafusion_common::ScalarValue; @@ -684,4 +687,152 @@ mod tests { Ok(()) } + #[test] + fn test_primary_key_generator() -> Result<()> { + let mut generator = LinearPKGenerator::new(2, 3); + assert_eq!(generator.next_pk(), 3); // x=0: 2*0 + 3 + assert_eq!(generator.next_pk(), 5); // x=1: 2*1 + 3 + assert_eq!(generator.next_pk(), 7); // x=2: 2*2 + 3 + Ok(()) + } + + struct LinearPKGenerator { + a: i64, + b: i64, + current: i64, + } + + impl LinearPKGenerator { + fn new(a: i64, b: i64) -> Self { + LinearPKGenerator { + a, + b, + current: 0 + } + } + + fn next_pk(&mut self) -> i64 { + let pk = self.a * self.current + self.b; + self.current += 1; + pk + } + } + + fn create_batch(num_columns: usize, num_rows: usize, str_len: usize, pk_generator: &mut Option) -> RecordBatch { + let mut rng = rand::thread_rng(); + let mut len_rng = rand::thread_rng(); + let mut iter = vec![]; + if let Some(generator) = pk_generator { + let pk_iter = (0..num_rows).map(|_| generator.next_pk()); + iter.push(( + "pk".to_string(), + Arc::new(Int64Array::from_iter_values(pk_iter)) as ArrayRef, + true, + )); + } + for i in 0..num_columns { + iter.push(( + format!("col_{}", i), + Arc::new(StringArray::from( + (0..num_rows) + .into_iter() + .map(|_| { + rand::distributions::Alphanumeric + .sample_string(&mut rng, len_rng.gen_range(str_len..str_len * 3)) + }) + .collect::>(), + )) as ArrayRef, + true, + )); + } + RecordBatch::try_from_iter_with_nullable(iter).unwrap() + } + + fn create_schema(num_columns: usize, with_pk: bool) -> Schema { + let mut fields = vec![]; + if with_pk { + fields.push(Field::new("pk", DataType::Int64, true)); + } + for i in 0..num_columns { + fields.push(Field::new(format!("col_{}", i), DataType::Utf8, true)); + } + Schema::new(fields) + } + + + #[test] + fn profiling_2ways_merge_on_read() -> Result<()> { + let num_batch = 10; + let num_rows = 1000; + let num_columns = 100; + let str_len = 4; + let temp_dir = tempfile::tempdir()?.into_path(); + let temp_dir = std::env::current_dir()?.join("temp_dir"); + let with_pk = true; + let to_write_schema = create_schema(num_columns, with_pk); + + for i in 0..2 { + + let mut generator = if with_pk { Some(LinearPKGenerator::new(i + 2, 0)) } else { None } ; + let to_write = create_batch(num_columns, num_rows, str_len, &mut generator); + let path = temp_dir + .clone() + .join(format!("test{}.parquet", i)) + .into_os_string() + .into_string() + .unwrap(); + dbg!(&path); + let writer_conf = LakeSoulIOConfigBuilder::new() + .with_files(vec![path.clone()]) + // .with_prefix(tempfile::tempdir()?.into_path().into_os_string().into_string().unwrap()) + .with_thread_num(2) + .with_batch_size(num_rows) + // .with_max_row_group_size(2000) + // .with_max_row_group_num_values(4_00_000) + .with_schema(to_write.schema()) + .with_primary_keys( + vec!["pk".to_string()] + ) + // .with_aux_sort_column("col2".to_string()) + // .with_option(OPTION_KEY_MEM_LIMIT, format!("{}", 1024 * 1024 * 48)) + // .set_dynamic_partition(true) + .with_hash_bucket_num(4) + // .with_max_file_size(1024 * 1024 * 32) + .build(); + + let mut writer = SyncSendableMutableLakeSoulWriter::try_new( + writer_conf, + Builder::new_multi_thread().enable_all().build().unwrap() + )?; + + let start = Instant::now(); + for _ in 0..num_batch { + let once_start = Instant::now(); + writer.write_batch(create_batch(num_columns, num_rows, str_len, &mut generator))?; + println!("write batch once cost: {}", once_start.elapsed().as_millis()); + } + let flush_start = Instant::now(); + writer.flush_and_close()?; + } + + let reader_conf = LakeSoulIOConfigBuilder::new() + .with_files((0..2).map(|i| temp_dir.join(format!("test{}.parquet", i)).into_os_string().into_string().unwrap()).collect::>()) + .with_thread_num(2) + .with_batch_size(num_rows) + .with_schema(Arc::new(to_write_schema)) + .with_primary_keys(vec!["pk".to_string()]) + .build(); + + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + let lakesoul_reader = LakeSoulReader::new(reader_conf)?; + let mut reader = SyncSendableMutableLakeSoulReader::new(lakesoul_reader, runtime); + reader.start_blocked(); + let start = Instant::now(); + while let Some(rb) = reader.next_rb_blocked() { + dbg!(&rb.unwrap().num_rows()); + } + println!("time cost: {:?}ms", start.elapsed().as_millis()); // ms + Ok(()) + } + } diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 5deeec54d..ddc52b9df 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -76,6 +76,11 @@ impl SyncSendableMutableLakeSoulWriter { let writer = Self::create_writer(writer_config).await?; let schema = writer.schema(); + + if let Some(max_file_size) = config.max_file_size_option() { + config.max_file_size = Some(max_file_size); + } + if let Some(mem_limit) = config.mem_limit() { if config.use_dynamic_partition { config.max_file_size = Some((mem_limit as f64 * 0.15) as u64); @@ -342,8 +347,6 @@ impl SyncSendableMutableLakeSoulWriter { mod tests { use arrow_array::builder; use datafusion::catalog::schema; - use hdf5::File as OtherFile; - use hdf5::Group; use parquet::arrow::ArrowWriter; use parquet::column; use parquet::file::properties::WriterProperties; diff --git a/rust/lakesoul-io/src/sorted_merge/combiner.rs b/rust/lakesoul-io/src/sorted_merge/combiner.rs index fff50c2fb..7c89b0d0f 100644 --- a/rust/lakesoul-io/src/sorted_merge/combiner.rs +++ b/rust/lakesoul-io/src/sorted_merge/combiner.rs @@ -23,10 +23,14 @@ use arrow::{ }; use arrow_array::types::*; use dary_heap::QuaternaryHeap; +use nohash::BuildNoHashHasher; use smallvec::SmallVec; +use super::sort_key_range::{UseLastSortKeyBatchRanges, UseLastSortKeyBatchRangesRef}; + #[derive(Debug)] pub enum RangeCombiner { + DefaultUseLastRangeCombiner(UseLastRangeCombiner), MinHeapSortKeyBatchRangeCombiner(MinHeapSortKeyBatchRangeCombiner), } @@ -37,25 +41,45 @@ impl RangeCombiner { fields_map: Arc>>, target_batch_size: usize, merge_operator: Vec, + is_partial_merge: bool, ) -> Self { - RangeCombiner::MinHeapSortKeyBatchRangeCombiner(MinHeapSortKeyBatchRangeCombiner::new( - schema, - streams_num, - fields_map, - target_batch_size, - merge_operator, - )) + if merge_operator.is_empty() || merge_operator.iter().all(|op| *op == MergeOperator::UseLast) { + RangeCombiner::DefaultUseLastRangeCombiner(UseLastRangeCombiner::new( + schema, + streams_num, + fields_map, + target_batch_size, + is_partial_merge, + )) + } else { + RangeCombiner::MinHeapSortKeyBatchRangeCombiner(MinHeapSortKeyBatchRangeCombiner::new( + schema, + streams_num, + fields_map, + target_batch_size, + merge_operator, + )) + } } - pub fn push_range(&mut self, range: Reverse) { + pub fn push_range(&mut self, range: SortKeyBatchRange) { match self { RangeCombiner::MinHeapSortKeyBatchRangeCombiner(combiner) => combiner.push(range), + RangeCombiner::DefaultUseLastRangeCombiner(combiner) => combiner.push(range), }; } pub fn poll_result(&mut self) -> RangeCombinerResult { match self { RangeCombiner::MinHeapSortKeyBatchRangeCombiner(combiner) => combiner.poll_result(), + RangeCombiner::DefaultUseLastRangeCombiner(combiner) => combiner.poll_result(), + } + } + + pub fn external_advance(&self) -> bool { + match self { + RangeCombiner::MinHeapSortKeyBatchRangeCombiner(_) => true, + RangeCombiner::DefaultUseLastRangeCombiner(_) => false, } } } @@ -64,7 +88,7 @@ impl RangeCombiner { pub enum RangeCombinerResult { None, Err(ArrowError), - Range(Reverse), + Range(SortKeyBatchRange), RecordBatch(ArrowResult), } @@ -110,16 +134,23 @@ impl MinHeapSortKeyBatchRangeCombiner { } } - pub fn push(&mut self, range: Reverse) { - self.heap.push(range) + pub fn push(&mut self, range: SortKeyBatchRange) { + self.heap.push(Reverse(range)) } + /// if in_progress is full, we should build record batch by merge all ranges in in_progress + /// otherwise, + /// if heap is not empty, we should pop from heap and add to in_progress + /// then return the advanced popped range + /// if heap is empty, + /// check if in_progress is empty, if so, return None, which the RangeCombiner is finished + /// otherwise, build record batch by merge all the remaining ranges in in_progress pub fn poll_result(&mut self) -> RangeCombinerResult { if self.in_progress.len() == self.target_batch_size { RangeCombinerResult::RecordBatch(self.build_record_batch()) } else { match self.heap.pop() { - Some(Reverse(range)) => { + Some(Reverse(mut range)) => { if self.current_sort_key_range.match_row(&range) { self.get_mut_current_sort_key_range().add_range_in_batch(range.clone()); } else { @@ -127,7 +158,8 @@ impl MinHeapSortKeyBatchRangeCombiner { self.init_current_sort_key_range(); self.get_mut_current_sort_key_range().add_range_in_batch(range.clone()); } - RangeCombinerResult::Range(Reverse(range)) + range.advance(); + RangeCombinerResult::Range(range) } None => { if self.current_sort_key_range.is_empty() && self.in_progress.is_empty() { @@ -145,6 +177,7 @@ impl MinHeapSortKeyBatchRangeCombiner { } fn build_record_batch(&mut self) -> ArrowResult { + // construct record batch by columnarly merging let columns = self .schema .fields() @@ -152,13 +185,15 @@ impl MinHeapSortKeyBatchRangeCombiner { .enumerate() .map(|(column_idx, field)| { let capacity = self.in_progress.len(); - let ranges_per_col: Vec<&SmallVec<[SortKeyArrayRange; 4]>> = self + // collect all array ranges of current column_idx for each row + let ranges_per_row: Vec<&SmallVec<[SortKeyArrayRange; 4]>> = self .in_progress .iter() .map(|ranges_per_row| ranges_per_row.column(column_idx)) .collect::>(); - let mut flatten_array_ranges = ranges_per_col + // flatten all array ranges of current column_idx for interleave + let mut flatten_array_ranges = ranges_per_row .iter() .flat_map(|ranges| *ranges) .collect::>(); @@ -181,7 +216,7 @@ impl MinHeapSortKeyBatchRangeCombiner { merge_sort_key_array_ranges( capacity, field, - ranges_per_col, + ranges_per_row, &mut flatten_dedup_arrays, &batch_idx_to_flatten_array_idx, unsafe { self.merge_operator.get_unchecked(column_idx) }, @@ -266,3 +301,372 @@ fn merge_sort_key_array_ranges( extend_list.as_slice(), ) } + +#[derive(Debug)] +pub struct UseLastRangeCombiner { + schema: SchemaRef, + fields_map: Arc>>, + streams_num: usize, + + /// A loser tree that always produces the minimum cursor + /// + /// Node 0 stores the top winner, Nodes 1..num_streams store + /// the loser nodes + /// + /// This implements a "Tournament Tree" (aka Loser Tree) to keep + /// track of the current smallest element at the top. When the top + /// record is taken, the tree structure is not modified, and only + /// the path from bottom to top is visited, keeping the number of + /// comparisons close to the theoretical limit of `log(S)`. + /// + /// The current implementation uses a vector to store the tree. + /// Conceptually, it looks like this (assuming 8 streams): + /// + /// ```text + /// 0 (winner) + /// + /// 1 + /// / \ + /// 2 3 + /// / \ / \ + /// 4 5 6 7 + /// ``` + /// + /// Where element at index 0 in the vector is the current winner. Element + /// at index 1 is the root of the loser tree, element at index 2 is the + /// left child of the root, and element at index 3 is the right child of + /// the root and so on. + /// + /// reference: + loser_tree: Vec, + + ranges_counter: usize, + + loser_tree_has_updated: bool, + + /// ranges for each input source. `None` means the input is exhausted + ranges: Vec>, + + in_progress: Vec, + target_batch_size: usize, + current_sort_key_range: UseLastSortKeyBatchRangesRef, + const_null_array: ConstNullArray, + is_partial_merge: bool, +} + +impl UseLastRangeCombiner { + pub fn new( + schema: SchemaRef, + streams_num: usize, + fields_map: Arc>>, + target_batch_size: usize, + is_partial_merge: bool, + ) -> Self { + Self { + schema: schema.clone(), + fields_map: fields_map.clone(), + streams_num, + in_progress: Vec::with_capacity(target_batch_size), + target_batch_size, + current_sort_key_range: Arc::new(UseLastSortKeyBatchRanges::new(schema, fields_map, is_partial_merge)), + const_null_array: ConstNullArray::new(), + ranges: (0..streams_num).map(|_| None).collect(), + loser_tree: vec![], + ranges_counter: 0, + loser_tree_has_updated: false, + is_partial_merge, + } + } + + pub fn push(&mut self, range: SortKeyBatchRange) { + let stream_idx = range.stream_idx; + self.ranges[stream_idx] = Some(range); + self.ranges_counter += 1; + } + + pub fn update(&mut self) { + if self.loser_tree.is_empty() { + if self.ranges_counter >= self.streams_num { + self.init_loser_tree(); + } + } else { + self.update_loser_tree(); + } + } + + /// if in_progress is full, we should build record batch by merge all ranges in in_progress + /// otherwise, + /// if heap is not empty, we should pop from heap and add to in_progress + /// then return the advanced popped range + /// if heap is empty, + /// check if in_progress is empty, if so, return None, which the RangeCombiner is finished + /// otherwise, build record batch by merge all the remaining ranges in in_progress + pub fn poll_result(&mut self) -> RangeCombinerResult { + if self.ranges_counter < self.streams_num { + return RangeCombinerResult::Err(ArrowError::InvalidArgumentError(format!( + "Not all streams have been initialized, ranges_counter: {}, streams_num: {}", + self.ranges_counter, self.streams_num + ))); + } + if self.in_progress.len() == self.target_batch_size { + RangeCombinerResult::RecordBatch(self.build_record_batch()) + } else { + if !self.loser_tree_has_updated { + self.update(); + } + let winner = self.loser_tree[0]; + if let Some(mut range) = self.ranges[winner].take() { + self.loser_tree_has_updated = false; + if self.current_sort_key_range.match_row(&range) { + self.get_mut_current_sort_key_range().add_range_in_batch(&range); + } else { + self.in_progress.push(self.current_sort_key_range.clone()); + self.init_current_sort_key_range(); + self.get_mut_current_sort_key_range().add_range_in_batch(&range); + } + range.advance(); + RangeCombinerResult::Range(range) + } else { + if self.current_sort_key_range.is_empty() && self.in_progress.is_empty() { + RangeCombinerResult::None + } else { + if !self.current_sort_key_range.is_empty() { + self.in_progress.push(self.current_sort_key_range.clone()); + self.get_mut_current_sort_key_range().set_batch_range(None); + } + RangeCombinerResult::RecordBatch(self.build_record_batch()) + } + } + } + } + + // for full column merge, all columns of each row have same batch idx + // so we only need to build indices once and reuse them for all columns + fn build_record_batch_full_merge(&mut self) -> ArrowResult { + let capacity = self.in_progress.len(); + let mut interleave_idx = Vec::<(usize, usize)>::with_capacity(capacity); + interleave_idx.resize(capacity, (0, 0)); + let mut batch_idx_to_flatten_array_idx = + HashMap::>::with_capacity_and_hasher( + capacity * 2, + BuildNoHashHasher::default(), + ); + let mut array_count = 1usize; + let mut flatten_arrays: Vec> = Vec::with_capacity(self.schema.fields().len()); + flatten_arrays.resize(self.schema.fields().len(), vec![]); + for column_idx in 0..self.schema.fields().len() { + unsafe { + let flatten_array = flatten_arrays.get_unchecked_mut(column_idx); + flatten_array.reserve(capacity + 1); + flatten_array.push(self.const_null_array.get(self.schema.field(column_idx).data_type())); + } + } + for (idx, ranges) in self.in_progress.iter().enumerate() { + if let Some(range) = ranges.column(0) { + let batch_idx = range.batch_idx; + unsafe { + match batch_idx_to_flatten_array_idx.get(&batch_idx) { + Some(flatten_array_idx) => { + *interleave_idx.get_unchecked_mut(idx) = (*flatten_array_idx, range.row_idx) + } + None => { + batch_idx_to_flatten_array_idx.insert(batch_idx, array_count); + *interleave_idx.get_unchecked_mut(idx) = (array_count, range.row_idx); + array_count += 1; + // fill all column arrays for interleaving + for column_idx in 0..self.schema.fields().len() { + let flatten_array = flatten_arrays.get_unchecked_mut(column_idx); + flatten_array.push( + range.array_ref_by_col( + *self + .fields_map + .get_unchecked(range.stream_idx) + .get_unchecked(column_idx), + ), + ); + } + } + } + } + } + } + let columns = flatten_arrays + .iter() + .map(|array| { + interleave( + array.iter().map(|a| a.as_ref()).collect::>().as_slice(), + interleave_idx.as_slice(), + ) + }) + .collect::>>()?; + + self.in_progress.clear(); + + RecordBatch::try_new(self.schema.clone(), columns) + } + + fn build_record_batch(&mut self) -> ArrowResult { + if !self.is_partial_merge { + return self.build_record_batch_full_merge(); + } + // construct record batch by columnarly merging + let capacity = self.in_progress.len(); + let mut interleave_idx = Vec::<(usize, usize)>::with_capacity(capacity); + let mut batch_idx_to_flatten_array_idx = + HashMap::>::with_capacity_and_hasher( + capacity * 2, + BuildNoHashHasher::default(), + ); + // collect all array ranges of current column_idx for each row + let columns = self + .schema + .fields() + .iter() + .enumerate() + .map(|(column_idx, field)| { + let mut flatten_arrays = Vec::with_capacity(capacity + 1); + flatten_arrays.push(self.const_null_array.get_ref(field.data_type())); + batch_idx_to_flatten_array_idx.clear(); + interleave_idx.clear(); + interleave_idx.resize(capacity, (0, 0)); + + for (idx, range) in self.in_progress.iter().enumerate() { + unsafe { + let array = range.column(column_idx); + if let Some(array) = array { + match batch_idx_to_flatten_array_idx.get(&array.batch_idx) { + Some(flatten_array_idx) => { + *interleave_idx.get_unchecked_mut(idx) = (*flatten_array_idx, array.row_idx) + } + None => { + flatten_arrays.push(array.array_ref()); + batch_idx_to_flatten_array_idx.insert(array.batch_idx, flatten_arrays.len() - 1); + *interleave_idx.get_unchecked_mut(idx) = (flatten_arrays.len() - 1, array.row_idx); + } + } + } + } + } + + interleave(flatten_arrays.as_slice(), interleave_idx.as_slice()) + }) + .collect::>>()?; + + self.in_progress.clear(); + + RecordBatch::try_new(self.schema.clone(), columns) + } + + fn get_mut_current_sort_key_range(&mut self) -> &mut UseLastSortKeyBatchRanges { + Arc::make_mut(&mut self.current_sort_key_range) + } + + fn init_current_sort_key_range(&mut self) { + self.current_sort_key_range = Arc::new(UseLastSortKeyBatchRanges::new( + self.schema.clone(), + self.fields_map.clone(), + self.is_partial_merge, + )); + } + + /// Attempts to initialize the loser tree with one value from each + /// non exhausted input, if possible + fn init_loser_tree(&mut self) { + // Init loser tree + self.loser_tree = vec![usize::MAX; self.streams_num]; + for i in 0..self.streams_num { + let mut winner = i; + let mut cmp_node = self.loser_tree_leaf_node_index(i); + while cmp_node != 0 && self.loser_tree[cmp_node] != usize::MAX { + let challenger = self.loser_tree[cmp_node]; + match (&self.ranges[winner], &self.ranges[challenger]) { + // None means the stream is exhausted, always mark None as loser + (None, _) => self.update_winner(cmp_node, &mut winner, challenger), + (_, None) => (), + (Some(ac), Some(bc)) => { + if ac.cmp(bc).is_gt() { + self.update_winner(cmp_node, &mut winner, challenger); + } + } + } + + cmp_node = self.loser_tree_parent_node_index(cmp_node); + } + self.loser_tree[cmp_node] = winner; + } + self.loser_tree_has_updated = true; + } + + /// Find the leaf node index in the loser tree for the given cursor index + /// + /// Note that this is not necessarily a leaf node in the tree, but it can + /// also be a half-node (a node with only one child). This happens when the + /// number of cursors/streams is not a power of two. Thus, the loser tree + /// will be unbalanced, but it will still work correctly. + /// + /// For example, with 5 streams, the loser tree will look like this: + /// + /// ```text + /// 0 (winner) + /// + /// 1 + /// / \ + /// 2 3 + /// / \ / \ + /// 4 | | | + /// / \ | | | + /// -+---+--+---+---+---- Below is not a part of loser tree + /// S3 S4 S0 S1 S2 + /// ``` + /// + /// S0, S1, ... S4 are the streams (read: stream at index 0, stream at + /// index 1, etc.) + /// + /// Zooming in at node 2 in the loser tree as an example, we can see that + /// it takes as input the next item at (S0) and the loser of (S3, S4). + /// + #[inline] + fn loser_tree_leaf_node_index(&self, cursor_index: usize) -> usize { + (self.streams_num + cursor_index) / 2 + } + + /// Find the parent node index for the given node index + #[inline] + fn loser_tree_parent_node_index(&self, node_idx: usize) -> usize { + node_idx / 2 + } + + /// Updates the loser tree to reflect the new winner after the previous winner is consumed. + /// This function adjusts the tree by comparing the current winner with challengers from + /// other partitions. + /// + /// If `enable_round_robin_tie_breaker` is true and a tie occurs at the final level, the + /// tie-breaker logic will be applied to ensure fair selection among equal elements. + fn update_loser_tree(&mut self) { + let mut winner = self.loser_tree[0]; + let mut cmp_node = self.loser_tree_leaf_node_index(winner); + + while cmp_node != 0 { + let challenger = self.loser_tree[cmp_node]; + // None means the stream is exhausted, always mark None as loser + match (&self.ranges[winner], &self.ranges[challenger]) { + (None, _) => self.update_winner(cmp_node, &mut winner, challenger), + (_, None) => (), + (Some(ac), Some(bc)) => { + if ac.cmp(bc).is_gt() { + self.update_winner(cmp_node, &mut winner, challenger); + } + } + } + cmp_node = self.loser_tree_parent_node_index(cmp_node); + } + self.loser_tree[0] = winner; + self.loser_tree_has_updated = true; + } + + #[inline] + fn update_winner(&mut self, cmp_node: usize, winner: &mut usize, challenger: usize) { + self.loser_tree[cmp_node] = *winner; + *winner = challenger; + } +} diff --git a/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs b/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs index 95a872925..808bb20aa 100644 --- a/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs +++ b/rust/lakesoul-io/src/sorted_merge/sort_key_range.rs @@ -12,6 +12,8 @@ use arrow::{ record_batch::RecordBatch, row::{Row, Rows}, }; +use arrow_array::Array; +use arrow_cast::pretty::pretty_format_batches; use smallvec::{smallvec, SmallVec}; /// A range in one arrow::record_batch::RecordBatch with same sorted primary key @@ -68,6 +70,10 @@ impl SortKeyBatchRange { self.batch.schema() } + pub fn columns(&self) -> usize { + self.batch.num_columns() + } + pub(crate) fn current(&self) -> Row<'_> { self.rows.row(self.begin_row) } @@ -112,15 +118,28 @@ impl SortKeyBatchRange { array: self.batch.column(idx).clone(), } } + + pub fn array(&self, idx: usize) -> ArrayRef { + unsafe { self.batch.columns().get_unchecked(idx).clone() } + } + + pub fn batch(&self) -> Arc { + self.batch.clone() + } } impl Debug for SortKeyBatchRange { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("SortKeyBatchRange") - .field("begin_row", &self.begin_row) - .field("end_row", &self.end_row) - .field("batch", &self.batch) - .finish() + write!( + f, + "SortKeyBatchRange: \nbegin_row: {}, end_row: {}, stream_idx: {}, batch_idx: {}\n", + self.begin_row, self.end_row, self.stream_idx, self.batch_idx + )?; + write!( + f, + "batch: \n{}", + &pretty_format_batches(&[self.batch.slice(self.begin_row, self.end_row - self.begin_row)]).unwrap() + ) } } @@ -244,10 +263,138 @@ impl SortKeyBatchRanges { pub fn match_row(&self, range: &SortKeyBatchRange) -> bool { match &self.batch_range { + // return true if no current batch range + None => true, + Some(batch_range) => batch_range.current() == range.current(), + } + } +} + +// A range in one arrow::array::Array with same sorted primary key +#[derive(Debug)] +pub struct UseLastSortKeyArrayRange { + pub(crate) row_idx: usize, + pub(crate) batch_idx: usize, + pub(crate) batch: Arc, + pub(crate) column_idx: usize, + pub(crate) stream_idx: usize, +} + +impl UseLastSortKeyArrayRange { + pub fn array(&self) -> ArrayRef { + unsafe { self.batch.columns().get_unchecked(self.column_idx).clone() } + } + + pub fn array_ref(&self) -> &dyn Array { + unsafe { self.batch.columns().get_unchecked(self.column_idx).as_ref() } + } + + pub fn array_ref_by_col(&self, column_idx: usize) -> ArrayRef { + unsafe { self.batch.columns().get_unchecked(column_idx).clone() } + } +} + +impl Clone for UseLastSortKeyArrayRange { + fn clone(&self) -> Self { + UseLastSortKeyArrayRange { + row_idx: self.row_idx, + batch_idx: self.batch_idx, + batch: self.batch.clone(), + column_idx: self.column_idx, + stream_idx: self.stream_idx, + } + } +} + +#[derive(Debug, Clone)] +pub struct UseLastSortKeyBatchRanges { + // fields_index_map from source schemas to target schema which vector index = stream_idx + fields_map: Arc>>, + + current_batch_range: Option, + + // UseLastSortKeyArrayRange for each field of source schema + last_index_of_array: Vec>, + + is_partial_merge: bool, +} + +impl UseLastSortKeyBatchRanges { + pub fn new( + schema: SchemaRef, + fields_map: Arc>>, + is_partial_merge: bool, + ) -> UseLastSortKeyBatchRanges { + let last_index_of_array = if is_partial_merge { + vec![None; schema.fields().len()] + } else { + vec![None; 1] + }; + UseLastSortKeyBatchRanges { + fields_map, + current_batch_range: None, + last_index_of_array, + is_partial_merge, + } + } + + pub fn match_row(&self, range: &SortKeyBatchRange) -> bool { + match &self.current_batch_range { + // return true if no current batch range None => true, Some(batch_range) => batch_range.current() == range.current(), } } + + // insert one SortKeyBatchRange into UseLastSortKeyBatchRanges + pub fn add_range_in_batch(&mut self, range: &SortKeyBatchRange) { + if self.is_empty() { + self.set_batch_range(Some(range.clone())); + } + unsafe { + if self.is_partial_merge { + let range_col = self.fields_map.get_unchecked(range.stream_idx()); + for column_idx in 0..range.columns() { + let target_schema_idx = range_col.get_unchecked(column_idx); + *self.last_index_of_array.get_unchecked_mut(*target_schema_idx) = Some(UseLastSortKeyArrayRange { + row_idx: range.end_row - 1, + batch_idx: range.batch_idx, + batch: range.batch(), + column_idx, + stream_idx: range.stream_idx(), + }); + } + } else { + // full column merge. we just need to record batch idx of this row + *self.last_index_of_array.get_unchecked_mut(0) = Some(UseLastSortKeyArrayRange { + row_idx: range.end_row - 1, + batch_idx: range.batch_idx, + batch: range.batch(), + column_idx: 0, + stream_idx: range.stream_idx(), + }); + } + } + } + + pub fn is_empty(&self) -> bool { + self.current_batch_range.is_none() + } + + pub fn set_batch_range(&mut self, batch_range: Option) { + self.current_batch_range = batch_range + } + + pub fn column(&self, column_idx: usize) -> &Option { + unsafe { + if self.is_partial_merge { + &self.last_index_of_array.get_unchecked(column_idx) + } else { + &self.last_index_of_array.get_unchecked(0) + } + } + } } pub type SortKeyBatchRangesRef = Arc; +pub type UseLastSortKeyBatchRangesRef = Arc; diff --git a/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs b/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs index 52d3138cc..e099ee496 100644 --- a/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs +++ b/rust/lakesoul-io/src/sorted_merge/sorted_stream_merger.rs @@ -2,7 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::cmp::Reverse; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; @@ -147,6 +146,9 @@ impl SortedStreamMerger { .collect::>>()?; let fields_map = Arc::new(fields_map); + // this is a partial merge when any one of stream has columns less than target + let is_partial_merge = fields_map.iter().any(|f| f.len() != target_schema.fields().len()); + let wrappers: Vec> = streams.into_iter().map(|s| s.stream.fuse()).collect(); let combiner = RangeCombiner::new( @@ -155,6 +157,7 @@ impl SortedStreamMerger { fields_map, batch_size, merge_operator, + is_partial_merge, ); Ok(Self { @@ -211,7 +214,7 @@ impl SortedStreamMerger { self.range_finished[idx] = false; - self.range_combiner.push_range(Reverse(range)); + self.range_combiner.push_range(range); } else { empty_batch = true; } @@ -265,12 +268,12 @@ impl SortedStreamMerger { RangeCombinerResult::None => { return Poll::Ready(None); } - RangeCombinerResult::Range(Reverse(mut range)) => { + RangeCombinerResult::Range(range) => { let stream_idx = range.stream_idx(); - range.advance(); + // range.advance(); if !range.is_finished() { - self.range_combiner.push_range(Reverse(range)) + self.range_combiner.push_range(range) } else { // we should mark this stream uninitialized // since its polling may return pending @@ -479,8 +482,8 @@ mod tests { async fn create_stream(batches: Vec, context: Arc) -> Result { let schema = batches[0].schema(); - let exec = MemoryExec::try_new(&[batches], schema.clone(), None).unwrap(); - let stream = exec.execute(0, context.clone()).unwrap(); + let exec = MemoryExec::try_new(&[batches], schema.clone(), None)?; + let stream = exec.execute(0, context.clone())?; Ok(SortedStream::new(stream)) } @@ -911,17 +914,17 @@ mod tests { let conf = LakeSoulIOConfigBuilder::new() .with_primary_keys(vec!["uuid".to_string()]) .with_files(vec![ - "s3://lakesoul-test-bucket/datalake_table/part-00000-c2c3071b-e566-4b2f-a67c-6648c311b9f5_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-bde11c74-f264-40cc-aa71-be7d61c2ed78_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-84dd2e3f-3bce-4dd3-b612-81791cbc701f_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-5813797f-8d93-420f-af9f-75ebeb655af9_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-659b7074-3547-43cb-b858-3867624c0236_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-ab29b003-5438-4b19-ba9d-067c0bf82800_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-c6efa765-91cc-4393-a7ef-6a53f1f0d777_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-391c2a2d-7c8c-4193-9539-ec881e046a23_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-44b1bdd7-6501-4056-aa1b-1851a40192ac_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-2c7f8088-cf5b-4418-94f1-41aece595c6b_00000.c000.parquet".to_string(), - "s3://lakesoul-test-bucket/datalake_table/part-00000-4c90050a-6d97-423e-a90b-3385872a03a9_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-6cb26ff7-d7b5-4997-a5df-d6450b6f4eae_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-180d1486-f26e-4bf3-9816-5fae2f302f7b_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-6e5c2082-d0ff-4995-9eae-4ebae5587d2e_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-400e7944-1250-44ce-8781-8e7b39ec4ac9_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-c8968e46-2331-40dd-8279-923197ffd4a0_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-c06dff75-a09a-4c9b-b1ad-10fa522c9e40_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-9af5eaed-95ac-4276-bbe2-0db2ce1f9b88_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-4f2def3c-4cab-4fc5-b12c-e4e8aef6c723_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-e149a62e-cef3-42ef-a6b4-9253985e2584_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-4cd85802-f60e-450a-8e52-04e627f933fc_00000.c000.parquet".to_string(), + "/opt/spark/work-dir/result/table_bak_zstd/part--0001-1596e006-cd78-4d68-8c4c-88d0fff02e7b_00000.c000.parquet".to_string(), ]) .with_schema(Arc::new(schema)) .with_thread_num(2)