diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java index 0590b7a..ff4193a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java @@ -3,10 +3,8 @@ import com.google.api.gax.core.FixedExecutorProvider; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.*; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; -import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.Descriptors; import com.vinted.flink.bigquery.model.config.Credentials; import com.vinted.flink.bigquery.model.config.WriterSettings; @@ -53,7 +51,7 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) .setExecutorProvider(executorProvider) .build(); - + JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer); } catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) { throw new RuntimeException(e); diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index 85b1759..b4a827a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -51,10 +51,11 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setMaxInflightBytes(this.writerSettings.getMaxInflightBytes()) .setMaxRetryDuration(this.writerSettings.getMaxRetryDuration()) .setEnableConnectionPool(this.writerSettings.getEnableConnectionPool()) + .setChannelProvider(BigQueryWriteSettings.defaultTransportChannelProvider()) .setExecutorProvider(executorProvider) .setLocation(table.getProject()) .setWriterSchema(protoSchema); - + StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build()); } catch (IOException | Descriptors.DescriptorValidationException e) { throw new RuntimeException(e); diff --git a/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java b/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java index 7a3223e..d49c87c 100644 --- a/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java +++ b/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java @@ -8,6 +8,8 @@ public class BigQueryStreamMetrics { private double batchSizeInMb = 0.0; private long splitBatchCount = 0; + private int timeoutCount = 0; + public BigQueryStreamMetrics(String streamName) { this.streamName = streamName; } @@ -42,4 +44,12 @@ public double getBatchSizeInMb() { public long getSplitBatchCount() { return splitBatchCount; } + + public int getTimeoutCount() { + return timeoutCount; + } + + public void incrementTimeoutCount() { + this.timeoutCount++; + } } diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java index e3d3ecb..72673cc 100644 --- a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java +++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java @@ -16,6 +16,8 @@ public class WriterSettings implements Serializable { private Long maxInflightRequests; private Long maxInflightBytes; private Duration maxRetryDuration; + + private Duration maxRequestWaitCallbackTime; private Boolean enableConnectionPool; public int getStreamsPerTable() { @@ -69,6 +71,14 @@ public static WriterSettingsBuilder newBuilder() { return new WriterSettingsBuilder(); } + public Duration getMaxRequestWaitCallbackTime() { + return maxRequestWaitCallbackTime; + } + + public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) { + this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime; + } + public static final class WriterSettingsBuilder implements Serializable { private int streamsPerTable = 1; private int writerThreads = 1; @@ -78,6 +88,7 @@ public static final class WriterSettingsBuilder implements Serializable { private Long maxInflightRequests = 1000L; private Long maxInflightBytes = 100L * 1024L * 1024L; // 100Mb. private Duration maxRetryDuration = Duration.ofMinutes(5); + private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5); private Boolean enableConnectionPool = false; private WriterSettingsBuilder() { @@ -123,6 +134,11 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) { return this; } + public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) { + this.maxRequestWaitCallbackTime = maxRetryDuration; + return this; + } + public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPool) { this.enableConnectionPool = enableConnectionPool; return this; @@ -139,6 +155,7 @@ public WriterSettings build() { writerSettings.maxInflightRequests = this.maxInflightRequests; writerSettings.retryPause = this.retryPause; writerSettings.maxRetryDuration = this.maxRetryDuration; + writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime; return writerSettings; } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index 35bdc17..c8bf70e 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -111,6 +111,7 @@ public void write(Rows rows, Context context) { group.gauge("batch_count", metric::getBatchCount); group.gauge("batch_size_mb", metric::getBatchSizeInMb); group.gauge("split_batch_count", metric::getSplitBatchCount); + group.gauge("callback_timeouts", metric::getTimeoutCount); return metric; }); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index a7de3a0..d35a744 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -191,6 +191,8 @@ public void onFailure(Throwable t) { case UNKNOWN: if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage()); + Optional.ofNullable(this.parent.metrics.get(rows.getStream())) + .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount); this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable()); retryWrite(t, retryCount - 1); } else {