Skip to content

Commit

Permalink
cleanup code
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
zenghua committed Sep 2, 2024
1 parent 26e8889 commit b6a3edc
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 264 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -262,46 +262,18 @@ private Path assembleBucketPath(Path basePath, String bucketId) {
*/
private Path assembleNewPartPath() {
return bucketPath;
// if (DYNAMIC_BUCKET.equals(bucketId)) {
// return bucketPath;
// }
// long currentPartCounter = partCounter++;
// String count = String.format("%03d", currentPartCounter);
// String subTask = String.format("%05d", this.subTaskId);
// return new Path(
// assembleBucketPath(bucketPath, bucketId),
// outputFileConfig.getPartPrefix()
// + '-'
// + subTask
// + '-'
// + uniqueId
// + '_'
// + subTask
// + ".c"
// + count
// + outputFileConfig.getPartSuffix());
}

private void closePartFile() throws IOException {
if (inProgressPartWriter != null) {
long start = System.currentTimeMillis();
// if (inProgressPartWriter instanceof DynamicPartitionNativeParquetWriter) {
Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap =
// ((DynamicPartitionNativeParquetWriter) inProgressPartWriter).closeForCommitWithRecoverableMap();
((NativeParquetWriter) inProgressPartWriter).closeForCommitWithRecoverableMap();
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : pendingFileRecoverableMap.entrySet()) {
pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList())
.addAll(entry.getValue());
}
inProgressPartWriter = null;
// } else {
// InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable =
// inProgressPartWriter.closeForCommit();
// pendingFilesMap.computeIfAbsent(bucketId, bucketId -> new ArrayList()).add(pendingFileRecoverable);
// inProgressPartWriter = null;
// LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(),
// (System.currentTimeMillis() - start));
// }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.io.IOException;
import java.util.List;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET;

public class NativeBucketWriter implements BucketWriter<RowData, String> {

private final RowType rowType;
Expand All @@ -38,9 +36,6 @@ public NativeBucketWriter(RowType rowType, List<String> primaryKeys, List<String

@Override
public InProgressFileWriter<RowData, String> openNewInProgressFile(String bucketId, Path path, long creationTime) throws IOException {
// if (DYNAMIC_BUCKET.equals(bucketId)) {
// return new DynamicPartitionNativeParquetWriter(rowType, primaryKeys, partitionKeys, path, creationTime, conf);
// }
return new NativeParquetWriter(rowType, primaryKeys, partitionKeys, bucketId, path, creationTime, conf, subTaskId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,7 @@ public NativeParquetWriter(RowType rowType,
this.prefix = new Path(this.prefix, bucketID);
}
initNativeWriter();

// ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf));
// Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
// nativeWriter = new NativeIOWriter(arrowSchema);
// nativeWriter.setPrimaryKeys(primaryKeys);
// if (conf.getBoolean(LakeSoulSinkOptions.isMultiTableSource)) {
// nativeWriter.setAuxSortColumns(Collections.singletonList(SORT_FIELD));
// }
// nativeWriter.setRowGroupRowNumber(this.maxRowGroupRows);
// batch = VectorSchemaRoot.create(arrowSchema, nativeWriter.getAllocator());
// arrowWriter = ArrowUtils.createRowDataArrowWriter(batch, rowType);
// this.path = path.makeQualified(path.getFileSystem());
// nativeWriter.addFile(this.path.toUri().toString());
//
// FlinkUtil.setIOConfigs(conf, this.nativeWriter);
// this.nativeWriter.initializeWriter();

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ private void closePartFile() throws IOException {
pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList())
.addAll(entry.getValue());
}
inProgressPartWriter = null;
} else {
throw new RuntimeException(
"inProgressPartWriter only support instanceof NativeLakeSoulArrowWrapperWriter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public Map<String, List<PendingFileRecoverable>> closeForCommitWithRecoverableMa

try {
this.nativeWriter.close();
initNativeWriter();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.csair.dss.flink.entry;
package com.dmetasoul.lakesoul;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
Expand Down
5 changes: 1 addition & 4 deletions rust/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,12 @@ impl SyncSendableMutableLakeSoulWriter {
))
)
};
// let in_progress_writer = in_progress_writer.clone();
let mut guard = in_progress_writer.lock().await;

let batch_memory_size = record_batch.get_array_memory_size() as u64;
let batch_rows = record_batch.num_rows() as u64;
// If would exceed max_file_size, split batch
// let msg = format!("buffered size of current writer= {}, batch_size = {}, do_spill={}", guard.buffered_size(), batch_memory_size, do_spill);
// dbg!(max_file_size);
// dbg!(msg);

if !do_spill && guard.buffered_size() + batch_memory_size > max_file_size {
let to_write = (batch_rows * (max_file_size - guard.buffered_size())) / batch_memory_size;
if to_write + 1 < batch_rows {
Expand Down

0 comments on commit b6a3edc

Please sign in to comment.