Skip to content

Commit

Permalink
[ISSUE apache#8239] Fix the issue of potential message loss after a c…
Browse files Browse the repository at this point in the history
…rash under synchronous disk flushing configuration. (apache#8240)
  • Loading branch information
RongtongJin authored Jun 7, 2024
1 parent 4be8fd4 commit d60198f
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public long getConfirmOffset() {
} else if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
return this.confirmOffset;
} else {
return getMaxOffset();
return this.defaultMessageStore.isSyncDiskFlush() ? getFlushedWhere() : getMaxOffset();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2814,7 +2814,11 @@ public long behind() {
}

public boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.getConfirmOffset();
return this.reputFromOffset < getReputEndOffset();
}

protected long getReputEndOffset() {
return DefaultMessageStore.this.getMessageStoreConfig().isReadUnCommitted() ? DefaultMessageStore.this.commitLog.getMaxOffset() : DefaultMessageStore.this.commitLog.getConfirmOffset();
}

public void doReput() {
Expand All @@ -2834,12 +2838,12 @@ public void doReput() {
try {
this.reputFromOffset = result.getStartOffset();

for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
for (int readSize = 0; readSize < result.getSize() && reputFromOffset < getReputEndOffset() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

if (reputFromOffset + size > DefaultMessageStore.this.getConfirmOffset()) {
if (reputFromOffset + size > getReputEndOffset()) {
doNext = false;
break;
}
Expand Down Expand Up @@ -3127,7 +3131,7 @@ public void doReput() {
try {
this.reputFromOffset = result.getStartOffset();

for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
for (int readSize = 0; readSize < result.getSize() && reputFromOffset < getReputEndOffset() && doNext; ) {
ByteBuffer byteBuffer = result.getByteBuffer();

int totalSize = preCheckMessageAndReturnSize(byteBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ public class MessageStoreConfig {

private int topicQueueLockNum = 32;

/**
* If readUnCommitted is true, the dispatch of the consume queue will exceed the confirmOffset, which may cause the client to read uncommitted messages.
* For example, reput offset exceeding the flush offset during synchronous disk flushing.
*/
private boolean readUnCommitted = false;

public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
Expand Down Expand Up @@ -672,7 +678,6 @@ public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) {
this.forceVerifyPropCRC = forceVerifyPropCRC;
}


public String getStorePathCommitLog() {
if (storePathCommitLog == null) {
return storePathRootDir + File.separator + "commitlog";
Expand Down Expand Up @@ -1819,4 +1824,12 @@ public int getTopicQueueLockNum() {
public void setTopicQueueLockNum(int topicQueueLockNum) {
this.topicQueueLockNum = topicQueueLockNum;
}

public boolean isReadUnCommitted() {
return readUnCommitted;
}

public void setReadUnCommitted(boolean readUnCommitted) {
this.readUnCommitted = readUnCommitted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.store;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.UUID;
import java.io.File;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;

public class ReputMessageServiceTest {
private DefaultMessageStore syncFlushMessageStore;
private DefaultMessageStore asyncFlushMessageStore;
private final String topic = "FooBar";
private final String tmpdir = System.getProperty("java.io.tmpdir");
private final String storePathRootParentDir = (StringUtils.endsWith(tmpdir, File.separator) ? tmpdir : tmpdir + File.separator) + UUID.randomUUID();
private SocketAddress bornHost;
private SocketAddress storeHost;

@Before
public void init() throws Exception {
File file = new File(storePathRootParentDir);
UtilAll.deleteFile(file);
syncFlushMessageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
asyncFlushMessageStore = buildMessageStore(FlushDiskType.ASYNC_FLUSH);
assertTrue(syncFlushMessageStore.load());
assertTrue(asyncFlushMessageStore.load());
syncFlushMessageStore.start();
asyncFlushMessageStore.start();
storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}

private DefaultMessageStore buildMessageStore(FlushDiskType flushDiskType) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setHaListenPort(0);
messageStoreConfig.setFlushDiskType(flushDiskType);
messageStoreConfig.setStorePathRootDir(storePathRootParentDir + File.separator + flushDiskType);
messageStoreConfig.setStorePathCommitLog(storePathRootParentDir + File.separator + flushDiskType + File.separator + "commitlog");
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setLongPollingEnable(false);
DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, mock(BrokerStatsManager.class), null, brokerConfig, null);
// Mock flush disk service
Field field = CommitLog.class.getDeclaredField("flushManager");
field.setAccessible(true);
FlushManager flushManager = mock(FlushManager.class);
CompletableFuture<PutMessageStatus> completableFuture = new CompletableFuture<>();
completableFuture.complete(PutMessageStatus.PUT_OK);
when(flushManager.handleDiskFlush(any(AppendMessageResult.class), any(MessageExt.class))).thenReturn(completableFuture);
field.set(messageStore.getCommitLog(), flushManager);
return messageStore;
}

@Test
public void testReputEndOffset_whenSyncFlush() throws Exception {
for (int i = 0; i < 10; i++) {
assertEquals(PutMessageStatus.PUT_OK, syncFlushMessageStore.putMessage(buildMessage()).getPutMessageStatus());
}
assertEquals(1580, syncFlushMessageStore.getMaxPhyOffset());
assertEquals(0, syncFlushMessageStore.getCommitLog().getFlushedWhere());
// wait for cq dispatch
Thread.sleep(3000);
assertEquals(0, syncFlushMessageStore.getCommitLog().getFlushedWhere());
assertEquals(0, syncFlushMessageStore.getMaxOffsetInQueue(topic, 0));
GetMessageResult getMessageResult = syncFlushMessageStore.getMessage("testGroup", topic, 0, 0, 32, null);
assertEquals(GetMessageStatus.NO_MESSAGE_IN_QUEUE, getMessageResult.getStatus());
}

@Test
public void testReputEndOffset_whenAsyncFlush() throws Exception {
for (int i = 0; i < 10; i++) {
assertEquals(PutMessageStatus.PUT_OK, asyncFlushMessageStore.putMessage(buildMessage()).getPutMessageStatus());
}
assertEquals(1580, asyncFlushMessageStore.getMaxPhyOffset());
assertEquals(0, asyncFlushMessageStore.getCommitLog().getFlushedWhere());
// wait for cq dispatch
Thread.sleep(3000);
assertEquals(0, asyncFlushMessageStore.getCommitLog().getFlushedWhere());
assertEquals(10, asyncFlushMessageStore.getMaxOffsetInQueue(topic, 0));
GetMessageResult getMessageResult = asyncFlushMessageStore.getMessage("testGroup", topic, 0, 0, 32, null);
assertEquals(10, getMessageResult.getMessageCount());
}

private MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
msg.setTags("TAG1");
msg.setBody("Once, there was a chance for me!".getBytes());
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(0);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost);
msg.setBornHost(bornHost);
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
return msg;
}

@After
public void destroy() throws Exception {
if (this.syncFlushMessageStore != null) {
syncFlushMessageStore.shutdown();
syncFlushMessageStore.destroy();
}
if (this.asyncFlushMessageStore != null) {
asyncFlushMessageStore.shutdown();
asyncFlushMessageStore.destroy();
}
File file = new File(storePathRootParentDir);
UtilAll.deleteFile(file);
}
}

0 comments on commit d60198f

Please sign in to comment.