Skip to content

Commit

Permalink
[Rust] add substriat for flink and be compatible for other engines
Browse files Browse the repository at this point in the history
Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

add flink expression to substrait

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

add more functions

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

add more tests

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

add base schema for namedscan, substriat type to arrow type

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

compatibility

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

switch to java8

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

before apply cargo fix

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

cargo clippy && cargo fmt

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>

fix ci

Signed-off-by: mag1c1an1 <mag1cian@icloud.com>
  • Loading branch information
mag1c1an1 committed Mar 21, 2024
1 parent 6ef0076 commit 71031ca
Show file tree
Hide file tree
Showing 40 changed files with 2,001 additions and 499 deletions.
14 changes: 14 additions & 0 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,18 @@ SPDX-License-Identifier: Apache-2.0
<version>3.3.2</version>
<scope>${local.scope}</scope>
</dependency>
<dependency>
<groupId>io.substrait</groupId>
<artifactId>core</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
<version>0.28.0</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -592,6 +604,8 @@ SPDX-License-Identifier: Apache-2.0
<include>com.google.code.gson:gson</include>
<include>dev.failsafe:failsafe</include>
<include>com.google.protobuf:protobuf-java</include>
<!--substrait-->
<inclue>io.substrait:core</inclue>
</includes>
<excludes>
<exclude>org.apache.logging.log4j:*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public DynamicTableSource copy() {
lsts.projectedFields = this.projectedFields;
lsts.remainingPartitions = this.remainingPartitions;
lsts.filter = this.filter;
lsts.filter = this.filter;
return lsts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.dmetasoul.lakesoul.LakeSoulArrowReader;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
import io.substrait.proto.Plan;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
Expand Down Expand Up @@ -73,7 +74,8 @@ public class LakeSoulOneSplitRecordsReader implements RecordsWithSplitIds<RowDat
// arrow batch -> row, with requested schema
private ArrowReader curArrowReaderRequestedSchema;

private final FilterPredicate filter;
private final FilterPredicate _filterPredicate;
private final Plan filter;

public LakeSoulOneSplitRecordsReader(Configuration conf,
LakeSoulSplit split,
Expand All @@ -82,7 +84,8 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter)
FilterPredicate _filterPredicate,
Plan filter)
throws Exception {
this.split = split;
this.skipRecords = split.getSkipRecord();
Expand All @@ -94,6 +97,7 @@ public LakeSoulOneSplitRecordsReader(Configuration conf,
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this.finishedSplit = Collections.singleton(splitId);
this._filterPredicate = _filterPredicate;
this.filter = filter;
initializeReader();
recoverFromSkipRecord();
Expand Down Expand Up @@ -129,7 +133,11 @@ private void initializeReader() throws IOException {
}

if (filter != null) {
reader.addFilter(filter.toString());
reader.addFilterProto(this.filter);
}

if (_filterPredicate !=null) {
reader.addFilter(_filterPredicate.toString());
}

LOG.info("Initializing reader for split {}, pk={}, partitions={}," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import io.substrait.proto.Plan;
import org.apache.flink.api.connector.source.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -43,7 +44,9 @@ public class LakeSoulSource implements Source<RowData, LakeSoulSplit, LakeSoulPe
List<Map<String, String>> remainingPartitions;

@Nullable
FilterPredicate filter;
FilterPredicate _filterPredicate;
@Nullable
Plan filter;

public LakeSoulSource(TableId tableId,
RowType rowType,
Expand All @@ -52,15 +55,18 @@ public LakeSoulSource(TableId tableId,
List<String> pkColumns,
Map<String, String> optionParams,
@Nullable List<Map<String, String>> remainingPartitions,
@Nullable FilterPredicate filter) {
@Nullable FilterPredicate _filterPredicate,
@Nullable Plan filter) {
this.tableId = tableId;
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
this.isStreaming = isStreaming;
this.pkColumns = pkColumns;
this.optionParams = optionParams;
this.remainingPartitions = remainingPartitions;
this._filterPredicate = _filterPredicate;
this.filter = filter;

}

@Override
Expand All @@ -83,6 +89,7 @@ public SourceReader<RowData, LakeSoulSplit> createReader(SourceReaderContext rea
this.pkColumns,
this.isStreaming,
this.optionParams.getOrDefault(LakeSoulSinkOptions.CDC_CHANGE_COLUMN, ""),
this._filterPredicate,
this.filter),
new LakeSoulRecordEmitter(),
readerContext.getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package org.apache.flink.lakesoul.source;

import io.substrait.proto.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
Expand Down Expand Up @@ -39,7 +40,8 @@ public class LakeSoulSplitReader implements SplitReader<RowData, LakeSoulSplit>

String cdcColumn;

FilterPredicate filter;
FilterPredicate _filterPredicate;
Plan filter;

private LakeSoulOneSplitRecordsReader lastSplitReader;

Expand All @@ -49,14 +51,16 @@ public LakeSoulSplitReader(Configuration conf,
List<String> pkColumns,
boolean isStreaming,
String cdcColumn,
FilterPredicate filter) {
FilterPredicate _filterPredicate,
Plan filter) {
this.conf = conf;
this.splits = new ArrayDeque<>();
this.rowType = rowType;
this.rowTypeWithPk = rowTypeWithPk;
this.pkColumns = pkColumns;
this.isStreaming = isStreaming;
this.cdcColumn = cdcColumn;
this._filterPredicate = _filterPredicate;
this.filter = filter;
}

Expand All @@ -72,7 +76,9 @@ public RecordsWithSplitIds<RowData> fetch() throws IOException {
this.pkColumns,
this.isStreaming,
this.cdcColumn,
this.filter);
this._filterPredicate,
this.filter
);
return lastSplitReader;
} catch (Exception e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import io.substrait.proto.Plan;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.lakesoul.source.LakeSoulSource;
import org.apache.flink.lakesoul.source.ParquetFilters;
import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.TableId;
import org.apache.flink.table.connector.ChangelogMode;
Expand All @@ -33,6 +34,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -60,7 +62,10 @@ public class LakeSoulTableSource

protected List<Map<String, String>> remainingPartitions;

protected FilterPredicate filter;
// TODO remove this , now used for debug
protected FilterPredicate _filterPredicate;
// TODO merge
protected io.substrait.proto.Plan filter;

public LakeSoulTableSource(TableId tableId,
RowType rowType,
Expand All @@ -84,6 +89,7 @@ public DynamicTableSource copy() {
lsts.projectedFields = this.projectedFields;
lsts.remainingPartitions = this.remainingPartitions;
lsts.filter = this.filter;
lsts._filterPredicate = this._filterPredicate;
return lsts;
}

Expand All @@ -104,19 +110,28 @@ public Result applyFilters(List<ResolvedExpression> filters) {
DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
Set<String> partitionCols = new HashSet<>(partitionKeys.rangeKeys);
for (ResolvedExpression filter : filters) {
if (ParquetFilters.filterContainsPartitionColumn(filter, partitionCols)) {
if (SubstraitUtil.filterContainsPartitionColumn(filter, partitionCols)) {
remainingFilters.add(filter);
} else {
nonPartitionFilters.add(filter);
}
}
// find acceptable non partition filters
Tuple2<Result, FilterPredicate> filterPushDownResult = ParquetFilters.toParquetFilter(nonPartitionFilters,
remainingFilters);
// Tuple2<Result, FilterPredicate> filterPushDownRes = ParquetFilters.toParquetFilter(nonPartitionFilters,
// remainingFilters);
Tuple2<Result, Plan> filterPushDownResult = null;
try {
filterPushDownResult = SubstraitUtil.toPlan(nonPartitionFilters,
remainingFilters, tableInfo.getTableName(), tableInfo.getTableSchema());
} catch (IOException e) {
throw new RuntimeException(e);
}
this.filter = filterPushDownResult.f1;
// this.filterStr = filterPushDownRes.f1;
LOG.info("Applied filters to native io: {}, accepted {}, remaining {}", this.filter,
filterPushDownResult.f0.getAcceptedFilters(),
filterPushDownResult.f0.getRemainingFilters());
// LOG.info("FilterPlan: {}", this.filterPlan);
return filterPushDownResult.f0;
}

Expand Down Expand Up @@ -215,6 +230,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
this.pkColumns,
this.optionParams,
this.remainingPartitions,
this._filterPredicate,
this.filter));
}

Expand Down
24 changes: 24 additions & 0 deletions native-io/lakesoul-io-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ SPDX-License-Identifier: Apache-2.0
<maven.compiler.target>8</maven.compiler.target>
<arrow.version>12.0.0</arrow.version>
<scalatest.version>3.1.0</scalatest.version>
<substrait.version>0.28.0</substrait.version>
<flink.version>1.17.1</flink.version>
</properties>


Expand Down Expand Up @@ -84,6 +86,28 @@ SPDX-License-Identifier: Apache-2.0
<version>2.2.16</version>
</dependency>

<dependency>
<groupId>io.substrait</groupId>
<artifactId>core</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
<version>${substrait.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>



<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

package com.dmetasoul.lakesoul.lakesoul.io;

import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO;
import io.substrait.proto.Plan;
import jnr.ffi.Pointer;
import jnr.ffi.Runtime;
import jnr.ffi.byref.IntByReference;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand All @@ -34,6 +36,18 @@ public void addFilter(String filter) {
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_filter(ioConfigBuilder, filter);
}

/**
* usually use only once
*
* @param plan Filter{}
*/
public void addFilterProto(Plan plan) {
byte[] bytes = plan.toByteArray();
Pointer buf = Runtime.getRuntime(libLakeSoulIO).getMemoryManager().allocateDirect(bytes.length);
buf.put(0, bytes, 0, bytes.length);
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_filter_proto(ioConfigBuilder, buf.address(), bytes.length);
}

public void addMergeOps(Map<String, String> mergeOps) {
for (Map.Entry<String, String> entry : mergeOps.entrySet()) {
ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_merge_op(ioConfigBuilder, entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface LibLakeSoulIO {

Pointer lakesoul_config_builder_add_filter(Pointer builder, String filter);

Pointer lakesoul_config_builder_add_filter_proto(Pointer builder, @LongLong long proto_addr, int len);

Pointer lakesoul_config_builder_add_merge_op(Pointer builder, String field, String mergeOp);

Pointer lakesoul_config_builder_set_schema(Pointer builder, @LongLong long schemaAddr);
Expand Down
Loading

0 comments on commit 71031ca

Please sign in to comment.