diff --git a/src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java b/src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java new file mode 100644 index 0000000..f77a856 --- /dev/null +++ b/src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java @@ -0,0 +1,55 @@ +package com.vinted.flink.bigquery.metric; + +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncBigQueryStreamMetrics { + + private long batchCount = 0; + private double batchSizeInMb = 0.0; + private long splitBatchCount = 0; + private int timeoutCount = 0; + private final AtomicInteger inflightRequests = new AtomicInteger(0); + + public void incSplitCount() { + splitBatchCount += 1; + } + public void updateSize(long sizeInBytes) { + batchSizeInMb = sizeInBytes / 1000000.0; + } + + public long getBatchCount() { + return batchCount; + } + + public void setBatchCount(long batchCount) { + this.batchCount = batchCount; + } + + public double getBatchSizeInMb() { + return batchSizeInMb; + } + + public long getSplitBatchCount() { + return splitBatchCount; + } + + public int getTimeoutCount() { + return timeoutCount; + } + + public void incrementTimeoutCount() { + this.timeoutCount++; + } + + public int getInflightRequests() { + return inflightRequests.get(); + } + + public void incrementInflightRequests() { + this.inflightRequests.incrementAndGet(); + } + + public void decrementInflightRequests() { + this.inflightRequests.decrementAndGet(); + } +} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java index 1506db2..2d1413b 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java @@ -1,7 +1,10 @@ package com.vinted.flink.bigquery.sink.async; import com.vinted.flink.bigquery.model.Rows; +import com.vinted.flink.bigquery.serializer.RowValueSerializer; +import com.vinted.flink.bigquery.sink.ExecutorProvider; import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; @@ -9,26 +12,31 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; +import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.concurrent.Executors; public class AsyncBigQuerySink extends AsyncSinkBase, StreamRequest> { private final AsyncClientProvider provider; private final RateLimitingStrategy strategy; + private final ExecutorProvider executorProvider; + public static AsyncBigQuerySinkBuilder builder() { return new AsyncBigQuerySinkBuilder<>(); } - protected AsyncBigQuerySink(AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { + protected AsyncBigQuerySink(ExecutorProvider executorProvider, AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes); + this.executorProvider = executorProvider; this.provider = provider; this.strategy = rateLimitingStrategy; } @Override public StatefulSinkWriter, BufferedRequestState> createWriter(InitContext initContext) throws IOException { - return new AsyncBigQuerySinkWriter<>(provider, this.getElementConverter(), initContext, + return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext, AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(getMaxBatchSize()) .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) @@ -44,7 +52,7 @@ public StatefulSinkWriter, BufferedRequestState> createWr @Override public StatefulSinkWriter, BufferedRequestState> restoreWriter(InitContext initContext, Collection> collection) throws IOException { - return new AsyncBigQuerySinkWriter<>(provider, this.getElementConverter(), initContext, + return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext, AsyncSinkWriterConfiguration.builder() .setMaxBatchSize(getMaxBatchSize()) .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) @@ -62,4 +70,82 @@ public StatefulSinkWriter, BufferedRequestState> restoreW public SimpleVersionedSerializer> getWriterStateSerializer() { return new StreamRequestSerializer(); } + + static public class AsyncBigQuerySinkBuilder extends AsyncSinkBaseBuilder, StreamRequest, AsyncBigQuerySinkBuilder> { + private static final int DEFAULT_MAX_BATCH_SIZE = 1; + private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4; + private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1; + private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB + + private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis(); + + private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000; + private AsyncClientProvider provider; + + private RowValueSerializer serializer; + + private RateLimitingStrategy strategy = null; + + private ExecutorProvider executorProvider = () -> Executors.newFixedThreadPool(4); + + public AsyncBigQuerySinkBuilder setClientProvider(AsyncClientProvider provider) { + this.provider = provider; + return this; + } + + public AsyncBigQuerySinkBuilder setRowSerializer(RowValueSerializer serializer) { + this.serializer = serializer; + return this; + } + + public AsyncBigQuerySinkBuilder setRateLimitStrategy(RateLimitingStrategy strategy) { + this.strategy = strategy; + return this; + } + + public AsyncBigQuerySinkBuilder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = executorProvider; + return this; + } + + @Override + public AsyncSinkBase, StreamRequest> build() { + if (getMaxBatchSize() == null) { + setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE); + } + + if (getMaxInFlightRequests() == null) { + setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS); + } + + if (getMaxBufferedRequests() == null) { + setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS); + } + + if (getMaxBatchSizeInBytes() == null) { + setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES); + } + + if (getMaxTimeInBufferMS() == null) { + setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS); + } + + if (getMaxRecordSizeInBytes() == null) { + setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES); + } + + return new AsyncBigQuerySink<>( + this.executorProvider, + this.provider, + this.strategy, + new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()), + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBatchSizeInBytes(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes() + ); + } + } } diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java deleted file mode 100644 index 4346582..0000000 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkBuilder.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.vinted.flink.bigquery.sink.async; - -import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.serializer.RowValueSerializer; -import org.apache.flink.connector.base.sink.AsyncSinkBase; -import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; -import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; - -import java.time.Duration; - -public class AsyncBigQuerySinkBuilder extends AsyncSinkBaseBuilder, StreamRequest, AsyncBigQuerySinkBuilder> { - private static final int DEFAULT_MAX_BATCH_SIZE = 1; - private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4; - private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1; - private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB - - private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis(); - - private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000; - private AsyncClientProvider provider; - - private RowValueSerializer serializer; - - private RateLimitingStrategy strategy = null; - - public AsyncBigQuerySinkBuilder setClientProvider(AsyncClientProvider provider) { - this.provider = provider; - return this; - } - - public AsyncBigQuerySinkBuilder setRowSerializer(RowValueSerializer serializer) { - this.serializer = serializer; - return this; - } - - public AsyncBigQuerySinkBuilder setRateLimitStrategy(RateLimitingStrategy strategy) { - this.strategy = strategy; - return this; - } - - @Override - public AsyncSinkBase, StreamRequest> build() { - if (getMaxBatchSize() == null) { - setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE); - } - - if (getMaxInFlightRequests() == null) { - setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS); - } - - if (getMaxBufferedRequests() == null) { - setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS); - } - - if (getMaxBatchSizeInBytes() == null) { - setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES); - } - - if (getMaxTimeInBufferMS() == null) { - setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS); - } - - if (getMaxRecordSizeInBytes() == null) { - setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES); - } - - return new AsyncBigQuerySink<>( - this.provider, - this.strategy, - new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()), - getMaxBatchSize(), - getMaxInFlightRequests(), - getMaxBufferedRequests(), - getMaxBatchSizeInBytes(), - getMaxTimeInBufferMS(), - getMaxRecordSizeInBytes() - ); - } -} diff --git a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java index b6649a5..e5809cc 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySinkWriter.java @@ -1,30 +1,27 @@ package com.vinted.flink.bigquery.sink.async; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.*; -import com.vinted.flink.bigquery.client.BigQueryStreamWriter; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.vinted.flink.bigquery.metric.AsyncBigQueryStreamMetrics; import com.vinted.flink.bigquery.metric.BigQueryStreamMetrics; import com.vinted.flink.bigquery.model.Rows; -import com.vinted.flink.bigquery.sink.AppendException; -import com.vinted.flink.bigquery.sink.defaultStream.BigQueryDefaultSinkWriter; +import com.vinted.flink.bigquery.sink.ExecutorProvider; import io.grpc.Status; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -34,19 +31,15 @@ public class AsyncBigQuerySinkWriter extends AsyncSinkWriter, StreamR private final SinkWriterMetricGroup metricGroup; - private final transient Map metrics = new HashMap<>(); + private final transient Map metrics = new HashMap<>(); private final Executor appendExecutor; - private final Executor waitExecutor = Executors.newSingleThreadExecutor(); - protected transient Map streamMap = new ConcurrentHashMap<>(); - protected BigQueryWriteClient client; - - public AsyncBigQuerySinkWriter(AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { + public AsyncBigQuerySinkWriter(ExecutorProvider executorProvider, AsyncClientProvider clientProvider, ElementConverter, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection> bufferedRequestStates) { super(elementConverter, context, configuration, bufferedRequestStates); - appendExecutor = Executors.newFixedThreadPool(4); + appendExecutor = executorProvider.create(); this.metricGroup = context.metricGroup(); this.clientProvider = clientProvider; } @@ -61,15 +54,15 @@ private void registerInflightMetric(StreamWriter writer) { private void registerAppendMetrics(StreamRequest request) { metrics.computeIfAbsent(request.getStream(), s -> { - var metric = new BigQueryStreamMetrics(request.getStream()); + var metric = new AsyncBigQueryStreamMetrics(); var group = metricGroup .addGroup("table", request.getTable().getTable()) .addGroup("stream", request.getStream()); - group.gauge("stream_offset", (Gauge) metric::getOffset); group.gauge("batch_count", metric::getBatchCount); group.gauge("batch_size_mb", metric::getBatchSizeInMb); group.gauge("split_batch_count", metric::getSplitBatchCount); group.gauge("callback_timeouts", metric::getTimeoutCount); + group.gauge("inflight_requests", metric::getInflightRequests); return metric; }); @@ -122,10 +115,16 @@ protected void submitRequestEntries(List list, Consumer { s.updateSize(request.getData().getSerializedSize()); s.setBatchCount(request.getData().getSerializedRowsCount()); + s.incrementInflightRequests(); }); writer.append(request.getData()).get(); return List.of(); } catch (Throwable t) { + Optional.ofNullable(metrics.get(request.getStream())).ifPresent(s -> { + s.updateSize(request.getData().getSerializedSize()); + s.setBatchCount(request.getData().getSerializedRowsCount()); + s.decrementInflightRequests(); + }); logger.error("Trace-id {}, StreamWriter failed to append {}", traceId, t.getMessage()); var status = Status.fromThrowable(t); switch (status.getCode()) { @@ -135,7 +134,7 @@ protected void submitRequestEntries(List list, Consumer list, Consumer { + Optional.ofNullable(this.metrics.get(request.getStream())) + .ifPresent(AsyncBigQueryStreamMetrics::decrementInflightRequests); + this.getFatalExceptionCons().accept(new AsyncWriterException(traceId, Status.fromThrowable(e).getCode(), e)); + return List.of(); + }).whenCompleteAsync((v, e) -> { + Optional.ofNullable(this.metrics.get(request.getStream())) + .ifPresent(AsyncBigQueryStreamMetrics::decrementInflightRequests); }, appendExecutor); }).collect(Collectors.toList()); @@ -201,5 +205,23 @@ protected long getSizeInBytes(StreamRequest StreamRequest) { return StreamRequest.getData().getSerializedSize(); } + @Override + public void close() { + logger.info("Closing BigQuery write stream"); + try { + flush(true); + streamMap.values().forEach(stream -> { + try { + stream.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } + } diff --git a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java index e6a694d..ffc84de 100644 --- a/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/AsyncBigQuerySinkTest.java @@ -76,7 +76,7 @@ public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any()); + verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any()); } @Test