From 7042dc3f4b199a6cac8fb222a7b88967cec38c52 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Tue, 21 Jan 2025 12:32:24 +0100 Subject: [PATCH] [FLINK-37190] Make Kudu `FlushMode` configurable in Flink SQL --- .../connector/kudu/table/KuduDynamicTableFactory.java | 3 +++ .../connector/kudu/table/KuduDynamicTableOptions.java | 8 ++++++++ .../connector/kudu/table/KuduDynamicTableFactoryTest.java | 3 +++ 3 files changed, 14 insertions(+) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java index a03cc2c..efd238a 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java @@ -42,6 +42,7 @@ import static org.apache.flink.connector.kudu.table.KuduCommonOptions.KUDU_MASTERS; import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.IDENTIFIER; import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_FLUSH_INTERVAL; +import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_FLUSH_MODE; import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_HASH_COLS; import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_HASH_PARTITION_NUMS; import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_IGNORE_DUPLICATE; @@ -81,6 +82,7 @@ public Set> optionalOptions() { KUDU_MAX_BUFFER_SIZE, KUDU_MAX_BUFFER_SIZE, KUDU_OPERATION_TIMEOUT, + KUDU_FLUSH_MODE, KUDU_FLUSH_INTERVAL, KUDU_IGNORE_NOT_FOUND, KUDU_IGNORE_DUPLICATE, @@ -107,6 +109,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { final KuduWriterConfig.Builder configBuilder = KuduWriterConfig.Builder.setMasters(config.get(KUDU_MASTERS)) .setOperationTimeout(config.get(KUDU_OPERATION_TIMEOUT).toMillis()) + .setConsistency(config.get(KUDU_FLUSH_MODE)) .setFlushInterval((int) config.get(KUDU_FLUSH_INTERVAL).toMillis()) .setMaxBufferSize(config.get(KUDU_MAX_BUFFER_SIZE)) .setIgnoreNotFound(config.get(KUDU_IGNORE_NOT_FOUND)) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java index 9b4f11b..11de5de 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java @@ -21,6 +21,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.kudu.client.SessionConfiguration; + import java.time.Duration; /** Kudu table options. */ @@ -53,6 +55,12 @@ public class KuduDynamicTableOptions { .defaultValue(1000) .withDescription("kudu's max buffer size"); + public static final ConfigOption KUDU_FLUSH_MODE = + ConfigOptions.key("kudu.flush-mode") + .enumType(SessionConfiguration.FlushMode.class) + .defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) + .withDescription("kudu's data flush mode"); + public static final ConfigOption KUDU_FLUSH_INTERVAL = ConfigOptions.key("kudu.flush-interval") .durationType() diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java index 46abe1a..cadf2ca 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java @@ -39,6 +39,7 @@ import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.SessionConfiguration; import org.apache.kudu.shaded.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -207,11 +208,13 @@ public void testTableSink() { properties.put("kudu.table", "TestTable12"); properties.put("kudu.ignore-not-found", "true"); properties.put("kudu.ignore-duplicate", "true"); + properties.put("kudu.flush-mode", "auto_flush_sync"); properties.put("kudu.flush-interval", "10000"); properties.put("kudu.max-buffer-size", "10000"); KuduWriterConfig.Builder builder = KuduWriterConfig.Builder.setMasters(kuduMasters) + .setConsistency(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) .setFlushInterval(10000) .setMaxBufferSize(10000) .setIgnoreDuplicate(true)