Skip to content

Commit

Permalink
feat: add inlight time metric
Browse files Browse the repository at this point in the history
  • Loading branch information
gintarasm committed Mar 6, 2024
1 parent 672c917 commit 8d89378
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);

StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public interface BigQueryStreamWriter<T> extends AutoCloseable {
String getStreamName();

String getWriterId();

long getInflightWaitSeconds();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
}
}

@Override
public long getInflightWaitSeconds() {
return writer.getInflightWaitSeconds();
}

@Override
public String getStreamName() {
return writer.getStreamName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
return writer.append(prows, offset);
}

@Override
public long getInflightWaitSeconds() {
return writer.getInflightWaitSeconds();
}

@Override
public String getStreamName() {
return writer.getStreamName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,22 @@ public BigQuerySinkWriter(

}

private void registerInflightMetric(String streamName, BigQueryStreamWriter<A> writer) {
var group = metricGroup
.addGroup("stream", streamName)
.addGroup("writer_id", writer.getWriterId());

group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds);
}

protected final BigQueryStreamWriter<A> streamWriter(String traceId, String streamName, TableId table) {
var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next());
return streamMap.computeIfAbsent(streamWithIndex, name -> {
logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex);
// Stream name can't contain index
return this.clientProvider.getWriter(streamName, table, rowSerializer);
var writer = this.clientProvider.getWriter(streamName, table, rowSerializer);
registerInflightMetric(streamName, writer);
return writer;
});
}

Expand All @@ -91,6 +101,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer);
registerInflightMetric(streamName, writer);
}
return newWriter;
});
Expand Down

0 comments on commit 8d89378

Please sign in to comment.