From 36bf3dfd36dc2c72b0661973bbf87d6c490eb8bf Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 4 Nov 2024 17:22:15 +0800 Subject: [PATCH] Use message id as file name by default --- docs/aws-s3-sink.md | 4 +- docs/azure-blob-storage-sink.md | 6 +- docs/google-cloud-storage-sink.md | 6 +- .../partitioner/AbstractPartitioner.java | 10 ++- .../jcloud/partitioner/SimplePartitioner.java | 2 +- .../jcloud/partitioner/TimePartitioner.java | 2 +- .../io/jcloud/sink/BlobStoreAbstractSink.java | 2 +- .../partitioner/AbstractPartitionerTest.java | 78 +++++++++++++++++++ .../jcloud/partitioner/PartitionerTest.java | 41 +++++----- .../SliceTopicPartitionPartitionerTest.java | 37 ++++----- .../sink/CloudStorageSinkBatchBlendTest.java | 2 + .../CloudStorageSinkBatchPartitionedTest.java | 27 ++++--- 12 files changed, 153 insertions(+), 64 deletions(-) create mode 100644 src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java diff --git a/docs/aws-s3-sink.md b/docs/aws-s3-sink.md index 89dea0d7..a8ed7958 100644 --- a/docs/aws-s3-sink.md +++ b/docs/aws-s3-sink.md @@ -236,9 +236,9 @@ There are two types of partitioner: - **PARTITION**: This is the default partitioning method based on Pulsar partitions. In other words, data is partitioned according to the pre-existing partitions in Pulsar topics. For instance, a message for the topic `public/default/my-topic-partition-0` would be directed to the - file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest message offset in this file. + file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file. - **TIME**: Data is partitioned according to the time it was flushed. Using the previous message as an example, if it was received on 2023-12-20, it would be directed - to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest message offset in + to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file. diff --git a/docs/azure-blob-storage-sink.md b/docs/azure-blob-storage-sink.md index 704dddf7..fed20b54 100644 --- a/docs/azure-blob-storage-sink.md +++ b/docs/azure-blob-storage-sink.md @@ -223,9 +223,9 @@ There are two types of partitioner: - **PARTITION**: This is the default partitioning method based on Pulsar partitions. In other words, data is partitioned according to the pre-existing partitions in Pulsar topics. For instance, a message for the topic `public/default/my-topic-partition-0` would be directed to the - file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest message offset in this file. + file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file. - **TIME**: Data is partitioned according to the time it was flushed. Using the previous message as an example, if it was received on 2023-12-20, it would be directed - to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest message offset in - this file. \ No newline at end of file + to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in + this file. diff --git a/docs/google-cloud-storage-sink.md b/docs/google-cloud-storage-sink.md index 043e52e4..abb29a3f 100644 --- a/docs/google-cloud-storage-sink.md +++ b/docs/google-cloud-storage-sink.md @@ -222,9 +222,9 @@ There are two types of partitioner: - **PARTITION**: This is the default partitioning method based on Pulsar partitions. In other words, data is partitioned according to the pre-existing partitions in Pulsar topics. For instance, a message for the topic `public/default/my-topic-partition-0` would be directed to the - file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest message offset in this file. + file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file. - **TIME**: Data is partitioned according to the time it was flushed. Using the previous message as an example, if it was received on 2023-12-20, it would be directed - to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest message offset in - this file. \ No newline at end of file + to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in + this file. diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java index 0b95f309..a3f6c81d 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java @@ -73,7 +73,7 @@ public String generatePartitionedPath(String topic, String encodedPartition) { return StringUtils.join(joinList, PATH_SEPARATOR); } - protected long getMessageOffset(Record record) { + protected String getFileName(Record record) { if (useIndexAsOffset && record.getMessage().isPresent()) { final Message message = record.getMessage().get(); // Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present. @@ -81,7 +81,7 @@ protected long getMessageOffset(Record record) { if (message.hasIndex()) { final Optional index = message.getIndex(); if (index.isPresent()) { - return index.get(); + return String.valueOf(index.get()); } else { LOGGER.warn("Found message {} with hasIndex=true but index is empty, using recordSequence", message.getMessageId()); @@ -92,7 +92,9 @@ protected long getMessageOffset(Record record) { message.getMessageId()); } } - return record.getRecordSequence() - .orElseThrow(() -> new RuntimeException("found empty recordSequence")); + return record.getMessage() + .map(msg -> msg.getMessageId().toString()) + .orElseThrow(() -> new RuntimeException("found empty message")); } + } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java index 4fd9d61d..aaecf35d 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/SimplePartitioner.java @@ -29,6 +29,6 @@ public class SimplePartitioner extends AbstractPartitioner { @Override public String encodePartition(Record sinkRecord) { - return Long.toString(getMessageOffset(sinkRecord)); + return getFileName(sinkRecord); } } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java index 5803545b..f2d54a4c 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/TimePartitioner.java @@ -96,7 +96,7 @@ public String encodePartition(Record sinkRecord, long nowInMillis) { String timeString = dateTimeFormatter.format(Instant.ofEpochMilli(parsed).atOffset(ZoneOffset.UTC)); final String result = timeString + PATH_SEPARATOR - + getMessageOffset(sinkRecord); + + getFileName(sinkRecord); return result; } diff --git a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java index 36d3e837..d7e73a27 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/sink/BlobStoreAbstractSink.java @@ -224,7 +224,7 @@ public String buildPartitionPath(Record message, String encodePartition = partitioner.encodePartition(message, partitioningTimestamp); String partitionedPath = partitioner.generatePartitionedPath(message.getTopicName().get(), encodePartition); String path = sinkConfig.getPathPrefix() + partitionedPath + format.getExtension(); - log.info("generate message[recordSequence={}] savePath: {}", message.getRecordSequence().get(), path); + log.info("generate message[messageId={}] savePath: {}", message.getMessage().get().getMessageId(), path); return path; } } diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java new file mode 100644 index 00000000..e9a957a1 --- /dev/null +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.Optional; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.functions.api.Record; +import org.junit.Assert; +import org.junit.Test; + +public class AbstractPartitionerTest { + + @Test + public void testGetMessageOffsetWithBatchMessage() { + + AbstractPartitioner abstractPartitioner = new AbstractPartitioner<>() { + @Override + public String encodePartition(Record sinkRecord) { + return null; + } + }; + + BatchMessageIdImpl batchId1 = new BatchMessageIdImpl(12, 34, 1, 1); + Record message1 = getMessageRecord(batchId1); + String fileName1 = abstractPartitioner.getFileName(message1); + + BatchMessageIdImpl batchId2 = new BatchMessageIdImpl(12, 34, 1, 2); + Record message2 = getMessageRecord(batchId2); + String fileName2 = abstractPartitioner.getFileName(message2); + + Assert.assertEquals(fileName1, batchId1.toString()); + Assert.assertEquals(fileName2, batchId2.toString()); + Assert.assertNotEquals(fileName1, fileName2); + + MessageIdImpl id3 = new MessageIdImpl(12, 34, 1); + Assert.assertEquals(abstractPartitioner.getFileName(getMessageRecord(id3)), "12:34:1"); + } + + public static Record getMessageRecord(MessageId msgId) { + @SuppressWarnings("unchecked") + Message mock = mock(Message.class); + when(mock.getPublishTime()).thenReturn(1599578218610L); + when(mock.getMessageId()).thenReturn(msgId); + when(mock.hasIndex()).thenReturn(true); + when(mock.getIndex()).thenReturn(Optional.of(11115506L)); + + String topic = TopicName.get("test").toString(); + Record mockRecord = mock(Record.class); + when(mockRecord.getTopicName()).thenReturn(Optional.of(topic)); + when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1)); + when(mockRecord.getMessage()).thenReturn(Optional.of(mock)); + when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1))); + return mockRecord; + } + +} \ No newline at end of file diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java index 21719ba9..849ab981 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/PartitionerTest.java @@ -25,6 +25,7 @@ import java.util.Optional; import junit.framework.TestCase; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; @@ -53,6 +54,8 @@ public class PartitionerTest extends TestCase { @Parameterized.Parameter(3) public Record pulsarRecord; + private static final MessageId testMessageId = new MessageIdImpl(12, 34, 1); + @Parameterized.Parameters public static Object[][] data() { BlobStoreAbstractConfig blobStoreAbstractConfig = new BlobStoreAbstractConfig(); @@ -89,8 +92,8 @@ public static Object[][] data() { return new Object[][]{ new Object[]{ simplePartitioner, - "3221225506", - "public/default/test" + Partitioner.PATH_SEPARATOR + "3221225506", + testMessageId.toString(), + "public/default/test" + Partitioner.PATH_SEPARATOR + testMessageId, getTopic() }, new Object[]{ @@ -101,44 +104,44 @@ public static Object[][] data() { }, new Object[]{ dayPartitioner, - "2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test/2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", + "2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test/2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, getTopic() }, new Object[]{ hourPartitioner, - "2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test/2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506" + "2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test/2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId , getTopic() }, new Object[]{ simplePartitioner, - "3221225506", - "public/default/test-partition-1" + Partitioner.PATH_SEPARATOR + "3221225506", + testMessageId.toString(), + "public/default/test-partition-1" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, new Object[]{ dayPartitioner, - "2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test-partition-1/2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", + "2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test-partition-1/2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, new Object[]{ hourPartitioner, - "2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test-partition-1/2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506" + "2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test-partition-1/2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId , getPartitionedTopic() }, new Object[]{ noPartitionNumberPartitioner, - "3221225506", - "public/default/test" + Partitioner.PATH_SEPARATOR + "3221225506", + testMessageId.toString(), + "public/default/test" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, new Object[]{ numberPartitioner, - "2020-09-08-14" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test-partition-1/2020-09-08-14" + Partitioner.PATH_SEPARATOR + "3221225506", + "2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test-partition-1/2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, }; @@ -148,14 +151,13 @@ public static Record getPartitionedTopic() { @SuppressWarnings("unchecked") Message mock = mock(Message.class); when(mock.getPublishTime()).thenReturn(1599578218610L); - when(mock.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); + when(mock.getMessageId()).thenReturn(testMessageId); String topic = TopicName.get("test-partition-1").toString(); Record mockRecord = mock(Record.class); when(mockRecord.getTopicName()).thenReturn(Optional.of(topic)); when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1)); when(mockRecord.getMessage()).thenReturn(Optional.of(mock)); when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1))); - when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L)); return mockRecord; } @@ -163,7 +165,7 @@ public static Record getTopic() { @SuppressWarnings("unchecked") Message mock = mock(Message.class); when(mock.getPublishTime()).thenReturn(1599578218610L); - when(mock.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); + when(mock.getMessageId()).thenReturn(testMessageId); when(mock.hasIndex()).thenReturn(true); when(mock.getIndex()).thenReturn(Optional.of(11115506L)); @@ -173,7 +175,6 @@ public static Record getTopic() { when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1)); when(mockRecord.getMessage()).thenReturn(Optional.of(mock)); when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1))); - when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L)); return mockRecord; } diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java index cd33bf29..a6baaad2 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/SliceTopicPartitionPartitionerTest.java @@ -25,6 +25,7 @@ import java.util.Optional; import junit.framework.TestCase; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; @@ -53,6 +54,8 @@ public class SliceTopicPartitionPartitionerTest extends TestCase { @Parameterized.Parameter(3) public Record pulsarRecord; + private static final MessageId testMessageId = new MessageIdImpl(12, 34, 1); + @Parameterized.Parameters public static Object[][] data() { BlobStoreAbstractConfig blobStoreAbstractConfig = new BlobStoreAbstractConfig(); @@ -81,44 +84,44 @@ public static Object[][] data() { return new Object[][]{ new Object[]{ simplePartitioner, - "3221225506", - "public/default/test" + Partitioner.PATH_SEPARATOR + "3221225506", + testMessageId.toString(), + "public/default/test" + Partitioner.PATH_SEPARATOR + testMessageId, getTopic() }, new Object[]{ dayPartitioner, - "2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test/2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", + "2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test/2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, getTopic() }, new Object[]{ hourPartitioner, - "2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test/2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506" + "2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test/2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId , getTopic() }, new Object[]{ simplePartitioner, - "3221225506", - "public/default/test/1" + Partitioner.PATH_SEPARATOR + "3221225506", + testMessageId.toString(), + "public/default/test/1" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, new Object[]{ dayPartitioner, - "2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test/1/2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506", + "2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test/1/2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, new Object[]{ hourPartitioner, - "2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506", - "public/default/test/1/2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506" + "2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId, + "public/default/test/1/2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId , getPartitionedTopic() }, new Object[]{ noPartitionNumberPartitioner, - "3221225506", - "public/default/test" + Partitioner.PATH_SEPARATOR + "3221225506", + testMessageId.toString(), + "public/default/test" + Partitioner.PATH_SEPARATOR + testMessageId, getPartitionedTopic() }, }; @@ -128,14 +131,13 @@ public static Record getPartitionedTopic() { @SuppressWarnings("unchecked") Message mock = mock(Message.class); when(mock.getPublishTime()).thenReturn(1599578218610L); - when(mock.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); + when(mock.getMessageId()).thenReturn(testMessageId); String topic = TopicName.get("test-partition-1").toString(); Record mockRecord = mock(Record.class); when(mockRecord.getTopicName()).thenReturn(Optional.of(topic)); when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1)); when(mockRecord.getMessage()).thenReturn(Optional.of(mock)); when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1))); - when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L)); return mockRecord; } @@ -143,14 +145,13 @@ public static Record getTopic() { @SuppressWarnings("unchecked") Message mock = mock(Message.class); when(mock.getPublishTime()).thenReturn(1599578218610L); - when(mock.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); + when(mock.getMessageId()).thenReturn(testMessageId); String topic = TopicName.get("test").toString(); Record mockRecord = mock(Record.class); when(mockRecord.getTopicName()).thenReturn(Optional.of(topic)); when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1)); when(mockRecord.getMessage()).thenReturn(Optional.of(mock)); when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1))); - when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L)); return mockRecord; } diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java index 8f22dc14..2275f220 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchBlendTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; @@ -98,6 +99,7 @@ public void setup() throws Exception { Message mockMessage = mock(Message.class); when(mockMessage.size()).thenReturn(PAYLOAD_BYTES); + when(mockMessage.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); GenericSchema schema = createTestSchema(); GenericRecord genericRecord = spy(createTestRecord(schema)); diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java index e27fb936..1689145c 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkBatchPartitionedTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; @@ -103,20 +104,23 @@ public void setup() throws Exception { Message mockMessage = mock(Message.class); when(mockMessage.size()).thenReturn(PAYLOAD_BYTES); + when(mockMessage.getMessageId()).thenReturn(new MessageIdImpl(12, 11, 1)); + + Message mockMessage2 = mock(Message.class); + when(mockMessage2.size()).thenReturn(PAYLOAD_BYTES); + when(mockMessage2.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); this.mockRecordTopic1 = mock(Record.class); when(mockRecordTopic1.getTopicName()).thenReturn(Optional.of("topic-1")); when(mockRecordTopic1.getValue()).thenReturn(genericRecord); when(mockRecordTopic1.getSchema()).thenAnswer((Answer) invocationOnMock -> schema); when(mockRecordTopic1.getMessage()).thenReturn(Optional.of(mockMessage)); - when(mockRecordTopic1.getRecordSequence()).thenReturn(Optional.of(100L)); this.mockRecordTopic2 = mock(Record.class); when(mockRecordTopic2.getTopicName()).thenReturn(Optional.of("topic-2")); when(mockRecordTopic2.getValue()).thenReturn(genericRecord); when(mockRecordTopic2.getSchema()).thenAnswer((Answer) invocationOnMock -> schema); - when(mockRecordTopic2.getMessage()).thenReturn(Optional.of(mockMessage)); - when(mockRecordTopic2.getRecordSequence()).thenReturn(Optional.of(200L)); + when(mockRecordTopic2.getMessage()).thenReturn(Optional.of(mockMessage2)); } @After @@ -168,30 +172,30 @@ public void flushOnTimeOutTests() throws Exception { } await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(10)).untilAsserted( () -> verify(mockBlobWriter, times(1)) - .uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)) + .uploadBlob(eq("public/default/topic-1/12:11:1.raw"), any(ByteBuffer.class)) ); - verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-2/200.raw"), any(ByteBuffer.class)); + verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-2/12:34:1.raw"), any(ByteBuffer.class)); // 3. Write 2 message for topic-1 again and assert not message need flush(no timeout) for (int i = 0; i < 2; i++) { sink.write(mockRecordTopic1); } clearInvocations(mockBlobWriter); - verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)); + verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-1/12:11:1.raw"), any(ByteBuffer.class)); // 4. Second sleep maxBatchTimeout / 2 again, and assert topic-2 data need flush // and topic-1 no need flush(no timeout) Thread.sleep(maxBatchTimeout / 2 + 100); await().atMost(Duration.ofSeconds(10)).pollInterval(Duration.ofMillis(10)).untilAsserted( () -> verify(mockBlobWriter, times(1)) - .uploadBlob(eq("public/default/topic-2/200.raw"), any(ByteBuffer.class)) + .uploadBlob(eq("public/default/topic-2/12:34:1.raw"), any(ByteBuffer.class)) ); - verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)); + verify(mockBlobWriter, never()).uploadBlob(eq("public/default/topic-1/12:11:1.raw"), any(ByteBuffer.class)); // 5. Assert for topic-1 flush data step-3 write data. await().atMost(Duration.ofSeconds(10)).untilAsserted( () -> verify(mockBlobWriter, times(1)) - .uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)) + .uploadBlob(eq("public/default/topic-1/12:11:1.raw"), any(ByteBuffer.class)) ); // 6. Assert all message has been ack @@ -238,6 +242,7 @@ public void repeatedlyFlushOnMultiConditionTest() throws Exception { int randomMultiplier = ThreadLocalRandom.current().nextInt(1, 6); return PAYLOAD_BYTES * randomMultiplier; }); + when(randomMessage.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1)); when(mockRecordTopic1.getMessage()).thenReturn(Optional.of(randomMessage)); when(mockRecordTopic2.getMessage()).thenReturn(Optional.of(randomMessage)); @@ -308,7 +313,7 @@ private void verifySinkFlush() throws Exception { await().atMost(Duration.ofSeconds(10)).untilAsserted( () -> verify(mockBlobWriter, times(1)) - .uploadBlob(eq("public/default/topic-2/200.raw"), any(ByteBuffer.class)) + .uploadBlob(eq("public/default/topic-2/12:34:1.raw"), any(ByteBuffer.class)) ); await().atMost(Duration.ofSeconds(30)).untilAsserted( () -> verify(mockRecordTopic2, times(5)).ack() @@ -317,7 +322,7 @@ private void verifySinkFlush() throws Exception { this.sink.write(mockRecordTopic1); await().atMost(Duration.ofSeconds(10)).untilAsserted( () -> verify(mockBlobWriter, times(1)) - .uploadBlob(eq("public/default/topic-1/100.raw"), any(ByteBuffer.class)) + .uploadBlob(eq("public/default/topic-1/12:11:1.raw"), any(ByteBuffer.class)) ); await().atMost(Duration.ofSeconds(30)).untilAsserted( () -> verify(mockRecordTopic1, times(5)).ack()