Skip to content

Commit

Permalink
Add more test
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Oct 18, 2024
1 parent 525ffca commit f452b2b
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void validate() {
"pathPrefix cannot start with '/',the style is 'xx/xxx/'.");
checkArgument(StringUtils.endsWith(pathPrefix, "/"),
"pathPrefix must end with '/',the style is 'xx/xxx/'.");
pathPrefix = pathPrefix.trim();
}
pathPrefix = StringUtils.trimToEmpty(pathPrefix);

if ("bytes".equalsIgnoreCase(formatType)) {
checkArgument(StringUtils.isNotEmpty(bytesFormatTypeSeparator),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ public Map<String, List<Record<GenericRecord>>> pollNeedFlushData() {
Map<String, List<Record<GenericRecord>>> flushData = new HashMap<>();
topicBatchContainer.forEach((topicName, batchContainer) -> {
if (batchContainer.needFlush()) {
flushData.put(topicName, batchContainer.poolNeedFlushRecords());
List<Record<GenericRecord>> records = batchContainer.poolNeedFlushRecords();
if (!records.isEmpty()) {
flushData.put(topicName, records);
}
}
});
return flushData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.io.jcloud.batch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -74,6 +75,15 @@ public void testFlushByTime() throws InterruptedException {
assertTrue(batchContainer.isEmpty());
}

@Test
public void testPollData() throws InterruptedException {
batchContainer.add(createMockRecord(1));
assertFalse(batchContainer.needFlush());
List<Record<GenericRecord>> records = batchContainer.poolNeedFlushRecords();
assertEquals(1, records.size());
assertTrue(batchContainer.isEmpty());
}

Record<GenericRecord> createMockRecord(int size) {
Message msg = mock(Message.class);
when(msg.size()).thenReturn(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,25 @@ public void testFlushByTimout() throws InterruptedException {
long maxBatchTimeout = 1000;
BlendBatchManager blendBatchManger = new BlendBatchManager(1000,
1000, maxBatchTimeout, 1000);

blendBatchManger.add(getRecord("topic-0", 2));
blendBatchManger.add(getRecord("topic-1", 2));
assertEquals(2, blendBatchManger.getCurrentBatchSize(null));
assertEquals(4, blendBatchManger.getCurrentBatchBytes(null));
Thread.sleep(maxBatchTimeout + 100);

// Time out flush
Map<String, List<Record<GenericRecord>>> flushData = blendBatchManger.pollNeedFlushData();
assertEquals(2, flushData.size());
assertEquals(1, flushData.get("topic-0").size());
assertEquals(1, flushData.get("topic-1").size());
assertTrue(blendBatchManger.isEmpty());
assertEquals(0, blendBatchManger.getCurrentBatchSize(null));
assertEquals(0, blendBatchManger.getCurrentBatchBytes(null));

// Time out again
Thread.sleep(maxBatchTimeout + 100);
assertTrue(blendBatchManger.pollNeedFlushData().isEmpty());
}

Record<GenericRecord> getRecord(String topicName, int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,38 +87,41 @@ public void testFlushByTimout() throws InterruptedException {
PartitionedBatchManager partitionedBatchManager = new PartitionedBatchManager(1000,
100, maxBatchTimeout, 1000);

// Add and assert status
// 1. Add and assert status
partitionedBatchManager.add(getRecord("topic-0", 2));
partitionedBatchManager.add(getRecord("topic-1", 101));

// First sleep maxBatchTimeout / 2
// 2. First sleep maxBatchTimeout / 2
Thread.sleep(maxBatchTimeout / 2);

// poll flush data, assert topic-1 data
// 3. Poll flush data, assert topic-1 data
Map<String, List<Record<GenericRecord>>> flushData = partitionedBatchManager.pollNeedFlushData();
assertEquals(1, flushData.size());
assertFalse(flushData.containsKey("topic-0"));
assertEquals(1, flushData.get("topic-1").size());

// write topic-1 data again
// 4. write topic-1 data again, assert not need flush
partitionedBatchManager.add(getRecord("topic-1", 2));
// Second sleep maxBatchTimeout / 2
Thread.sleep(maxBatchTimeout / 2 + 100);

// assert topic-0 message timeout
// 5. assert topic-0 message timeout
flushData = partitionedBatchManager.pollNeedFlushData();
assertEquals(1, flushData.size());
assertEquals(1, flushData.get("topic-0").size());
assertFalse(flushData.containsKey("topic-1"));

// Sleep assert can get topic-1 data
// 6. Sleep assert can get topic-1 data
Thread.sleep(maxBatchTimeout / 2 + 100);
flushData = partitionedBatchManager.pollNeedFlushData();
assertEquals(1, flushData.size());
assertFalse(flushData.containsKey("topic-0"));
assertEquals(1, flushData.get("topic-1").size());

assertTrue(partitionedBatchManager.isEmpty());

// Sleep and trigger timeout, and assert not data need flush
Thread.sleep(maxBatchTimeout + 100);
assertTrue(partitionedBatchManager.pollNeedFlushData().isEmpty());
}

Record<GenericRecord> getRecord(String topicName, int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
Expand All @@ -57,7 +58,7 @@
/**
* Test for {@link CloudStorageGenericRecordSink}.
*/
public class CloudStorageGenericRecordSinkTest {
public class CloudStorageSinkBatchBlendTest {

private static final int PAYLOAD_BYTES = 100;

Expand All @@ -83,6 +84,7 @@ public void setup() throws Exception {
this.config.put("bucket", "just/a/test");
this.config.put("formatType", "bytes");
this.config.put("partitionerType", "default");
this.config.put("batchModel", "BLEND");

this.sink = spy(new CloudStorageGenericRecordSink());
this.mockSinkContext = mock(SinkContext.class);
Expand All @@ -97,7 +99,6 @@ public void setup() throws Exception {
Message mockMessage = mock(Message.class);
when(mockMessage.size()).thenReturn(PAYLOAD_BYTES);


GenericSchema<GenericRecord> schema = createTestSchema();
GenericRecord genericRecord = spy(createTestRecord(schema));
doReturn(new byte[]{0x1}).when(genericRecord).getSchemaVersion();
Expand Down Expand Up @@ -160,6 +161,33 @@ public void repeatedlyFlushOnMaxBatchBytesTest() throws Exception {
verifyRecordAck(100);
}

@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void repeatedlyFlushOnMultiConditionTest() throws Exception {
this.config.put("pendingQueueSize", 100); // accept high number of messages
this.config.put("batchTimeMs", 1000);
this.config.put("maxBatchBytes", 10 * PAYLOAD_BYTES);
this.config.put("batchSize", 5);
this.sink.open(this.config, this.mockSinkContext);

// Gen random message size
Message randomMessage = mock(Message.class);
when(randomMessage.size()).thenAnswer((Answer<Integer>) invocation -> {
int randomMultiplier = ThreadLocalRandom.current().nextInt(1, 6);
return PAYLOAD_BYTES * randomMultiplier;
});
when(mockRecord.getMessage()).thenReturn(Optional.of(randomMessage));

int numberOfRecords = 100;
for (int i = 0; i < numberOfRecords; i++) {
this.sink.write(mockRecord);
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 500));
}
await().atMost(Duration.ofSeconds(60)).untilAsserted(
() -> verify(mockRecord, times(numberOfRecords)).ack()
);
}

@Test
public void testBatchCleanupWhenFlushCrashed() throws Exception {
this.config.put("pendingQueueSize", 1000);
Expand Down
Loading

0 comments on commit f452b2b

Please sign in to comment.