Skip to content

Commit

Permalink
[ISSUE apache#8127] Optimize the metric calculation logic of the time…
Browse files Browse the repository at this point in the history
… wheel (apache#8128)

* Fix the metric of the time wheel was incorrectly calculated

* Fix the metric of the time wheel was incorrectly calculated

---------

Co-authored-by: wanghuaiyuan <wanghuaiyuan@xiaomi.com>
  • Loading branch information
3424672656 and wanghuaiyuan authored Mar 4, 2025
1 parent 33a185a commit 343ed4f
Showing 1 changed file with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,9 @@ public String getServiceName() {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
//Mark different rounds
boolean isRound = true;
Map<String ,MessageExt> avoidDeleteLose = new HashMap<>();
while (!this.isStopped()) {
try {
setState(AbstractStateService.WAITING);
Expand All @@ -1587,9 +1590,18 @@ public void run() {
MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
if (null != msgExt) {
if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) {
//Clearing is performed once in each round.
//The deletion message is received first and the common message is received once
if (!isRound) {
isRound = true;
for (MessageExt messageExt: avoidDeleteLose.values()) {
addMetric(messageExt, 1);
}
avoidDeleteLose.clear();
}
if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) {
//Execute metric plus one for messages that fail to be deleted
addMetric(msgExt, 1);

avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt);
tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
}
tr.idempotentRelease();
Expand All @@ -1599,10 +1611,13 @@ public void run() {
if (null == uniqueKey) {
LOGGER.warn("No uniqueKey for msg:{}", msgExt);
}
//Mark ready for next round
if (isRound) {
isRound = false;
}
if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0
&& tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
//Normally, it cancels out with the +1 above
addMetric(msgExt, -1);
avoidDeleteLose.remove(uniqueKey);
doRes = true;
tr.idempotentRelease();
perfCounterTicks.getCounter("dequeue_delete").flow(1);
Expand Down

0 comments on commit 343ed4f

Please sign in to comment.