diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java
index 0590b7a..ff4193a 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryJsonClientProvider.java
@@ -3,10 +3,8 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
-import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
-import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
@@ -53,7 +51,7 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();
-
+ JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java
index 85b1759..d4e2ec5 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java
@@ -51,10 +51,12 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa
.setMaxInflightBytes(this.writerSettings.getMaxInflightBytes())
.setMaxRetryDuration(this.writerSettings.getMaxRetryDuration())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
+ .setChannelProvider(BigQueryWriteSettings.defaultTransportChannelProvider())
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);
+ StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java
index 9485adb..0b934cb 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java
@@ -11,5 +11,7 @@ public interface BigQueryStreamWriter extends AutoCloseable {
String getStreamName();
String getWriterId();
+
+ long getInflightWaitSeconds();
boolean isClosed();
}
diff --git a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java
index c26983b..652cc4a 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java
@@ -42,6 +42,11 @@ public ApiFuture append(Rows data, long offset) {
}
}
+ @Override
+ public long getInflightWaitSeconds() {
+ return writer.getInflightWaitSeconds();
+ }
+
@Override
public String getStreamName() {
return writer.getStreamName();
diff --git a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java
index f651476..978b492 100644
--- a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java
@@ -38,6 +38,11 @@ public ApiFuture append(Rows data, long offset) {
return writer.append(prows, offset);
}
+ @Override
+ public long getInflightWaitSeconds() {
+ return writer.getInflightWaitSeconds();
+ }
+
@Override
public String getStreamName() {
return writer.getStreamName();
diff --git a/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java b/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java
index 7a3223e..d49c87c 100644
--- a/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java
+++ b/src/main/java/com/vinted/flink/bigquery/metric/BigQueryStreamMetrics.java
@@ -8,6 +8,8 @@ public class BigQueryStreamMetrics {
private double batchSizeInMb = 0.0;
private long splitBatchCount = 0;
+ private int timeoutCount = 0;
+
public BigQueryStreamMetrics(String streamName) {
this.streamName = streamName;
}
@@ -42,4 +44,12 @@ public double getBatchSizeInMb() {
public long getSplitBatchCount() {
return splitBatchCount;
}
+
+ public int getTimeoutCount() {
+ return timeoutCount;
+ }
+
+ public void incrementTimeoutCount() {
+ this.timeoutCount++;
+ }
}
diff --git a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java
index e3d3ecb..72673cc 100644
--- a/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java
+++ b/src/main/java/com/vinted/flink/bigquery/model/config/WriterSettings.java
@@ -16,6 +16,8 @@ public class WriterSettings implements Serializable {
private Long maxInflightRequests;
private Long maxInflightBytes;
private Duration maxRetryDuration;
+
+ private Duration maxRequestWaitCallbackTime;
private Boolean enableConnectionPool;
public int getStreamsPerTable() {
@@ -69,6 +71,14 @@ public static WriterSettingsBuilder newBuilder() {
return new WriterSettingsBuilder();
}
+ public Duration getMaxRequestWaitCallbackTime() {
+ return maxRequestWaitCallbackTime;
+ }
+
+ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
+ this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
+ }
+
public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
@@ -78,6 +88,7 @@ public static final class WriterSettingsBuilder implements Serializable {
private Long maxInflightRequests = 1000L;
private Long maxInflightBytes = 100L * 1024L * 1024L; // 100Mb.
private Duration maxRetryDuration = Duration.ofMinutes(5);
+ private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;
private WriterSettingsBuilder() {
@@ -123,6 +134,11 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
return this;
}
+ public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
+ this.maxRequestWaitCallbackTime = maxRetryDuration;
+ return this;
+ }
+
public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
@@ -139,6 +155,7 @@ public WriterSettings build() {
writerSettings.maxInflightRequests = this.maxInflightRequests;
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
+ writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
return writerSettings;
}
}
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java
index 35bdc17..b7702ab 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java
@@ -69,12 +69,22 @@ public BigQuerySinkWriter(
}
+ private void registerInflightMetric(String streamName, BigQueryStreamWriter writer) {
+ var group = metricGroup
+ .addGroup("stream", streamName)
+ .addGroup("writer_id", writer.getWriterId());
+
+ group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds);
+ }
+
protected final BigQueryStreamWriter streamWriter(String traceId, String streamName, TableId table) {
var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next());
return streamMap.computeIfAbsent(streamWithIndex, name -> {
logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex);
// Stream name can't contain index
- return this.clientProvider.getWriter(streamName, table, rowSerializer);
+ var writer = this.clientProvider.getWriter(streamName, table, rowSerializer);
+ registerInflightMetric(streamName, writer);
+ return writer;
});
}
@@ -91,6 +101,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer);
+ registerInflightMetric(streamName, writer);
}
return newWriter;
});
@@ -111,6 +122,7 @@ public void write(Rows rows, Context context) {
group.gauge("batch_count", metric::getBatchCount);
group.gauge("batch_size_mb", metric::getBatchSizeInMb);
group.gauge("split_batch_count", metric::getSplitBatchCount);
+ group.gauge("callback_timeouts", metric::getTimeoutCount);
return metric;
});
diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
index a7de3a0..d35a744 100644
--- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
+++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java
@@ -191,6 +191,8 @@ public void onFailure(Throwable t) {
case UNKNOWN:
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
+ Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
+ .ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
} else {