diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 0e3ede871c3..f1c935d00b7 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -62,6 +62,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { protected static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME); + protected static final long MIN_STORE_TIME = -1L; protected final String brokerName; protected final MessageStore defaultStore; @@ -310,24 +311,21 @@ public long getEarliestMessageTime(String topic, int queueId) { return getEarliestMessageTimeAsync(topic, queueId).join(); } + /** + * In the original design, getting the earliest time of the first message + * would generate two RPC requests. However, using the timestamp stored in the metadata + * avoids these requests, although this approach might introduce some level of inaccuracy. + */ @Override public CompletableFuture getEarliestMessageTimeAsync(String topic, int queueId) { - long nextEarliestMessageTime = next.getEarliestMessageTime(topic, queueId); - long finalNextEarliestMessageTime = nextEarliestMessageTime > 0 ? nextEarliestMessageTime : Long.MAX_VALUE; - Stopwatch stopwatch = Stopwatch.createStarted(); + long localMinTime = next.getEarliestMessageTime(topic, queueId); return fetcher.getEarliestMessageTimeAsync(topic, queueId) - .thenApply(time -> { - Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder() - .put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME) - .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic) - .build(); - TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); - if (time < 0) { - log.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}", - topic, queueId); - return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1; + .thenApply(remoteMinTime -> { + if (localMinTime > MIN_STORE_TIME && remoteMinTime > MIN_STORE_TIME) { + return Math.min(localMinTime, remoteMinTime); } - return Math.min(finalNextEarliestMessageTime, time); + return localMinTime > MIN_STORE_TIME ? localMinTime : + (remoteMinTime > MIN_STORE_TIME ? remoteMinTime : MIN_STORE_TIME); }); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index bc347bd5b47..9e5ab01d3b8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -375,14 +375,7 @@ public CompletableFuture getMessageAsync( @Override public CompletableFuture getEarliestMessageTimeAsync(String topic, int queueId) { FlatMessageFile flatFile = flatFileStore.getFlatFile(new MessageQueue(topic, brokerName, queueId)); - if (flatFile == null) { - return CompletableFuture.completedFuture(-1L); - } - - // read from timestamp to timestamp + length - int length = MessageFormatUtil.STORE_TIMESTAMP_POSITION + 8; - return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), length) - .thenApply(MessageFormatUtil::getStoreTimeStamp); + return CompletableFuture.completedFuture(flatFile == null ? -1L : flatFile.getMinStoreTimestamp()); } @Override diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java index 4510a8a1271..ade37149d68 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java @@ -178,16 +178,20 @@ public AppendResult appendConsumeQueue(DispatchRequest request) { return consumeQueue.append(buffer, request.getStoreTimestamp()); } - - @Override public void release() { - } @Override public long getMinStoreTimestamp() { - return commitLog.getMinTimestamp(); + long minStoreTime = -1L; + if (Long.MAX_VALUE != commitLog.getMinTimestamp()) { + minStoreTime = Math.max(minStoreTime, commitLog.getMinTimestamp()); + } + if (Long.MAX_VALUE != consumeQueue.getMinTimestamp()) { + minStoreTime = Math.max(minStoreTime, consumeQueue.getMinTimestamp()); + } + return minStoreTime; } @Override diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 2f395584829..bb259ae811a 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -246,7 +246,7 @@ public void testGetMinOffsetInQueue() { @Test public void testGetEarliestMessageTimeAsync() { when(fetcher.getEarliestMessageTimeAsync(anyString(), anyInt())).thenReturn(CompletableFuture.completedFuture(1L)); - Assert.assertEquals(1, (long) currentStore.getEarliestMessageTimeAsync(mq.getTopic(), mq.getQueueId()).join()); + Assert.assertEquals(0, (long) currentStore.getEarliestMessageTimeAsync(mq.getTopic(), mq.getQueueId()).join()); when(fetcher.getEarliestMessageTimeAsync(anyString(), anyInt())).thenReturn(CompletableFuture.completedFuture(-1L)); when(defaultStore.getEarliestMessageTime(anyString(), anyInt())).thenReturn(2L);