Skip to content

Commit

Permalink
[ISSUE apache#8804] clean offset when remove group offset
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan authored Oct 10, 2024
1 parent c826373 commit e75554d
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.offset;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -110,4 +111,25 @@ public ConcurrentHashMap<String, Long> getLmqOffsetTable() {
public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) {
this.lmqOffsetTable = lmqOffsetTable;
}

@Override
public void removeOffset(String group) {
if (!MixAll.isLmq(group)) {
super.removeOffset(group);
return;
}
Iterator<Map.Entry<String, Long>> it = this.lmqOffsetTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Long> next = it.next();
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("clean lmq group offset {}", topicAtGroup);
}
}
}
}
}

0 comments on commit e75554d

Please sign in to comment.