-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #23 from vinted/feat/async-writer
Feat/async writer
- Loading branch information
Showing
12 changed files
with
1,005 additions
and
85 deletions.
There are no files selected for viewing
55 changes: 55 additions & 0 deletions
55
src/main/java/com/vinted/flink/bigquery/metric/AsyncBigQueryStreamMetrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package com.vinted.flink.bigquery.metric; | ||
|
||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
public class AsyncBigQueryStreamMetrics { | ||
|
||
private long batchCount = 0; | ||
private double batchSizeInMb = 0.0; | ||
private long splitBatchCount = 0; | ||
private int timeoutCount = 0; | ||
private final AtomicInteger inflightRequests = new AtomicInteger(0); | ||
|
||
public void incSplitCount() { | ||
splitBatchCount += 1; | ||
} | ||
public void updateSize(long sizeInBytes) { | ||
batchSizeInMb = sizeInBytes / 1000000.0; | ||
} | ||
|
||
public long getBatchCount() { | ||
return batchCount; | ||
} | ||
|
||
public void setBatchCount(long batchCount) { | ||
this.batchCount = batchCount; | ||
} | ||
|
||
public double getBatchSizeInMb() { | ||
return batchSizeInMb; | ||
} | ||
|
||
public long getSplitBatchCount() { | ||
return splitBatchCount; | ||
} | ||
|
||
public int getTimeoutCount() { | ||
return timeoutCount; | ||
} | ||
|
||
public void incrementTimeoutCount() { | ||
this.timeoutCount++; | ||
} | ||
|
||
public int getInflightRequests() { | ||
return inflightRequests.get(); | ||
} | ||
|
||
public void incrementInflightRequests() { | ||
this.inflightRequests.incrementAndGet(); | ||
} | ||
|
||
public void decrementInflightRequests() { | ||
this.inflightRequests.decrementAndGet(); | ||
} | ||
} |
151 changes: 151 additions & 0 deletions
151
src/main/java/com/vinted/flink/bigquery/sink/async/AsyncBigQuerySink.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package com.vinted.flink.bigquery.sink.async; | ||
|
||
import com.vinted.flink.bigquery.model.Rows; | ||
import com.vinted.flink.bigquery.serializer.RowValueSerializer; | ||
import com.vinted.flink.bigquery.sink.ExecutorProvider; | ||
import org.apache.flink.connector.base.sink.AsyncSinkBase; | ||
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; | ||
import org.apache.flink.connector.base.sink.writer.BufferedRequestState; | ||
import org.apache.flink.connector.base.sink.writer.ElementConverter; | ||
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; | ||
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; | ||
import org.apache.flink.core.io.SimpleVersionedSerializer; | ||
|
||
import java.io.IOException; | ||
import java.time.Duration; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.concurrent.Executors; | ||
|
||
public class AsyncBigQuerySink<A> extends AsyncSinkBase<Rows<A>, StreamRequest> { | ||
private final AsyncClientProvider provider; | ||
private final RateLimitingStrategy strategy; | ||
|
||
private final ExecutorProvider executorProvider; | ||
|
||
public static <A> AsyncBigQuerySinkBuilder<A> builder() { | ||
return new AsyncBigQuerySinkBuilder<>(); | ||
} | ||
|
||
protected AsyncBigQuerySink(ExecutorProvider executorProvider, AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter<Rows<A>, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { | ||
super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes); | ||
this.executorProvider = executorProvider; | ||
this.provider = provider; | ||
this.strategy = rateLimitingStrategy; | ||
} | ||
|
||
@Override | ||
public StatefulSinkWriter<Rows<A>, BufferedRequestState<StreamRequest>> createWriter(InitContext initContext) throws IOException { | ||
return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext, | ||
AsyncSinkWriterConfiguration.builder() | ||
.setMaxBatchSize(getMaxBatchSize()) | ||
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) | ||
.setMaxInFlightRequests(getMaxInFlightRequests()) | ||
.setMaxBufferedRequests(getMaxBufferedRequests()) | ||
.setMaxTimeInBufferMS(getMaxTimeInBufferMS()) | ||
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) | ||
.setRateLimitingStrategy(strategy) | ||
.build(), | ||
List.of() | ||
); | ||
} | ||
|
||
@Override | ||
public StatefulSinkWriter<Rows<A>, BufferedRequestState<StreamRequest>> restoreWriter(InitContext initContext, Collection<BufferedRequestState<StreamRequest>> collection) throws IOException { | ||
return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext, | ||
AsyncSinkWriterConfiguration.builder() | ||
.setMaxBatchSize(getMaxBatchSize()) | ||
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes()) | ||
.setMaxInFlightRequests(getMaxInFlightRequests()) | ||
.setMaxBufferedRequests(getMaxBufferedRequests()) | ||
.setMaxTimeInBufferMS(getMaxTimeInBufferMS()) | ||
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes()) | ||
.setRateLimitingStrategy(strategy) | ||
.build(), | ||
collection | ||
); | ||
} | ||
|
||
@Override | ||
public SimpleVersionedSerializer<BufferedRequestState<StreamRequest>> getWriterStateSerializer() { | ||
return new StreamRequestSerializer(); | ||
} | ||
|
||
static public class AsyncBigQuerySinkBuilder<A> extends AsyncSinkBaseBuilder<Rows<A>, StreamRequest, AsyncBigQuerySinkBuilder<A>> { | ||
private static final int DEFAULT_MAX_BATCH_SIZE = 1; | ||
private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4; | ||
private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1; | ||
private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB | ||
|
||
private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis(); | ||
|
||
private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000; | ||
private AsyncClientProvider provider; | ||
|
||
private RowValueSerializer<A> serializer; | ||
|
||
private RateLimitingStrategy strategy = null; | ||
|
||
private ExecutorProvider executorProvider = () -> Executors.newFixedThreadPool(4); | ||
|
||
public AsyncBigQuerySinkBuilder<A> setClientProvider(AsyncClientProvider provider) { | ||
this.provider = provider; | ||
return this; | ||
} | ||
|
||
public AsyncBigQuerySinkBuilder<A> setRowSerializer(RowValueSerializer<A> serializer) { | ||
this.serializer = serializer; | ||
return this; | ||
} | ||
|
||
public AsyncBigQuerySinkBuilder<A> setRateLimitStrategy(RateLimitingStrategy strategy) { | ||
this.strategy = strategy; | ||
return this; | ||
} | ||
|
||
public AsyncBigQuerySinkBuilder<A> setExecutorProvider(ExecutorProvider executorProvider) { | ||
this.executorProvider = executorProvider; | ||
return this; | ||
} | ||
|
||
@Override | ||
public AsyncSinkBase<Rows<A>, StreamRequest> build() { | ||
if (getMaxBatchSize() == null) { | ||
setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE); | ||
} | ||
|
||
if (getMaxInFlightRequests() == null) { | ||
setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS); | ||
} | ||
|
||
if (getMaxBufferedRequests() == null) { | ||
setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS); | ||
} | ||
|
||
if (getMaxBatchSizeInBytes() == null) { | ||
setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES); | ||
} | ||
|
||
if (getMaxTimeInBufferMS() == null) { | ||
setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS); | ||
} | ||
|
||
if (getMaxRecordSizeInBytes() == null) { | ||
setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES); | ||
} | ||
|
||
return new AsyncBigQuerySink<>( | ||
this.executorProvider, | ||
this.provider, | ||
this.strategy, | ||
new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()), | ||
getMaxBatchSize(), | ||
getMaxInFlightRequests(), | ||
getMaxBufferedRequests(), | ||
getMaxBatchSizeInBytes(), | ||
getMaxTimeInBufferMS(), | ||
getMaxRecordSizeInBytes() | ||
); | ||
} | ||
} | ||
} |
Oops, something went wrong.