Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust/Flink]Flink repartition pushdown #463

Merged
merged 7 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
int readPartitionVersion = 0;
if (readPartition != null) {
readPartitionVersion = readPartition.getVersion();

}

int newVersion = curVersion + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private void initialize() {
final CompletableFuture<Boolean> future = new CompletableFuture<>();
tokioPostgresClient = libLakeSoulMetaData.create_tokio_postgres_client(
new ReferencedBooleanCallback((bool, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
future.complete(bool);
} else {
System.err.println(msg);
Expand Down Expand Up @@ -236,7 +236,7 @@ public JniWrapper executeQuery(Integer queryType, List<String> params) {
final CompletableFuture<Integer> queryFuture = new CompletableFuture<>();
Pointer queryResult = getLibLakeSoulMetaData().execute_query(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
queryFuture.complete(result);
} else {
queryFuture.completeExceptionally(new SQLException(msg));
Expand All @@ -262,7 +262,7 @@ public JniWrapper executeQuery(Integer queryType, List<String> params) {
final CompletableFuture<Boolean> importFuture = new CompletableFuture<>();
getLibLakeSoulMetaData().export_bytes_result(
new ReferencedBooleanCallback((result, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
importFuture.complete(result);
} else {
importFuture.completeExceptionally(new SQLException(msg));
Expand Down Expand Up @@ -348,7 +348,7 @@ else if (bytes.length < mutableBuffer.size()) {

getLibLakeSoulMetaData().execute_insert(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
Expand Down Expand Up @@ -395,7 +395,7 @@ public Integer executeUpdate(Integer updateType, List<String> params) {

getLibLakeSoulMetaData().execute_update(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
Expand Down Expand Up @@ -441,7 +441,7 @@ public List<String> executeQueryScalar(Integer queryScalarType, List<String> par

getLibLakeSoulMetaData().execute_query_scalar(
new ReferencedStringCallback((result, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
Expand Down Expand Up @@ -512,7 +512,7 @@ public static int cleanMeta() {
try {
instance.getLibLakeSoulMetaData().clean_meta_for_test(
new ReferencedIntegerCallback((result, msg) -> {
if (msg.isEmpty()) {
if (msg == null || msg.isEmpty()) {
future.complete(result);
} else {
future.completeExceptionally(new SQLException(msg));
Expand Down Expand Up @@ -600,21 +600,4 @@ public List<SplitDesc> createSplitDescArray(String tableName, String namespace)
unlockReadLock();
}
}

// void filter_rel(io.substrait.proto.Expression e) {
// byte[] byteArray = e.toByteArray();
// int length = byteArray.length;
// Pointer buffer = fixedBuffer;
// if (length < fixedBuffer.size())
// fixedBuffer.put(0, byteArray, 0, length);
// else if (length < mutableBuffer.size()) {
// mutableBuffer.put(0, byteArray, 0, length);
// buffer = mutableBuffer;
// } else {
// mutableBuffer = Runtime.getRuntime(libLakeSoulMetaData).getMemoryManager().allocateDirect(length);
// mutableBuffer.put(0, byteArray, 0, length);
// buffer = mutableBuffer;
// }
// getLibLakeSoulMetaData().call_rust(buffer.address(), length);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.types.logical.RowType;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_CHECK_INTERVAL;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BUCKET_PARALLELISM;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FILE_ROLLING_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.FILE_ROLLING_TIME;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*;

public class LakeSoulMultiTableSinkStreamBuilder {

Expand Down Expand Up @@ -56,6 +53,7 @@ public DataStream<BinarySourceRecord> buildHashPartitionedCDCStream(DataStream<B
}

public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<BinarySourceRecord> stream) {
context.conf.set(DYNAMIC_BUCKETING, false);
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl(
context.conf.getLong(FILE_ROLLING_SIZE), context.conf.getLong(FILE_ROLLING_TIME));
OutputFileConfig fileNameConfig = OutputFileConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkCommitter;
import org.apache.flink.lakesoul.sink.committer.LakeSoulSinkGlobalCommitter;
import org.apache.flink.lakesoul.sink.state.*;
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
import org.apache.flink.lakesoul.sink.writer.DefaultLakeSoulWriterBucketFactory;
import org.apache.flink.lakesoul.sink.writer.LakeSoulWriterBucketFactory;
import org.apache.flink.lakesoul.sink.writer.NativeBucketWriter;
import org.apache.flink.lakesoul.sink.writer.*;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
Expand Down Expand Up @@ -102,14 +99,14 @@ public LakeSoulSinkCommitter createCommitter() throws IOException {
public SimpleVersionedSerializer<LakeSoulWriterBucketState> getWriterStateSerializer()
throws IOException {
return new LakeSoulWriterBucketStateSerializer(
NativeBucketWriter.NativePendingFileRecoverableSerializer.INSTANCE);
NativeParquetWriter.NativePendingFileRecoverableSerializer.INSTANCE);
}

@Override
public SimpleVersionedSerializer<LakeSoulMultiTableSinkCommittable> getCommittableSerializer()
throws IOException {
return new LakeSoulSinkCommittableSerializer(
NativeBucketWriter.NativePendingFileRecoverableSerializer.INSTANCE);
NativeParquetWriter.NativePendingFileRecoverableSerializer.INSTANCE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,13 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
DBManager lakeSoulDBManager = new DBManager();
for (LakeSoulMultiTableSinkCommittable committable : committables) {
LOG.info("Commtting {}", committable);
if (committable.hasPendingFile()) {
assert committable.getPendingFiles() != null;
LOG.info("PendingFiles to commit {}", committable.getPendingFiles().size());
if (committable.getPendingFiles().isEmpty()) {
continue;
}
for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : committable.getPendingFilesMap().entrySet()) {
List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = entry.getValue();

// pending files to commit
List<String> files = new ArrayList<>();
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
committable.getPendingFiles()) {
pendingFiles) {
if (pendingFileRecoverable instanceof NativeParquetWriter.NativeWriterPendingFileRecoverable) {
NativeParquetWriter.NativeWriterPendingFileRecoverable recoverable =
(NativeParquetWriter.NativeWriterPendingFileRecoverable) pendingFileRecoverable;
Expand Down Expand Up @@ -92,9 +88,10 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
dataFileOp.setFileExistCols(fileExistCols);
dataFileOpList.add(dataFileOp.build());
}
String partition = committable.getBucketId();
String partition = entry.getKey();
List<PartitionInfo> readPartitionInfoList = null;


TableNameId tableNameId =
lakeSoulDBManager.shortTableName(identity.tableId.table(), identity.tableId.schema());
if (identity.tableId.schema() == null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
String dbType = this.conf.getString(SOURCE_DB_TYPE,"");

for (Map.Entry<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> entry :
globalCommittable.getGroupedCommitables()
globalCommittable.getGroupedCommittable()
.entrySet()) {
TableSchemaIdentity identity = entry.getKey().f0;
List<LakeSoulMultiTableSinkCommittable> lakeSoulMultiTableSinkCommittable = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import java.util.*;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DYNAMIC_BUCKET;

/**
* Wrapper class for both type of committables in {@link LakeSoulMultiTablesSink}. One committable might be either
Expand All @@ -28,11 +29,12 @@ public class LakeSoulMultiTableSinkCommittable implements Serializable, Comparab

private final String bucketId;

private final boolean dynamicBucketing;

private final TableSchemaIdentity identity;
private String sourcePartitionInfo;

@Nullable
private List<InProgressFileWriter.PendingFileRecoverable> pendingFiles;
private final Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesMap;

@Nullable
private final String commitId;
Expand Down Expand Up @@ -80,27 +82,80 @@ public LakeSoulMultiTableSinkCommittable(
String dmlType,
String sourcePartitionInfo
) {
this.dynamicBucketing = false;
this.bucketId = bucketId;
this.identity = identity;
this.pendingFiles = pendingFiles;
this.pendingFilesMap = new HashMap<>();
this.pendingFilesMap.put(bucketId, pendingFiles);
this.creationTime = time;
this.commitId = commitId;
this.tsMs = tsMs;
this.dmlType = dmlType;
this.sourcePartitionInfo = sourcePartitionInfo;
}

/**
* Constructor for {@link org.apache.flink.lakesoul.sink.state.LakeSoulSinkCommittableSerializer} to
* restore commitable states
*/
public LakeSoulMultiTableSinkCommittable(
TableSchemaIdentity identity,
Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesMap,
long time,
@Nullable String commitId,
long tsMs,
String dmlType,
String sourcePartitionInfo
) {
Preconditions.checkNotNull(pendingFilesMap);
this.dynamicBucketing = pendingFilesMap.keySet().size() != 1;
this.bucketId = this.dynamicBucketing ? DYNAMIC_BUCKET : pendingFilesMap.keySet().stream().findFirst().get();
this.identity = identity;
this.pendingFilesMap = pendingFilesMap;
this.creationTime = time;
this.commitId = commitId;
this.tsMs = tsMs;
this.dmlType = dmlType;
this.sourcePartitionInfo = sourcePartitionInfo;
}


public long getTsMs() {
return tsMs;
}

public boolean hasPendingFile() {
return pendingFiles != null;
if (dynamicBucketing) {
return !pendingFilesMap.isEmpty();
} else {
return hasPendingFile(bucketId);
}
}

public boolean hasPendingFile(String bucketId) {
return pendingFilesMap.containsKey(bucketId);
}

@Nullable
public List<InProgressFileWriter.PendingFileRecoverable> getPendingFiles() {
return pendingFiles;
if (dynamicBucketing) {
List<InProgressFileWriter.PendingFileRecoverable> summary = new ArrayList<>();
for (List<InProgressFileWriter.PendingFileRecoverable> list : pendingFilesMap.values()) {
summary.addAll(list);
}
return summary;
} else {
return getPendingFiles(bucketId);
}
}

@Nullable
public List<InProgressFileWriter.PendingFileRecoverable> getPendingFiles(String bucketId) {
return pendingFilesMap.get(bucketId);
}

public Map<String, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFilesMap() {
return pendingFilesMap;
}

@Override
Expand All @@ -125,9 +180,12 @@ public String toString() {
return "LakeSoulMultiTableSinkCommittable{" +
"creationTime=" + creationTime +
", bucketId='" + bucketId + '\'' +
", dynamicBucketing=" + dynamicBucketing +
", identity=" + identity +
", pendingFilesMap=" + pendingFilesMap +
", commitId='" + commitId + '\'' +
", pendingFiles='" + pendingFiles + '\'' +
", tsMs=" + tsMs +
", dmlType='" + dmlType + '\'' +
'}';
}

Expand All @@ -138,12 +196,15 @@ public String getCommitId() {

public void merge(LakeSoulMultiTableSinkCommittable committable) {
Preconditions.checkState(identity.equals(committable.getIdentity()));
Preconditions.checkState(bucketId.equals(committable.getBucketId()));
Preconditions.checkState(creationTime == committable.getCreationTime());
if (hasPendingFile()) {
if (committable.hasPendingFile()) pendingFiles.addAll(committable.getPendingFiles());
} else {
if (committable.hasPendingFile()) pendingFiles = committable.getPendingFiles();

for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : committable.getPendingFilesMap().entrySet()) {
String bucketId = entry.getKey();
if (hasPendingFile(bucketId)) {
if (!entry.getValue().isEmpty()) pendingFilesMap.get(bucketId).addAll(entry.getValue());
} else {
if (!entry.getValue().isEmpty()) pendingFilesMap.put(bucketId, entry.getValue());
}
}
mergeSourcePartitionInfo(committable);
}
Expand All @@ -167,6 +228,8 @@ private void mergeSourcePartitionInfo(LakeSoulMultiTableSinkCommittable committa
throw new RuntimeException(e);
}
}
Preconditions.checkState(bucketId.equals(committable.getBucketId()));

}

public String getDmlType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class LakeSoulMultiTableSinkGlobalCommittable implements Serializable {

static final long serialVersionUID = 42L;

private final Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables;
private final Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommittable;

public LakeSoulMultiTableSinkGlobalCommittable(
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables) {
Expand All @@ -45,14 +45,14 @@ public LakeSoulMultiTableSinkGlobalCommittable(
groupedCommitables.put(key, mergedCommittables);
});

this.groupedCommitables = groupedCommitables;
this.groupedCommittable = groupedCommitables;
}

public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkGlobalCommittable(
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittables) {
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables =
new HashMap<>();
globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommitables().forEach(
globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach(
(key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value)));
return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables);
}
Expand All @@ -68,12 +68,12 @@ public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSink
}


public Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> getGroupedCommitables() {
return groupedCommitables;
public Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> getGroupedCommittable() {
return groupedCommittable;
}

@Override
public String toString() {
return "LakeSoulMultiTableSinkGlobalCommittable{" + "groupedCommitables=" + groupedCommitables + '}';
return "LakeSoulMultiTableSinkGlobalCommittable{" + "groupedCommitables=" + groupedCommittable + '}';
}
}
Loading
Loading