Skip to content

Commit

Permalink
[ISSUE apache#9213] Fix get the earliest time error when data is clea…
Browse files Browse the repository at this point in the history
…n up in tiered storage (apache#9214)

* [ISSUE apache#9213] Fix get the earliest time error when data is clean up in tiered storag
  • Loading branch information
lizhimins authored Mar 3, 2025
1 parent 66d4a26 commit 53fdc4a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
public class TieredMessageStore extends AbstractPluginMessageStore {

protected static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
protected static final long MIN_STORE_TIME = -1L;

protected final String brokerName;
protected final MessageStore defaultStore;
Expand Down Expand Up @@ -310,24 +311,21 @@ public long getEarliestMessageTime(String topic, int queueId) {
return getEarliestMessageTimeAsync(topic, queueId).join();
}

/**
* In the original design, getting the earliest time of the first message
* would generate two RPC requests. However, using the timestamp stored in the metadata
* avoids these requests, although this approach might introduce some level of inaccuracy.
*/
@Override
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int queueId) {
long nextEarliestMessageTime = next.getEarliestMessageTime(topic, queueId);
long finalNextEarliestMessageTime = nextEarliestMessageTime > 0 ? nextEarliestMessageTime : Long.MAX_VALUE;
Stopwatch stopwatch = Stopwatch.createStarted();
long localMinTime = next.getEarliestMessageTime(topic, queueId);
return fetcher.getEarliestMessageTimeAsync(topic, queueId)
.thenApply(time -> {
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME)
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
if (time < 0) {
log.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}",
topic, queueId);
return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1;
.thenApply(remoteMinTime -> {
if (localMinTime > MIN_STORE_TIME && remoteMinTime > MIN_STORE_TIME) {
return Math.min(localMinTime, remoteMinTime);
}
return Math.min(finalNextEarliestMessageTime, time);
return localMinTime > MIN_STORE_TIME ? localMinTime :
(remoteMinTime > MIN_STORE_TIME ? remoteMinTime : MIN_STORE_TIME);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(
@Override
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int queueId) {
FlatMessageFile flatFile = flatFileStore.getFlatFile(new MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
return CompletableFuture.completedFuture(-1L);
}

// read from timestamp to timestamp + length
int length = MessageFormatUtil.STORE_TIMESTAMP_POSITION + 8;
return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), length)
.thenApply(MessageFormatUtil::getStoreTimeStamp);
return CompletableFuture.completedFuture(flatFile == null ? -1L : flatFile.getMinStoreTimestamp());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,20 @@ public AppendResult appendConsumeQueue(DispatchRequest request) {
return consumeQueue.append(buffer, request.getStoreTimestamp());
}



@Override
public void release() {

}

@Override
public long getMinStoreTimestamp() {
return commitLog.getMinTimestamp();
long minStoreTime = -1L;
if (Long.MAX_VALUE != commitLog.getMinTimestamp()) {
minStoreTime = Math.max(minStoreTime, commitLog.getMinTimestamp());
}
if (Long.MAX_VALUE != consumeQueue.getMinTimestamp()) {
minStoreTime = Math.max(minStoreTime, consumeQueue.getMinTimestamp());
}
return minStoreTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void testGetMinOffsetInQueue() {
@Test
public void testGetEarliestMessageTimeAsync() {
when(fetcher.getEarliestMessageTimeAsync(anyString(), anyInt())).thenReturn(CompletableFuture.completedFuture(1L));
Assert.assertEquals(1, (long) currentStore.getEarliestMessageTimeAsync(mq.getTopic(), mq.getQueueId()).join());
Assert.assertEquals(0, (long) currentStore.getEarliestMessageTimeAsync(mq.getTopic(), mq.getQueueId()).join());

when(fetcher.getEarliestMessageTimeAsync(anyString(), anyInt())).thenReturn(CompletableFuture.completedFuture(-1L));
when(defaultStore.getEarliestMessageTime(anyString(), anyInt())).thenReturn(2L);
Expand Down

0 comments on commit 53fdc4a

Please sign in to comment.