Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/async writer #23

Merged
merged 5 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.vinted.flink.bigquery.metric;

import java.util.concurrent.atomic.AtomicInteger;

public class AsyncBigQueryStreamMetrics {

private long batchCount = 0;
private double batchSizeInMb = 0.0;
private long splitBatchCount = 0;
private int timeoutCount = 0;
private final AtomicInteger inflightRequests = new AtomicInteger(0);

public void incSplitCount() {
splitBatchCount += 1;
}
public void updateSize(long sizeInBytes) {
batchSizeInMb = sizeInBytes / 1000000.0;
}

public long getBatchCount() {
return batchCount;
}

public void setBatchCount(long batchCount) {
this.batchCount = batchCount;
}

public double getBatchSizeInMb() {
return batchSizeInMb;
}

public long getSplitBatchCount() {
return splitBatchCount;
}

public int getTimeoutCount() {
return timeoutCount;
}

public void incrementTimeoutCount() {
this.timeoutCount++;
}

public int getInflightRequests() {
return inflightRequests.get();
}

public void incrementInflightRequests() {
this.inflightRequests.incrementAndGet();
}

public void decrementInflightRequests() {
this.inflightRequests.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.vinted.flink.bigquery.sink.async;

import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import com.vinted.flink.bigquery.sink.ExecutorProvider;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;

public class AsyncBigQuerySink<A> extends AsyncSinkBase<Rows<A>, StreamRequest> {
private final AsyncClientProvider provider;
private final RateLimitingStrategy strategy;

private final ExecutorProvider executorProvider;

public static <A> AsyncBigQuerySinkBuilder<A> builder() {
return new AsyncBigQuerySinkBuilder<>();
}

protected AsyncBigQuerySink(ExecutorProvider executorProvider, AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter<Rows<A>, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes);
this.executorProvider = executorProvider;
this.provider = provider;
this.strategy = rateLimitingStrategy;
}

@Override
public StatefulSinkWriter<Rows<A>, BufferedRequestState<StreamRequest>> createWriter(InitContext initContext) throws IOException {
return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.setRateLimitingStrategy(strategy)
.build(),
List.of()
);
}

@Override
public StatefulSinkWriter<Rows<A>, BufferedRequestState<StreamRequest>> restoreWriter(InitContext initContext, Collection<BufferedRequestState<StreamRequest>> collection) throws IOException {
return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.setRateLimitingStrategy(strategy)
.build(),
collection
);
}

@Override
public SimpleVersionedSerializer<BufferedRequestState<StreamRequest>> getWriterStateSerializer() {
return new StreamRequestSerializer();
}

static public class AsyncBigQuerySinkBuilder<A> extends AsyncSinkBaseBuilder<Rows<A>, StreamRequest, AsyncBigQuerySinkBuilder<A>> {
private static final int DEFAULT_MAX_BATCH_SIZE = 1;
private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4;
private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1;
private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB

private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis();

private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000;
private AsyncClientProvider provider;

private RowValueSerializer<A> serializer;

private RateLimitingStrategy strategy = null;

private ExecutorProvider executorProvider = () -> Executors.newFixedThreadPool(4);

public AsyncBigQuerySinkBuilder<A> setClientProvider(AsyncClientProvider provider) {
this.provider = provider;
return this;
}

public AsyncBigQuerySinkBuilder<A> setRowSerializer(RowValueSerializer<A> serializer) {
this.serializer = serializer;
return this;
}

public AsyncBigQuerySinkBuilder<A> setRateLimitStrategy(RateLimitingStrategy strategy) {
this.strategy = strategy;
return this;
}

public AsyncBigQuerySinkBuilder<A> setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

@Override
public AsyncSinkBase<Rows<A>, StreamRequest> build() {
if (getMaxBatchSize() == null) {
setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE);
}

if (getMaxInFlightRequests() == null) {
setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS);
}

if (getMaxBufferedRequests() == null) {
setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS);
}

if (getMaxBatchSizeInBytes() == null) {
setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES);
}

if (getMaxTimeInBufferMS() == null) {
setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS);
}

if (getMaxRecordSizeInBytes() == null) {
setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES);
}

return new AsyncBigQuerySink<>(
this.executorProvider,
this.provider,
this.strategy,
new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes()
);
}
}
}
Loading
Loading