Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink/NativeIO] Update async writer interface && add mem_limit option #533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public LakeSoulPartitionReader(Configuration conf, RowType schema, List<String>
this.primaryKeys = primaryKeys;
this.schema = schema;
this.capacity = conf.getInteger(LakeSoulOptions.LAKESOUL_NATIVE_IO_BATCH_SIZE);
this.conf = new Configuration(conf);;
this.conf = new Configuration(conf);
;
this.awaitTimeout = 10000;
this.curPartitionId = -1;
}

/**
* Opens the reader with given partitions.
*
Expand Down Expand Up @@ -80,7 +82,7 @@ public RowData read(RowData reuse) throws IOException {
}

private Optional<RowData> nextRecord() throws IOException {
if (curArrowReader == null || curRecordId >= currentVSR.getRowCount()) {
if (curArrowReader == null || curRecordId >= currentVSR.getRowCount()) {
curArrowReader = nextBatch();
}
if (curArrowReader == null) return Optional.empty();
Expand Down Expand Up @@ -109,7 +111,7 @@ private void recreateInnerReaderForSinglePartition(int partitionIndex) throws IO
}
nativeIOReader = new NativeIOReader();
LakeSoulPartition partition = partitions.get(partitionIndex);
for (Path path: partition.getPaths()) {
for (Path path : partition.getPaths()) {
nativeIOReader.addFile(FlinkUtil.makeQualifiedPath(path).toString());
}

Expand All @@ -121,7 +123,7 @@ private void recreateInnerReaderForSinglePartition(int partitionIndex) throws IO
nativeIOReader.setPrimaryKeys(primaryKeys);
}
Schema arrowSchema = ArrowUtils.toArrowSchema(schema);
FlinkUtil.setFSConfigs(conf, nativeIOReader);
FlinkUtil.setIOConfigs(conf, nativeIOReader);
nativeIOReader.setSchema(arrowSchema);
nativeIOReader.setBatchSize(capacity);
nativeIOReader.initializeReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public DataStreamSink<BinarySourceRecord> buildLakeSoulDMLSink(DataStream<Binary

public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context,
DataStream<LakeSoulArrowWrapper> stream) {
return buildArrowSink(context, stream, 1);
}

public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context context,
DataStream<LakeSoulArrowWrapper> stream,
int parallelism
) {
if (!context.conf.contains(AUTO_SCHEMA_CHANGE)) {
context.conf.set(AUTO_SCHEMA_CHANGE, true);
}
Expand All @@ -92,7 +99,7 @@ public static DataStreamSink<LakeSoulArrowWrapper> buildArrowSink(Context contex
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig)
.build();
return stream.sinkTo(sink).name("LakeSoul MultiTable Arrow Sink");
return stream.sinkTo(sink).name("LakeSoul MultiTable Arrow Sink").setParallelism(parallelism);
}

public DataStreamSink<BinarySourceRecord> printStream(DataStream<BinarySourceRecord> stream, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -87,6 +88,7 @@ public Optional<Committer<LakeSoulMultiTableSinkCommittable>> createCommitter()
@Override
public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSinkCommittable> committables)
throws IOException, InterruptedException {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + " org.apache.flink.api.connector.sink.Committer.commit: " + committables);
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
if (LOG.isInfoEnabled()) {
String fileOpStr = dataFileOpList.stream()
.map(op -> String.format("%s,%s,%d,%s", op.getPath(), op.getFileOp(), op.getSize(),
op.getFileExistCols())).collect(Collectors.joining("\n\t"));
"op.getFileExistCols()")).collect(Collectors.joining("\n\t"));
LOG.info("Commit to LakeSoul: Table={}, TableId={}, Partition={}, Files:\n\t{}, " +
"CommitOp={}, Timestamp={}, UUID={}", identity.tableId.identifier(),
tableNameId.getTableId(), partition, fileOpStr, dataCommitInfo.getCommitOp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
StructType sparkSchema = ArrowUtils.fromArrowSchema(msgSchema);

TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName, tableNamespace);
LOG.info("Committing: {}, {}, {}, {} {}", tableNamespace, tableName, isCdc, msgSchema, tableInfo);
if (tableInfo == null) {
if (!conf.getBoolean(AUTO_SCHEMA_CHANGE)) {
throw new SuppressRestartsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class LakeSoulWriterBucketState {

private final Map<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap;

private final int restartTimes;

public LakeSoulWriterBucketState(
TableSchemaIdentity identity,
String bucketId,
Expand All @@ -44,12 +46,22 @@ public LakeSoulWriterBucketState(
this.bucketPath = bucketPath;
this.pendingFileRecoverableMap = new HashMap<>();
this.pendingFileRecoverableMap.put(bucketId, pendingFileRecoverableList);
restartTimes = 0;
}

public LakeSoulWriterBucketState(
TableSchemaIdentity identity,
Path bucketPath,
HashMap<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap
) {
this(identity, bucketPath, pendingFileRecoverableMap, 0);
}

public LakeSoulWriterBucketState(
TableSchemaIdentity identity,
Path bucketPath,
HashMap<String, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverableMap,
int restartTimes
) {
this.identity = identity;
Optional<Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>>> first = pendingFileRecoverableMap.entrySet().stream().findFirst();
Expand All @@ -61,6 +73,7 @@ public LakeSoulWriterBucketState(
this.bucketPath = bucketPath;

this.pendingFileRecoverableMap = pendingFileRecoverableMap;
this.restartTimes = restartTimes;
}

public String getBucketId() {
Expand All @@ -71,6 +84,10 @@ public Path getBucketPath() {
return bucketPath;
}

public int getRestartTimes() {
return restartTimes;
}

@Override
public String toString() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public LakeSoulWriterBucketState deserialize(int version, byte[] serialized) thr
private void serialize(LakeSoulWriterBucketState state, DataOutputView dataOutputView)
throws IOException {
// dataOutputView.writeUTF(state.getBucketId());
dataOutputView.writeInt(state.getRestartTimes());
dataOutputView.writeUTF(state.getBucketPath().toString());

SimpleVersionedSerialization.writeVersionAndSerialize(
Expand Down Expand Up @@ -102,6 +103,7 @@ private LakeSoulWriterBucketState internalDeserialize(
throws IOException {

// String bucketId = dataInputView.readUTF();
int restartTimes = dataInputView.readInt();
String bucketPathStr = dataInputView.readUTF();

TableSchemaIdentity identity = SimpleVersionedSerialization.readVersionAndDeSerialize(
Expand All @@ -123,7 +125,9 @@ private LakeSoulWriterBucketState internalDeserialize(
return new LakeSoulWriterBucketState(
identity,
new Path(bucketPathStr),
pendingFileRecoverableMap);
pendingFileRecoverableMap,
restartTimes
);
}

private void validateMagicNumber(DataInputView in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink;
import org.apache.flink.lakesoul.sink.state.LakeSoulMultiTableSinkCommittable;
import org.apache.flink.lakesoul.sink.state.LakeSoulWriterBucketState;
import org.apache.flink.lakesoul.sink.writer.arrow.LakeSoulArrowWriterBucket;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.flink.metrics.Counter;
Expand Down Expand Up @@ -116,7 +114,7 @@ public void initializeState(List<LakeSoulWriterBucketState> bucketStates) throws
bucketFactory.restoreBucket(
subTaskId,
state.getIdentity(),
creator.createBucketWriter(),
creator.createBucketWriter(getSubTaskId()),
rollingPolicy,
state,
outputFileConfig);
Expand Down Expand Up @@ -222,12 +220,16 @@ protected LakeSoulWriterBucket getOrCreateBucketForBucketId(
LakeSoulWriterBucket bucket = activeBuckets.get(Tuple2.of(identity, bucketId));
if (bucket == null) {
final Path bucketPath = creator.tableLocation;
BucketWriter<RowData, String> bucketWriter = creator.createBucketWriter();
BucketWriter<RowData, String> bucketWriter = creator.createBucketWriter(getSubTaskId());
bucket =
bucketFactory.getNewBucket(
subTaskId,
creator.identity,
bucketId, bucketPath, bucketWriter, rollingPolicy, outputFileConfig);
bucketId,
bucketPath,
bucketWriter,
rollingPolicy,
outputFileConfig);
activeBuckets.put(Tuple2.of(identity, bucketId), bucket);
LOG.info("Create new bucket {}, {}, {}",
identity, bucketId, bucketPath);
Expand Down
Loading
Loading