Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]fix partition filter #535

Merged
merged 3 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ public void logicTest() throws ExecutionException, InterruptedException {
"[+I[1, Bob, 1995-10-01, true, 10.01, A, 1.85, 3, 1, 89, 100.11, [1, -81], [18, 67, 112, -105], 1990-01-07T10:10, 1995-10-01T07:10:00Z]]");
}

@Test
public void partitionTest() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
createLakeSoulSourceTableWithDateType(createTableEnv);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 65536; i++) {
sb.append("0");
}
String testSql = String.format("select * from type_info where zone='%s' or zone='A'", sb.toString());
StreamTableEnvironment tEnvs = TestUtils.createStreamTableEnv(BATCH_TYPE);
List<Row> rows = CollectionUtil.iteratorToList(tEnvs.executeSql(testSql).collect());
rows.sort(Comparator.comparing(Row::toString));
assertThat(rows.toString()).isEqualTo(
"[+I[1, Bob, 1995-10-01, true, 10.01, A, 1.85, 3, 1, 89, 100.11, [1, -81], [18, 67, 112, -105], 1990-01-07T10:10, 1995-10-01T07:10:00Z]]");
}

@Test
public void cmpTest() throws ExecutionException, InterruptedException {
TableEnvironment createTableEnv = TestUtils.createTableEnv(BATCH_TYPE);
Expand Down Expand Up @@ -110,7 +126,10 @@ private void createLakeSoulSourceTableWithDateType(TableEnvironment tEnvs)
" country VARBINARY, " +
" createTime TIMESTAMP, " +
" modifyTime TIMESTAMP_LTZ " +
") WITH (" +
") " +
"PARTITIONED BY (`zone`,`country`,`money`)" +

"WITH (" +
" 'connector'='lakesoul'," +
" 'hashBucketNum'='2'," +
" 'path'='" + getTempDirUri("/lakeSource/type") +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public class SubstraitUtil {

private static final LibLakeSoulIO LIB;

private static final Pointer BUFFER1;
private static final Pointer BUFFER2;
private static Pointer BUFFER1;
private static Pointer BUFFER2;

private static final NativeIOBase NATIVE_IO_BASE;

Expand Down Expand Up @@ -204,12 +204,9 @@ public static List<PartitionInfo> applyPartitionFilters(List<PartitionInfo> allP
JniWrapper jniWrapper = JniWrapper.newBuilder().addAllPartitionInfo(allPartitionInfo).build();

byte[] jniBytes = jniWrapper.toByteArray();
BUFFER1.put(0, jniBytes, 0, jniBytes.length);
BUFFER1.putByte(jniBytes.length, (byte) 0);

byte[] filterBytes = partitionFilter.toByteArray();
BUFFER2.put(0, filterBytes, 0, filterBytes.length);
BUFFER2.putByte(filterBytes.length, (byte) 0);
tryPutBuffer1(jniBytes);
tryPutBuffer2(filterBytes);

try {
final CompletableFuture<Integer> filterFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -267,6 +264,22 @@ public static List<PartitionInfo> applyPartitionFilters(List<PartitionInfo> allP
return resultPartitionInfo;
}

private static void tryPutBuffer1(byte[] bytes) {
while (BUFFER1.size() < bytes.length + 1) {
BUFFER1 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(BUFFER1.size() * 2);
}
BUFFER1.put(0, bytes, 0, bytes.length);
BUFFER1.putByte(bytes.length, (byte) 0);
}

private static void tryPutBuffer2(byte[] bytes) {
while (BUFFER2.size() < bytes.length + 1) {
BUFFER2 = Runtime.getRuntime(LIB).getMemoryManager().allocateDirect(BUFFER2.size() * 2);
}
BUFFER2.put(0, bytes, 0, bytes.length);
BUFFER2.putByte(bytes.length, (byte) 0);
}


public static FieldReference arrowFieldToSubstraitField(Field field) {
return FieldReference
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions rust/lakesoul-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ log = "0.4.20"
anyhow = { workspace = true, features = [] }
prost = { workspace = true }
env_logger = "0.11"

hex = "0.4"

[features]
hdfs = ["dep:hdrs"]
Expand All @@ -56,8 +56,6 @@ datafusion-substrait = { workspace = true }
datafusion-substrait = { workspace = true, features = ["protoc"] }




[dev-dependencies]
tempfile = "3.3.0"
comfy-table = "6.0"
Expand Down
2 changes: 2 additions & 0 deletions rust/lakesoul-io/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use lazy_static::lazy_static;
pub static LAKESOUL_TIMEZONE: &str = "UTC";
pub static LAKESOUL_NULL_STRING: &str = "__L@KE$OUL_NULL__";
pub static LAKESOUL_EMPTY_STRING: &str = "__L@KE$OUL_EMPTY_STRING__";
pub static LAKESOUL_EQ: &str = "__L@KE$OUL_EQ__";
pub static LAKESOUL_COMMA: &str = "__L@KE$OUL_COMMA__";

pub static DATE32_FORMAT: &str = "%Y-%m-%d";
pub static FLINK_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.9f";
Expand Down
2 changes: 1 addition & 1 deletion rust/lakesoul-io/src/datasource/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub async fn flatten_file_scan_config(
let mut builder = SchemaBuilder::new();
// O(nm), n = number of fields, m = number of partition columns
for field in file_schema.fields() {
if !partition_schema.field_with_name(field.name()).is_ok() {
if partition_schema.field_with_name(field.name()).is_err() {
builder.push(field.clone());
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-io/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ impl LakeSoulListingTable {
let mut builder = SchemaBuilder::from(target_schema.fields());
// O(n^2), n is the number of fields in file_schema and config.partition_schema
for field in file_schema.fields() {
if !target_schema.field_with_name(field.name()).is_ok() {
if target_schema.field_with_name(field.name()).is_err() {
builder.try_merge(field)?;
}
}
for field in config.partition_schema().fields() {
if !target_schema.field_with_name(field.name()).is_ok() {
if target_schema.field_with_name(field.name()).is_err() {
builder.try_merge(field)?;
}
}
Expand Down
22 changes: 17 additions & 5 deletions rust/lakesoul-io/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc};

use arrow::datatypes::UInt32Type;
use arrow_array::{RecordBatch, UInt32Array};
use arrow_buffer::i256;
use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit};
use chrono::{DateTime, Duration};
use datafusion::{
Expand All @@ -31,9 +32,7 @@ use url::Url;

use crate::{
constant::{
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT,
TIMESTAMP_SECOND_FORMAT,
DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT
},
filter::parser::Parser,
lakesoul_io_config::LakeSoulIOConfig,
Expand Down Expand Up @@ -123,7 +122,7 @@ pub fn format_scalar_value(v: &ScalarValue) -> String {
if s.is_empty() {
LAKESOUL_EMPTY_STRING.to_string()
} else {
s.clone()
s.replace(',', LAKESOUL_EQ).replace(',', LAKESOUL_COMMA)
}
}
ScalarValue::TimestampSecond(Some(s), _) => {
Expand Down Expand Up @@ -166,6 +165,14 @@ pub fn format_scalar_value(v: &ScalarValue) -> String {
.format(TIMESTAMP_NANOSECOND_FORMAT)
)
}
ScalarValue::Decimal128(Some(s), _, _) => format!("{}", s),
ScalarValue::Decimal256(Some(s), _, _) => format!("{}", s),
ScalarValue::Binary(e)
| ScalarValue::FixedSizeBinary(_, e)
| ScalarValue::LargeBinary(e) => match e {
Some(bytes) => hex::encode(bytes),
None => LAKESOUL_NULL_STRING.to_string(),
}
other => other.to_string(),
}
}
Expand All @@ -180,7 +187,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
if val.eq(LAKESOUL_EMPTY_STRING) {
Ok(ScalarValue::Utf8(Some("".to_string())))
} else {
Ok(ScalarValue::Utf8(Some(val.to_string())))
Ok(ScalarValue::Utf8(Some(val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","))))
}
}
DataType::Timestamp(unit, timezone) => match unit {
Expand Down Expand Up @@ -238,6 +245,11 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result<ScalarValue>
Ok(ScalarValue::TimestampNanosecond(Some(nanosecs), timezone.clone()))
}
},
DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(Some(val.parse::<i128>().unwrap()), *p, *s)),
DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(Some(i256::from_string(val).unwrap()), *p, *s)),
DataType::Binary=> Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))),
DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, Some(hex::decode(val).unwrap()))),
DataType::LargeBinary => Ok(ScalarValue::LargeBinary(Some(hex::decode(val).unwrap()))),
_ => ScalarValue::try_from_string(val.to_string(), data_type),
}
}
Expand Down
20 changes: 13 additions & 7 deletions rust/lakesoul-io/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::constant::{
ARROW_CAST_OPTIONS, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING,
TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT,
};
use crate::helpers::{date_str_to_epoch_days, timestamp_str_to_unix_time};
use crate::helpers::{date_str_to_epoch_days, into_scalar_value, timestamp_str_to_unix_time};

/// adjust time zone to UTC
pub fn uniform_field(orig_field: &FieldRef) -> FieldRef {
Expand Down Expand Up @@ -315,12 +315,18 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize)
.map_err(|e| External(Box::new(e)))?;
num_rows
])),
_ => {
println!(
"make_default_array() datatype not match, datatype={:?}, value={:?}",
datatype, value
);
new_null_array(datatype, num_rows)
data_type => {
match into_scalar_value(value, data_type) {
Ok(scalar) => scalar.to_array_of_size(num_rows)?,
Err(_) => {
println!(
"make_default_array() datatype not match, datatype={:?}, value={:?}",
datatype, value
);
new_null_array(datatype, num_rows)
}
}

}
})
}
Loading