From f452b2bff99e5076c1ead3e56523477d3bdfd783 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 18 Oct 2024 11:36:13 +0800 Subject: [PATCH] Add more test --- .../io/jcloud/BlobStoreAbstractConfig.java | 2 +- .../jcloud/batch/PartitionedBatchManager.java | 5 +- .../io/jcloud/batch/BatchContainerTest.java | 10 + .../io/jcloud/batch/BlendBatchMangerTest.java | 7 + .../batch/PartitionedBatchManagerTest.java | 17 +- ...va => CloudStorageSinkBatchBlendTest.java} | 32 +- .../CloudStorageSinkBatchPartitionedTest.java | 326 ++++++++++++++++++ 7 files changed, 388 insertions(+), 11 deletions(-) rename src/test/java/org/apache/pulsar/io/jcloud/sink/{CloudStorageGenericRecordSinkTest.java => CloudStorageSinkBatchBlendTest.java} (86%) create mode 100644 src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java index 84460cfc..4b4f4a4f 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java @@ -160,8 +160,8 @@ public void validate() { "pathPrefix cannot start with '/',the style is 'xx/xxx/'."); checkArgument(StringUtils.endsWith(pathPrefix, "/"), "pathPrefix must end with '/',the style is 'xx/xxx/'."); - pathPrefix = pathPrefix.trim(); } + pathPrefix = StringUtils.trimToEmpty(pathPrefix); if ("bytes".equalsIgnoreCase(formatType)) { checkArgument(StringUtils.isNotEmpty(bytesFormatTypeSeparator), diff --git a/src/main/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManager.java b/src/main/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManager.java index d25e41c8..af379bf0 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManager.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManager.java @@ -93,7 +93,10 @@ public Map>> pollNeedFlushData() { Map>> flushData = new HashMap<>(); topicBatchContainer.forEach((topicName, batchContainer) -> { if (batchContainer.needFlush()) { - flushData.put(topicName, batchContainer.poolNeedFlushRecords()); + List> records = batchContainer.poolNeedFlushRecords(); + if (!records.isEmpty()) { + flushData.put(topicName, records); + } } }); return flushData; diff --git a/src/test/java/org/apache/pulsar/io/jcloud/batch/BatchContainerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/batch/BatchContainerTest.java index 2611fa55..df0c5578 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/batch/BatchContainerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/batch/BatchContainerTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.jcloud.batch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -74,6 +75,15 @@ public void testFlushByTime() throws InterruptedException { assertTrue(batchContainer.isEmpty()); } + @Test + public void testPollData() throws InterruptedException { + batchContainer.add(createMockRecord(1)); + assertFalse(batchContainer.needFlush()); + List> records = batchContainer.poolNeedFlushRecords(); + assertEquals(1, records.size()); + assertTrue(batchContainer.isEmpty()); + } + Record createMockRecord(int size) { Message msg = mock(Message.class); when(msg.size()).thenReturn(size); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/batch/BlendBatchMangerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/batch/BlendBatchMangerTest.java index 44afc250..fbe0eddc 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/batch/BlendBatchMangerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/batch/BlendBatchMangerTest.java @@ -79,11 +79,14 @@ public void testFlushByTimout() throws InterruptedException { long maxBatchTimeout = 1000; BlendBatchManager blendBatchManger = new BlendBatchManager(1000, 1000, maxBatchTimeout, 1000); + blendBatchManger.add(getRecord("topic-0", 2)); blendBatchManger.add(getRecord("topic-1", 2)); assertEquals(2, blendBatchManger.getCurrentBatchSize(null)); assertEquals(4, blendBatchManger.getCurrentBatchBytes(null)); Thread.sleep(maxBatchTimeout + 100); + + // Time out flush Map>> flushData = blendBatchManger.pollNeedFlushData(); assertEquals(2, flushData.size()); assertEquals(1, flushData.get("topic-0").size()); @@ -91,6 +94,10 @@ public void testFlushByTimout() throws InterruptedException { assertTrue(blendBatchManger.isEmpty()); assertEquals(0, blendBatchManger.getCurrentBatchSize(null)); assertEquals(0, blendBatchManger.getCurrentBatchBytes(null)); + + // Time out again + Thread.sleep(maxBatchTimeout + 100); + assertTrue(blendBatchManger.pollNeedFlushData().isEmpty()); } Record getRecord(String topicName, int size) { diff --git a/src/test/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManagerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManagerTest.java index 04a36e23..f7d7b408 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManagerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/batch/PartitionedBatchManagerTest.java @@ -87,38 +87,41 @@ public void testFlushByTimout() throws InterruptedException { PartitionedBatchManager partitionedBatchManager = new PartitionedBatchManager(1000, 100, maxBatchTimeout, 1000); - // Add and assert status + // 1. Add and assert status partitionedBatchManager.add(getRecord("topic-0", 2)); partitionedBatchManager.add(getRecord("topic-1", 101)); - // First sleep maxBatchTimeout / 2 + // 2. First sleep maxBatchTimeout / 2 Thread.sleep(maxBatchTimeout / 2); - // poll flush data, assert topic-1 data + // 3. Poll flush data, assert topic-1 data Map>> flushData = partitionedBatchManager.pollNeedFlushData(); assertEquals(1, flushData.size()); assertFalse(flushData.containsKey("topic-0")); assertEquals(1, flushData.get("topic-1").size()); - // write topic-1 data again + // 4. write topic-1 data again, assert not need flush partitionedBatchManager.add(getRecord("topic-1", 2)); // Second sleep maxBatchTimeout / 2 Thread.sleep(maxBatchTimeout / 2 + 100); - // assert topic-0 message timeout + // 5. assert topic-0 message timeout flushData = partitionedBatchManager.pollNeedFlushData(); assertEquals(1, flushData.size()); assertEquals(1, flushData.get("topic-0").size()); assertFalse(flushData.containsKey("topic-1")); - // Sleep assert can get topic-1 data + // 6. Sleep assert can get topic-1 data Thread.sleep(maxBatchTimeout / 2 + 100); flushData = partitionedBatchManager.pollNeedFlushData(); assertEquals(1, flushData.size()); assertFalse(flushData.containsKey("topic-0")); assertEquals(1, flushData.get("topic-1").size()); - assertTrue(partitionedBatchManager.isEmpty()); + + // Sleep and trigger timeout, and assert not data need flush + Thread.sleep(maxBatchTimeout + 100); + assertTrue(partitionedBatchManager.pollNeedFlushData().isEmpty()); } Record getRecord(String topicName, int size) { diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java similarity index 86% rename from src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java rename to src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java index fab1a810..8f22dc14 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageGenericRecordSinkTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -57,7 +58,7 @@ /** * Test for {@link CloudStorageGenericRecordSink}. */ -public class CloudStorageGenericRecordSinkTest { +public class CloudStorageSinkBatchBlendTest { private static final int PAYLOAD_BYTES = 100; @@ -83,6 +84,7 @@ public void setup() throws Exception { this.config.put("bucket", "just/a/test"); this.config.put("formatType", "bytes"); this.config.put("partitionerType", "default"); + this.config.put("batchModel", "BLEND"); this.sink = spy(new CloudStorageGenericRecordSink()); this.mockSinkContext = mock(SinkContext.class); @@ -97,7 +99,6 @@ public void setup() throws Exception { Message mockMessage = mock(Message.class); when(mockMessage.size()).thenReturn(PAYLOAD_BYTES); - GenericSchema schema = createTestSchema(); GenericRecord genericRecord = spy(createTestRecord(schema)); doReturn(new byte[]{0x1}).when(genericRecord).getSchemaVersion(); @@ -160,6 +161,33 @@ public void repeatedlyFlushOnMaxBatchBytesTest() throws Exception { verifyRecordAck(100); } + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void repeatedlyFlushOnMultiConditionTest() throws Exception { + this.config.put("pendingQueueSize", 100); // accept high number of messages + this.config.put("batchTimeMs", 1000); + this.config.put("maxBatchBytes", 10 * PAYLOAD_BYTES); + this.config.put("batchSize", 5); + this.sink.open(this.config, this.mockSinkContext); + + // Gen random message size + Message randomMessage = mock(Message.class); + when(randomMessage.size()).thenAnswer((Answer) invocation -> { + int randomMultiplier = ThreadLocalRandom.current().nextInt(1, 6); + return PAYLOAD_BYTES * randomMultiplier; + }); + when(mockRecord.getMessage()).thenReturn(Optional.of(randomMessage)); + + int numberOfRecords = 100; + for (int i = 0; i < numberOfRecords; i++) { + this.sink.write(mockRecord); + Thread.sleep(ThreadLocalRandom.current().nextInt(1, 500)); + } + await().atMost(Duration.ofSeconds(60)).untilAsserted( + () -> verify(mockRecord, times(numberOfRecords)).ack() + ); + } + @Test public void testBatchCleanupWhenFlushCrashed() throws Exception { this.config.put("pendingQueueSize", 1000); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java new file mode 100644 index 00000000..e27fb936 --- /dev/null +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.sink; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.jcloud.format.Format; +import org.apache.pulsar.io.jcloud.writer.BlobWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.stubbing.Answer; + +/** + * Test for {@link CloudStorageGenericRecordSink}. + */ +public class CloudStorageSinkBatchPartitionedTest { + + private static final int PAYLOAD_BYTES = 100; + + @Mock + private SinkContext mockSinkContext; + + @Mock + private BlobWriter mockBlobWriter; + + @Mock + private Record mockRecordTopic1; + + @Mock + private Record mockRecordTopic2; + + private Map config; + + private CloudStorageGenericRecordSink sink; + + @Before + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() throws Exception { + //initialize required parameters + this.config = new HashMap<>(); + this.config.put("provider", "google-cloud-storage"); + this.config.put("bucket", "just/a/test"); + this.config.put("formatType", "bytes"); + this.config.put("partitionerType", "PARTITION"); + this.config.put("batchModel", "PARTITIONED"); + + this.sink = spy(new CloudStorageGenericRecordSink()); + this.mockSinkContext = mock(SinkContext.class); + this.mockBlobWriter = mock(BlobWriter.class); + + doReturn(mockBlobWriter).when(sink).initBlobWriter(any(CloudStorageSinkConfig.class)); + doReturn(ByteBuffer.wrap(new byte[]{0x0})).when(sink).bindValue(any(Iterator.class), any(Format.class)); + + RecordSchemaBuilder schemaBuilder = SchemaBuilder.record("test"); + schemaBuilder.field("a").type(SchemaType.INT32).optional().defaultValue(null); + GenericSchema schema = Schema.generic(schemaBuilder.build(SchemaType.JSON)); + GenericRecord genericRecord = spy(schema.newRecordBuilder().set("a", 1).build()); + doReturn(new byte[]{0x1}).when(genericRecord).getSchemaVersion(); + + Message mockMessage = mock(Message.class); + when(mockMessage.size()).thenReturn(PAYLOAD_BYTES); + + this.mockRecordTopic1 = mock(Record.class); + when(mockRecordTopic1.getTopicName()).thenReturn(Optional.of("topic-1")); + when(mockRecordTopic1.getValue()).thenReturn(genericRecord); + when(mockRecordTopic1.getSchema()).thenAnswer((Answer) invocationOnMock -> schema); + when(mockRecordTopic1.getMessage()).thenReturn(Optional.of(mockMessage)); + when(mockRecordTopic1.getRecordSequence()).thenReturn(Optional.of(100L)); + + this.mockRecordTopic2 = mock(Record.class); + when(mockRecordTopic2.getTopicName()).thenReturn(Optional.of("topic-2")); + when(mockRecordTopic2.getValue()).thenReturn(genericRecord); + when(mockRecordTopic2.getSchema()).thenAnswer((Answer) invocationOnMock -> schema); + when(mockRecordTopic2.getMessage()).thenReturn(Optional.of(mockMessage)); + when(mockRecordTopic2.getRecordSequence()).thenReturn(Optional.of(200L)); + } + + @After + public void tearDown() throws Exception { + this.sink.close(); + } + + @Test + public void flushOnMaxBatchBytesTest() throws Exception { + this.config.put("batchTimeMs", 60000); // set high batchTimeMs to prevent scheduled flush + this.config.put("batchSize", 1000); // set high batchSize to prevent flush + this.config.put("maxBatchBytes", 5 * PAYLOAD_BYTES); // force flush after 500 bytes + verifySinkFlush(); + } + + @Test + public void flushOnBatchSizeTests() throws Exception { + this.config.put("batchTimeMs", 60000); // set high batchTimeMs to prevent scheduled flush + this.config.put("maxBatchBytes", 10000); // set high maxBatchBytes to prevent flush + this.config.put("batchSize", 5); // force flush after 5 messages + + verifySinkFlush(); + } + + @Test + public void flushOnTimeOutTests() throws Exception { + long maxBatchTimeout = 2000; + this.config.put("batchTimeMs", maxBatchTimeout); // set high batchTimeMs to prevent scheduled flush + this.config.put("maxBatchBytes", 100000); // set high maxBatchBytes to prevent flush + this.config.put("batchSize", 10); // force flush after 5 messages + + this.sink.open(this.config, this.mockSinkContext); + + // 0. Write 2 data for each topic + for (int i = 0; i < 2; i++) { + sink.write(mockRecordTopic1); + } + for (int i = 0; i < 2; i++) { + sink.write(mockRecordTopic2); + } + + // 1. First sleep maxBatchTimeout / 2, and not data need flush + Thread.sleep(maxBatchTimeout / 2); + verify(mockBlobWriter, never()).uploadBlob(any(String.class), any(ByteBuffer.class)); + + // 2. Write 8 for topic-1 and to trigger flush + for (int i = 0; i < 8; i++) { + sink.write(mockRecordTopic1); + } + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(10)).untilAsserted( + () -> verify(mockBlobWriter, times(1)) + .uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)) + ); + verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-2/200.raw"), any(ByteBuffer.class)); + + // 3. Write 2 message for topic-1 again and assert not message need flush(no timeout) + for (int i = 0; i < 2; i++) { + sink.write(mockRecordTopic1); + } + clearInvocations(mockBlobWriter); + verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)); + + // 4. Second sleep maxBatchTimeout / 2 again, and assert topic-2 data need flush + // and topic-1 no need flush(no timeout) + Thread.sleep(maxBatchTimeout / 2 + 100); + await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(10)).untilAsserted( + () -> verify(mockBlobWriter, times(1)) + .uploadBlob(eq("public/default/topic-2/200.raw"), any(ByteBuffer.class)) + ); + verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)); + + // 5. Assert for topic-1 flush data step-3 write data. + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> verify(mockBlobWriter, times(1)) + .uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)) + ); + + // 6. Assert all message has been ack + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> verify(mockRecordTopic1, times(12)).ack() + ); + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> verify(mockRecordTopic2, times(2)).ack() + ); + } + + @Test + public void repeatedlyFlushOnBatchSizeTest() throws Exception { + this.config.put("pendingQueueSize", 1000); // accept high number of messages + this.config.put("batchTimeMs", 60000); // set high batchTimeMs to prevent scheduled flush + this.config.put("maxBatchBytes", 100000); // set high maxBatchBytes to prevent flush + this.config.put("batchSize", 5); // force flush after 5 messages + + verifyRecordAck(100, -1); + } + + @Test + public void repeatedlyFlushOnMaxBatchBytesTest() throws Exception { + this.config.put("pendingQueueSize", 1000); // accept high number of messages + this.config.put("batchTimeMs", 60000); // set high batchTimeMs to prevent scheduled flush + this.config.put("maxBatchBytes", 5 * PAYLOAD_BYTES); // force flush after 500 bytes + this.config.put("batchSize", 1000); // set high batchSize to prevent flush + + verifyRecordAck(100, -1); + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void repeatedlyFlushOnMultiConditionTest() throws Exception { + this.config.put("pendingQueueSize", 100); + this.config.put("batchTimeMs", 1000); + this.config.put("maxBatchBytes", 10 * PAYLOAD_BYTES); + this.config.put("batchSize", 5); + this.sink.open(this.config, this.mockSinkContext); + + // Gen random message size + Message randomMessage = mock(Message.class); + when(randomMessage.size()).thenAnswer((Answer) invocation -> { + int randomMultiplier = ThreadLocalRandom.current().nextInt(1, 6); + return PAYLOAD_BYTES * randomMultiplier; + }); + when(mockRecordTopic1.getMessage()).thenReturn(Optional.of(randomMessage)); + when(mockRecordTopic2.getMessage()).thenReturn(Optional.of(randomMessage)); + + int numberOfRecords = 100; + for (int i = 0; i < numberOfRecords; i++) { + this.sink.write(mockRecordTopic1); + this.sink.write(mockRecordTopic2); + Thread.sleep(ThreadLocalRandom.current().nextInt(1, 500)); + } + await().atMost(Duration.ofSeconds(60)).untilAsserted( + () -> verify(mockRecordTopic1, times(numberOfRecords)).ack() + ); + await().atMost(Duration.ofSeconds(60)).untilAsserted( + () -> verify(mockRecordTopic2, times(numberOfRecords)).ack() + ); + } + + @Test + public void testBatchCleanupWhenFlushCrashed() throws Exception { + this.config.put("pendingQueueSize", 1000); + this.config.put("batchTimeMs", 1000); + this.config.put("maxBatchBytes", 5 * PAYLOAD_BYTES); + this.config.put("batchSize", 1); + + this.sink.open(this.config, this.mockSinkContext); + when(mockRecordTopic1.getSchema()).thenThrow(new OutOfMemoryError()); + when(mockRecordTopic2.getSchema()).thenThrow(new OutOfMemoryError()); + this.sink.write(mockRecordTopic1); + this.sink.write(mockRecordTopic2); + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> { + Assert.assertEquals(0, this.sink.batchManager.getCurrentBatchBytes("topic-1")); + Assert.assertEquals(0, this.sink.batchManager.getCurrentBatchSize("topic-1")); + Assert.assertEquals(0, this.sink.batchManager.getCurrentBatchBytes("topic-2")); + Assert.assertEquals(0, this.sink.batchManager.getCurrentBatchSize("topic-2")); + } + ); + } + + private void verifyRecordAck(int numberOfRecords, long sleepMillis) throws Exception { + this.sink.open(this.config, this.mockSinkContext); + for (int i = 0; i < numberOfRecords; i++) { + this.sink.write(mockRecordTopic1); + this.sink.write(mockRecordTopic2); + if (sleepMillis > 0) { + Thread.sleep(sleepMillis); + } + } + await().atMost(Duration.ofSeconds(30)).untilAsserted( + () -> verify(mockRecordTopic1, times(numberOfRecords)).ack() + ); + await().atMost(Duration.ofSeconds(30)).untilAsserted( + () -> verify(mockRecordTopic2, times(numberOfRecords)).ack() + ); + } + + private void verifySinkFlush() throws Exception { + this.sink.open(this.config, this.mockSinkContext); + + for (int i = 0; i < 4; i++) { + this.sink.write(mockRecordTopic1); + } + verify(mockBlobWriter, never()).uploadBlob(any(String.class), any(ByteBuffer.class)); + + for (int i = 0; i < 5; i++) { + this.sink.write(mockRecordTopic2); + } + + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> verify(mockBlobWriter, times(1)) + .uploadBlob(eq("public/default/topic-2/200.raw"), any(ByteBuffer.class)) + ); + await().atMost(Duration.ofSeconds(30)).untilAsserted( + () -> verify(mockRecordTopic2, times(5)).ack() + ); + + this.sink.write(mockRecordTopic1); + await().atMost(Duration.ofSeconds(10)).untilAsserted( + () -> verify(mockBlobWriter, times(1)) + .uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)) + ); + await().atMost(Duration.ofSeconds(30)).untilAsserted( + () -> verify(mockRecordTopic1, times(5)).ack() + ); + } +}