Skip to content

Commit b7850a5

Browse files
Tan-JiaLiangxiekunyuan
and
xiekunyuan
authored
[FLINK-35233] Fix lookup cache reuse RowData object problem (#47)
* fix: convertToReusedRow() is now returned by default, and the result returned is a reused object. If lookup.cache is enabled, the result encapsulated in the reused object will be cached externally, resulting in all cached values being the same object * [FLINK-35233] Fix lookup cache reuse RowData object problem --------- Co-authored-by: xiekunyuan <xiekunyuan@meizu.com>
1 parent 8ffab20 commit b7850a5

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java

+32-7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.apache.hadoop.hbase.TableNotFoundException;
4949
import org.apache.hadoop.hbase.util.Bytes;
5050
import org.junit.jupiter.api.Test;
51+
import org.junit.jupiter.params.ParameterizedTest;
52+
import org.junit.jupiter.params.provider.EnumSource;
5153

5254
import java.io.IOException;
5355
import java.util.ArrayList;
@@ -591,14 +593,16 @@ void testTableSourceSinkWithDDL() throws Exception {
591593
assertThat(result).isEqualTo(expected);
592594
}
593595

594-
@Test
595-
void testHBaseLookupTableSource() {
596-
verifyHBaseLookupJoin(false);
596+
@ParameterizedTest
597+
@EnumSource(Caching.class)
598+
void testHBaseLookupTableSource(Caching caching) {
599+
verifyHBaseLookupJoin(caching, false);
597600
}
598601

599-
@Test
600-
void testHBaseAsyncLookupTableSource() {
601-
verifyHBaseLookupJoin(true);
602+
@ParameterizedTest
603+
@EnumSource(Caching.class)
604+
void testHBaseAsyncLookupTableSource(Caching caching) {
605+
verifyHBaseLookupJoin(caching, true);
602606
}
603607

604608
@Test
@@ -661,10 +665,22 @@ void testHBaseSinkFunctionTableExistence() throws Exception {
661665
sinkFunction.close();
662666
}
663667

664-
private void verifyHBaseLookupJoin(boolean async) {
668+
private void verifyHBaseLookupJoin(Caching caching, boolean async) {
665669
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
666670
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings);
667671

672+
String cacheOptions = "";
673+
if (caching == Caching.ENABLE_CACHE) {
674+
cacheOptions =
675+
","
676+
+ String.join(
677+
",",
678+
Arrays.asList(
679+
"'lookup.cache' = 'PARTIAL'",
680+
"'lookup.partial-cache.max-rows' = '1000'",
681+
"'lookup.partial-cache.expire-after-write' = '10min'"));
682+
}
683+
668684
tEnv.executeSql(
669685
"CREATE TABLE "
670686
+ TEST_TABLE_1
@@ -686,6 +702,7 @@ private void verifyHBaseLookupJoin(boolean async) {
686702
+ " 'zookeeper.quorum' = '"
687703
+ getZookeeperQuorum()
688704
+ "'"
705+
+ cacheOptions
689706
+ ")");
690707

691708
// prepare a source table
@@ -722,6 +739,8 @@ private void verifyHBaseLookupJoin(boolean async) {
722739
.collect(Collectors.toList());
723740

724741
List<String> expected = new ArrayList<>();
742+
expected.add(
743+
"+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
725744
expected.add(
726745
"+I[1, 1, 10, Hello-1, 100, 1.01, false, Welt-1, 2019-08-18T19:00, 2019-08-18, 19:00, 12345678.0001]");
727746
expected.add(
@@ -750,6 +769,12 @@ private void verifyHBaseLookupJoin(boolean async) {
750769
testData.add(Row.of(2, 2L, "Hello"));
751770
testData.add(Row.of(3, 2L, "Hello world"));
752771
testData.add(Row.of(3, 3L, "Hello world!"));
772+
testData.add(Row.of(1, 1L, "Hi")); // lookup one more time
773+
}
774+
775+
private enum Caching {
776+
ENABLE_CACHE,
777+
DISABLE_CACHE
753778
}
754779

755780
// ------------------------------- Utilities -------------------------------------------------

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public Collection<RowData> lookup(RowData keyRow) throws IOException {
9797
if (get != null) {
9898
Result result = table.get(get);
9999
if (!result.isEmpty()) {
100-
return Collections.singletonList(serde.convertToReusedRow(result));
100+
return Collections.singletonList(serde.convertToNewRow(result));
101101
}
102102
}
103103
break;

0 commit comments

Comments
 (0)