Skip to content

Commit

Permalink
[FLINK-37190] Make Kudu FlushMode configurable in Flink SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
ferenc-csaky committed Jan 21, 2025
1 parent f91b61a commit 7042dc3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.flink.connector.kudu.table.KuduCommonOptions.KUDU_MASTERS;
import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.IDENTIFIER;
import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_FLUSH_INTERVAL;
import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_FLUSH_MODE;
import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_HASH_COLS;
import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_HASH_PARTITION_NUMS;
import static org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_IGNORE_DUPLICATE;
Expand Down Expand Up @@ -81,6 +82,7 @@ public Set<ConfigOption<?>> optionalOptions() {
KUDU_MAX_BUFFER_SIZE,
KUDU_MAX_BUFFER_SIZE,
KUDU_OPERATION_TIMEOUT,
KUDU_FLUSH_MODE,
KUDU_FLUSH_INTERVAL,
KUDU_IGNORE_NOT_FOUND,
KUDU_IGNORE_DUPLICATE,
Expand All @@ -107,6 +109,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
final KuduWriterConfig.Builder configBuilder =
KuduWriterConfig.Builder.setMasters(config.get(KUDU_MASTERS))
.setOperationTimeout(config.get(KUDU_OPERATION_TIMEOUT).toMillis())
.setConsistency(config.get(KUDU_FLUSH_MODE))
.setFlushInterval((int) config.get(KUDU_FLUSH_INTERVAL).toMillis())
.setMaxBufferSize(config.get(KUDU_MAX_BUFFER_SIZE))
.setIgnoreNotFound(config.get(KUDU_IGNORE_NOT_FOUND))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import org.apache.kudu.client.SessionConfiguration;

import java.time.Duration;

/** Kudu table options. */
Expand Down Expand Up @@ -53,6 +55,12 @@ public class KuduDynamicTableOptions {
.defaultValue(1000)
.withDescription("kudu's max buffer size");

public static final ConfigOption<SessionConfiguration.FlushMode> KUDU_FLUSH_MODE =
ConfigOptions.key("kudu.flush-mode")
.enumType(SessionConfiguration.FlushMode.class)
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
.withDescription("kudu's data flush mode");

public static final ConfigOption<Duration> KUDU_FLUSH_INTERVAL =
ConfigOptions.key("kudu.flush-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -207,11 +208,13 @@ public void testTableSink() {
properties.put("kudu.table", "TestTable12");
properties.put("kudu.ignore-not-found", "true");
properties.put("kudu.ignore-duplicate", "true");
properties.put("kudu.flush-mode", "auto_flush_sync");
properties.put("kudu.flush-interval", "10000");
properties.put("kudu.max-buffer-size", "10000");

KuduWriterConfig.Builder builder =
KuduWriterConfig.Builder.setMasters(kuduMasters)
.setConsistency(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
.setFlushInterval(10000)
.setMaxBufferSize(10000)
.setIgnoreDuplicate(true)
Expand Down

0 comments on commit 7042dc3

Please sign in to comment.