diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java index 9dfba2f71..3f6578e88 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulOneSplitRecordsReader.java @@ -32,6 +32,7 @@ import java.util.stream.Collectors; import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.BATCH_SIZE; +import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.LIMIT; public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds, AutoCloseable { @@ -79,6 +80,9 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds= this.limit) { + this.reader.close(); + LOG.info("Reach limit condition {}", split); + return null; + } if (curRecordIdx >= currentVCR.getRowCount()) { if (this.reader.hasNext()) { this.currentVCR = this.reader.nextResultVectorSchemaRoot(); @@ -262,6 +274,7 @@ public RowData nextRecordFromSplit() { rd = this.curArrowReaderRequestedSchema.read(rowId); // change rowkind if needed rd.setRowKind(rk); + totalRead++; return rd; } } @@ -274,6 +287,7 @@ public Set finishedSplits() { @Override public void close() throws Exception { + LOG.info("Close reader split {}, read num {}", splitId, totalRead); if (this.currentVCR != null) { this.currentVCR.close(); this.currentVCR = null; diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java index dfa7f99fb..4bd47cd6b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/arrow/LakeSoulArrowSource.java @@ -31,20 +31,72 @@ public static LakeSoulArrowSource create( TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableName, tableNamespace); RowType tableRowType = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableInfo.getTableSchema())); DBUtil.TablePartitionKeys tablePartitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); + boolean isBounded = conf.getBoolean("IS_BOUNDED", false); return new LakeSoulArrowSource( tableInfo, tableId, conf.toMap(), + isBounded, tableRowType, tablePartitionKeys.primaryKeys, tablePartitionKeys.rangeKeys ); } + public static LakeSoulArrowSource create( + String tableNamespace, + String tableName, + Configuration conf, + List> remainingPartitions + ) throws IOException { + TableId tableId = new TableId(LakeSoulCatalog.CATALOG_NAME, tableNamespace, tableName); + TableInfo tableInfo = DataOperation.dbManager().getTableInfoByNameAndNamespace(tableName, tableNamespace); + RowType tableRowType = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableInfo.getTableSchema())); + DBUtil.TablePartitionKeys tablePartitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); + boolean isBounded = conf.getBoolean("IS_BOUNDED", false); + return new LakeSoulArrowSource( + tableInfo, + tableId, + conf.toMap(), + isBounded, + tableRowType, + tablePartitionKeys.primaryKeys, + tablePartitionKeys.rangeKeys, + remainingPartitions + ); + } + + LakeSoulArrowSource( + TableInfo tableInfo, + TableId tableId, + Map optionParams, + boolean isBounded, + RowType tableRowType, + List pkColumns, + List partitionColumns, + List> remainingPartitions + ) { + super( + tableId, + tableRowType, + tableRowType, + tableRowType, + isBounded, + pkColumns, + partitionColumns, + optionParams, + remainingPartitions, + null, + null + ); + this.encodedTableInfo = tableInfo.toByteArray(); + } + LakeSoulArrowSource( TableInfo tableInfo, TableId tableId, Map optionParams, + boolean isBounded, RowType tableRowType, List pkColumns, List partitionColumns @@ -54,7 +106,7 @@ public static LakeSoulArrowSource create( tableRowType, tableRowType, tableRowType, - false, + isBounded, pkColumns, partitionColumns, optionParams, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java index 2e3cd4cd4..033c0f52b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java @@ -25,6 +25,7 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; import org.apache.flink.table.expressions.ResolvedExpression; @@ -47,7 +48,7 @@ public class LakeSoulTableSource implements SupportsFilterPushDown, SupportsProjectionPushDown, ScanTableSource, - SupportsRowLevelModificationScan { + SupportsRowLevelModificationScan, SupportsLimitPushDown { private static final Logger LOG = LoggerFactory.getLogger(LakeSoulTableSource.class); @@ -351,4 +352,9 @@ public RowLevelModificationScanContext applyRowLevelModificationScan( public LakeSoulRowLevelModificationScanContext getModificationContext() { return modificationContext; } + + @Override + public void applyLimit(long limit) { + this.optionParams.put(LakeSoulSinkOptions.LIMIT.key(),String.valueOf(limit)); + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java index e031f222d..7a0a0f770 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java @@ -11,7 +11,7 @@ import java.time.Duration; -public class LakeSoulSinkOptions { +public class LakeSoulSinkOptions { public static final String FACTORY_IDENTIFIER = "lakesoul"; @@ -231,6 +231,11 @@ public class LakeSoulSinkOptions { .booleanType() .defaultValue(false) .withDescription("If true, lakesoul sink will auto change sink table's schema"); + public static final ConfigOption LIMIT = ConfigOptions + .key("lakesoul.limit") + .longType() + .defaultValue(Long.MAX_VALUE) + .withDescription("limit io read num"); } diff --git a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java index 6045904ce..7a5430eaa 100644 --- a/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java +++ b/lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java @@ -284,6 +284,28 @@ public void testDeletePartitionOnlySQL() throws ExecutionException, InterruptedE TestUtils.checkEqualInAnyOrder(results, new String[]{"+I[2, Alice, 80]", "+I[4, Bob, 110]"}); } + @Test + public void testSelectWithLimit() throws ExecutionException, InterruptedException { + TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE); + createLakeSoulSourceTableUserWithRange(tEnv); + tEnv.executeSql( + "INSERT INTO user_info_1 VALUES (2, 'Alice', 80),(3, 'Jack', 75),(3, 'Amy', 95),(4, 'Bob', 110)") + .await(); + String testSelect = "select * from user_info_1 limit 2"; + List result = CollectionUtil.iteratorToList(tEnv.executeSql(testSelect).collect()); + assert result.size() == 2; + String testSelect1 = "select * from user_info_1 limit 9"; + List result1 = CollectionUtil.iteratorToList(tEnv.executeSql(testSelect1).collect()); + assert result1.size() == 4; + StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE); + TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect); + List results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); + assert result.size() == 2; + TableImpl flinkTable1 = (TableImpl) streamEnv.sqlQuery(testSelect1); + List result2 = CollectionUtil.iteratorToList(flinkTable1.execute().collect()); + assert result2.size() == 4; + } + @Test public void testDeleteAllPartitionedDataExactlySQL() throws ExecutionException, InterruptedException { TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);