diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java index b4a827a..d4e2ec5 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryProtoClientProvider.java @@ -55,6 +55,7 @@ public BigQueryStreamWriter getWriter(String streamName, TableId table, RowVa .setExecutorProvider(executorProvider) .setLocation(table.getProject()) .setWriterSchema(protoSchema); + StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime()); return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build()); } catch (IOException | Descriptors.DescriptorValidationException e) { diff --git a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java index 9485adb..0b934cb 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/client/BigQueryStreamWriter.java @@ -11,5 +11,7 @@ public interface BigQueryStreamWriter extends AutoCloseable { String getStreamName(); String getWriterId(); + + long getInflightWaitSeconds(); boolean isClosed(); } diff --git a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java index c26983b..652cc4a 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/client/JsonStreamWriter.java @@ -42,6 +42,11 @@ public ApiFuture append(Rows data, long offset) { } } + @Override + public long getInflightWaitSeconds() { + return writer.getInflightWaitSeconds(); + } + @Override public String getStreamName() { return writer.getStreamName(); diff --git a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java index f651476..978b492 100644 --- a/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/client/ProtoStreamWriter.java @@ -38,6 +38,11 @@ public ApiFuture append(Rows data, long offset) { return writer.append(prows, offset); } + @Override + public long getInflightWaitSeconds() { + return writer.getInflightWaitSeconds(); + } + @Override public String getStreamName() { return writer.getStreamName(); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index c8bf70e..b7702ab 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -69,12 +69,22 @@ public BigQuerySinkWriter( } + private void registerInflightMetric(String streamName, BigQueryStreamWriter writer) { + var group = metricGroup + .addGroup("stream", streamName) + .addGroup("writer_id", writer.getWriterId()); + + group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds); + } + protected final BigQueryStreamWriter streamWriter(String traceId, String streamName, TableId table) { var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next()); return streamMap.computeIfAbsent(streamWithIndex, name -> { logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex); // Stream name can't contain index - return this.clientProvider.getWriter(streamName, table, rowSerializer); + var writer = this.clientProvider.getWriter(streamName, table, rowSerializer); + registerInflightMetric(streamName, writer); + return writer; }); } @@ -91,6 +101,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); } newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer); + registerInflightMetric(streamName, writer); } return newWriter; });