Skip to content

Commit

Permalink
[Flink] Fix non-primary key table's sink parallelism (lakesoul-io#433)
Browse files Browse the repository at this point in the history
* Support concurrent writing of non-primary key tables

Signed-off-by: ChenYunHey <1908166778@qq.com>

* fix flink tests

Signed-off-by: ChenYunHey <1908166778@qq.com>

* fix flink tests

Signed-off-by: ChenYunHey <1908166778@qq.com>

---------

Signed-off-by: ChenYunHey <1908166778@qq.com>
  • Loading branch information
ChenYunHey authored and F-PHantam committed Apr 9, 2024
1 parent 450903a commit 29cdb66
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,11 @@ private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream, Co
int bucketParallelism = flinkConf.getInteger(HASH_BUCKET_NUM);
//rowData key tools
RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType();
LakeSoulKeyGen keyGen = new LakeSoulKeyGen(rowType, primaryKeyList.toArray(new String[0]));
//bucket file name config
OutputFileConfig fileNameConfig = OutputFileConfig.builder().withPartSuffix(".parquet").build();
//file rolling rule
LakeSoulRollingPolicyImpl rollingPolicy = new LakeSoulRollingPolicyImpl(flinkConf.getLong(FILE_ROLLING_SIZE),
flinkConf.getLong(FILE_ROLLING_TIME));
//redistribution by partitionKey
dataStream = dataStream.partitionCustom(new HashPartitioner(), keyGen::getRePartitionHash);
//rowData sink fileSystem Task
LakeSoulMultiTablesSink<RowData> sink = LakeSoulMultiTablesSink.forOneTableBulkFormat(path,
new TableSchemaIdentity(new TableId(io.debezium.relational.TableId.parse(summaryName)), rowType,
Expand All @@ -132,7 +129,14 @@ private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream, Co
), flinkConf)
.withBucketCheckInterval(flinkConf.getLong(BUCKET_CHECK_INTERVAL)).withRollingPolicy(rollingPolicy)
.withOutputFileConfig(fileNameConfig).build();
return dataStream.sinkTo(sink).setParallelism(bucketParallelism);
if (!primaryKeyList.isEmpty()) {
//redistribution by partitionKey
LakeSoulKeyGen keyGen = new LakeSoulKeyGen(rowType, primaryKeyList.toArray(new String[0]));
dataStream = dataStream.partitionCustom(new HashPartitioner(), keyGen::getRePartitionHash);
return dataStream.sinkTo(sink).setParallelism(bucketParallelism);
} else {
return dataStream.sinkTo(sink);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,18 @@ public void testLakeSoulTableSinkWithParallelismInBatch() {
" \"type\" : \"Sink: Writer\",\n" +
" \"pact\" : \"Operator\",\n" +
" \"contents\" : \"Sink: Writer\",\n" +
" \"parallelism\" : 3,\n" +
" \"parallelism\" : 2,\n" +
" \"predecessors\" : [ {\n" +
" \"id\" : ,\n" +
" \"ship_strategy\" : \"CUSTOM\",\n" +
" \"ship_strategy\" : \"REBALANCE\",\n" +
" \"side\" : \"second\"\n" +
" } ]\n" +
" }, {\n" +
" \"id\" : ,\n" +
" \"type\" : \"Sink: Committer\",\n" +
" \"pact\" : \"Operator\",\n" +
" \"contents\" : \"Sink: Committer\",\n" +
" \"parallelism\" : 3,\n" +
" \"parallelism\" : 2,\n" +
" \"predecessors\" : [ {\n" +
" \"id\" : ,\n" +
" \"ship_strategy\" : \"FORWARD\",\n" +
Expand Down Expand Up @@ -187,18 +187,18 @@ public void testLakeSoulTableSinkWithParallelismInStreaming() {
" \"type\" : \"Sink: Writer\",\n" +
" \"pact\" : \"Operator\",\n" +
" \"contents\" : \"Sink: Writer\",\n" +
" \"parallelism\" : 3,\n" +
" \"parallelism\" : 2,\n" +
" \"predecessors\" : [ {\n" +
" \"id\" : ,\n" +
" \"ship_strategy\" : \"CUSTOM\",\n" +
" \"ship_strategy\" : \"REBALANCE\",\n" +
" \"side\" : \"second\"\n" +
" } ]\n" +
" }, {\n" +
" \"id\" : ,\n" +
" \"type\" : \"Sink: Committer\",\n" +
" \"pact\" : \"Operator\",\n" +
" \"contents\" : \"Sink: Committer\",\n" +
" \"parallelism\" : 3,\n" +
" \"parallelism\" : 2,\n" +
" \"predecessors\" : [ {\n" +
" \"id\" : ,\n" +
" \"ship_strategy\" : \"FORWARD\",\n" +
Expand Down Expand Up @@ -251,7 +251,6 @@ private void testLakeSoulTableSinkWithParallelismBase(
"insert into test_table select 1, 1", ExplainDetail.JSON_EXECUTION_PLAN);
String plan = replaceFlinkVersion(replaceNodeIdInOperator(replaceExecNodeId(replaceStreamNodeId(replaceStageId(actual)))));
System.out.println(plan);

assertEquals(expected, plan);

tEnv.executeSql("drop database db1 cascade");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.apache.flink.lakesoul.test.schema;

import com.alibaba.fastjson.JSON;
import com.amazonaws.services.dynamodbv2.xspec.S;
import com.dmetasoul.lakesoul.meta.DBConfig;
import com.dmetasoul.lakesoul.meta.DBManager;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void testFromBigIntToTinyIntWithAllowance() throws IOException, Execution
"[+I[a, BIGINT, true, null, null, null]]",
"[+I[a, TINYINT, true, null, null, null]]",
"[+I[10000000000], +I[20000000000]]",
"[+I[null], +I[null], +I[3], +I[4]]"
"[+I[3], +I[4], +I[null], +I[null]]"
);
}

Expand Down Expand Up @@ -452,21 +453,25 @@ void testSchemaMigration(CatalogTable beforeTable,
testCatalog.setCurrentTable(beforeTable);
testEnv.executeSql(beforeInsertSql).await();
List<Row> desc_test_sink_before = CollectionUtil.iteratorToList(validateEnv.executeSql("desc test_sink").collect());
desc_test_sink_before.sort(Comparator.comparing(Row::toString));
if (beforeExpectedDescription != null)
assertThat(desc_test_sink_before.toString()).isEqualTo(beforeExpectedDescription);
List<Row> select_test_sink_before = CollectionUtil.iteratorToList(validateEnv.executeSql("select * from test_sink").collect());

select_test_sink_before.sort(Comparator.comparing(Row::toString));

if (beforeExpectedValue != null)
assertThat(select_test_sink_before.toString()).isEqualTo(beforeExpectedValue);
validateEnv.executeSql("select * from test_sink").print();

testCatalog.setCurrentTable(afterTable);
testEnv.executeSql(afterInsertSql).await();
List<Row> desc_test_sink_after = CollectionUtil.iteratorToList(validateEnv.executeSql("desc test_sink").collect());
desc_test_sink_after.sort(Comparator.comparing(Row::toString));
if (afterExpectedDescription != null)
assertThat(desc_test_sink_after.toString()).isEqualTo(afterExpectedDescription);
List<Row> select_test_sink_after = CollectionUtil.iteratorToList(validateEnv.executeSql("select * from test_sink").collect());
select_test_sink_before.sort(Comparator.comparing(Row::toString));
select_test_sink_after.sort(Comparator.comparing(Row::toString));
if (afterExpectedValue != null)
assertThat(select_test_sink_after.toString()).isEqualTo(afterExpectedValue);
validateEnv.executeSql("select * from test_sink").print();
Expand Down

0 comments on commit 29cdb66

Please sign in to comment.