Skip to content

Commit

Permalink
Merge pull request #16 from vinted/fix/handle-idle-streams
Browse files Browse the repository at this point in the history
fix: handle idle streams
  • Loading branch information
gintarasm authored Feb 7, 2024
2 parents 1eb5b22 + 4552d3b commit 32df104
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -75,6 +76,27 @@ protected final StreamT streamWriter(String traceId, String streamName, TableId
});
}

protected final void recreateAllStreamWriters(String traceId, String streamName, TableId table) {
logger.info("Trace-id {} Closing all writers for {}", traceId, streamName);
try {
flush(true);
streamMap.replaceAll((key, writer) -> {
var newWriter = writer;
if (key.startsWith(streamName)) {
try {
writer.close();
} catch (Exception e) {
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
newWriter = this.clientProvider.getWriter(streamName, table);
}
return newWriter;
});
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void write(Rows<A> rows, Context context) {
numRecordsOutCounter.inc(rows.getData().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
var rowArray = new JSONArray();
rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row)))));
var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
}

logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ protected ApiFuture<AppendRowsResponse> append(String traceId, Rows<A> rows) {
numBytesOutCounter.inc(size);
numRecordsOutCounter.inc(rows.getData().size());
Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size));

var writer = streamWriter(traceId, rows.getStream(), rows.getTable());

if (writer.isClosed() || writer.isUserClosed()) {
logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream());
}
logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId());
try {
return writer.append(prows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,40 @@

import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.function.Function;

public abstract class BigQueryDefaultSinkWriter<A, StreamT extends AutoCloseable>
extends BigQuerySinkWriter<A, StreamT> {
private static final Logger logger = LoggerFactory.getLogger(BigQueryDefaultSinkWriter.class);

private final Phaser inflightRequestCount = new Phaser(1);

private volatile AppendException appendAsyncException = null;

public BigQueryDefaultSinkWriter(
Sink.InitContext sinkInitContext,
RowValueSerializer<A> rowSerializer,
ClientProvider<StreamT> clientProvider,
ExecutorProvider executorProvider) {

super(sinkInitContext, rowSerializer, clientProvider, executorProvider);

var metricGroup = this.sinkInitContext.metricGroup();
var group = metricGroup
.addGroup("BigQuery")
.addGroup("DefaultSinkWriter")
.addGroup("inflight_requests");

group.gauge("count", this.inflightRequestCount::getRegisteredParties);
}

private void checkAsyncException() {
// reset this exception since we could close the writer later on
AppendException e = appendAsyncException;
if (e != null) {
appendAsyncException = null;
logger.error("Throwing non recoverable exception", e);
var error = e.getError();
var errorRows = e.<A>getRows();
var errorTraceId = e.getTraceId();
var status = Status.fromThrowable(error);
logger.error(this.createLogMessage("Non recoverable BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, 0), error);
throw e;
}
}
Expand All @@ -63,9 +72,7 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
var errorRows = exception.<A>getRows();
var errorTraceId = exception.getTraceId();
var status = Status.fromThrowable(error);
Function<String, String> createLogMessage = (title) ->
this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount);
logger.error(createLogMessage.apply("Non recoverable BigQuery stream AppendException for:"), error);
logger.error(this.createLogMessage("Non recoverable BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, retryCount), error);
throw error;
} catch (Throwable t) {
logger.error("Trace-id: {} Non recoverable BigQuery stream error for: {}. Retry count: {}", traceId, t.getMessage(), retryCount);
Expand All @@ -89,8 +96,10 @@ public void close() {

@Override
public void flush(boolean endOfInput) {
inflightRequestCount.arriveAndAwaitAdvance();
logger.info("Flushing BigQuery writer {} data. Inflight request count {}", this.sinkInitContext.getSubtaskId(), inflightRequestCount.getRegisteredParties());
checkAsyncException();
inflightRequestCount.arriveAndAwaitAdvance();
logger.info("BigQuery writer {} data flushed. Inflight request count {}", this.sinkInitContext.getSubtaskId(), inflightRequestCount.getRegisteredParties());
}

static class AppendCallBack<A> implements ApiFutureCallback<AppendRowsResponse> {
Expand Down Expand Up @@ -125,10 +134,14 @@ public void onFailure(Throwable t) {
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
case UNAVAILABLE:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
case UNAVAILABLE: {
this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable());
retryWrite(t, retryCount - 1);
break;
}
case INVALID_ARGUMENT:
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
Expand Down
108 changes: 30 additions & 78 deletions src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.vinted.flink.bigquery;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.protobuf.Int64Value;
import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer;
import com.vinted.flink.bigquery.util.FlinkTest;
Expand All @@ -22,7 +20,7 @@
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;


Expand All @@ -36,26 +34,10 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn
mockClientProvider.givenSuccessfulAppend();

runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));

verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(0L));
}

@Test
public void shouldFlushRowsWhenExactlyOnceDeliveryEnabled(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenSuccessfulAppend();

runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 1)
))));

verify(mockClientProvider.getClient(), times(1)).flushRows(
FlushRowsRequest.newBuilder()
.setWriteStream(stream)
.setOffset(Int64Value.of(1))
.build()
);
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
Expand All @@ -66,12 +48,12 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 1)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any());
}

@Test
Expand All @@ -83,106 +65,77 @@ public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.Pipeli
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 1)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong());
}


@Test
public void shouldDoNothingWhenFullBatchWasAlreadyAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenAppendingExistingOffset(16, 4, stream);

runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(4, 4)
))));


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
}

@Test
public void shouldSplitBatchWhenAppendingBatchWhereNotAllRowsAreAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenAppendingExistingOffset(4, 2, stream);

runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(6, 2)
))));


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(2L));
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(4L));
verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenFailedWithOutOfRange(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.OUT_OF_RANGE);
public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS);
public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT);
public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenStreamIsFinalized(stream);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any());
}

@Test
public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenStreamIsFinalized(stream);
public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE);
mockClientProvider.givenRetryCount(2);

assertThatThrownBy(() -> {
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(1, 0)
givenRow(1)
))));
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong());
verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any());
assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3);
}

@Test
Expand All @@ -192,20 +145,19 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam
runner
.withRetryCount(0)
.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of(
givenRowWithOffset(6, 4)
givenRow(6)
))));


verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any(), eq(4L));
verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(7L));
verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any());
}

private Rows<String> givenRowWithOffset(int count, int offset) {
private Rows<String> givenRow(int count) {
var data = new ArrayList<String>(count);
IntStream.rangeClosed(1, count)
.forEach(i -> data.add("{\"value\": " + i + "}"));

return new Rows<>(data, offset, stream, testTable);
return new Rows<>(data, -1, stream, testTable);
}

private Function<StreamExecutionEnvironment, DataStream<Rows<String>>> pipeline(List<Rows<String>> data) {
Expand All @@ -215,7 +167,7 @@ private Function<StreamExecutionEnvironment, DataStream<Rows<String>>> pipeline(
private Function<StreamExecutionEnvironment, DataStreamSink<Rows<String>>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function<StreamExecutionEnvironment, DataStream<Rows<String>>> pipeline) {
var sink = BigQueryStreamSink.<String>newJson()
.withClientProvider(mockClientProvider)
.withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.withRowValueSerializer((JsonRowValueSerializer<String>) String::getBytes)
.build();

Expand Down
Loading

0 comments on commit 32df104

Please sign in to comment.