Skip to content

Commit

Permalink
Use message id as file name by default
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Nov 4, 2024
1 parent 4b94112 commit 36bf3df
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 64 deletions.
4 changes: 2 additions & 2 deletions docs/aws-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ There are two types of partitioner:
- **PARTITION**: This is the default partitioning method based on Pulsar partitions. In other words, data is
partitioned according to the pre-existing partitions in Pulsar topics. For instance, a message for the
topic `public/default/my-topic-partition-0` would be directed to the
file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest message offset in this file.
file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file.
- **TIME**: Data is partitioned according to the time it was flushed. Using the previous message as an
example, if it was received on 2023-12-20, it would be directed
to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest message offset in
to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in
this file.
6 changes: 3 additions & 3 deletions docs/azure-blob-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ There are two types of partitioner:
- **PARTITION**: This is the default partitioning method based on Pulsar partitions. In other words, data is
partitioned according to the pre-existing partitions in Pulsar topics. For instance, a message for the
topic `public/default/my-topic-partition-0` would be directed to the
file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest message offset in this file.
file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file.
- **TIME**: Data is partitioned according to the time it was flushed. Using the previous message as an
example, if it was received on 2023-12-20, it would be directed
to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest message offset in
this file.
to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in
this file.
6 changes: 3 additions & 3 deletions docs/google-cloud-storage-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ There are two types of partitioner:
- **PARTITION**: This is the default partitioning method based on Pulsar partitions. In other words, data is
partitioned according to the pre-existing partitions in Pulsar topics. For instance, a message for the
topic `public/default/my-topic-partition-0` would be directed to the
file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest message offset in this file.
file `public/default/my-topic-partition-0/xxx.json`, where `xxx` signifies the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in this file.
- **TIME**: Data is partitioned according to the time it was flushed. Using the previous message as an
example, if it was received on 2023-12-20, it would be directed
to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest message offset in
this file.
to `public/default/my-topic-partition-0/2023-12-20/xxx.json`, where `xxx` also denotes the earliest messageId/offset(Enable config: `partitionerUseIndexAsOffset`) in
this file.
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ public String generatePartitionedPath(String topic, String encodedPartition) {
return StringUtils.join(joinList, PATH_SEPARATOR);
}

protected long getMessageOffset(Record<T> record) {
protected String getFileName(Record<T> record) {
if (useIndexAsOffset && record.getMessage().isPresent()) {
final Message<T> message = record.getMessage().get();
// Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
// Requires exposingBrokerEntryMetadataToClientEnabled=true on brokers.
if (message.hasIndex()) {
final Optional<Long> index = message.getIndex();
if (index.isPresent()) {
return index.get();
return String.valueOf(index.get());
} else {
LOGGER.warn("Found message {} with hasIndex=true but index is empty, using recordSequence",
message.getMessageId());
Expand All @@ -92,7 +92,9 @@ protected long getMessageOffset(Record<T> record) {
message.getMessageId());
}
}
return record.getRecordSequence()
.orElseThrow(() -> new RuntimeException("found empty recordSequence"));
return record.getMessage()
.map(msg -> msg.getMessageId().toString())
.orElseThrow(() -> new RuntimeException("found empty message"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public class SimplePartitioner<T> extends AbstractPartitioner<T> {

@Override
public String encodePartition(Record<T> sinkRecord) {
return Long.toString(getMessageOffset(sinkRecord));
return getFileName(sinkRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public String encodePartition(Record<T> sinkRecord, long nowInMillis) {
String timeString = dateTimeFormatter.format(Instant.ofEpochMilli(parsed).atOffset(ZoneOffset.UTC));
final String result = timeString
+ PATH_SEPARATOR
+ getMessageOffset(sinkRecord);
+ getFileName(sinkRecord);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public String buildPartitionPath(Record<GenericRecord> message,
String encodePartition = partitioner.encodePartition(message, partitioningTimestamp);
String partitionedPath = partitioner.generatePartitionedPath(message.getTopicName().get(), encodePartition);
String path = sinkConfig.getPathPrefix() + partitionedPath + format.getExtension();
log.info("generate message[recordSequence={}] savePath: {}", message.getRecordSequence().get(), path);
log.info("generate message[messageId={}] savePath: {}", message.getMessage().get().getMessageId(), path);
return path;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.jcloud.partitioner;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Record;
import org.junit.Assert;
import org.junit.Test;

public class AbstractPartitionerTest {

@Test
public void testGetMessageOffsetWithBatchMessage() {

AbstractPartitioner<Object> abstractPartitioner = new AbstractPartitioner<>() {
@Override
public String encodePartition(Record<Object> sinkRecord) {
return null;
}
};

BatchMessageIdImpl batchId1 = new BatchMessageIdImpl(12, 34, 1, 1);
Record<Object> message1 = getMessageRecord(batchId1);
String fileName1 = abstractPartitioner.getFileName(message1);

BatchMessageIdImpl batchId2 = new BatchMessageIdImpl(12, 34, 1, 2);
Record<Object> message2 = getMessageRecord(batchId2);
String fileName2 = abstractPartitioner.getFileName(message2);

Assert.assertEquals(fileName1, batchId1.toString());
Assert.assertEquals(fileName2, batchId2.toString());
Assert.assertNotEquals(fileName1, fileName2);

MessageIdImpl id3 = new MessageIdImpl(12, 34, 1);
Assert.assertEquals(abstractPartitioner.getFileName(getMessageRecord(id3)), "12:34:1");
}

public static Record<Object> getMessageRecord(MessageId msgId) {
@SuppressWarnings("unchecked")
Message<Object> mock = mock(Message.class);
when(mock.getPublishTime()).thenReturn(1599578218610L);
when(mock.getMessageId()).thenReturn(msgId);
when(mock.hasIndex()).thenReturn(true);
when(mock.getIndex()).thenReturn(Optional.of(11115506L));

String topic = TopicName.get("test").toString();
Record<Object> mockRecord = mock(Record.class);
when(mockRecord.getTopicName()).thenReturn(Optional.of(topic));
when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1));
when(mockRecord.getMessage()).thenReturn(Optional.of(mock));
when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1)));
return mockRecord;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import junit.framework.TestCase;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -53,6 +54,8 @@ public class PartitionerTest extends TestCase {
@Parameterized.Parameter(3)
public Record<Object> pulsarRecord;

private static final MessageId testMessageId = new MessageIdImpl(12, 34, 1);

@Parameterized.Parameters
public static Object[][] data() {
BlobStoreAbstractConfig blobStoreAbstractConfig = new BlobStoreAbstractConfig();
Expand Down Expand Up @@ -89,8 +92,8 @@ public static Object[][] data() {
return new Object[][]{
new Object[]{
simplePartitioner,
"3221225506",
"public/default/test" + Partitioner.PATH_SEPARATOR + "3221225506",
testMessageId.toString(),
"public/default/test" + Partitioner.PATH_SEPARATOR + testMessageId,
getTopic()
},
new Object[]{
Expand All @@ -101,44 +104,44 @@ public static Object[][] data() {
},
new Object[]{
dayPartitioner,
"2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506",
"public/default/test/2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506",
"2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId,
"public/default/test/2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId,
getTopic()
},
new Object[]{
hourPartitioner,
"2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506",
"public/default/test/2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506"
"2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId,
"public/default/test/2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId
, getTopic()
},
new Object[]{
simplePartitioner,
"3221225506",
"public/default/test-partition-1" + Partitioner.PATH_SEPARATOR + "3221225506",
testMessageId.toString(),
"public/default/test-partition-1" + Partitioner.PATH_SEPARATOR + testMessageId,
getPartitionedTopic()
},
new Object[]{
dayPartitioner,
"2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506",
"public/default/test-partition-1/2020-09-08" + Partitioner.PATH_SEPARATOR + "3221225506",
"2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId,
"public/default/test-partition-1/2020-09-08" + Partitioner.PATH_SEPARATOR + testMessageId,
getPartitionedTopic()
},
new Object[]{
hourPartitioner,
"2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506",
"public/default/test-partition-1/2020-09-08-12" + Partitioner.PATH_SEPARATOR + "3221225506"
"2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId,
"public/default/test-partition-1/2020-09-08-12" + Partitioner.PATH_SEPARATOR + testMessageId
, getPartitionedTopic()
},
new Object[]{
noPartitionNumberPartitioner,
"3221225506",
"public/default/test" + Partitioner.PATH_SEPARATOR + "3221225506",
testMessageId.toString(),
"public/default/test" + Partitioner.PATH_SEPARATOR + testMessageId,
getPartitionedTopic()
},
new Object[]{
numberPartitioner,
"2020-09-08-14" + Partitioner.PATH_SEPARATOR + "3221225506",
"public/default/test-partition-1/2020-09-08-14" + Partitioner.PATH_SEPARATOR + "3221225506",
"2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMessageId,
"public/default/test-partition-1/2020-09-08-14" + Partitioner.PATH_SEPARATOR + testMessageId,
getPartitionedTopic()
},
};
Expand All @@ -148,22 +151,21 @@ public static Record<Object> getPartitionedTopic() {
@SuppressWarnings("unchecked")
Message<Object> mock = mock(Message.class);
when(mock.getPublishTime()).thenReturn(1599578218610L);
when(mock.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1));
when(mock.getMessageId()).thenReturn(testMessageId);
String topic = TopicName.get("test-partition-1").toString();
Record<Object> mockRecord = mock(Record.class);
when(mockRecord.getTopicName()).thenReturn(Optional.of(topic));
when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1));
when(mockRecord.getMessage()).thenReturn(Optional.of(mock));
when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1)));
when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L));
return mockRecord;
}

public static Record<Object> getTopic() {
@SuppressWarnings("unchecked")
Message<Object> mock = mock(Message.class);
when(mock.getPublishTime()).thenReturn(1599578218610L);
when(mock.getMessageId()).thenReturn(new MessageIdImpl(12, 34, 1));
when(mock.getMessageId()).thenReturn(testMessageId);
when(mock.hasIndex()).thenReturn(true);
when(mock.getIndex()).thenReturn(Optional.of(11115506L));

Expand All @@ -173,7 +175,6 @@ public static Record<Object> getTopic() {
when(mockRecord.getPartitionIndex()).thenReturn(Optional.of(1));
when(mockRecord.getMessage()).thenReturn(Optional.of(mock));
when(mockRecord.getPartitionId()).thenReturn(Optional.of(String.format("%s-%s", topic, 1)));
when(mockRecord.getRecordSequence()).thenReturn(Optional.of(3221225506L));
return mockRecord;
}

Expand Down
Loading

0 comments on commit 36bf3df

Please sign in to comment.