diff --git a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java index e2dadf2..110c0a9 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/BigQuerySinkWriter.java @@ -18,6 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -75,6 +76,27 @@ protected final StreamT streamWriter(String traceId, String streamName, TableId }); } + protected final void recreateAllStreamWriters(String traceId, String streamName, TableId table) { + logger.info("Trace-id {} Closing all writers for {}", traceId, streamName); + try { + flush(true); + streamMap.replaceAll((key, writer) -> { + var newWriter = writer; + if (key.startsWith(streamName)) { + try { + writer.close(); + } catch (Exception e) { + logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName); + } + newWriter = this.clientProvider.getWriter(streamName, table); + } + return newWriter; + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override public void write(Rows rows, Context context) { numRecordsOutCounter.inc(rows.getData().size()); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java index a1ad7a0..a45ec4b 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/buffered/BigQueryProtoBufferedSinkWriter.java @@ -34,10 +34,6 @@ protected ApiFuture append(String traceId, Rows rows) { Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - if (writer.isClosed() || writer.isUserClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); - } - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try { diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java index 47f3d73..fb3b408 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultJsonSinkWriter.java @@ -28,11 +28,6 @@ protected ApiFuture append(String traceId, Rows rows) { var rowArray = new JSONArray(); rows.getData().forEach(row -> rowArray.put(new JSONObject(new String(rowSerializer.serialize(row))))); var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - - if (writer.isClosed() || writer.isUserClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); - } - logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try { diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java index 6786546..aa156ed 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultProtoSinkWriter.java @@ -38,11 +38,9 @@ protected ApiFuture append(String traceId, Rows rows) { numBytesOutCounter.inc(size); numRecordsOutCounter.inc(rows.getData().size()); Optional.ofNullable(metrics.get(rows.getStream())).ifPresent(s -> s.updateSize(size)); + var writer = streamWriter(traceId, rows.getStream(), rows.getTable()); - if (writer.isClosed() || writer.isUserClosed()) { - logger.warn("Trace-id {}, StreamWrite is closed. Recreating stream for {}", traceId, rows.getStream()); - } logger.trace("Trace-id {}, Writing rows stream {} to steamWriter for {} writer id {}", traceId, rows.getStream(), writer.getStreamName(), writer.getWriterId()); try { return writer.append(prows); diff --git a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java index 5dd41ce..fe9763d 100644 --- a/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java +++ b/src/main/java/com/vinted/flink/bigquery/sink/defaultStream/BigQueryDefaultSinkWriter.java @@ -17,14 +17,12 @@ import java.util.Optional; import java.util.concurrent.Phaser; -import java.util.function.Function; public abstract class BigQueryDefaultSinkWriter extends BigQuerySinkWriter { private static final Logger logger = LoggerFactory.getLogger(BigQueryDefaultSinkWriter.class); private final Phaser inflightRequestCount = new Phaser(1); - private volatile AppendException appendAsyncException = null; public BigQueryDefaultSinkWriter( @@ -32,8 +30,15 @@ public BigQueryDefaultSinkWriter( RowValueSerializer rowSerializer, ClientProvider clientProvider, ExecutorProvider executorProvider) { - super(sinkInitContext, rowSerializer, clientProvider, executorProvider); + + var metricGroup = this.sinkInitContext.metricGroup(); + var group = metricGroup + .addGroup("BigQuery") + .addGroup("DefaultSinkWriter") + .addGroup("inflight_requests"); + + group.gauge("count", this.inflightRequestCount::getRegisteredParties); } private void checkAsyncException() { @@ -41,7 +46,11 @@ private void checkAsyncException() { AppendException e = appendAsyncException; if (e != null) { appendAsyncException = null; - logger.error("Throwing non recoverable exception", e); + var error = e.getError(); + var errorRows = e.getRows(); + var errorTraceId = e.getTraceId(); + var status = Status.fromThrowable(error); + logger.error(this.createLogMessage("Non recoverable BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, 0), error); throw e; } } @@ -63,9 +72,7 @@ protected void writeWithRetry(String traceId, Rows rows, int retryCount) thro var errorRows = exception.getRows(); var errorTraceId = exception.getTraceId(); var status = Status.fromThrowable(error); - Function createLogMessage = (title) -> - this.createLogMessage(title, errorTraceId, status, error, errorRows, retryCount); - logger.error(createLogMessage.apply("Non recoverable BigQuery stream AppendException for:"), error); + logger.error(this.createLogMessage("Non recoverable BigQuery stream AppendException for:", errorTraceId, status, error, errorRows, retryCount), error); throw error; } catch (Throwable t) { logger.error("Trace-id: {} Non recoverable BigQuery stream error for: {}. Retry count: {}", traceId, t.getMessage(), retryCount); @@ -89,8 +96,10 @@ public void close() { @Override public void flush(boolean endOfInput) { - inflightRequestCount.arriveAndAwaitAdvance(); + logger.info("Flushing BigQuery writer {} data. Inflight request count {}", this.sinkInitContext.getSubtaskId(), inflightRequestCount.getRegisteredParties()); checkAsyncException(); + inflightRequestCount.arriveAndAwaitAdvance(); + logger.info("BigQuery writer {} data flushed. Inflight request count {}", this.sinkInitContext.getSubtaskId(), inflightRequestCount.getRegisteredParties()); } static class AppendCallBack implements ApiFutureCallback { @@ -125,10 +134,14 @@ public void onFailure(Throwable t) { case CANCELLED: case FAILED_PRECONDITION: case DEADLINE_EXCEEDED: - case UNAVAILABLE: doPauseBeforeRetry(); retryWrite(t, retryCount - 1); break; + case UNAVAILABLE: { + this.parent.recreateAllStreamWriters(traceId, rows.getStream(), rows.getTable()); + retryWrite(t, retryCount - 1); + break; + } case INVALID_ARGUMENT: if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) { Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount); diff --git a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java index 96a4dd6..2166d3c 100644 --- a/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java +++ b/src/test/java/com/vinted/flink/bigquery/BigQueryDefaultSinkTest.java @@ -1,8 +1,6 @@ package com.vinted.flink.bigquery; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.storage.v1.FlushRowsRequest; -import com.google.protobuf.Int64Value; import com.vinted.flink.bigquery.model.Rows; import com.vinted.flink.bigquery.serializer.JsonRowValueSerializer; import com.vinted.flink.bigquery.util.FlinkTest; @@ -22,7 +20,7 @@ import java.util.function.Function; import java.util.stream.IntStream; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; @@ -36,26 +34,10 @@ public void shouldAppendRows(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runn mockClientProvider.givenSuccessfulAppend(); runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(0L)); - } - - @Test - public void shouldFlushRowsWhenExactlyOnceDeliveryEnabled(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenSuccessfulAppend(); - - runner.runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 1) - )))); - - verify(mockClientProvider.getClient(), times(1)).flushRows( - FlushRowsRequest.newBuilder() - .setWriteStream(stream) - .setOffset(Int64Value.of(1)) - .build() - ); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test @@ -66,12 +48,12 @@ public void shouldRetryAppendWhenFailingWithInternalError(@FlinkTest.FlinkParam runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 1) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(5)).append(any()); } @Test @@ -83,106 +65,77 @@ public void shouldRetryOnTimeoutException(@FlinkTest.FlinkParam FlinkTest.Pipeli runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 1) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any(), anyLong()); - } - - - @Test - public void shouldDoNothingWhenFullBatchWasAlreadyAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenAppendingExistingOffset(16, 4, stream); - - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(4, 4) - )))); - - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); - } - - @Test - public void shouldSplitBatchWhenAppendingBatchWhereNotAllRowsAreAppended(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenAppendingExistingOffset(4, 2, stream); - - runner - .withRetryCount(0) - .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(6, 2) - )))); - - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(2L)); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(4L)); + verify(mockClientProvider.getMockJsonWriter(), times(6)).append(any()); } @Test - public void shouldFailAndNotRetryWhenFailedWithOutOfRange(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.OUT_OF_RANGE); + public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test - public void shouldFailAndNotRetryWhenAppendingFailedWithAlreadyExistsWithoutOffsetInformation(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.ALREADY_EXISTS); + public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test - public void shouldFailAndNotRetryWhenAppendingFailedWithInvalidArgument(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenFailingAppendWithStatus(Status.INVALID_ARGUMENT); + public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenStreamIsFinalized(stream); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any()); } @Test - public void shouldFailAndNotRetryWhenAppendingToFinalizedStream(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { - mockClientProvider.givenStreamIsFinalized(stream); + public void shouldRecreateWriterAndRetryWhenAppendFailedWithUnavailable(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockJsonClientProvider mockClientProvider) throws Exception { + mockClientProvider.givenFailingAppendWithStatus(Status.UNAVAILABLE); + mockClientProvider.givenRetryCount(2); assertThatThrownBy(() -> { runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(1, 0) + givenRow(1) )))); }).isInstanceOf(JobExecutionException.class); - - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), anyLong()); + verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any()); + assertThat(mockClientProvider.getNumOfCreatedWriter()).isEqualTo(3); } @Test @@ -192,20 +145,19 @@ public void shouldSplitTheBatchWhenAppendingTooLargeBatch(@FlinkTest.FlinkParam runner .withRetryCount(0) .runWithCustomSink(withBigQuerySink(mockClientProvider, pipeline(List.of( - givenRowWithOffset(6, 4) + givenRow(6) )))); - verify(mockClientProvider.getMockJsonWriter(), times(2)).append(any(), eq(4L)); - verify(mockClientProvider.getMockJsonWriter(), times(1)).append(any(), eq(7L)); + verify(mockClientProvider.getMockJsonWriter(), times(3)).append(any()); } - private Rows givenRowWithOffset(int count, int offset) { + private Rows givenRow(int count) { var data = new ArrayList(count); IntStream.rangeClosed(1, count) .forEach(i -> data.add("{\"value\": " + i + "}")); - return new Rows<>(data, offset, stream, testTable); + return new Rows<>(data, -1, stream, testTable); } private Function>> pipeline(List> data) { @@ -215,7 +167,7 @@ private Function>> pipeline( private Function>> withBigQuerySink(MockJsonClientProvider mockClientProvider, Function>> pipeline) { var sink = BigQueryStreamSink.newJson() .withClientProvider(mockClientProvider) - .withDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .withDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .withRowValueSerializer((JsonRowValueSerializer) String::getBytes) .build(); diff --git a/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java b/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java index 8bddc1b..077270a 100644 --- a/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java +++ b/src/test/java/com/vinted/flink/bigquery/util/MockJsonClientProvider.java @@ -13,11 +13,20 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; public class MockJsonClientProvider implements ClientProvider, Serializable { private static BigQueryWriteClient mockClient = Mockito.mock(BigQueryWriteClient.class); private static JsonStreamWriter writer = Mockito.mock(JsonStreamWriter.class); + private static AtomicInteger numOfCreatedWriters = new AtomicInteger(0); + + private int retryCount = 5; + + + public void givenRetryCount(int count) { + this.retryCount = count; + } public void givenStreamDoesNotExist(String streamName) { Mockito.doThrow(new RuntimeException(new StatusException(Status.NOT_FOUND))) .when(MockJsonClientProvider.mockClient).getWriteStream(streamName); @@ -97,9 +106,14 @@ public void givenAppendingTooLargeBatch() throws Descriptors.DescriptorValidatio .thenReturn(createAppendRowsResponse()); } + public int getNumOfCreatedWriter() { + return numOfCreatedWriters.get(); + } + public static void reset() { Mockito.reset(MockJsonClientProvider.mockClient); Mockito.reset(MockJsonClientProvider.writer); + MockJsonClientProvider.numOfCreatedWriters.set(0); } private static Exceptions.StreamFinalizedException createFinalizedStreamException() { @@ -147,12 +161,13 @@ public BigQueryWriteClient getClient() { @Override public JsonStreamWriter getWriter(String streamName, TableId table) { + numOfCreatedWriters.incrementAndGet(); return MockJsonClientProvider.writer; } @Override public WriterSettings writeSettings() { - return WriterSettings.newBuilder().build(); + return WriterSettings.newBuilder().withRetryCount(retryCount).build(); } }