Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add callback timeout config #21

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors;
import com.vinted.flink.bigquery.model.config.Credentials;
import com.vinted.flink.bigquery.model.config.WriterSettings;
Expand Down Expand Up @@ -53,7 +51,7 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();

JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setMaxInflightBytes(this.writerSettings.getMaxInflightBytes())
.setMaxRetryDuration(this.writerSettings.getMaxRetryDuration())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setChannelProvider(BigQueryWriteSettings.defaultTransportChannelProvider())
.setExecutorProvider(executorProvider)
.setLocation(table.getProject())
.setWriterSchema(protoSchema);

StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public interface BigQueryStreamWriter<T> extends AutoCloseable {
String getStreamName();

String getWriterId();

long getInflightWaitSeconds();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
}
}

@Override
public long getInflightWaitSeconds() {
return writer.getInflightWaitSeconds();
}

@Override
public String getStreamName() {
return writer.getStreamName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public ApiFuture<AppendRowsResponse> append(Rows<A> data, long offset) {
return writer.append(prows, offset);
}

@Override
public long getInflightWaitSeconds() {
return writer.getInflightWaitSeconds();
}

@Override
public String getStreamName() {
return writer.getStreamName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class BigQueryStreamMetrics {
private double batchSizeInMb = 0.0;
private long splitBatchCount = 0;

private int timeoutCount = 0;

public BigQueryStreamMetrics(String streamName) {
this.streamName = streamName;
}
Expand Down Expand Up @@ -42,4 +44,12 @@ public double getBatchSizeInMb() {
public long getSplitBatchCount() {
return splitBatchCount;
}

public int getTimeoutCount() {
return timeoutCount;
}

public void incrementTimeoutCount() {
this.timeoutCount++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class WriterSettings implements Serializable {
private Long maxInflightRequests;
private Long maxInflightBytes;
private Duration maxRetryDuration;

private Duration maxRequestWaitCallbackTime;
private Boolean enableConnectionPool;

public int getStreamsPerTable() {
Expand Down Expand Up @@ -69,6 +71,14 @@ public static WriterSettingsBuilder newBuilder() {
return new WriterSettingsBuilder();
}

public Duration getMaxRequestWaitCallbackTime() {
return maxRequestWaitCallbackTime;
}

public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
}

public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
Expand All @@ -78,6 +88,7 @@ public static final class WriterSettingsBuilder implements Serializable {
private Long maxInflightRequests = 1000L;
private Long maxInflightBytes = 100L * 1024L * 1024L; // 100Mb.
private Duration maxRetryDuration = Duration.ofMinutes(5);
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;

private WriterSettingsBuilder() {
Expand Down Expand Up @@ -123,6 +134,11 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
return this;
}

public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRetryDuration;
return this;
}

public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
Expand All @@ -139,6 +155,7 @@ public WriterSettings build() {
writerSettings.maxInflightRequests = this.maxInflightRequests;
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
return writerSettings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,22 @@ public BigQuerySinkWriter(

}

private void registerInflightMetric(String streamName, BigQueryStreamWriter<A> writer) {
var group = metricGroup
.addGroup("stream", streamName)
.addGroup("writer_id", writer.getWriterId());

group.gauge("inflight_wait_seconds", writer::getInflightWaitSeconds);
}

protected final BigQueryStreamWriter<A> streamWriter(String traceId, String streamName, TableId table) {
var streamWithIndex = String.format("%s-%s",streamName, streamIndexIterator.next());
return streamMap.computeIfAbsent(streamWithIndex, name -> {
logger.trace("Trace-id {} Stream not found {}. Creating new stream", traceId, streamWithIndex);
// Stream name can't contain index
return this.clientProvider.getWriter(streamName, table, rowSerializer);
var writer = this.clientProvider.getWriter(streamName, table, rowSerializer);
registerInflightMetric(streamName, writer);
return writer;
});
}

Expand All @@ -91,6 +101,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table, rowSerializer);
registerInflightMetric(streamName, writer);
}
return newWriter;
});
Expand All @@ -111,6 +122,7 @@ public void write(Rows<A> rows, Context context) {
group.gauge("batch_count", metric::getBatchCount);
group.gauge("batch_size_mb", metric::getBatchSizeInMb);
group.gauge("split_batch_count", metric::getSplitBatchCount);
group.gauge("callback_timeouts", metric::getTimeoutCount);

return metric;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ public void onFailure(Throwable t) {
case UNKNOWN:
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
.ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
} else {
Expand Down
Loading