diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java index 0c26bd1d5..3c261b7a4 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java @@ -457,6 +457,7 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm int readPartitionVersion = 0; if (readPartition != null) { readPartitionVersion = readPartition.getVersion(); + } int newVersion = curVersion + 1; diff --git a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java index 64ddf59da..de286c832 100644 --- a/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java +++ b/lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/jnr/NativeMetadataJavaClient.java @@ -205,7 +205,7 @@ private void initialize() { final CompletableFuture future = new CompletableFuture<>(); tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client( new ReferencedBooleanCallback((bool, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { future.complete(bool); } else { System.err.println(msg); @@ -236,7 +236,7 @@ public JniWrapper executeQuery(Integer queryType, List params) { final CompletableFuture queryFuture = new CompletableFuture<>(); Pointer queryResult = getLibLakeSoulMetaData().execute_query( new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { queryFuture.complete(result); } else { queryFuture.completeExceptionally(new SQLException(msg)); @@ -262,7 +262,7 @@ public JniWrapper executeQuery(Integer queryType, List params) { final CompletableFuture importFuture = new CompletableFuture<>(); getLibLakeSoulMetaData().export_bytes_result( new ReferencedBooleanCallback((result, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { importFuture.complete(result); } else { importFuture.completeExceptionally(new SQLException(msg)); @@ -348,7 +348,7 @@ else if (bytes.length < mutableBuffer.size()) { getLibLakeSoulMetaData().execute_insert( new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { future.complete(result); } else { future.completeExceptionally(new SQLException(msg)); @@ -395,7 +395,7 @@ public Integer executeUpdate(Integer updateType, List params) { getLibLakeSoulMetaData().execute_update( new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { future.complete(result); } else { future.completeExceptionally(new SQLException(msg)); @@ -441,7 +441,7 @@ public List executeQueryScalar(Integer queryScalarType, List par getLibLakeSoulMetaData().execute_query_scalar( new ReferencedStringCallback((result, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { future.complete(result); } else { future.completeExceptionally(new SQLException(msg)); @@ -512,7 +512,7 @@ public static int cleanMeta() { try { instance.getLibLakeSoulMetaData().clean_meta_for_test( new ReferencedIntegerCallback((result, msg) -> { - if (msg.isEmpty()) { + if (msg == null || msg.isEmpty()) { future.complete(result); } else { future.completeExceptionally(new SQLException(msg)); @@ -600,21 +600,4 @@ public List createSplitDescArray(String tableName, String namespace) unlockReadLock(); } } - -// void filter_rel(io.substrait.proto.Expression e) { -// byte[] byteArray = e.toByteArray(); -// int length = byteArray.length; -// Pointer buffer = fixedBuffer; -// if (length < fixedBuffer.size()) -// fixedBuffer.put(0, byteArray, 0, length); -// else if (length < mutableBuffer.size()) { -// mutableBuffer.put(0, byteArray, 0, length); -// buffer = mutableBuffer; -// } else { -// mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(length); -// mutableBuffer.put(0, byteArray, 0, length); -// buffer = mutableBuffer; -// } -// getLibLakeSoulMetaData().call_rust(buffer.address(), length); -// } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java index f20d72a14..4b65aff9e 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTableSinkStreamBuilder.java @@ -19,10 +19,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.table.types.logical.RowType; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_CHECK_INTERVAL; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_PARALLELISM; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FILE_ROLLING_SIZE; -import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FILE_ROLLING_TIME; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*; public class LakeSoulMultiTableSinkStreamBuilder { @@ -56,6 +53,7 @@ public DataStream buildHashPartitionedCDCStream(DataStream buildLakeSoulDMLSink(DataStream stream) { + context.conf.set(DYNAMIC_BUCKETING, false); LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl( context.conf.getLong(FILE_ROLLING_SIZE), context.conf.getLong(FILE_ROLLING_TIME)); OutputFileConfig fileNameConfig = OutputFileConfig.builder() diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java index d0635368e..5fc74e51e 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/BulkFormatBuilder.java @@ -12,10 +12,7 @@ import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkCommitter; import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkGlobalCommitter; import org.apache.flink.lakesoul.sink.state.*; -import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter; -import org.apache.flink.lakesoul.sink.writer.DefaultLakeSoulWriterBucketFactory; -import org.apache.flink.lakesoul.sink.writer.LakeSoulWriterBucketFactory; -import org.apache.flink.lakesoul.sink.writer.NativeBucketWriter; +import org.apache.flink.lakesoul.sink.writer.*; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; @@ -102,14 +99,14 @@ public LakeSoulSinkCommitter createCommitter() throws IOException { public SimpleVersionedSerializer getWriterStateSerializer() throws IOException { return new LakeSoulWriterBucketStateSerializer( - NativeBucketWriter.NativePendingFileRecoverableSerializer.INSTANCE); + NativeParquetWriter.NativePendingFileRecoverableSerializer.INSTANCE); } @Override public SimpleVersionedSerializer getCommittableSerializer() throws IOException { return new LakeSoulSinkCommittableSerializer( - NativeBucketWriter.NativePendingFileRecoverableSerializer.INSTANCE); + NativeParquetWriter.NativePendingFileRecoverableSerializer.INSTANCE); } @Override diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java index d0a9e15da..e5d47d227 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java @@ -54,17 +54,13 @@ public List commit(List> entry : committable.getPendingFilesMap().entrySet()) { + List pendingFiles = entry.getValue(); // pending files to commit List files = new ArrayList<>(); for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : - committable.getPendingFiles()) { + pendingFiles) { if (pendingFileRecoverable instanceof NativeParquetWriter.NativeWriterPendingFileRecoverable) { NativeParquetWriter.NativeWriterPendingFileRecoverable recoverable = (NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFileRecoverable; @@ -92,9 +88,10 @@ public List commit(List readPartitionInfoList = null; + TableNameId tableNameId = lakeSoulDBManager.shortTableName(identity.tableId.table(), identity.tableId.schema()); if (identity.tableId.schema() == null){ diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java index 52b6f1e95..e78a28e85 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java @@ -116,7 +116,7 @@ public List commit( String dbType = this.conf.getString(SOURCE_DB_TYPE,""); for (Map.Entry, List> entry : - globalCommittable.getGroupedCommitables() + globalCommittable.getGroupedCommittable() .entrySet()) { TableSchemaIdentity identity = entry.getKey().f0; List lakeSoulMultiTableSinkCommittable = entry.getValue(); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkCommittable.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkCommittable.java index 55f75d248..087b458e5 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkCommittable.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkCommittable.java @@ -13,8 +13,9 @@ import javax.annotation.Nullable; import java.io.Serializable; -import java.util.List; -import java.util.UUID; +import java.util.*; + +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; /** * Wrapper class for both type of committables in {@link LakeSoulMultiTablesSink}. One committable might be either @@ -28,11 +29,12 @@ public class LakeSoulMultiTableSinkCommittable implements Serializable, Comparab private final String bucketId; + private final boolean dynamicBucketing; + private final TableSchemaIdentity identity; private String sourcePartitionInfo; - @Nullable - private List pendingFiles; + private final Map> pendingFilesMap; @Nullable private final String commitId; @@ -80,9 +82,36 @@ public LakeSoulMultiTableSinkCommittable( String dmlType, String sourcePartitionInfo ) { + this.dynamicBucketing = false; this.bucketId = bucketId; this.identity = identity; - this.pendingFiles = pendingFiles; + this.pendingFilesMap = new HashMap<>(); + this.pendingFilesMap.put(bucketId, pendingFiles); + this.creationTime = time; + this.commitId = commitId; + this.tsMs = tsMs; + this.dmlType = dmlType; + this.sourcePartitionInfo = sourcePartitionInfo; + } + + /** + * Constructor for {@link org.apache.flink.lakesoul.sink.state.LakeSoulSinkCommittableSerializer} to + * restore commitable states + */ + public LakeSoulMultiTableSinkCommittable( + TableSchemaIdentity identity, + Map> pendingFilesMap, + long time, + @Nullable String commitId, + long tsMs, + String dmlType, + String sourcePartitionInfo + ) { + Preconditions.checkNotNull(pendingFilesMap); + this.dynamicBucketing = pendingFilesMap.keySet().size() != 1; + this.bucketId = this.dynamicBucketing ? DYNAMIC_BUCKET : pendingFilesMap.keySet().stream().findFirst().get(); + this.identity = identity; + this.pendingFilesMap = pendingFilesMap; this.creationTime = time; this.commitId = commitId; this.tsMs = tsMs; @@ -90,17 +119,43 @@ public LakeSoulMultiTableSinkCommittable( this.sourcePartitionInfo = sourcePartitionInfo; } + public long getTsMs() { return tsMs; } public boolean hasPendingFile() { - return pendingFiles != null; + if (dynamicBucketing) { + return !pendingFilesMap.isEmpty(); + } else { + return hasPendingFile(bucketId); + } + } + + public boolean hasPendingFile(String bucketId) { + return pendingFilesMap.containsKey(bucketId); } @Nullable public List getPendingFiles() { - return pendingFiles; + if (dynamicBucketing) { + List summary = new ArrayList<>(); + for (List list : pendingFilesMap.values()) { + summary.addAll(list); + } + return summary; + } else { + return getPendingFiles(bucketId); + } + } + + @Nullable + public List getPendingFiles(String bucketId) { + return pendingFilesMap.get(bucketId); + } + + public Map> getPendingFilesMap() { + return pendingFilesMap; } @Override @@ -125,9 +180,12 @@ public String toString() { return "LakeSoulMultiTableSinkCommittable{" + "creationTime=" + creationTime + ", bucketId='" + bucketId + '\'' + + ", dynamicBucketing=" + dynamicBucketing + ", identity=" + identity + + ", pendingFilesMap=" + pendingFilesMap + ", commitId='" + commitId + '\'' + - ", pendingFiles='" + pendingFiles + '\'' + + ", tsMs=" + tsMs + + ", dmlType='" + dmlType + '\'' + '}'; } @@ -138,12 +196,15 @@ public String getCommitId() { public void merge(LakeSoulMultiTableSinkCommittable committable) { Preconditions.checkState(identity.equals(committable.getIdentity())); - Preconditions.checkState(bucketId.equals(committable.getBucketId())); Preconditions.checkState(creationTime == committable.getCreationTime()); - if (hasPendingFile()) { - if (committable.hasPendingFile()) pendingFiles.addAll(committable.getPendingFiles()); - } else { - if (committable.hasPendingFile()) pendingFiles = committable.getPendingFiles(); + + for (Map.Entry> entry : committable.getPendingFilesMap().entrySet()) { + String bucketId = entry.getKey(); + if (hasPendingFile(bucketId)) { + if (!entry.getValue().isEmpty()) pendingFilesMap.get(bucketId).addAll(entry.getValue()); + } else { + if (!entry.getValue().isEmpty()) pendingFilesMap.put(bucketId, entry.getValue()); + } } mergeSourcePartitionInfo(committable); } @@ -167,6 +228,8 @@ private void mergeSourcePartitionInfo(LakeSoulMultiTableSinkCommittable committa throw new RuntimeException(e); } } + Preconditions.checkState(bucketId.equals(committable.getBucketId())); + } public String getDmlType() { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java index 936f5747f..6ea9d9cce 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulMultiTableSinkGlobalCommittable.java @@ -23,7 +23,7 @@ public class LakeSoulMultiTableSinkGlobalCommittable implements Serializable { static final long serialVersionUID = 42L; - private final Map, List> groupedCommitables; + private final Map, List> groupedCommittable; public LakeSoulMultiTableSinkGlobalCommittable( Map, List> groupedCommitables) { @@ -45,14 +45,14 @@ public LakeSoulMultiTableSinkGlobalCommittable( groupedCommitables.put(key, mergedCommittables); }); - this.groupedCommitables = groupedCommitables; + this.groupedCommittable = groupedCommitables; } public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkGlobalCommittable( List globalCommittables) { Map, List> groupedCommitables = new HashMap<>(); - globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommitables().forEach( + globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach( (key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value))); return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables); } @@ -68,12 +68,12 @@ public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSink } - public Map, List> getGroupedCommitables() { - return groupedCommitables; + public Map, List> getGroupedCommittable() { + return groupedCommittable; } @Override public String toString() { - return "LakeSoulMultiTableSinkGlobalCommittable{" + "groupedCommitables=" + groupedCommitables + '}'; + return "LakeSoulMultiTableSinkGlobalCommittable{" + "groupedCommitables=" + groupedCommittable + '}'; } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java index c846855b6..9204f6ea7 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkCommittableSerializer.java @@ -11,12 +11,15 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.lakesoul.sink.writer.NativeBucketWriter; +import org.apache.flink.lakesoul.sink.writer.NativeParquetWriter; import org.apache.flink.lakesoul.types.TableSchemaIdentity; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -26,7 +29,7 @@ public class LakeSoulSinkCommittableSerializer implements SimpleVersionedSerializer { - public static final LakeSoulSinkCommittableSerializer INSTANCE = new LakeSoulSinkCommittableSerializer(NativeBucketWriter.NativePendingFileRecoverableSerializer.INSTANCE); + public static final LakeSoulSinkCommittableSerializer INSTANCE = new LakeSoulSinkCommittableSerializer(NativeParquetWriter.NativePendingFileRecoverableSerializer.INSTANCE); private static final int MAGIC_NUMBER = 0x1e765c80; private final SimpleVersionedSerializer @@ -68,17 +71,21 @@ public LakeSoulMultiTableSinkCommittable deserialize(int version, byte[] seriali private void serializeV1(LakeSoulMultiTableSinkCommittable committable, DataOutputView dataOutputView) throws IOException { - if (committable.hasPendingFile()) { - assert committable.getPendingFiles() != null; + if (!committable.getPendingFilesMap().isEmpty()) { + assert committable.getPendingFilesMap() != null; assert committable.getCommitId() != null; dataOutputView.writeBoolean(true); - dataOutputView.writeInt(committable.getPendingFiles().size()); - for (InProgressFileWriter.PendingFileRecoverable pennding : - committable.getPendingFiles()) { - SimpleVersionedSerialization.writeVersionAndSerialize( - pendingFileSerializer, pennding, dataOutputView); + dataOutputView.writeInt(committable.getPendingFilesMap().entrySet().size()); + for (Map.Entry> entry : committable.getPendingFilesMap().entrySet()) { + dataOutputView.writeUTF(entry.getKey()); + dataOutputView.writeInt(entry.getValue().size()); + for (InProgressFileWriter.PendingFileRecoverable pendingFile : entry.getValue()) { + SimpleVersionedSerialization.writeVersionAndSerialize( + pendingFileSerializer, pendingFile, dataOutputView); + } } + dataOutputView.writeLong(committable.getCreationTime()); dataOutputView.writeUTF(committable.getCommitId()); dataOutputView.writeLong(committable.getTsMs()); @@ -90,11 +97,11 @@ private void serializeV1(LakeSoulMultiTableSinkCommittable committable, DataOutp SimpleVersionedSerialization.writeVersionAndSerialize( tableSchemaIdentitySerializer, committable.getIdentity(), dataOutputView); - dataOutputView.writeUTF(committable.getBucketId()); +// dataOutputView.writeUTF(committable.getBucketId()); } private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputView) throws IOException { - List pendingFile = null; + Map> pendingFileMap = new HashMap<>(); String commitId = null; long time = Long.MIN_VALUE; long dataTsMs = Long.MAX_VALUE; @@ -103,11 +110,16 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV if (dataInputView.readBoolean()) { int size = dataInputView.readInt(); if (size > 0) { - pendingFile = new ArrayList<>(); for (int i = 0; i < size; ++i) { - pendingFile.add( - SimpleVersionedSerialization.readVersionAndDeSerialize( - pendingFileSerializer, dataInputView)); + String bucketId = dataInputView.readUTF(); + int fileNum = dataInputView.readInt(); + List pendingFiles = new ArrayList<>(); + for (int j = 0; j < fileNum; j++) { + pendingFiles.add( + SimpleVersionedSerialization.readVersionAndDeSerialize( + pendingFileSerializer, dataInputView)); + } + pendingFileMap.put(bucketId, pendingFiles); } time = dataInputView.readLong(); commitId = dataInputView.readUTF(); @@ -119,10 +131,10 @@ private LakeSoulMultiTableSinkCommittable deserializeV1(DataInputView dataInputV TableSchemaIdentity identity = SimpleVersionedSerialization.readVersionAndDeSerialize( tableSchemaIdentitySerializer, dataInputView); - String bucketId = dataInputView.readUTF(); +// String bucketId = dataInputView.readUTF(); return new LakeSoulMultiTableSinkCommittable( - bucketId, identity, pendingFile, time, commitId, dataTsMs, dmlType, sourcePartitionInfo); + identity, pendingFileMap, time, commitId, dataTsMs, dmlType, sourcePartitionInfo); } private static void validateMagicNumber(DataInputView in) throws IOException { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkGlobalCommittableSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkGlobalCommittableSerializer.java index b823fc4f1..7985ec64c 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkGlobalCommittableSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulSinkGlobalCommittableSerializer.java @@ -75,13 +75,13 @@ public LakeSoulMultiTableSinkGlobalCommittable deserialize(int version, private void serializeV1(LakeSoulMultiTableSinkGlobalCommittable globalCommittable, DataOutputView dataOutputView) throws IOException { - Map, List> groupedCommitables = - globalCommittable.getGroupedCommitables(); - assert groupedCommitables != null; - List commitables = - groupedCommitables.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); - dataOutputView.writeInt(commitables.size()); - for (LakeSoulMultiTableSinkCommittable committable : commitables) { + Map, List> groupedCommittable = + globalCommittable.getGroupedCommittable(); + assert groupedCommittable != null; + List committableList = + groupedCommittable.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + dataOutputView.writeInt(committableList.size()); + for (LakeSoulMultiTableSinkCommittable committable : committableList) { SimpleVersionedSerialization.writeVersionAndSerialize(committableSerializer, committable, dataOutputView); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java index 96a0ce11d..2d3a128bf 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketState.java @@ -9,31 +9,56 @@ import org.apache.flink.lakesoul.types.TableSchemaIdentity; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; -/** States for {@link LakeSoulWriterBucket}. */ +/** + * States for {@link LakeSoulWriterBucket}. + */ public class LakeSoulWriterBucketState { private final TableSchemaIdentity identity; private final String bucketId; - /** The directory where all the part files of the bucket are stored. */ + /** + * The directory where all the part files of the bucket are stored. + */ private final Path bucketPath; - private final List pendingFileRecoverableList; + private final Map> pendingFileRecoverableMap; public LakeSoulWriterBucketState( TableSchemaIdentity identity, String bucketId, Path bucketPath, List pendingFileRecoverableList - ) { + ) { this.identity = identity; this.bucketId = bucketId; this.bucketPath = bucketPath; - this.pendingFileRecoverableList = pendingFileRecoverableList; + this.pendingFileRecoverableMap = new HashMap<>(); + this.pendingFileRecoverableMap.put(bucketId, pendingFileRecoverableList); + } + + public LakeSoulWriterBucketState( + TableSchemaIdentity identity, + Path bucketPath, + HashMap> pendingFileRecoverableMap + ) { + this.identity = identity; + Optional>> first = pendingFileRecoverableMap.entrySet().stream().findFirst(); + if (first.isPresent()) { + this.bucketId = first.get().getKey(); + } else { + this.bucketId = ""; + } + this.bucketPath = bucketPath; + + this.pendingFileRecoverableMap = pendingFileRecoverableMap; } public String getBucketId() { @@ -53,8 +78,8 @@ public String toString() { bucketPath + " and identity=" + identity + - " and pendingFiles=" + - pendingFileRecoverableList.stream().map(Object::toString).collect(Collectors.joining("; ")) + " and pendingFilesMap=" + + pendingFileRecoverableMap.entrySet().stream().map(Object::toString).collect(Collectors.joining("; ")) ; } @@ -63,6 +88,10 @@ public TableSchemaIdentity getIdentity() { } public List getPendingFileRecoverableList() { - return pendingFileRecoverableList; + return pendingFileRecoverableMap.values().stream().findFirst().get(); + } + + public Map> getPendingFileRecoverableMap() { + return pendingFileRecoverableMap; } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java index c26f4676e..8fcbc440a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/state/LakeSoulWriterBucketStateSerializer.java @@ -17,7 +17,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,19 +66,25 @@ public LakeSoulWriterBucketState deserialize(int version, byte[] serialized) thr private void serialize(LakeSoulWriterBucketState state, DataOutputView dataOutputView) throws IOException { - dataOutputView.writeUTF(state.getBucketId()); +// dataOutputView.writeUTF(state.getBucketId()); dataOutputView.writeUTF(state.getBucketPath().toString()); SimpleVersionedSerialization.writeVersionAndSerialize( tableSchemaIdentitySerializer, state.getIdentity(), dataOutputView); - dataOutputView.writeInt(state.getPendingFileRecoverableList().size()); - for (int i = 0; i < state.getPendingFileRecoverableList().size(); ++i) { - SimpleVersionedSerialization.writeVersionAndSerialize( - pendingFileRecoverableSimpleVersionedSerializer, state.getPendingFileRecoverableList().get(i), - dataOutputView - ); + + dataOutputView.writeInt(state.getPendingFileRecoverableMap().entrySet().size()); + for (Map.Entry> entry : state.getPendingFileRecoverableMap().entrySet()) { + dataOutputView.writeUTF(entry.getKey()); + dataOutputView.writeInt(entry.getValue().size()); + for (int i = 0; i < state.getPendingFileRecoverableMap().size(); ++i) { + SimpleVersionedSerialization.writeVersionAndSerialize( + pendingFileRecoverableSimpleVersionedSerializer, entry.getValue().get(i), + dataOutputView + ); + } } + } private LakeSoulWriterBucketState deserialize(DataInputView in) throws IOException { @@ -93,22 +101,29 @@ private LakeSoulWriterBucketState internalDeserialize( pendingFileDeser) throws IOException { - String bucketId = dataInputView.readUTF(); +// String bucketId = dataInputView.readUTF(); String bucketPathStr = dataInputView.readUTF(); TableSchemaIdentity identity = SimpleVersionedSerialization.readVersionAndDeSerialize( tableSchemaIdentitySerializer, dataInputView); - int pendingFileNum = dataInputView.readInt(); - List pendingFileRecoverableList = new ArrayList<>(); - for (int i = 0; i < pendingFileNum; ++i) { - pendingFileRecoverableList.add(pendingFileDeser.apply(dataInputView)); + int mapEntryNum = dataInputView.readInt(); + HashMap> pendingFileRecoverableMap = new HashMap<>(); + for (int i = 0; i < mapEntryNum; i++) { + String bucketId = dataInputView.readUTF(); + + int pendingFileNum = dataInputView.readInt(); + List pendingFileRecoverableList = new ArrayList<>(); + for (int j = 0; j < pendingFileNum; ++j) { + pendingFileRecoverableList.add(pendingFileDeser.apply(dataInputView)); + } + pendingFileRecoverableMap.put(bucketId, pendingFileRecoverableList); } return new LakeSoulWriterBucketState( - identity, bucketId, + identity, new Path(bucketPathStr), - pendingFileRecoverableList); + pendingFileRecoverableMap); } private void validateMagicNumber(DataInputView in) throws IOException { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java index 17255c3a4..2d9709b62 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/AbstractLakeSoulMultiTableSinkWriter.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.util.*; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKETING; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -163,9 +165,14 @@ public void write(IN element, Context context) throws IOException { TableSchemaIdentity identity = schemaAndRowData.f0; RowData rowData = schemaAndRowData.f1; TableSchemaWriterCreator creator = getOrCreateTableSchemaWriterCreator(identity); - final String bucketId = creator.bucketAssigner.getBucketId(rowData, bucketerContext); - final LakeSoulWriterBucket bucket = getOrCreateBucketForBucketId(identity, bucketId, creator); - bucket.write(rowData, processingTimeService.getCurrentProcessingTime(), dataDmlTsMs); + if (conf.get(DYNAMIC_BUCKETING)) { + final LakeSoulWriterBucket bucket = getOrCreateBucketForBucketId(identity, DYNAMIC_BUCKET, creator); + bucket.write(rowData, processingTimeService.getCurrentProcessingTime(), dataDmlTsMs); + } else { + final String bucketId = creator.bucketAssigner.getBucketId(rowData, bucketerContext); + final LakeSoulWriterBucket bucket = getOrCreateBucketForBucketId(identity, bucketId, creator); + bucket.write(rowData, processingTimeService.getCurrentProcessingTime(), dataDmlTsMs); + } recordsOutCounter.inc(); } } @@ -212,7 +219,7 @@ private LakeSoulWriterBucket getOrCreateBucketForBucketId( TableSchemaWriterCreator creator) throws IOException { LakeSoulWriterBucket bucket = activeBuckets.get(Tuple2.of(identity, bucketId)); if (bucket == null) { - final Path bucketPath = assembleBucketPath(creator.tableLocation, bucketId); + final Path bucketPath = creator.tableLocation; BucketWriter bucketWriter = creator.createBucketWriter(); bucket = bucketFactory.getNewBucket( diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java new file mode 100644 index 000000000..1b86853c4 --- /dev/null +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/DynamicPartitionNativeParquetWriter.java @@ -0,0 +1,198 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.flink.lakesoul.sink.writer; + +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.lakesoul.tool.FlinkUtil; +import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.ArrowUtils; +import org.apache.flink.table.runtime.arrow.ArrowWriter; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; + +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SORT_FIELD; + +public class DynamicPartitionNativeParquetWriter implements InProgressFileWriter { + + private final RowType rowType; + + private ArrowWriter arrowWriter; + private final List primaryKeys; + private final List rangeColumns; + private final Configuration conf; + + private NativeIOWriter nativeWriter; + + private final int batchSize; + + private final long creationTime; + + private VectorSchemaRoot batch; + + private int rowsInBatch; + + long lastUpdateTime; + + String prefix; + + private long totalRows = 0; + + public DynamicPartitionNativeParquetWriter(RowType rowType, + List primaryKeys, + List rangeColumns, + Path path, + long creationTime, + Configuration conf) throws IOException { + this.batchSize = 250000; // keep same with native writer's row group row number + this.creationTime = creationTime; + this.rowsInBatch = 0; + this.rowType = rowType; + this.primaryKeys = primaryKeys; + this.rangeColumns = rangeColumns; + this.prefix = path.makeQualified(path.getFileSystem()).toString(); + this.conf = conf; + initNativeWriter(); + } + + private void initNativeWriter() throws IOException { + ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf)); + Schema arrowSchema = ArrowUtils.toArrowSchema(rowType); + nativeWriter = new NativeIOWriter(arrowSchema); + nativeWriter.setPrimaryKeys(primaryKeys); + nativeWriter.setRangePartitions(rangeColumns); + if (conf.getBoolean(LakeSoulSinkOptions.isMultiTableSource)) { + nativeWriter.setAuxSortColumns(Collections.singletonList(SORT_FIELD)); + } + nativeWriter.setHashBucketNum(conf.getInteger(LakeSoulSinkOptions.HASH_BUCKET_NUM)); + + nativeWriter.setRowGroupRowNumber(this.batchSize); + batch = VectorSchemaRoot.create(arrowSchema, nativeWriter.getAllocator()); + arrowWriter = ArrowUtils.createRowDataArrowWriter(batch, rowType); + + + nativeWriter.withPrefix(this.prefix); + nativeWriter.useDynamicPartition(true); + + FlinkUtil.setFSConfigs(conf, nativeWriter); + nativeWriter.initializeWriter(); + } + + @Override + public void write(RowData element, long currentTime) throws IOException { + this.lastUpdateTime = currentTime; + this.arrowWriter.write(element); + this.rowsInBatch++; + this.totalRows++; + if (this.rowsInBatch >= this.batchSize) { + this.arrowWriter.finish(); + this.nativeWriter.write(this.batch); + // in native writer, batch may be kept in memory for sorting, + // so we have to release ownership in java + this.batch.clear(); + this.arrowWriter.reset(); + this.rowsInBatch = 0; + } + } + + @Override + public InProgressFileRecoverable persist() throws IOException { + // we currently do not support persist + return null; + } + + + @Override + public PendingFileRecoverable closeForCommit() throws IOException { + this.arrowWriter.finish(); + this.nativeWriter.write(this.batch); + HashMap> partitionDescAndFilesMap = this.nativeWriter.flush(); + this.arrowWriter.reset(); + this.rowsInBatch = 0; + this.batch.clear(); + this.batch.close(); + try { + this.nativeWriter.close(); + initNativeWriter(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new NativeParquetWriter.NativeWriterPendingFileRecoverable(this.prefix, this.creationTime); + } + + public Map> closeForCommitWithRecoverableMap() throws IOException { + this.arrowWriter.finish(); + Map> recoverableMap = new HashMap<>(); + if (this.batch.getRowCount() > 0) { + this.nativeWriter.write(this.batch); + HashMap> partitionDescAndFilesMap = this.nativeWriter.flush(); +// System.out.println(partitionDescAndFilesMap); + for (Map.Entry> entry : partitionDescAndFilesMap.entrySet()) { + recoverableMap.put( + entry.getKey(), + entry.getValue() + .stream() + .map(path -> new NativeParquetWriter.NativeWriterPendingFileRecoverable(path, creationTime)) + .collect(Collectors.toList()) + ); + } + this.arrowWriter.reset(); + this.rowsInBatch = 0; + this.batch.clear(); + this.batch.close(); + try { + this.nativeWriter.close(); + initNativeWriter(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return recoverableMap; + } + + @Override + public void dispose() { + try { + this.arrowWriter.finish(); + this.batch.close(); + this.nativeWriter.close(); + this.nativeWriter = null; + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public String getBucketId() { + return DYNAMIC_BUCKET; + } + + @Override + public long getCreationTime() { + return this.creationTime; + } + + @Override + public long getSize() throws IOException { + return totalRows; + } + + @Override + public long getLastUpdateTime() { + return this.lastUpdateTime; + } +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java index 976b25e9e..9bc3c90b0 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulRowDataOneTableSinkWriter.java @@ -63,7 +63,8 @@ public LakeSoulRowDataOneTableSinkWriter( conf); } - @Override protected TableSchemaWriterCreator getOrCreateTableSchemaWriterCreator(TableSchemaIdentity identity) { + @Override + protected TableSchemaWriterCreator getOrCreateTableSchemaWriterCreator(TableSchemaIdentity identity) { return this.creator; } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java index 63484794b..8042d7753 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterBucket.java @@ -16,11 +16,9 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.UUID; +import java.util.*; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -52,8 +50,11 @@ public class LakeSoulWriterBucket { private long tsMs; - private final List pendingFiles = - new ArrayList<>(); +// private final List pendingFiles = +// new ArrayList<>(); + + private final Map> pendingFilesMap = + new HashMap<>(); private long partCounter; @@ -107,7 +108,10 @@ private LakeSoulWriterBucket( } private void restoreState(LakeSoulWriterBucketState state) throws IOException { - pendingFiles.addAll(state.getPendingFileRecoverableList()); + for (Map.Entry> entry : state.getPendingFileRecoverableMap().entrySet()) { + pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue()); + } +// pendingFiles.addAll(state.getPendingFileRecoverableList()); } public String getBucketId() { @@ -123,24 +127,29 @@ public long getPartCounter() { } public boolean isActive() { - return inProgressPartWriter != null || !pendingFiles.isEmpty(); + return inProgressPartWriter != null || !pendingFilesMap.isEmpty(); } void merge(final LakeSoulWriterBucket bucket) throws IOException { checkNotNull(bucket); - checkState(Objects.equals(bucket.bucketPath, bucketPath)); + +// checkState(Objects.equals(bucket.bucketPath, bucketPath)); +// pendingFiles.addAll(bucket.pendingFiles); + bucket.closePartFile(); - pendingFiles.addAll(bucket.pendingFiles); + for (Map.Entry> entry : bucket.pendingFilesMap.entrySet()) { + pendingFilesMap.computeIfAbsent(entry.getKey(), key -> new ArrayList<>()).addAll(entry.getValue()); + } - LOG.info("Merging buckets for bucket id={}", bucketId); + LOG.info("Merging buckets for bucket id={}", getBucketId()); } void write(RowData element, long currentTime, long tsMs) throws IOException { if (inProgressPartWriter == null || rollingPolicy.shouldRollOnEvent(inProgressPartWriter, element)) { LOG.info( "Opening new part file for bucket id={} at {}.", - bucketId, + getBucketId(), tsMs); inProgressPartWriter = rollPartFile(currentTime); this.tsMs = tsMs; @@ -154,25 +163,31 @@ List prepareCommit(boolean flush, String dmlT // since the native parquet writer doesn't support resume if (inProgressPartWriter != null) { LOG.info( - "Closing in-progress part file for bucket id={} on checkpoint.", bucketId); + "Closing in-progress part file for bucket id={} on checkpoint.", getBucketId()); closePartFile(); } List committables = new ArrayList<>(); - long time = pendingFiles.isEmpty() ? Long.MIN_VALUE : - ((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFiles.get(0)).creationTime; + long time = pendingFilesMap.isEmpty() ? Long.MIN_VALUE : + ((NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFilesMap.values().stream().findFirst().get().get(0)).creationTime; // this.pendingFiles would be cleared later, we need to make a copy - List tmpPending = new ArrayList<>(pendingFiles); +// List tmpPending = new ArrayList<>(pendingFiles); +// committables.add(new LakeSoulMultiTableSinkCommittable( +// getBucketId(), +// tmpPending, +// time, tableId, tsMs, dmlType)); committables.add(new LakeSoulMultiTableSinkCommittable( - bucketId, - tmpPending, - time, +// getBucketId(), tableId, + new HashMap<>(pendingFilesMap), + time, + UUID.randomUUID().toString(), tsMs, dmlType, - sourcePartitionInfo)); - pendingFiles.clear(); + sourcePartitionInfo + )); + pendingFilesMap.clear(); return committables; } @@ -183,12 +198,13 @@ LakeSoulWriterBucketState snapshotState() throws IOException { } // this.pendingFiles would be cleared later, we need to make a copy - List tmpPending = new ArrayList<>(pendingFiles); - return new LakeSoulWriterBucketState( - tableId, - bucketId, - bucketPath, - tmpPending); +// List tmpPending = new ArrayList<>(pendingFiles); +// return new LakeSoulWriterBucketState( +// tableId, +// getBucketId(), +// bucketPath, +// tmpPending); + return new LakeSoulWriterBucketState(tableId, bucketPath, new HashMap<>(pendingFilesMap)); } void onProcessingTime(long timestamp) throws IOException { @@ -198,7 +214,7 @@ void onProcessingTime(long timestamp) throws IOException { "Bucket {} closing in-progress part file for part file id={} due to processing time rolling " + "policy " + "(in-progress file created @ {}, last updated @ {} and current time is {}).", - bucketId, + getBucketId(), uniqueId, inProgressPartWriter.getCreationTime(), inProgressPartWriter.getLastUpdateTime(), @@ -216,20 +232,30 @@ private InProgressFileWriter rollPartFile(long currentTime) thr LOG.info( "Opening new part file \"{}\" for bucket id={}.", partFilePath.getName(), - bucketId); + getBucketId()); + + return bucketWriter.openNewInProgressFile(getBucketId(), partFilePath, currentTime); + } - return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime); + private Path assembleBucketPath(Path basePath, String bucketId) { + if ("".equals(bucketId)) { + return basePath; + } + return new Path(basePath, bucketId); } /** * Constructor a new PartPath and increment the partCounter. */ private Path assembleNewPartPath() { + if (DYNAMIC_BUCKET.equals(bucketId)) { + return bucketPath; + } long currentPartCounter = partCounter++; String count = String.format("%03d", currentPartCounter); String subTask = String.format("%05d", this.subTaskId); return new Path( - bucketPath, + assembleBucketPath(bucketPath, bucketId), outputFileConfig.getPartPrefix() + '-' + subTask @@ -245,12 +271,21 @@ private Path assembleNewPartPath() { private void closePartFile() throws IOException { if (inProgressPartWriter != null) { long start = System.currentTimeMillis(); - InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = - inProgressPartWriter.closeForCommit(); - pendingFiles.add(pendingFileRecoverable); - inProgressPartWriter = null; - LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(), - (System.currentTimeMillis() - start)); + if (inProgressPartWriter instanceof DynamicPartitionNativeParquetWriter) { + Map> pendingFileRecoverableMap = + ((DynamicPartitionNativeParquetWriter) inProgressPartWriter).closeForCommitWithRecoverableMap(); + for (Map.Entry> entry : pendingFileRecoverableMap.entrySet()) { + pendingFilesMap.computeIfAbsent(entry.getKey(), bucketId -> new ArrayList()).addAll(entry.getValue()); + } + } else { + InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = + inProgressPartWriter.closeForCommit(); +// pendingFiles.add(pendingFileRecoverable); + pendingFilesMap.computeIfAbsent(bucketId, bucketId -> new ArrayList()).add(pendingFileRecoverable); + inProgressPartWriter = null; + LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(), + (System.currentTimeMillis() - start)); + } } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java new file mode 100644 index 000000000..a6f49ccfa --- /dev/null +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/LakeSoulWriterDynamicBucketFactory.java @@ -0,0 +1,50 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.flink.lakesoul.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; +import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState; +import org.apache.flink.lakesoul.types.TableSchemaIdentity; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; +import org.apache.flink.table.data.RowData; + +import java.io.IOException; + +/** + * A factory returning {@link AbstractLakeSoulMultiTableSinkWriter writer}. + */ +@Internal +public class LakeSoulWriterDynamicBucketFactory implements LakeSoulWriterBucketFactory { + + @Override + public LakeSoulWriterBucket getNewBucket( + int subTaskId, + TableSchemaIdentity tableId, + String bucketId, + Path bucketPath, + BucketWriter bucketWriter, + RollingPolicy rollingPolicy, + OutputFileConfig outputFileConfig) { + return LakeSoulWriterBucket.getNew( + subTaskId, tableId, + bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig); + } + + @Override + public LakeSoulWriterBucket restoreBucket( + int subTaskId, + TableSchemaIdentity tableId, + BucketWriter bucketWriter, + RollingPolicy rollingPolicy, + LakeSoulWriterBucketState bucketState, + OutputFileConfig outputFileConfig) + throws IOException { + return LakeSoulWriterBucket.restore(subTaskId, tableId, bucketWriter, + rollingPolicy, bucketState, outputFileConfig); + } +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java index 770a45f0a..d3d8648b5 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeBucketWriter.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.List; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET; + public class NativeBucketWriter implements BucketWriter { private final RowType rowType; @@ -25,16 +27,21 @@ public class NativeBucketWriter implements BucketWriter { private final List primaryKeys; private final Configuration conf; + private final List partitionKeys; - public NativeBucketWriter(RowType rowType, List primaryKeys, Configuration conf) { + public NativeBucketWriter(RowType rowType, List primaryKeys, List partitionKeys, Configuration conf) { this.rowType = rowType; this.primaryKeys = primaryKeys; + this.partitionKeys = partitionKeys; this.conf = conf; } @Override - public InProgressFileWriter openNewInProgressFile(String s, Path path, long creationTime) throws IOException { - return new NativeParquetWriter(rowType, primaryKeys, s, path, creationTime, conf); + public InProgressFileWriter openNewInProgressFile(String bucketId, Path path, long creationTime) throws IOException { + if (DYNAMIC_BUCKET.equals(bucketId)) { + return new DynamicPartitionNativeParquetWriter(rowType, primaryKeys, partitionKeys, path, creationTime, conf); + } + return new NativeParquetWriter(rowType, primaryKeys, bucketId, path, creationTime, conf); } @Override @@ -49,9 +56,9 @@ public InProgressFileWriter resumeInProgressFileFrom( public WriterProperties getProperties() { return new WriterProperties( UnsupportedInProgressFileRecoverableSerializable.INSTANCE, - NativePendingFileRecoverableSerializer.INSTANCE, + NativeParquetWriter.NativePendingFileRecoverableSerializer.INSTANCE, false - ); + ); } @Override @@ -90,37 +97,5 @@ public InProgressFileWriter.InProgressFileRecoverable deserialize(int version, b } } - public static class NativePendingFileRecoverableSerializer - implements SimpleVersionedSerializer { - - public static final NativePendingFileRecoverableSerializer INSTANCE = - new NativePendingFileRecoverableSerializer(); - - @Override - public int getVersion() { - return 0; - } - @Override - public byte[] serialize(InProgressFileWriter.PendingFileRecoverable obj) throws IOException { - if (!(obj instanceof NativeParquetWriter.NativeWriterPendingFileRecoverable)) { - throw new UnsupportedOperationException( - "Only NativeParquetWriter.NativeWriterPendingFileRecoverable is supported."); - } - DataOutputSerializer out = new DataOutputSerializer(256); - NativeParquetWriter.NativeWriterPendingFileRecoverable recoverable = - (NativeParquetWriter.NativeWriterPendingFileRecoverable) obj; - out.writeUTF(recoverable.path); - out.writeLong(recoverable.creationTime); - return out.getCopyOfBuffer(); - } - - @Override - public InProgressFileWriter.PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException { - DataInputDeserializer in = new DataInputDeserializer(serialized); - String path = in.readUTF(); - long time = in.readLong(); - return new NativeParquetWriter.NativeWriterPendingFileRecoverable(path, time); - } - } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java index e6566bf34..4525656be 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/NativeParquetWriter.java @@ -9,6 +9,9 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.lakesoul.tool.FlinkUtil; import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; @@ -99,6 +102,40 @@ public InProgressFileRecoverable persist() throws IOException { return null; } + public static class NativePendingFileRecoverableSerializer + implements SimpleVersionedSerializer { + + public static final NativePendingFileRecoverableSerializer INSTANCE = + new NativePendingFileRecoverableSerializer(); + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(InProgressFileWriter.PendingFileRecoverable obj) throws IOException { + if (!(obj instanceof NativeParquetWriter.NativeWriterPendingFileRecoverable)) { + throw new UnsupportedOperationException( + "Only NativeParquetWriter.NativeWriterPendingFileRecoverable is supported."); + } + DataOutputSerializer out = new DataOutputSerializer(256); + NativeParquetWriter.NativeWriterPendingFileRecoverable recoverable = + (NativeParquetWriter.NativeWriterPendingFileRecoverable) obj; + out.writeUTF(recoverable.path); + out.writeLong(recoverable.creationTime); + return out.getCopyOfBuffer(); + } + + @Override + public InProgressFileWriter.PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + String path = in.readUTF(); + long time = in.readLong(); + return new NativeParquetWriter.NativeWriterPendingFileRecoverable(path, time); + } + } + static public class NativeWriterPendingFileRecoverable implements PendingFileRecoverable, Serializable { public String path; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/TableSchemaWriterCreator.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/TableSchemaWriterCreator.java index b18565c56..bd1c889bf 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/TableSchemaWriterCreator.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/writer/TableSchemaWriterCreator.java @@ -84,7 +84,7 @@ public static TableSchemaWriterCreator create( public BucketWriter createBucketWriter() throws IOException { if (NativeIOBase.isNativeIOLibExist()) { LOG.info("Create natvie bucket writer"); - return new NativeBucketWriter(this.identity.rowType, this.primaryKeys, this.conf); + return new NativeBucketWriter(this.identity.rowType, this.primaryKeys, this.partitionKeyList, this.conf); } else { String msg = "Cannot load lakesoul native writer"; LOG.error(msg); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java index e71fc0618..d1cdccf58 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulDynamicTableFactory.java @@ -37,6 +37,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { options.addAll((Configuration) FactoryUtil.createTableFactoryHelper(this, context).getOptions()); FlinkUtil.setLocalTimeZone(options, FlinkUtil.getLocalTimeZone(((TableConfig) context.getConfiguration()).getConfiguration())); + FlinkUtil.setS3Options(options, + ((TableConfig) context.getConfiguration()).getConfiguration()); + ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); ResolvedCatalogTable catalogTable = context.getCatalogTable(); @@ -77,6 +80,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { options.addAll((Configuration) FactoryUtil.createTableFactoryHelper(this, context).getOptions()); FlinkUtil.setLocalTimeZone(options, FlinkUtil.getLocalTimeZone(((TableConfig) context.getConfiguration()).getConfiguration())); + FlinkUtil.setS3Options(options, + ((TableConfig) context.getConfiguration()).getConfiguration()); ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); ResolvedCatalogTable catalogTable = context.getCatalogTable(); TableSchema schema = catalogTable.getSchema(); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java index 4d00807a8..683bc5cb8 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/FlinkUtil.java @@ -48,6 +48,7 @@ import java.util.stream.IntStream; import static java.time.ZoneId.SHORT_IDS; +import static org.apache.flink.lakesoul.tool.JobOptions.*; import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType; @@ -318,6 +319,9 @@ public static void setFSConfigs(Configuration conf, NativeIOBase io) { } catch (Exception e) { // ignore } + if (conf.containsKey(DEFAULT_FS.key())) { + setFSConf(conf, DEFAULT_FS.key(), DEFAULT_FS.key(), io); + } // try hadoop's s3 configs setFSConf(conf, "fs.s3a.access.key", "fs.s3a.access.key", io); @@ -326,11 +330,12 @@ public static void setFSConfigs(Configuration conf, NativeIOBase io) { setFSConf(conf, "fs.s3a.endpoint.region", "fs.s3a.endpoint.region", io); setFSConf(conf, "fs.s3a.path.style.access", "fs.s3a.path.style.access", io); // try flink's s3 credential configs - setFSConf(conf, "s3.access-key", "fs.s3a.access.key", io); - setFSConf(conf, "s3.secret-key", "fs.s3a.secret.key", io); - setFSConf(conf, "s3.endpoint", "fs.s3a.endpoint", io); + setFSConf(conf, S3_ACCESS_KEY.key(), "fs.s3a.access.key", io); + setFSConf(conf, S3_SECRET_KEY.key(), "fs.s3a.secret.key", io); + setFSConf(conf, S3_ENDPOINT.key(), "fs.s3a.endpoint", io); setFSConf(conf, "s3.endpoint.region", "fs.s3a.endpoint.region", io); - setFSConf(conf, "s3.path.style.access", "fs.s3a.path.style.access", io); + setFSConf(conf, S3_PATH_STYLE_ACCESS.key(), "fs.s3a.path.style.access", io); + setFSConf(conf, S3_BUCKET.key(), "fs.s3a.bucket", io); } public static void setFSConf(Configuration conf, String confKey, String fsConfKey, NativeIOBase io) { @@ -469,6 +474,27 @@ public static void setLocalTimeZone(Configuration options, ZoneId localTimeZone) options.setString(TableConfigOptions.LOCAL_TIME_ZONE, localTimeZone.toString()); } + public static void setS3Options(Configuration dstConf, Configuration srcConf) { + if (srcConf.contains(S3_ACCESS_KEY)) { + dstConf.set(S3_ACCESS_KEY, srcConf.get(S3_ACCESS_KEY)); + } + if (srcConf.contains(S3_SECRET_KEY)) { + dstConf.set(S3_SECRET_KEY, srcConf.get(S3_SECRET_KEY)); + } + if (srcConf.contains(S3_ENDPOINT)) { + dstConf.set(S3_ENDPOINT, srcConf.get(S3_ENDPOINT)); + } + if (srcConf.contains(S3_BUCKET)) { + dstConf.set(S3_BUCKET, srcConf.get(S3_BUCKET)); + } + if (srcConf.contains(S3_PATH_STYLE_ACCESS)) { + dstConf.set(S3_PATH_STYLE_ACCESS, srcConf.get(S3_PATH_STYLE_ACCESS)); + } + if (srcConf.contains(DEFAULT_FS)) { + dstConf.set(DEFAULT_FS, srcConf.get(DEFAULT_FS)); + } + } + public static JSONObject getPropertiesFromConfiguration(Configuration conf) { Map map = new HashMap<>(); map.put(USE_CDC.key(), String.valueOf(conf.getBoolean(USE_CDC))); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java index 3fbe03743..a86d3b980 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java @@ -47,40 +47,77 @@ public class JobOptions { public static final ConfigOption STREAMING_SOURCE_ENABLE = key("streaming-source.enable") - .booleanType() - .defaultValue(false) - .withDescription( - Description.builder() - .text("Enable streaming source or not.") - .linebreak() - .text( - " NOTES: Please make sure that each partition/file should be written" - + " atomically, otherwise the reader may get incomplete data.") - .build()); + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text("Enable streaming source or not.") + .linebreak() + .text( + " NOTES: Please make sure that each partition/file should be written" + + " atomically, otherwise the reader may get incomplete data.") + .build()); public static final ConfigOption STREAMING_SOURCE_PARTITION_INCLUDE = key("streaming-source.partition.include") - .stringType() - .defaultValue("all") - .withDescription( - Description.builder() - .text( - "Option to set the partitions to read, supported values are") - .list( - text("all (read all partitions)"), - text( - "latest (read latest partition in order of 'streaming-source.partition.order', this only works when a streaming Hive source table is used as a temporal table)")) - .build()); + .stringType() + .defaultValue("all") + .withDescription( + Description.builder() + .text( + "Option to set the partitions to read, supported values are") + .list( + text("all (read all partitions)"), + text( + "latest (read latest partition in order of 'streaming-source.partition.order', this only works when a streaming Hive source table is used as a temporal table)")) + .build()); public static final ConfigOption STREAMING_SOURCE_LATEST_PARTITION_NUMBER = key("streaming-source.latest.partition.number") - .intType() - .defaultValue(1) - .withDescription("Option to set the latest partition number to read. It is only valid when STREAMING_SOURCE_PARTITION_INCLUDE is 'latest'."); + .intType() + .defaultValue(1) + .withDescription("Option to set the latest partition number to read. It is only valid when STREAMING_SOURCE_PARTITION_INCLUDE is 'latest'."); public static final ConfigOption PARTITION_ORDER_KEYS = key("partition.order.keys") .stringType() .noDefaultValue() .withDescription("Option to set partition order keys (e.g. partition1,partition2) to sort multiple partitions. Using all partitions to sort if this value is not set."); + + public static final ConfigOption S3_ACCESS_KEY = + key("s3.access-key") + .stringType() + .noDefaultValue() + .withDescription("Option to set aws s3 access key"); + + public static final ConfigOption S3_SECRET_KEY = + key("s3.secret-key") + .stringType() + .noDefaultValue() + .withDescription("Option to set aws s3 secret key"); + + public static final ConfigOption S3_ENDPOINT = + key("s3.endpoint") + .stringType() + .noDefaultValue() + .withDescription("Option to set aws s3 endpoint"); + + public static final ConfigOption S3_PATH_STYLE_ACCESS = + key("s3.path.style.access") + .stringType() + .defaultValue("false") + .withDescription("Option to set use S3_PATH_STYLE_ACCESS or not"); + + public static final ConfigOption S3_BUCKET = + key("s3.bucket") + .stringType() + .noDefaultValue() + .withDescription("Option to set s3 bucket"); + + public static final ConfigOption DEFAULT_FS = + key("fs.defaultFS") + .stringType() + .defaultValue("file:///") + .withDescription("Option to set fs default scheme"); + } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java index 0c448d6dd..c405e547a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java @@ -22,6 +22,8 @@ public class LakeSoulSinkOptions { public static final String FILE_OPTION_ADD = "add"; + public static final String DYNAMIC_BUCKET = "DynamicBucket"; + public static final String CDC_CHANGE_COLUMN = "lakesoul_cdc_change_column"; public static final String CDC_CHANGE_COLUMN_DEFAULT = "rowKinds"; @@ -183,6 +185,13 @@ public class LakeSoulSinkOptions { .defaultValue("decoderbufs") .withDescription("The name of the Postgres logical decoding plug-in installed on the server."); + + public static final ConfigOption DYNAMIC_BUCKETING = ConfigOptions + .key("lakesoul.sink.dynamic_bucketing") + .booleanType() + .defaultValue(true) + .withDescription("If true, lakesoul sink use dynamic bucketing writer"); + } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java index 235782cdf..f615dcc3f 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecordSerializer.java @@ -77,14 +77,16 @@ public BinarySourceRecordSerializer() { }); } - @Override public void write(Kryo kryo, Output output, BinarySourceRecord object) { + @Override + public void write(Kryo kryo, Output output, BinarySourceRecord object) { fury.execute(f -> { f.serializeJavaObject(output, object); return 0; }); } - @Override public BinarySourceRecord read(Kryo kryo, Input input, Class type) { + @Override + public BinarySourceRecord read(Kryo kryo, Input input, Class type) { return fury.execute(f -> f.deserializeJavaObject(input, BinarySourceRecord.class)); } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/TableSchemaIdentity.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/TableSchemaIdentity.java index def95491c..a8be479c8 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/TableSchemaIdentity.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/TableSchemaIdentity.java @@ -51,7 +51,8 @@ public int hashCode() { return Objects.hash(tableId, rowType); } - @Override public String toString() { + @Override + public String toString() { return "TableSchemaIdentity{" + "tableId=" + tableId + ", rowType=" + rowType + diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java index 8879b65ef..f5a2e3636 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/AbstractTestBase.java @@ -5,6 +5,7 @@ package org.apache.flink.lakesoul.test; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.client.JobStatusMessage; @@ -18,14 +19,36 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.time.Duration; +import static org.apache.flink.lakesoul.tool.JobOptions.*; + public abstract class AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.test.util.AbstractTestBase.class); private static final int DEFAULT_PARALLELISM = 16; + // disable LOCAL_FS for local minio test + public static final boolean LOCAL_FS = true; + + public static final Configuration fsConfig; + + static { + fsConfig = new org.apache.flink.configuration.Configuration(); + if (!LOCAL_FS) { + fsConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + fsConfig.set(S3_ENDPOINT, "http://localhost:9002"); + fsConfig.set(S3_ACCESS_KEY, "minioadmin1"); + fsConfig.set(S3_SECRET_KEY, "minioadmin1"); + fsConfig.set(S3_PATH_STYLE_ACCESS, "true"); + fsConfig.set(DEFAULT_FS, "s3://"); + fsConfig.set(S3_BUCKET, "lakesoul-test-s3"); + FileSystem.initialize(fsConfig, null); + } + } + private static Configuration getConfig() { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); @@ -45,7 +68,8 @@ private static Configuration getConfig() { .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) .build()); - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @After public final void cleanupRunningJobs() throws Exception { @@ -75,6 +99,15 @@ public static String getTempDirUri(String path) { Path tmpPath = new Path(tmp, path); File tmpDirFile = new File(tmpPath.toString()); tmpDirFile.deleteOnExit(); - return tmpPath.makeQualified(LocalFileSystem.getSharedInstance()).toUri().toString(); + if (LOCAL_FS) { + return tmpPath.makeQualified(LocalFileSystem.getSharedInstance()).toUri().toString(); + } else { + try { + return tmpPath.makeQualified(FileSystem.get(new Path("s3://lakesoul-test-s3").toUri())).toUri().toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } } diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java index eb97c5846..a32a8b25c 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/fail/LakeSoulSinkFailTest.java @@ -182,14 +182,14 @@ public void testLakeSoulSinkFailOnCheckpointing() throws IOException { Tuple3 tuple3 = parameters.get(testName); ResolvedSchema resolvedSchema = tuple3.f0; - indexBound = 30; + indexBound = (20 + new Random().nextInt(20)); List expectedData = IntStream.range(0, indexBound).boxed().map(i -> resolvedSchema.getColumns().stream() .map(col -> generateExpectedDataWithIndexByDatatype(i, col)) .collect(Collectors.joining(", ", "+I[", "]"))).collect(Collectors.toList()); MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(1000, 4000)); testLakeSoulSink(resolvedSchema, tuple3.f2, tuple3.f1, tempFolder.newFolder(testName).getAbsolutePath(), - 60 * 1000); + (30 + new Random().nextInt(30)) * 1000); List actualData = CollectionUtil.iteratorToList(batchEnv.executeSql("SELECT * FROM test_sink").collect()).stream() @@ -206,14 +206,14 @@ public void testLakeSoulSinkFailOnCollectFinished() throws IOException { Tuple3 tuple3 = parameters.get(testName); ResolvedSchema resolvedSchema = tuple3.f0; - indexBound = 30; + indexBound = (20 + new Random().nextInt(20)); List expectedData = IntStream.range(0, indexBound).boxed().map(i -> resolvedSchema.getColumns().stream() .map(col -> generateExpectedDataWithIndexByDatatype(i, col)) .collect(Collectors.joining(", ", "+I[", "]"))).collect(Collectors.toList()); MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(1000, 4000)); testLakeSoulSink(resolvedSchema, tuple3.f2, tuple3.f1, tempFolder.newFolder(testName).getAbsolutePath(), - 60 * 1000); + (30 + new Random().nextInt(30)) * 1000); List actualData = CollectionUtil.iteratorToList(batchEnv.executeSql("SELECT * FROM test_sink").collect()).stream() @@ -230,14 +230,14 @@ public void testLakeSoulSinkFailOnAssignSplitFinished() throws IOException { Tuple3 tuple3 = parameters.get(testName); ResolvedSchema resolvedSchema = tuple3.f0; - indexBound = 30; + indexBound = (20 + new Random().nextInt(20)); List expectedData = IntStream.range(0, indexBound).boxed().map(i -> resolvedSchema.getColumns().stream() .map(col -> generateExpectedDataWithIndexByDatatype(i, col)) .collect(Collectors.joining(", ", "+I[", "]"))).collect(Collectors.toList()); MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(1000, 4000)); testLakeSoulSink(resolvedSchema, tuple3.f2, tuple3.f1, tempFolder.newFolder(testName).getAbsolutePath(), - 60 * 1000); + (30 + new Random().nextInt(30)) * 1000); List actualData = CollectionUtil.iteratorToList(batchEnv.executeSql("SELECT * FROM test_sink").collect()).stream() @@ -254,14 +254,14 @@ public void testLakeSoulSinkFailOnBeforeAssignSplit() throws IOException { Tuple3 tuple3 = parameters.get(testName); ResolvedSchema resolvedSchema = tuple3.f0; - indexBound = 30; + indexBound = (20 + new Random().nextInt(20)); List expectedData = IntStream.range(0, indexBound).boxed().map(i -> resolvedSchema.getColumns().stream() .map(col -> generateExpectedDataWithIndexByDatatype(i, col)) .collect(Collectors.joining(", ", "+I[", "]"))).collect(Collectors.toList()); MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(1000, 4000)); testLakeSoulSink(resolvedSchema, tuple3.f2, tuple3.f1, tempFolder.newFolder(testName).getAbsolutePath(), - 60 * 1000); + (30 + new Random().nextInt(30)) * 1000); List actualData = CollectionUtil.iteratorToList(batchEnv.executeSql("SELECT * FROM test_sink").collect()).stream() @@ -278,14 +278,14 @@ public void testLakeSoulSinkStopPostgresOnCheckpointing() throws IOException { Tuple3 tuple3 = parameters.get(testName); ResolvedSchema resolvedSchema = tuple3.f0; - indexBound = 40; + indexBound = (30 + new Random().nextInt(20)); List expectedData = IntStream.range(0, indexBound).boxed().map(i -> resolvedSchema.getColumns().stream() .map(col -> generateExpectedDataWithIndexByDatatype(i, col)) .collect(Collectors.joining(", ", "+I[", "]"))).collect(Collectors.toList()); - MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(5000, 4000)); + MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(5000, 15000)); testLakeSoulSink(resolvedSchema, tuple3.f2, tuple3.f1, tempFolder.newFolder(testName).getAbsolutePath(), - 60 * 1000); + (30 + new Random().nextInt(30)) * 1000); List actualData = CollectionUtil.iteratorToList(batchEnv.executeSql("SELECT * FROM test_sink").collect()).stream() @@ -302,14 +302,14 @@ public void testLakeSoulSinkWithoutPkStopPostgresOnCheckpointing() throws IOExce Tuple3 tuple3 = parameters.get(testName); ResolvedSchema resolvedSchema = tuple3.f0; - indexBound = 40; + indexBound = (30 + new Random().nextInt(20)); List expectedData = IntStream.range(0, indexBound).boxed().map(i -> resolvedSchema.getColumns().stream() .map(col -> generateExpectedDataWithIndexByDatatype(i, col)) .collect(Collectors.joining(", ", "+I[", "]"))).collect(Collectors.toList()); - MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(5000, 4000)); + MockTableSource.FAIL_OPTION = Optional.of(Tuple2.of(5000, 14000)); testLakeSoulSink(resolvedSchema, tuple3.f2, tuple3.f1, tempFolder.newFolder(testName).getAbsolutePath(), - 60 * 1000); + (30 + new Random().nextInt(30)) * 1000); List actualData = CollectionUtil.iteratorToList(batchEnv.executeSql("SELECT * FROM test_sink").collect()).stream() diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java index 0cb404259..13f2730ab 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/TestUtils.java @@ -8,6 +8,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.lakesoul.metadata.LakeSoulCatalog; import org.apache.flink.lakesoul.test.AbstractTestBase; +import org.apache.flink.lakesoul.tool.FlinkUtil; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -26,6 +27,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.apache.flink.lakesoul.test.AbstractTestBase.fsConfig; import static org.assertj.core.api.Assertions.assertThat; public class TestUtils { @@ -36,9 +38,11 @@ public class TestUtils { public static TableEnvironment createTableEnv(String mode) { TableEnvironment createTableEnv; if (mode.equals(BATCH_TYPE)) { - createTableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + createTableEnv = TableEnvironment.create( + EnvironmentSettings.newInstance().withConfiguration(fsConfig).inBatchMode().build() + ); } else { - Configuration config = new Configuration(); + Configuration config = new Configuration(fsConfig); config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); @@ -69,6 +73,7 @@ public static StreamTableEnvironment createStreamTableEnv(String envType) { env.setRuntimeMode(RuntimeExecutionMode.BATCH); } tEnvs = StreamTableEnvironment.create(env); + FlinkUtil.setS3Options(tEnvs.getConfig().getConfiguration(), fsConfig); Catalog lakesoulCatalog = new LakeSoulCatalog(); tEnvs.registerCatalog("lakeSoul", lakesoulCatalog); tEnvs.useCatalog("lakeSoul"); @@ -129,6 +134,7 @@ public static void createLakeSoulSourceViewUser(TableEnvironment tEnvs) tEnvs.executeSql("DROP view if exists user_info_view"); tEnvs.executeSql(createViewSql); } + public static void createLakeSoulSourceMultiPartitionTable(TableEnvironment tEnvs) throws ExecutionException, InterruptedException { String createSql = "create table user_multi (" + " `id` INT," + " name STRING," + " score INT," + diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java index 3a7c226a0..f2005251b 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java @@ -38,6 +38,10 @@ public class NativeIOBase implements AutoCloseable { protected CDataDictionaryProvider provider; + protected Pointer fixedBuffer = null; + + protected Pointer mutableBuffer = null; + public static boolean isNativeIOLibExist() { return JnrLoader.get() != null; } @@ -52,6 +56,10 @@ public NativeIOBase(String allocatorName) { intReferenceManager = Runtime.getRuntime(libLakeSoulIO).newObjectReferenceManager(); ioConfigBuilder = libLakeSoulIO.new_lakesoul_io_config_builder(); tokioRuntimeBuilder = libLakeSoulIO.new_tokio_runtime_builder(); + + fixedBuffer = Runtime.getRuntime(libLakeSoulIO).getMemoryManager().allocateDirect(5000L); + mutableBuffer = Runtime.getRuntime(libLakeSoulIO).getMemoryManager().allocateDirect(1 << 12); + setBatchSize(10240); setThreadNum(2); libLakeSoulIO.rust_logger_init(); @@ -65,6 +73,10 @@ public void addFile(String file) { ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_file(ioConfigBuilder, file); } + public void withPrefix(String prefix) { + ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_with_prefix(ioConfigBuilder, prefix); + } + public void addColumn(String column) { assert ioConfigBuilder != null; ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_column(ioConfigBuilder, column); @@ -76,6 +88,12 @@ public void setPrimaryKeys(Iterable primaryKeys) { } } + public void setRangePartitions(Iterable rangePartitions) { + for (String col : rangePartitions) { + ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_range_partition(ioConfigBuilder, col); + } + } + public void setSchema(Schema schema) { assert ioConfigBuilder != null; ArrowSchema ffiSchema = ArrowSchema.allocateNew(allocator); @@ -91,6 +109,11 @@ public void setThreadNum(int threadNum) { ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_thread_num(ioConfigBuilder, threadNum); } + public void useDynamicPartition(boolean enable) { + assert ioConfigBuilder != null; + ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_dynamic_partition(ioConfigBuilder, enable); + } + public void setBatchSize(int batchSize) { assert ioConfigBuilder != null; ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_batch_size(ioConfigBuilder, batchSize); diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java index 38eeb8055..ef0857c6c 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOWriter.java @@ -5,6 +5,7 @@ package com.dmetasoul.lakesoul.lakesoul.io; import jnr.ffi.Pointer; +import jnr.ffi.Runtime; import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; @@ -13,6 +14,10 @@ import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; public class NativeIOWriter extends NativeIOBase implements AutoCloseable { @@ -31,6 +36,11 @@ public void setAuxSortColumns(Iterable auxSortColumns) { } } + public void setHashBucketNum(Integer hashBucketNum) { + ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_hash_bucket_num(ioConfigBuilder, hashBucketNum); + } + + public void setRowGroupRowNumber(int rowNum) { ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_set_max_row_group_size(ioConfigBuilder, rowNum); } @@ -63,19 +73,63 @@ public void write(VectorSchemaRoot batch) throws IOException { } } - public void flush() throws IOException { + public HashMap> flush() throws IOException { AtomicReference errMsg = new AtomicReference<>(); - BooleanCallback nativeBooleanCallback = new BooleanCallback((status, err) -> { - if (!status && err != null) { + AtomicReference lenResult = new AtomicReference<>(); + IntegerCallback nativeIntegerCallback = new IntegerCallback((len, err) -> { + if (len < 0 && err != null) { errMsg.set(err); } - }, boolReferenceManager); - nativeBooleanCallback.registerReferenceKey(); - libLakeSoulIO.flush_and_close_writer(writer, nativeBooleanCallback); + lenResult.set(len); + + }, intReferenceManager); + nativeIntegerCallback.registerReferenceKey(); + Pointer ptrResult = libLakeSoulIO.flush_and_close_writer(writer, nativeIntegerCallback); writer = null; if (errMsg.get() != null && !errMsg.get().isEmpty()) { throw new IOException("Native writer flush failed with error: " + errMsg.get()); } + + Integer len = lenResult.get(); + if (len != null && len > 0) { + int lenWithTail = len + 1; + Pointer buffer = fixedBuffer; + if (lenWithTail > fixedBuffer.size()) { + if (lenWithTail > mutableBuffer.size()) { + mutableBuffer = Runtime.getRuntime(libLakeSoulIO).getMemoryManager().allocateDirect(lenWithTail); + } + buffer = mutableBuffer; + } + AtomicReference exported = new AtomicReference<>(); + BooleanCallback nativeBooleanCallback = new BooleanCallback((status, err) -> { + if (!status && err != null) { + errMsg.set(err); + } + exported.set(status); + }, boolReferenceManager); + nativeBooleanCallback.registerReferenceKey(); + libLakeSoulIO.export_bytes_result(nativeBooleanCallback, ptrResult, len, buffer.address()); + + if (exported.get() != null && exported.get()) { + byte[] bytes = new byte[len]; + buffer.get(0, bytes, 0, len); + String decodedResult = new String(bytes); + String[] splits = decodedResult.split("\u0001"); + int partitionNum = Integer.parseInt(splits[0]); + if (partitionNum != splits.length - 1) { + throw new IOException("Dynamic Partitions Result [" + decodedResult + "] encode error: partition number mismatch " + partitionNum + "!=" + (splits.length - 1)); + } + HashMap> partitionDescAndFilesMap = new HashMap<>(); + for (int i = 1; i < splits.length; i++) { + String[] partitionDescAndFiles = splits[i].split("\u0002"); + List list = new ArrayList<>(Arrays.asList(partitionDescAndFiles).subList(1, partitionDescAndFiles.length)); + partitionDescAndFilesMap.put(partitionDescAndFiles[0], list); + + } + return partitionDescAndFilesMap; + } + } + return null; } public void abort() throws IOException { diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java index d576c2188..60c2feba7 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/jnr/LibLakeSoulIO.java @@ -18,14 +18,21 @@ public interface LibLakeSoulIO { Pointer tokio_runtime_builder_set_thread_num(Pointer builder, int thread_num); + Pointer create_tokio_runtime_from_builder(Pointer builder); Pointer new_lakesoul_io_config_builder(); Pointer lakesoul_config_builder_add_single_file(Pointer builder, String file); + Pointer lakesoul_config_builder_with_prefix(Pointer builder, String file); + + Pointer lakesoul_config_builder_set_hash_bucket_num(Pointer builder, int hash_bucket_num); + Pointer lakesoul_config_builder_add_single_primary_key(Pointer builder, String pk); + Pointer lakesoul_config_builder_add_single_range_partition(Pointer builder, String col); + Pointer lakesoul_config_builder_add_single_column(Pointer builder, String column); Pointer lakesoul_config_builder_add_single_aux_sort_column(Pointer builder, String column); @@ -42,6 +49,8 @@ public interface LibLakeSoulIO { Pointer lakesoul_config_builder_set_thread_num(Pointer builder, int thread_num); + Pointer lakesoul_config_builder_set_dynamic_partition(Pointer builder, boolean enable); + Pointer lakesoul_config_builder_set_batch_size(Pointer builder, int batch_size); Pointer lakesoul_config_builder_set_buffer_size(Pointer builder, int buffer_size); @@ -84,11 +93,14 @@ interface IntegerCallback { // type representing callback void free_lakesoul_reader(Pointer reader); - void flush_and_close_writer(Pointer writer, BooleanCallback callback); + Pointer flush_and_close_writer(Pointer writer, IntegerCallback callback); void abort_and_close_writer(Pointer writer, BooleanCallback callback); void free_tokio_runtime(Pointer runtime); + void export_bytes_result(BooleanCallback booleanCallback, Pointer bytes, Integer len, @LongLong long addr); + void rust_logger_init(); + } diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b7717c920..5e4f45f0e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1783,6 +1783,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-buffer", + "arrow-cast", "arrow-schema", "async-trait", "atomic_refcell", @@ -1823,6 +1824,7 @@ name = "lakesoul-io-c" version = "2.5.0" dependencies = [ "arrow", + "bytes", "cbindgen", "datafusion-substrait", "env_logger 0.11.2", diff --git a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs index 109050467..76a11f435 100644 --- a/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs +++ b/rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs @@ -35,7 +35,7 @@ use datafusion::{ use futures::StreamExt; use lakesoul_io::datasource::file_format::{compute_project_column_indices, flatten_file_scan_config}; use lakesoul_io::datasource::physical_plan::MergeParquetExec; -use lakesoul_io::helpers::partition_desc_from_file_scan_config; +use lakesoul_io::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_columnar_values, partition_desc_from_file_scan_config}; use lakesoul_io::lakesoul_io_config::LakeSoulIOConfig; use lakesoul_io::lakesoul_writer::{AsyncBatchWriter, MultiPartAsyncWriter}; use lakesoul_metadata::MetaDataClientRef; @@ -48,7 +48,7 @@ use tokio::task::JoinHandle; use tracing::debug; use crate::catalog::{commit_data, parse_table_info_partitions}; -use crate::lakesoul_table::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, create_io_config_builder_from_table_info, get_columnar_values}; +use crate::lakesoul_table::helpers::create_io_config_builder_from_table_info; pub struct LakeSoulMetaDataParquetFormat { parquet_format: Arc, diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index 2e049b189..c1d2707fc 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -8,7 +8,7 @@ use arrow::{array::{Array, ArrayRef, AsArray, StringBuilder}, compute::prep_null use arrow_cast::cast; use arrow_arith::boolean::and; -use datafusion::{common::{DFField, DFSchema}, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr, scalar::ScalarValue}; +use datafusion::{common::{DFField, DFSchema}, error::DataFusionError, execution::context::ExecutionProps, logical_expr::Expr, physical_expr::create_physical_expr}; use lakesoul_metadata::MetaDataClientRef; use object_store::{path::Path, ObjectMeta, ObjectStore}; use tracing::{debug, trace}; @@ -35,46 +35,6 @@ pub(crate) fn create_io_config_builder_from_table_info(table_info: Arc>) -> datafusion::error::Result> { - range_partitions - .iter() - .map(|range_col| { - if let Some(array) = batch.column_by_name(&range_col) { - match ScalarValue::try_from_array(array, 0) { - Ok(scalar) => Ok((range_col.clone(), scalar)), - Err(e) => Err(e) - } - } else { - Err(datafusion::error::DataFusionError::External(format!("").into())) - } - }) - .collect::>>() -} - -pub fn columnar_values_to_sub_path(columnar_values: &Vec<(String, ScalarValue)>) -> String { - if columnar_values.is_empty() { - "/".to_string() - } else { - format!("/{}/", columnar_values - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect::>() - .join("/")) - } -} - -pub fn columnar_values_to_partition_desc(columnar_values: &Vec<(String, ScalarValue)>) -> String { - if columnar_values.is_empty() { - "-5".to_string() - } else { - columnar_values - .iter() - .map(|(k, v)| format!("{}={}", k, v)) - .collect::>() - .join(",") - } -} - pub async fn prune_partitions( all_partition_info: Vec, filters: &[Expr], diff --git a/rust/lakesoul-datafusion/src/test/hash_tests.rs b/rust/lakesoul-datafusion/src/test/hash_tests.rs index 65858bab2..ce3a26fb4 100644 --- a/rust/lakesoul-datafusion/src/test/hash_tests.rs +++ b/rust/lakesoul-datafusion/src/test/hash_tests.rs @@ -5,6 +5,18 @@ mod hash_tests { use lakesoul_io::hash_utils::{HashValue, HASH_SEED}; + #[test] + fn chrono_test() { + let date = chrono::NaiveDate::parse_from_str("0001-01-01", "%Y-%m-%d").unwrap(); + let datetime = date + .and_hms_opt(12, 12, 12) + .unwrap(); + let epoch_time = chrono::NaiveDateTime::from_timestamp_millis(0).unwrap(); + + println!("{}", datetime.signed_duration_since(epoch_time).num_days() as i32); + println!("{}", chrono::NaiveDate::from_num_days_from_ce_opt(719162).unwrap().format("%Y-%m-%d")); + } + #[test] fn hash_value_test() { // let hash = "321".hash_one(HASH_SEED) as i32; diff --git a/rust/lakesoul-io-c/Cargo.toml b/rust/lakesoul-io-c/Cargo.toml index a138866d9..3c71d088e 100644 --- a/rust/lakesoul-io-c/Cargo.toml +++ b/rust/lakesoul-io-c/Cargo.toml @@ -16,6 +16,7 @@ arrow = { workspace = true, features = ["ffi"] } tokio = { version = "1", features = ["full"] } serde_json = "1.0" serde = { version = "1.0", default-features = false, features = ["derive", "std"], optional = true } +bytes = { workspace = true } prost = "0.12.3" log = "0.4.20" env_logger = "0.11" @@ -28,7 +29,6 @@ datafusion-substrait = { workspace = true } datafusion-substrait = { workspace = true, features = ["protoc"] } - [features] hdfs = ["lakesoul-io/hdfs"] simd = ["lakesoul-io/simd"] diff --git a/rust/lakesoul-io-c/lakesoul_c_bindings.h b/rust/lakesoul-io-c/lakesoul_c_bindings.h index 23837a15f..043dd2f96 100644 --- a/rust/lakesoul-io-c/lakesoul_c_bindings.h +++ b/rust/lakesoul-io-c/lakesoul_c_bindings.h @@ -49,6 +49,10 @@ struct Writer { uint8_t private_[0]; }; +struct BytesResult { + uint8_t private_[0]; +}; + struct TokioRuntimeBuilder { uint8_t private_[0]; }; @@ -57,6 +61,8 @@ extern "C" { IOConfigBuilder *new_lakesoul_io_config_builder(); +IOConfigBuilder *lakesoul_config_builder_with_prefix(IOConfigBuilder *builder, const char *prefix); + IOConfigBuilder *lakesoul_config_builder_add_single_file(IOConfigBuilder *builder, const char *file); @@ -78,6 +84,9 @@ IOConfigBuilder *lakesoul_config_builder_set_schema(IOConfigBuilder *builder, IOConfigBuilder *lakesoul_config_builder_set_thread_num(IOConfigBuilder *builder, c_size_t thread_num); +IOConfigBuilder *lakesoul_config_builder_set_dynamic_partition(IOConfigBuilder *builder, + bool enable); + IOConfigBuilder *lakesoul_config_builder_set_batch_size(IOConfigBuilder *builder, c_size_t batch_size); @@ -87,6 +96,9 @@ IOConfigBuilder *lakesoul_config_builder_set_max_row_group_size(IOConfigBuilder IOConfigBuilder *lakesoul_config_builder_set_buffer_size(IOConfigBuilder *builder, c_size_t buffer_size); +IOConfigBuilder *lakesoul_config_builder_set_hash_bucket_num(IOConfigBuilder *builder, + c_size_t hash_bucket_num); + IOConfigBuilder *lakesoul_config_builder_set_object_store_option(IOConfigBuilder *builder, const char *key, const char *value); @@ -98,6 +110,9 @@ IOConfigBuilder *lakesoul_config_builder_add_files(IOConfigBuilder *builder, IOConfigBuilder *lakesoul_config_builder_add_single_primary_key(IOConfigBuilder *builder, const char *pk); +IOConfigBuilder *lakesoul_config_builder_add_single_range_partition(IOConfigBuilder *builder, + const char *col); + IOConfigBuilder *lakesoul_config_builder_add_merge_op(IOConfigBuilder *builder, const char *field, const char *merge_op); @@ -150,7 +165,12 @@ const char *write_record_batch_blocked(CResult *writer, c_ptrdiff_t schema_addr, c_ptrdiff_t array_addr); -void flush_and_close_writer(CResult *writer, ResultCallback callback); +void export_bytes_result(void (*callback)(bool, const char*), + CResult *bytes, + int32_t len, + c_ptrdiff_t addr); + +CResult *flush_and_close_writer(CResult *writer, I32ResultCallback callback); void abort_and_close_writer(CResult *writer, ResultCallback callback); diff --git a/rust/lakesoul-io-c/src/lib.rs b/rust/lakesoul-io-c/src/lib.rs index 3e560b402..562f3b5a5 100644 --- a/rust/lakesoul-io-c/src/lib.rs +++ b/rust/lakesoul-io-c/src/lib.rs @@ -7,11 +7,14 @@ extern crate core; use core::ffi::{c_ptrdiff_t, c_size_t}; -use std::ffi::{c_char, c_int, c_void, CStr, CString}; +use std::ffi::{c_char, c_int, c_uchar, c_void, CStr, CString}; +use std::io::Write; use std::ptr::NonNull; use std::slice; use std::sync::Arc; +use bytes::BufMut; + use arrow::array::Array; pub use arrow::array::StructArray; use arrow::datatypes::Schema; @@ -102,11 +105,28 @@ pub struct Writer { private: [u8; 0], } +#[repr(C)] +pub struct BytesResult { + private: [u8; 0], +} + + #[no_mangle] pub extern "C" fn new_lakesoul_io_config_builder() -> NonNull { convert_to_opaque(LakeSoulIOConfigBuilder::new()) } +#[no_mangle] +pub extern "C" fn lakesoul_config_builder_with_prefix( + builder: NonNull, + prefix: *const c_char, +) -> NonNull { + unsafe { + let prefix = CStr::from_ptr(prefix).to_str().unwrap().to_string(); + convert_to_opaque(from_opaque::(builder).with_prefix(prefix)) + } +} + #[no_mangle] pub extern "C" fn lakesoul_config_builder_add_single_file( builder: NonNull, @@ -189,6 +209,14 @@ pub extern "C" fn lakesoul_config_builder_set_thread_num( convert_to_opaque(from_opaque::(builder).with_thread_num(thread_num)) } +#[no_mangle] +pub extern "C" fn lakesoul_config_builder_set_dynamic_partition( + builder: NonNull, + enable: bool, +) -> NonNull { + convert_to_opaque(from_opaque::(builder).set_dynamic_partition(enable)) +} + #[no_mangle] pub extern "C" fn lakesoul_config_builder_set_batch_size( builder: NonNull, @@ -215,6 +243,15 @@ pub extern "C" fn lakesoul_config_builder_set_buffer_size( convert_to_opaque(from_opaque::(builder).with_prefetch_size(buffer_size)) } +#[no_mangle] +pub extern "C" fn lakesoul_config_builder_set_hash_bucket_num( + builder: NonNull, + hash_bucket_num: c_size_t, +) -> NonNull { + convert_to_opaque(from_opaque::(builder).with_hash_bucket_num(hash_bucket_num)) +} + + #[no_mangle] pub extern "C" fn lakesoul_config_builder_set_object_store_option( builder: NonNull, @@ -259,6 +296,18 @@ pub extern "C" fn lakesoul_config_builder_add_single_primary_key( } } +#[no_mangle] +pub extern "C" fn lakesoul_config_builder_add_single_range_partition( + builder: NonNull, + col: *const c_char, +) -> NonNull { + unsafe { + let col = CStr::from_ptr(col).to_str().unwrap().to_string(); + convert_to_opaque(from_opaque::(builder).with_range_partition(col)) + } +} + + #[no_mangle] pub extern "C" fn lakesoul_config_builder_add_merge_op( builder: NonNull, @@ -673,21 +722,58 @@ pub extern "C" fn write_record_batch_blocked( } } +#[no_mangle] +pub extern "C" fn export_bytes_result( + callback: extern "C" fn(bool, *const c_char), + bytes: NonNull>, + len: i32, + addr: c_ptrdiff_t, +) { + let len = len as usize; + let bytes = unsafe { NonNull::new_unchecked(bytes.as_ref().ptr as *mut Vec).as_mut() }; + + if bytes.len() != len { + call_result_callback( + callback, + false, + CString::new("Size of buffer and result mismatch at export_bytes_result.") + .unwrap() + .into_raw(), + ); + return; + } + bytes.push(0u8); + bytes.shrink_to_fit(); + + let dst = unsafe { std::slice::from_raw_parts_mut(addr as *mut u8, len + 1) }; + let mut writer = dst.writer(); + let _ = writer.write_all(bytes.as_slice()); + + call_result_callback(callback, true, std::ptr::null()); +} + + // consumes the writer pointer // this writer cannot be used again #[no_mangle] -pub extern "C" fn flush_and_close_writer(writer: NonNull>, callback: ResultCallback) { +pub extern "C" fn flush_and_close_writer(writer: NonNull>, callback: I32ResultCallback) -> NonNull> { unsafe { let writer = from_opaque::(NonNull::new_unchecked(writer.as_ref().ptr)); let result = writer.flush_and_close(); match result { - Ok(_) => call_result_callback(callback, true, std::ptr::null()), - Err(e) => call_result_callback( - callback, - false, - CString::new(format!("{}", e).as_str()).unwrap().into_raw(), - ), + Ok(bytes) => { + call_i32_result_callback(callback, bytes.len() as i32, std::ptr::null()); + convert_to_nonnull(CResult::::new::>(bytes)) + } + Err(e) => { + call_i32_result_callback( + callback, + -1, + CString::new(format!("{}", e).as_str()).unwrap().into_raw(), + ); + convert_to_nonnull(CResult::::new::>(vec![])) + } } } } @@ -847,10 +933,26 @@ mod tests { static mut WRITER_FAILED: Option = None; #[no_mangle] - pub extern "C" fn writer_callback(status: bool, err: *const c_char) { + pub extern "C" fn writer_callback(status: i32, err: *const c_char) { unsafe { let mut writer_called = CALL_BACK_CV.0.lock().unwrap(); - if !status { + if status < 0 { + match err.as_ref() { + Some(e) => WRITER_FAILED = Some(CStr::from_ptr(e as *const c_char).to_str().unwrap().to_string()), + None => {} + } + WRITER_FINISHED = true; + } + *writer_called = true; + CALL_BACK_CV.1.notify_one(); + } + } + + #[no_mangle] + pub extern "C" fn result_callback(status: bool, err: *const c_char) { + unsafe { + let mut writer_called = CALL_BACK_CV.0.lock().unwrap(); + if !status { match err.as_ref() { Some(e) => WRITER_FAILED = Some(CStr::from_ptr(e as *const c_char).to_str().unwrap().to_string()), None => {} @@ -862,6 +964,7 @@ mod tests { } } + #[test] fn test_native_read_write() { let mut reader_config_builder = crate::new_lakesoul_io_config_builder(); @@ -972,7 +1075,7 @@ mod tests { writer, std::ptr::addr_of!(schema_ptr) as c_ptrdiff_t, std::ptr::addr_of!(array_ptr) as c_ptrdiff_t, - writer_callback, + result_callback, ); wait_callback(); @@ -1104,7 +1207,7 @@ mod tests { writer, std::ptr::addr_of!(schema_ptr) as c_ptrdiff_t, std::ptr::addr_of!(array_ptr) as c_ptrdiff_t, - writer_callback, + result_callback, ); wait_callback(); diff --git a/rust/lakesoul-io/Cargo.toml b/rust/lakesoul-io/Cargo.toml index b5b6e3662..8f311305f 100644 --- a/rust/lakesoul-io/Cargo.toml +++ b/rust/lakesoul-io/Cargo.toml @@ -20,6 +20,7 @@ arrow = { workspace = true, features = ["prettyprint"] } arrow-schema = { workspace = true, features = ["serde"] } arrow-array = { workspace = true, features = ["chrono-tz"] } arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } parquet = { workspace = true, features = ["async", "arrow"] } futures = { workspace = true } datafusion-common = { workspace = true } @@ -36,7 +37,7 @@ serde_json = { workspace = true } tracing = "0.1.40" proto = { path = "../proto" } parking_lot = "0.12.1" - +rand = { workspace = true } half = { workspace = true } log = "0.4.20" anyhow = { workspace = true, features = [] } diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index 7f2b2ad6c..d50818c07 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -4,15 +4,25 @@ use std::{collections::HashMap, sync::Arc}; +use arrow_array::RecordBatch; use arrow_schema::{DataType, Schema, SchemaBuilder, SchemaRef}; use datafusion::{ - datasource::{file_format::FileFormat, listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, physical_plan::FileScanConfig}, execution::context::SessionState, logical_expr::col, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::PhysicalExpr, physical_planner::create_physical_sort_expr, + datasource::{ + file_format::FileFormat, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + physical_plan::FileScanConfig}, + execution::context::SessionState, + logical_expr::col, + physical_expr::{create_physical_expr, PhysicalSortExpr}, + physical_plan::PhysicalExpr, + physical_planner::create_physical_sort_expr }; -use datafusion_common::{DataFusionError, DFSchema, Result}; +use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue}; + use object_store::path::Path; use url::Url; -use crate::{lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; +use crate::{constant::{LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING}, lakesoul_io_config::LakeSoulIOConfig, transform::uniform_schema}; pub fn column_names_to_physical_sort_expr( columns: &[String], @@ -63,6 +73,61 @@ fn range_partition_to_partition_cols( .collect::>>() } +pub fn get_columnar_values(batch: &RecordBatch, range_partitions: Arc>) -> datafusion::error::Result> { + range_partitions + .iter() + .map(|range_col| { + if let Some(array) = batch.column_by_name(&range_col) { + match ScalarValue::try_from_array(array, 0) { + Ok(scalar) => Ok((range_col.clone(), scalar)), + Err(e) => Err(e) + } + } else { + Err(datafusion::error::DataFusionError::External(format!("").into())) + } + }) + .collect::>>() +} + +pub fn format_scalar_value(v: &ScalarValue) -> String { + match v { + ScalarValue::Date32(Some(days)) => + format!("{}", chrono::NaiveDate::from_num_days_from_ce_opt(*days + 719163).unwrap().format("%Y-%m-%d")), + ScalarValue::Null => LAKESOUL_NULL_STRING.to_string(), + ScalarValue::Utf8(Some(s)) => + if s.is_empty() { + LAKESOUL_EMPTY_STRING.to_string() + } else { + s.clone() + }, + other => other.to_string() + } +} + +pub fn columnar_values_to_sub_path(columnar_values: &Vec<(String, ScalarValue)>) -> String { + if columnar_values.is_empty() { + "/".to_string() + } else { + format!("/{}/", columnar_values + .iter() + .map(|(k, v)| format!("{}={}", k, format_scalar_value(v))) + .collect::>() + .join("/")) + } +} + +pub fn columnar_values_to_partition_desc(columnar_values: &Vec<(String, ScalarValue)>) -> String { + if columnar_values.is_empty() { + "-5".to_string() + } else { + columnar_values + .iter() + .map(|(k, v)| format!("{}={}", k, format_scalar_value(v))) + .collect::>() + .join(",") + } +} + pub fn partition_desc_from_file_scan_config( conf: &FileScanConfig ) -> Result<(String, HashMap)> { diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index d17fc998a..eb0756c1a 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -53,6 +53,7 @@ pub struct LakeSoulIOConfig { // range partitions column names pub(crate) range_partitions: Vec, // number of hash bucket + #[derivative(Default(value = "1"))] pub(crate) hash_bucket_num: usize, // selecting columns pub(crate) columns: Vec, @@ -92,6 +93,10 @@ pub struct LakeSoulIOConfig { // to be compatible with hadoop's fs.defaultFS pub(crate) default_fs: String, + + // if dynamic partition + #[derivative(Default(value = "false"))] + pub(crate) use_dynamic_partition: bool, } impl LakeSoulIOConfig { @@ -245,6 +250,11 @@ impl LakeSoulIOConfigBuilder { self } + pub fn set_dynamic_partition(mut self, enable: bool) -> Self { + self.config.use_dynamic_partition = enable; + self + } + pub fn build(self) -> LakeSoulIOConfig { self.config } @@ -260,6 +270,10 @@ impl LakeSoulIOConfigBuilder { pub fn aux_sort_cols_slice(&self) -> &[String] { self.config.aux_sort_cols_slice() } + + pub fn prefix(&self) -> &String { + &self.config.prefix + } } impl From for LakeSoulIOConfigBuilder { @@ -464,6 +478,13 @@ pub fn create_session_context_with_planner( register_object_store(&fs, config, &runtime)?; }; + if !config.prefix.is_empty() { + let prefix = config.prefix.clone(); + let normalized_prefix = register_object_store(&prefix, config, &runtime)?; + config.prefix = normalized_prefix; + } + + // register object store(s) for input/output files' path // and replace file names with default fs concatenated if exist let files = config.files.clone(); diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index c71569b32..92c6320dd 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -2,7 +2,9 @@ // // SPDX-License-Identifier: Apache-2.0 -use crate::lakesoul_io_config::{create_session_context, IOSchema, LakeSoulIOConfig}; +use crate::helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_columnar_values}; +use crate::lakesoul_io_config::{create_session_context, IOSchema, LakeSoulIOConfig, LakeSoulIOConfigBuilder}; +use crate::repartition::RepartitionByRangeAndHashExec; use crate::transform::{uniform_record_batch, uniform_schema}; use arrow::compute::SortOptions; @@ -19,16 +21,18 @@ use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchReceiverStreamBuilder}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream}; -use datafusion_common::DataFusionError; +use datafusion_common::{project_schema, DataFusionError}; use datafusion_common::DataFusionError::Internal; use object_store::path::Path; use object_store::{MultipartId, ObjectStore}; use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; +use rand::distributions::DistString; +use tracing::debug; use std::any::Any; use std::borrow::Borrow; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt::{Debug, Formatter}; use std::io::ErrorKind::AddrInUse; use std::io::Write; @@ -46,9 +50,11 @@ use url::Url; pub trait AsyncBatchWriter { async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()>; - async fn flush_and_close(self: Box) -> Result<()>; + async fn flush_and_close(self: Box) -> Result>; - async fn abort_and_close(self: Box) -> Result<()>; + async fn abort_and_close(self: Box) -> Result>; + + fn schema(&self) -> SchemaRef; } /// An async writer using object_store's multi-part upload feature for cloud storage. @@ -75,9 +81,21 @@ pub struct MultiPartAsyncWriter { /// Wrap the above async writer with a SortExec to /// sort the batches before write to async writer pub struct SortAsyncWriter { + schema: SchemaRef, sorter_sender: Sender>, _sort_exec: Arc, - join_handle: Option>>, + join_handle: Option>>>, + err: Option, +} + + +/// Wrap the above async writer with a RepartitionExec to +/// dynamic repartitioning the batches before write to async writer +pub struct PartitioningAsyncWriter { + schema: SchemaRef, + sorter_sender: Sender>, + _partitioning_exec: Arc, + join_handle: Option>>>, err: Option, } @@ -194,11 +212,24 @@ impl MultiPartAsyncWriter { let in_mem_buf = InMemBuf(Arc::new(AtomicRefCell::new(VecDeque::::with_capacity( 16 * 1024 * 1024, // 16kb )))); - let schema: SchemaRef = config.schema.0.clone(); + let schema = uniform_schema(config.schema.0.clone()); + + let schema_projection_excluding_range = + schema + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| + match config.range_partitions.contains(field.name()) { + true => None, + false => Some(idx) + }) + .collect::>(); + let writer_schema = project_schema(&schema, Some(&schema_projection_excluding_range))?; let arrow_writer = ArrowWriter::try_new( in_mem_buf.clone(), - uniform_schema(schema.clone()), + writer_schema, Some( WriterProperties::builder() .set_max_row_group_size(config.max_row_group_size) @@ -211,7 +242,7 @@ impl MultiPartAsyncWriter { Ok(MultiPartAsyncWriter { in_mem_buf, task_context, - schema: uniform_schema(schema), + schema, writer: async_writer, multi_part_id: multipart_id, arrow_writer, @@ -280,7 +311,7 @@ impl AsyncBatchWriter for MultiPartAsyncWriter { MultiPartAsyncWriter::write_batch(batch, &mut self.arrow_writer, &mut self.in_mem_buf, &mut self.writer).await } - async fn flush_and_close(self: Box) -> Result<()> { + async fn flush_and_close(self: Box) -> Result> { // close arrow writer to flush remaining rows let mut this = *self; let arrow_writer = this.arrow_writer; @@ -296,15 +327,20 @@ impl AsyncBatchWriter for MultiPartAsyncWriter { // shutdown multi-part async writer to complete the upload this.writer.flush().await?; this.writer.shutdown().await?; - Ok(()) + Ok(vec![]) } - async fn abort_and_close(self: Box) -> Result<()> { + async fn abort_and_close(self: Box) -> Result> { let this = *self; this.object_store .abort_multipart(&this.path, &this.multi_part_id) .await - .map_err(DataFusionError::ObjectStore) + .map_err(DataFusionError::ObjectStore)?; + Ok(vec![]) + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() } } @@ -318,7 +354,7 @@ impl SortAsyncWriter { let schema = config.schema.0.clone(); let receiver_stream_builder = RecordBatchReceiverStream::builder(schema.clone(), 8); let tx = receiver_stream_builder.tx(); - let recv_exec = ReceiverStreamExec::new(receiver_stream_builder, schema); + let recv_exec = ReceiverStreamExec::new(receiver_stream_builder, schema.clone()); let sort_exprs: Vec = config .primary_keys @@ -377,7 +413,7 @@ impl SortAsyncWriter { let result = async_writer.abort_and_close().await; match result { Ok(_) => match e { - Internal(ref err_msg) if err_msg == "external abort" => Ok(()), + Internal(ref err_msg) if err_msg == "external abort" => Ok(vec![]), _ => Err(e), }, Err(abort_err) => Err(Internal(format!( @@ -387,11 +423,12 @@ impl SortAsyncWriter { } } else { async_writer.flush_and_close().await?; - Ok(()) + Ok(vec![]) } }); Ok(SortAsyncWriter { + schema, sorter_sender: tx, _sort_exec: exec_plan, join_handle: Some(join_handle), @@ -423,7 +460,7 @@ impl AsyncBatchWriter for SortAsyncWriter { } } - async fn flush_and_close(self: Box) -> Result<()> { + async fn flush_and_close(self: Box) -> Result> { if let Some(join_handle) = self.join_handle { let sender = self.sorter_sender; drop(sender); @@ -433,7 +470,7 @@ impl AsyncBatchWriter for SortAsyncWriter { } } - async fn abort_and_close(self: Box) -> Result<()> { + async fn abort_and_close(self: Box) -> Result> { if let Some(join_handle) = self.join_handle { let sender = self.sorter_sender; // send abort signal to the task @@ -445,9 +482,316 @@ impl AsyncBatchWriter for SortAsyncWriter { join_handle.await.map_err(|e| DataFusionError::External(Box::new(e)))? } else { // previous error has already aborted writer - Ok(()) + Ok(vec![]) + } + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + + +impl PartitioningAsyncWriter { + pub fn try_new( + task_context: Arc, + config: LakeSoulIOConfig, + runtime: Arc, + ) -> Result { + let _ = runtime.enter(); + let schema = config.schema.0.clone(); + let receiver_stream_builder = RecordBatchReceiverStream::builder(schema.clone(), 8); + let tx = receiver_stream_builder.tx(); + let recv_exec = ReceiverStreamExec::new(receiver_stream_builder, schema.clone()); + + let partitioning_exec = PartitioningAsyncWriter::get_partitioning_exec(recv_exec, config.clone())?; + + // launch one async task per *input* partition + let mut join_handles = vec![]; + + let write_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + + let partitioned_file_path_and_row_count = + Arc::new( + Mutex::new( + HashMap::, u64)>::new() + )); + for i in 0..partitioning_exec.output_partitioning().partition_count() { + let sink_task = tokio::spawn(Self::pull_and_sink( + partitioning_exec.clone(), + i, + task_context.clone(), + config.clone().into(), + Arc::new(config.range_partitions.clone()), + write_id.clone(), + partitioned_file_path_and_row_count.clone(), + )); + // // In a separate task, wait for each input to be done + // // (and pass along any errors, including panic!s) + join_handles.push(sink_task); + } + + let join_handle = tokio::spawn(Self::await_and_summary( + join_handles, + partitioned_file_path_and_row_count.clone(), + )); + + + Ok(Self { + schema, + sorter_sender: tx, + _partitioning_exec: partitioning_exec, + join_handle: Some(join_handle), + err: None, + }) + } + + fn get_partitioning_exec(input: ReceiverStreamExec, config: LakeSoulIOConfig) -> Result> { + let sort_exprs: Vec = config + .range_partitions + .iter() + .chain(config.primary_keys.iter()) + // add aux sort cols to sort expr + .chain(config.aux_sort_cols.iter()) + .map(|pk| { + let col = Column::new_with_schema(pk.as_str(), &config.schema.0)?; + Ok(PhysicalSortExpr { + expr: Arc::new(col), + options: SortOptions::default(), + }) + }) + .collect::>>()?; + if sort_exprs.is_empty() { + return Ok(Arc::new(input)); + } + + let sort_exec = Arc::new(SortExec::new(sort_exprs, Arc::new(input))); + + // see if we need to prune aux sort cols + let sort_exec: Arc = if config.aux_sort_cols.is_empty() { + sort_exec + } else { + let proj_expr: Vec<(Arc, String)> = config + .schema + .0 + .fields + .iter() + .filter_map(|f| { + if config.aux_sort_cols.contains(f.name()) { + // exclude aux sort cols + None + } else { + Some(col(f.name().as_str(), &config.schema.0).map(|e| (e, f.name().clone()))) + } + }) + .collect::, String)>>>()?; + Arc::new(ProjectionExec::try_new(proj_expr, sort_exec)?) + }; + + let exec_plan = if config.primary_keys.is_empty() && config.range_partitions.is_empty() { + sort_exec + } else { + let sorted_schema = sort_exec.schema(); + + let range_partitioning_expr: Vec> = config + .range_partitions + .iter() + .map(|col| { + let idx = sorted_schema.index_of(col.as_str())?; + Ok(Arc::new(Column::new(col.as_str(), idx)) as Arc) + }) + .collect::>>()?; + + let hash_partitioning_expr: Vec> = config + .primary_keys + .iter() + .map(|col| { + let idx = sorted_schema.index_of(col.as_str())?; + Ok(Arc::new(Column::new(col.as_str(), idx)) as Arc) + }) + .collect::>>()?; + let hash_partitioning = Partitioning::Hash(hash_partitioning_expr, config.hash_bucket_num); + + Arc::new(RepartitionByRangeAndHashExec::try_new(sort_exec, range_partitioning_expr, hash_partitioning)?) + }; + + Ok(exec_plan) + } + + async fn pull_and_sink( + input: Arc, + partition: usize, + context: Arc, + config_builder: LakeSoulIOConfigBuilder, + range_partitions: Arc>, + write_id: String, + partitioned_file_path_and_row_count: Arc, u64)>>>, + ) -> Result { + let mut data = input.execute(partition, context.clone())?; + let schema_projection_excluding_range = + data.schema() + .fields() + .iter() + .enumerate() + .filter_map(|(idx, field)| + match range_partitions.contains(field.name()) { + true => None, + false => Some(idx) + }) + .collect::>(); + + let mut err = None; + + let mut row_count = 0; + + let mut partitioned_writer = HashMap::>::new(); + let mut partitioned_file_path_and_row_count_locked = partitioned_file_path_and_row_count.lock().await; + while let Some(batch_result) = data.next().await { + match batch_result { + Ok(batch) => { + debug!("write record_batch with {} rows", batch.num_rows()); + let columnar_values = get_columnar_values(&batch, range_partitions.clone())?; + let partition_desc = columnar_values_to_partition_desc(&columnar_values); + let batch_excluding_range = batch.project(&schema_projection_excluding_range)?; + let file_absolute_path = format!("{}{}part-{}_{:0>4}.parquet", config_builder.prefix(), columnar_values_to_sub_path(&columnar_values), write_id, partition); + + if !partitioned_writer.contains_key(&partition_desc) { + let mut config = config_builder + .clone() + .with_files(vec![file_absolute_path]) + .build(); + + let writer = MultiPartAsyncWriter::try_new_with_context(&mut config, context.clone()).await?; + partitioned_writer.insert(partition_desc.clone(), Box::new(writer)); + } + + if let Some(async_writer) = partitioned_writer.get_mut(&partition_desc) { + row_count += batch_excluding_range.num_rows(); + async_writer.write_record_batch(batch_excluding_range).await?; + } + } + // received abort signal + Err(e) => { + err = Some(e); + break; + } + } + } + + if let Some(e) = err { + for (_, writer) in partitioned_writer.into_iter() { + match writer.abort_and_close().await { + Ok(_) => match e { + Internal(ref err_msg) if err_msg == "external abort" => (), + _ => return Err(e), + }, + Err(abort_err) => return Err(Internal(format!( + "Abort failed {:?}, previous error {:?}", + abort_err, e + ))), + } + } + Ok(row_count as u64) + } else { + for (partition_desc, writer) in partitioned_writer.into_iter() { + let file_absolute_path = writer.absolute_path(); + let num_rows = writer.nun_rows(); + if let Some(file_path_and_row_count) = + partitioned_file_path_and_row_count_locked.get_mut(&partition_desc) + { + file_path_and_row_count.0.push(file_absolute_path); + file_path_and_row_count.1 += num_rows; + } else { + partitioned_file_path_and_row_count_locked.insert( + partition_desc.clone(), + (vec![file_absolute_path], num_rows), + ); + } + writer.flush_and_close().await?; + } + Ok(row_count as u64) } } + + async fn await_and_summary( + join_handles: Vec>>, + partitioned_file_path_and_row_count: Arc, u64)>>>, + ) -> Result> { + let _ = + futures::future::join_all(join_handles) + .await + .iter() + .try_fold(0u64, |counter, result| match &result { + Ok(Ok(count)) => Ok(counter + count), + Ok(Err(e)) => Err(DataFusionError::Execution(format!("{}", e))), + Err(e) => Err(DataFusionError::Execution(format!("{}", e))), + })?; + let partitioned_file_path_and_row_count = partitioned_file_path_and_row_count.lock().await; + + let mut summary = format!("{}", partitioned_file_path_and_row_count.len()); + for (partition_desc, (files, _)) in partitioned_file_path_and_row_count.iter() { + summary += "\x01"; + summary += partition_desc.as_str(); + summary += "\x02"; + summary += files.join("\x02").as_str(); + } + Ok(summary.into_bytes()) + } +} + +#[async_trait] +impl AsyncBatchWriter for PartitioningAsyncWriter { + async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> { + // arrow_cast::pretty::print_batches(&[batch.clone()]); + if let Some(err) = &self.err { + return Err(Internal(format!("PartitioningAsyncWriter already failed with error {:?}", err))); + } + let send_result = self.sorter_sender.send(Ok(batch)).await; + match send_result { + Ok(_) => Ok(()), + // channel has been closed, indicating error happened during sort write + Err(e) => { + if let Some(join_handle) = self.join_handle.take() { + let result = join_handle.await.map_err(|e| DataFusionError::External(Box::new(e)))?; + self.err = result.err(); + Err(Internal(format!("Write to PartitioningAsyncWriter failed: {:?}", self.err))) + } else { + self.err = Some(DataFusionError::External(Box::new(e))); + Err(Internal(format!("Write to PartitioningAsyncWriter failed: {:?}", self.err))) + } + } + } + } + + async fn flush_and_close(self: Box) -> Result> { + if let Some(join_handle) = self.join_handle { + let sender = self.sorter_sender; + drop(sender); + join_handle.await.map_err(|e| DataFusionError::External(Box::new(e)))? + } else { + Err(Internal("PartitioningAsyncWriter has been aborted, cannot flush".to_string())) + } + } + + async fn abort_and_close(self: Box) -> Result> { + if let Some(join_handle) = self.join_handle { + let sender = self.sorter_sender; + // send abort signal to the task + sender + .send(Err(Internal("external abort".to_string()))) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + drop(sender); + join_handle.await.map_err(|e| DataFusionError::External(Box::new(e)))? + } else { + // previous error has already aborted writer + Ok(vec![]) + } + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } } pub type SendableWriter = Box; @@ -479,18 +823,25 @@ impl SyncSendableMutableLakeSoulWriter { config.schema.0.clone() }; - let mut writer_config = config.clone(); - writer_config.schema = IOSchema(uniform_schema(writer_schema)); - let writer = MultiPartAsyncWriter::try_new(writer_config).await?; - let schema = writer.schema.clone(); - let writer: Box = if !config.primary_keys.is_empty() { + let mut writer_config = config.clone(); + let writer: Box = if config.use_dynamic_partition { + let task_context = create_session_context(&mut writer_config)?.task_ctx(); + Box::new(PartitioningAsyncWriter::try_new(task_context, config, runtime.clone())?) + } else if !config.primary_keys.is_empty() { // sort primary key table + + writer_config.schema = IOSchema(uniform_schema(writer_schema)); + let writer = MultiPartAsyncWriter::try_new(writer_config).await?; Box::new(SortAsyncWriter::try_new(writer, config, runtime.clone())?) } else { // else multipart + writer_config.schema = IOSchema(uniform_schema(writer_schema)); + let writer = MultiPartAsyncWriter::try_new(writer_config).await?; Box::new(writer) }; + let schema = writer.schema(); + Ok(SyncSendableMutableLakeSoulWriter { inner: Arc::new(Mutex::new(writer)), @@ -513,7 +864,7 @@ impl SyncSendableMutableLakeSoulWriter { }) } - pub fn flush_and_close(self) -> Result<()> { + pub fn flush_and_close(self) -> Result> { let inner_writer = match Arc::try_unwrap(self.inner) { Ok(inner) => inner, Err(_) => return Err(Internal("Cannot get ownership of inner writer".to_string())), @@ -525,7 +876,7 @@ impl SyncSendableMutableLakeSoulWriter { }) } - pub fn abort_and_close(self) -> Result<()> { + pub fn abort_and_close(self) -> Result> { let inner_writer = match Arc::try_unwrap(self.inner) { Ok(inner) => inner, Err(_) => return Err(Internal("Cannot get ownership of inner writer".to_string())), @@ -547,7 +898,7 @@ mod tests { use crate::lakesoul_io_config::LakeSoulIOConfigBuilder; use crate::lakesoul_reader::LakeSoulReader; use crate::lakesoul_writer::{ - AsyncBatchWriter, MultiPartAsyncWriter, SortAsyncWriter, SyncSendableMutableLakeSoulWriter, + AsyncBatchWriter, MultiPartAsyncWriter, SyncSendableMutableLakeSoulWriter, }; use arrow::array::{ArrayRef, Int64Array}; use arrow::record_batch::RecordBatch; @@ -559,6 +910,8 @@ mod tests { use std::sync::Arc; use tokio::runtime::Builder; + use super::SortAsyncWriter; + #[test] fn test_parquet_async_write() -> Result<()> { let runtime = Arc::new(Builder::new_multi_thread().enable_all().build().unwrap()); diff --git a/rust/lakesoul-metadata-c/src/lib.rs b/rust/lakesoul-metadata-c/src/lib.rs index 872edcbd0..a748c999c 100644 --- a/rust/lakesoul-metadata-c/src/lib.rs +++ b/rust/lakesoul-metadata-c/src/lib.rs @@ -63,8 +63,7 @@ pub type IntegerResultCallBack = extern "C" fn(i32, *const c_char); /// for jnr /// can use as_ptr instead of into_raw? -#[allow(unused)] -fn call_result_callback(callback: ResultCallback, status: bool, err: *const c_char) { +fn call_result_callback(callback: extern "C" fn(T, *const c_char), status: T, err: *const c_char) { callback(status, err); // release error string if !err.is_null() { @@ -152,8 +151,8 @@ pub extern "C" fn execute_insert( let result = runtime.block_on(async { lakesoul_metadata::execute_insert(client, prepared, insert_type, wrapper).await }); match result { - Ok(count) => callback(count, CString::new("").unwrap().into_raw()), - Err(e) => callback(-1, CString::new(e.to_string().as_str()).unwrap().into_raw()), + Ok(count) => call_result_callback(callback, count, std::ptr::null()), + Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()), } } @@ -174,8 +173,8 @@ pub extern "C" fn execute_update( lakesoul_metadata::execute_update(client, prepared, update_type, string_from_ptr(joined_string)).await }); match result { - Ok(count) => callback(count, CString::new("").unwrap().into_raw()), - Err(e) => callback(-1, CString::new(e.to_string().as_str()).unwrap().into_raw()), + Ok(count) => call_result_callback(callback, count, std::ptr::null()), + Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()), } } @@ -195,19 +194,23 @@ pub extern "C" fn execute_query_scalar( let result = runtime.block_on(async { lakesoul_metadata::execute_query_scalar(client, prepared, update_type, string_from_ptr(joined_string)).await }); - match result { - Ok(Some(result)) => callback( + let (result, err): (*mut c_char, *const c_char) = match result { + Ok(Some(result)) => ( CString::new(result.as_str()).unwrap().into_raw(), - CString::new("").unwrap().into_raw(), + std::ptr::null(), ), - Ok(None) => callback( - CString::new("").unwrap().into_raw(), + Ok(None) => ( CString::new("").unwrap().into_raw(), + std::ptr::null(), ), - Err(e) => callback( + Err(e) => ( CString::new("").unwrap().into_raw(), CString::new(e.to_string().as_str()).unwrap().into_raw(), ), + }; + call_result_callback(callback, result, err); + unsafe { + let _ = CString::from_raw(result); } } @@ -230,11 +233,11 @@ pub extern "C" fn execute_query( match result { Ok(u8_vec) => { let len = u8_vec.len(); - callback(len as i32, CString::new("").unwrap().into_raw()); + call_result_callback(callback, len as i32, std::ptr::null()); convert_to_nonnull(CResult::::new::>(u8_vec)) } Err(e) => { - callback(-1, CString::new(e.to_string().as_str()).unwrap().into_raw()); + call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()); convert_to_nonnull(CResult::::new::>(vec![])) } } @@ -251,7 +254,8 @@ pub extern "C" fn export_bytes_result( let bytes = unsafe { NonNull::new_unchecked(bytes.as_ref().ptr as *mut Vec).as_mut() }; if bytes.len() != len { - callback( + call_result_callback( + callback, false, CString::new("Size of buffer and result mismatch at export_bytes_result.") .unwrap() @@ -266,7 +270,7 @@ pub extern "C" fn export_bytes_result( let mut writer = dst.writer(); let _ = writer.write_all(bytes.as_slice()); - callback(true, CString::new("").unwrap().into_raw()); + call_result_callback(callback, true, std::ptr::null()); } #[no_mangle] @@ -284,8 +288,8 @@ pub extern "C" fn clean_meta_for_test( let client = unsafe { NonNull::new_unchecked(client.as_ref().ptr as *mut Client).as_ref() }; let result = runtime.block_on(async { lakesoul_metadata::clean_meta_for_test(client).await }); match result { - Ok(count) => callback(count, CString::new("").unwrap().into_raw()), - Err(e) => callback(-1, CString::new(e.to_string().as_str()).unwrap().into_raw()), + Ok(count) => call_result_callback(callback, count, std::ptr::null()), + Err(e) => call_result_callback(callback, -1, CString::new(e.to_string().as_str()).unwrap().into_raw()), } } @@ -318,11 +322,11 @@ pub extern "C" fn create_tokio_postgres_client( let result = match result { Ok(client) => { - callback(true, CString::new("").unwrap().into_raw()); + call_result_callback(callback, true, std::ptr::null()); CResult::::new(client) } Err(e) => { - callback(false, CString::new(e.to_string().as_str()).unwrap().into_raw()); + call_result_callback(callback, false, CString::new(e.to_string().as_str()).unwrap().into_raw()); CResult::::error(format!("{}", e).as_str()) } };