diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 4287ce78ab0..d6af7b84e79 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -1571,6 +1571,9 @@ public String getServiceName() { public void run() { setState(AbstractStateService.START); TimerMessageStore.LOGGER.info(this.getServiceName() + " service start"); + //Mark different rounds + boolean isRound = true; + Map avoidDeleteLose = new HashMap<>(); while (!this.isStopped()) { try { setState(AbstractStateService.WAITING); @@ -1587,9 +1590,18 @@ public void run() { MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy()); if (null != msgExt) { if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) { + //Clearing is performed once in each round. + //The deletion message is received first and the common message is received once + if (!isRound) { + isRound = true; + for (MessageExt messageExt: avoidDeleteLose.values()) { + addMetric(messageExt, 1); + } + avoidDeleteLose.clear(); + } if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) { - //Execute metric plus one for messages that fail to be deleted - addMetric(msgExt, 1); + + avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt); tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)); } tr.idempotentRelease(); @@ -1599,10 +1611,13 @@ public void run() { if (null == uniqueKey) { LOGGER.warn("No uniqueKey for msg:{}", msgExt); } + //Mark ready for next round + if (isRound) { + isRound = false; + } if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) { - //Normally, it cancels out with the +1 above - addMetric(msgExt, -1); + avoidDeleteLose.remove(uniqueKey); doRes = true; tr.idempotentRelease(); perfCounterTicks.getCounter("dequeue_delete").flow(1);