From e8f0a1c7ff2c6162653761f514c6a1f187f7eb5d Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 4 Nov 2024 15:20:18 +0800 Subject: [PATCH] Fix duplication file name for batch message --- .../partitioner/AbstractPartitioner.java | 18 ++++- .../partitioner/AbstractPartitionerTest.java | 76 +++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java diff --git a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java index 0b95f309..2c739d83 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitioner.java @@ -23,6 +23,8 @@ import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig; @@ -92,7 +94,19 @@ protected long getMessageOffset(Record record) { message.getMessageId()); } } - return record.getRecordSequence() - .orElseThrow(() -> new RuntimeException("found empty recordSequence")); + return record.getMessage() + .map(msg -> getSequenceId(msg.getMessageId())) + .orElseThrow(() -> new RuntimeException("found empty message")); } + + public static final long getSequenceId(MessageId messageId) { + MessageIdAdv msgId = (MessageIdAdv) messageId; + long ledgerId = msgId.getLedgerId(); + long entryId = msgId.getEntryId(); + int batchIndex = msgId.getBatchIndex(); + return batchIndex == -1 + ? (ledgerId << 28) | entryId + : (ledgerId << 36) | (entryId << 8) | (batchIndex & 0xFF); + } + } diff --git a/src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java new file mode 100644 index 00000000..bc3535f7 --- /dev/null +++ b/src/test/java/org/apache/pulsar/io/jcloud/partitioner/AbstractPartitionerTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jcloud.partitioner; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.Optional; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.functions.api.Record; +import org.junit.Assert; +import org.junit.Test; + +public class AbstractPartitionerTest { + + @Test + public void testGetMessageOffsetWithBatchMessage() { + + AbstractPartitioner abstractPartitioner = new AbstractPartitioner<>() { + @Override + public String encodePartition(Record sinkRecord) { + return null; + } + }; + + BatchMessageIdImpl batchId1 = new BatchMessageIdImpl(12, 34, 1, 1); + Record message1 = getMessageRecord(batchId1); + + BatchMessageIdImpl batchId2 = new BatchMessageIdImpl(12, 34, 1, 2); + Record message2 = getMessageRecord(batchId2); + + Assert.assertNotEquals(abstractPartitioner.getMessageOffset(message1), + abstractPartitioner.getMessageOffset(message2)); + + MessageIdImpl id3 = new MessageIdImpl(12, 34, 1); + Assert.assertEquals(abstractPartitioner.getMessageOffset(getMessageRecord(id3)), 3221225506L); + } + + public static Record getMessageRecord(MessageId msgId) { + @SuppressWarnings("unchecked") + Message mock = mock(Message.class); + when(mock.getPublishTime()).thenReturn(1599578218610L); + when(mock.getMessageId()).thenReturn(msgId); + when(mock.hasIndex()).thenReturn(true); + when(mock.getIndex()).thenReturn(Optional.of(11115506L)); + + String topic = TopicName.get("test").toString(); + Record mockRecord = mock(Record.class); + when(mockRecord.getTopicName()).thenReturn(Optional.of(topic)); + when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1)); + when(mockRecord.getMessage()).thenReturn(Optional.of(mock)); + when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1))); + when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L)); + return mockRecord; + } + +} \ No newline at end of file