From b6a3edc21a2a5157c406ced625cf7ffdab893cc7 Mon Sep 17 00:00:00 2001 From: zenghua Date: Mon, 2 Sep 2024 13:23:14 +0800 Subject: [PATCH] cleanup code Signed-off-by: zenghua --- .../DynamicPartitionNativeParquetWriter.java | 209 ------------------ .../sink/writer/LakeSoulWriterBucket.java | 28 --- .../sink/writer/NativeBucketWriter.java | 5 - .../sink/writer/NativeParquetWriter.java | 17 +- .../arrow/LakeSoulArrowWriterBucket.java | 1 + .../NativeLakeSoulArrowWrapperWriter.java | 1 - .../lakesoul}/SinkMemoryLeakTest.java | 2 +- rust/lakesoul-io/src/lakesoul_writer.rs | 5 +- 8 files changed, 4 insertions(+), 264 deletions(-) delete mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java rename lakesoul-flink/src/test/java/com/{csair/dss/flink/entry => dmetasoul/lakesoul}/SinkMemoryLeakTest.java (98%) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java deleted file mode 100644 index 352bb3226..000000000 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java +++ /dev/null @@ -1,209 +0,0 @@ -// SPDX-FileCopyrightText: 2023 LakeSoul Contributors -// -// SPDX-License-Identifier: Apache-2.0 - -package org.apache.flink.lakesoul.sink.writer; - -import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.lakesoul.tool.FlinkUtil; -import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; -import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.arrow.ArrowUtils; -import org.apache.flink.table.runtime.arrow.ArrowWriter; -import org.apache.flink.table.types.logical.RowType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.MAX_ROW_GROUP_SIZE; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SORT_FIELD; - -public class DynamicPartitionNativeParquetWriter implements InProgressFileWriter { - - private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionNativeParquetWriter.class); - - private final RowType rowType; - - private ArrowWriter arrowWriter; - private final List primaryKeys; - private final List rangeColumns; - private final Configuration conf; - - private NativeIOWriter nativeWriter; - - private final int maxRowGroupRows; - - private final long creationTime; - - private VectorSchemaRoot batch; - - private int rowsInBatch; - - long lastUpdateTime; - - String prefix; - - private long totalRows = 0; - - public DynamicPartitionNativeParquetWriter(RowType rowType, - List primaryKeys, - List rangeColumns, - Path path, - long creationTime, - Configuration conf) throws IOException { - this.maxRowGroupRows = conf.getInteger(MAX_ROW_GROUP_SIZE); - this.creationTime = creationTime; - this.rowsInBatch = 0; - this.rowType = rowType; - this.primaryKeys = primaryKeys; - this.rangeColumns = rangeColumns; - this.prefix = path.makeQualified(path.getFileSystem()).toString(); - this.conf = conf; - initNativeWriter(); - } - - private void initNativeWriter() throws IOException { - ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf)); - Schema arrowSchema = ArrowUtils.toArrowSchema(rowType); - nativeWriter = new NativeIOWriter(arrowSchema); - nativeWriter.setPrimaryKeys(primaryKeys); - nativeWriter.setRangePartitions(rangeColumns); - if (conf.getBoolean(LakeSoulSinkOptions.isMultiTableSource)) { - nativeWriter.setAuxSortColumns(Collections.singletonList(SORT_FIELD)); - } - nativeWriter.setHashBucketNum(conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM)); - - nativeWriter.setRowGroupRowNumber(this.maxRowGroupRows); - batch = VectorSchemaRoot.create(arrowSchema, nativeWriter.getAllocator()); - arrowWriter = ArrowUtils.createRowDataArrowWriter(batch, rowType); - - - nativeWriter.withPrefix(this.prefix); - nativeWriter.useDynamicPartition(true); - - FlinkUtil.setIOConfigs(conf, nativeWriter); - nativeWriter.initializeWriter(); - LOG.info("Initialized DynamicPartitionNativeParquetWriter: {}", this); - } - - @Override - public void write(RowData element, long currentTime) throws IOException { - this.lastUpdateTime = currentTime; - this.arrowWriter.write(element); - this.rowsInBatch++; - this.totalRows++; - if (this.rowsInBatch >= this.maxRowGroupRows) { - this.arrowWriter.finish(); - this.nativeWriter.write(this.batch); - // in native writer, batch may be kept in memory for sorting, - // so we have to release ownership in java - this.batch.clear(); - this.arrowWriter.reset(); - this.rowsInBatch = 0; - } - } - - @Override - public InProgressFileRecoverable persist() throws IOException { - // we currently do not support persist - return null; - } - - - @Override - public PendingFileRecoverable closeForCommit() throws IOException { - throw new UnsupportedOperationException(); - } - - public Map> closeForCommitWithRecoverableMap() throws IOException { - long timer = System.currentTimeMillis(); - this.arrowWriter.finish(); - Map> recoverableMap = new HashMap<>(); - if (this.batch.getRowCount() > 0) { - this.nativeWriter.write(this.batch); - HashMap> partitionDescAndFilesMap = this.nativeWriter.flush(); - for (Map.Entry> entry : partitionDescAndFilesMap.entrySet()) { - recoverableMap.put( - entry.getKey(), - entry.getValue() - .stream() - .map(path -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(path, - creationTime)) - .collect(Collectors.toList()) - ); - } - this.arrowWriter.reset(); - this.rowsInBatch = 0; - this.batch.clear(); - this.batch.close(); - try { - this.nativeWriter.close(); - initNativeWriter(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - LOG.info("CloseForCommitWithRecoverableMap done, costTime={}ms, recoverableMap={}", System.currentTimeMillis() - timer, recoverableMap); - return recoverableMap; - } - - @Override - public void dispose() { - try { - this.arrowWriter.finish(); - this.batch.close(); - this.nativeWriter.close(); - this.nativeWriter = null; - - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public String getBucketId() { - return DYNAMIC_BUCKET; - } - - @Override - public long getCreationTime() { - return this.creationTime; - } - - @Override - public long getSize() throws IOException { - return totalRows; - } - - @Override - public long getLastUpdateTime() { - return this.lastUpdateTime; - } - - @Override - public String toString() { - return "DynamicPartitionNativeParquetWriter{" + - "rowType=" + rowType + - ", primaryKeys=" + primaryKeys + - ", rangeColumns=" + rangeColumns + - ", maxRowGroupRows=" + maxRowGroupRows + - ", creationTime=" + creationTime + - ", rowsInBatch=" + rowsInBatch + - ", lastUpdateTime=" + lastUpdateTime + - ", prefix='" + prefix + '\'' + - ", totalRows=" + totalRows + - '}'; - } -} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index 953911d1a..778a2335a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -262,46 +262,18 @@ private Path assembleBucketPath(Path basePath, String bucketId) { */ private Path assembleNewPartPath() { return bucketPath; -// if (DYNAMIC_BUCKET.equals(bucketId)) { -// return bucketPath; -// } -// long currentPartCounter = partCounter++; -// String count = String.format("%03d", currentPartCounter); -// String subTask = String.format("%05d", this.subTaskId); -// return new Path( -// assembleBucketPath(bucketPath, bucketId), -// outputFileConfig.getPartPrefix() -// + '-' -// + subTask -// + '-' -// + uniqueId -// + '_' -// + subTask -// + ".c" -// + count -// + outputFileConfig.getPartSuffix()); } private void closePartFile() throws IOException { if (inProgressPartWriter != null) { long start = System.currentTimeMillis(); -// if (inProgressPartWriter instanceof DynamicPartitionNativeParquetWriter) { Map> pendingFileRecoverableMap = -// ((DynamicPartitionNativeParquetWriter) inProgressPartWriter).closeForCommitWithRecoverableMap(); ((NativeParquetWriter) inProgressPartWriter).closeForCommitWithRecoverableMap(); for (Map.Entry> entry : pendingFileRecoverableMap.entrySet()) { pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList()) .addAll(entry.getValue()); } inProgressPartWriter = null; -// } else { -// InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = -// inProgressPartWriter.closeForCommit(); -// pendingFilesMap.computeIfAbsent(bucketId, bucketId -> new ArrayList()).add(pendingFileRecoverable); -// inProgressPartWriter = null; -// LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(), -// (System.currentTimeMillis() - start)); -// } } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java index bfa4ba5fa..ae1acd461 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java @@ -16,8 +16,6 @@ import java.io.IOException; import java.util.List; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; - public class NativeBucketWriter implements BucketWriter { private final RowType rowType; @@ -38,9 +36,6 @@ public NativeBucketWriter(RowType rowType, List primaryKeys, List openNewInProgressFile(String bucketId, Path path, long creationTime) throws IOException { -// if (DYNAMIC_BUCKET.equals(bucketId)) { -// return new DynamicPartitionNativeParquetWriter(rowType, primaryKeys, partitionKeys, path, creationTime, conf); -// } return new NativeParquetWriter(rowType, primaryKeys, partitionKeys, bucketId, path, creationTime, conf, subTaskId); } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java index db4ed47b4..9f5728577 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java @@ -83,22 +83,7 @@ public NativeParquetWriter(RowType rowType, this.prefix = new Path(this.prefix, bucketID); } initNativeWriter(); - -// ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf)); -// Schema arrowSchema = ArrowUtils.toArrowSchema(rowType); -// nativeWriter = new NativeIOWriter(arrowSchema); -// nativeWriter.setPrimaryKeys(primaryKeys); -// if (conf.getBoolean(LakeSoulSinkOptions.isMultiTableSource)) { -// nativeWriter.setAuxSortColumns(Collections.singletonList(SORT_FIELD)); -// } -// nativeWriter.setRowGroupRowNumber(this.maxRowGroupRows); -// batch = VectorSchemaRoot.create(arrowSchema, nativeWriter.getAllocator()); -// arrowWriter = ArrowUtils.createRowDataArrowWriter(batch, rowType); -// this.path = path.makeQualified(path.getFileSystem()); -// nativeWriter.addFile(this.path.toUri().toString()); -// -// FlinkUtil.setIOConfigs(conf, this.nativeWriter); -// this.nativeWriter.initializeWriter(); + } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java index 10d50c553..5f90f195f 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/LakeSoulArrowWriterBucket.java @@ -272,6 +272,7 @@ private void closePartFile() throws IOException { pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList()) .addAll(entry.getValue()); } + inProgressPartWriter = null; } else { throw new RuntimeException( "inProgressPartWriter only support instanceof NativeLakeSoulArrowWrapperWriter"); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/NativeLakeSoulArrowWrapperWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/NativeLakeSoulArrowWrapperWriter.java index e6d10827b..34472045d 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/NativeLakeSoulArrowWrapperWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/arrow/NativeLakeSoulArrowWrapperWriter.java @@ -127,7 +127,6 @@ public Map> closeForCommitWithRecoverableMa try { this.nativeWriter.close(); - initNativeWriter(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/lakesoul-flink/src/test/java/com/csair/dss/flink/entry/SinkMemoryLeakTest.java b/lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/SinkMemoryLeakTest.java similarity index 98% rename from lakesoul-flink/src/test/java/com/csair/dss/flink/entry/SinkMemoryLeakTest.java rename to lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/SinkMemoryLeakTest.java index e797f2ed1..e0b4e0fd6 100644 --- a/lakesoul-flink/src/test/java/com/csair/dss/flink/entry/SinkMemoryLeakTest.java +++ b/lakesoul-flink/src/test/java/com/dmetasoul/lakesoul/SinkMemoryLeakTest.java @@ -1,4 +1,4 @@ -package com.csair.dss.flink.entry; +package com.dmetasoul.lakesoul; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index 804255834..9f989bd84 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -145,15 +145,12 @@ impl SyncSendableMutableLakeSoulWriter { )) ) }; - // let in_progress_writer = in_progress_writer.clone(); let mut guard = in_progress_writer.lock().await; let batch_memory_size = record_batch.get_array_memory_size() as u64; let batch_rows = record_batch.num_rows() as u64; // If would exceed max_file_size, split batch - // let msg = format!("buffered size of current writer= {}, batch_size = {}, do_spill={}", guard.buffered_size(), batch_memory_size, do_spill); - // dbg!(max_file_size); - // dbg!(msg); + if !do_spill && guard.buffered_size() + batch_memory_size > max_file_size { let to_write = (batch_rows * (max_file_size - guard.buffered_size())) / batch_memory_size; if to_write + 1 < batch_rows {