From 2e40b6dd8187f403819d845ca56e87150fde6db8 Mon Sep 17 00:00:00 2001 From: qipingluo Date: Thu, 4 Jan 2024 11:36:36 +0800 Subject: [PATCH] only readable message queues are allowed to be assigned to push consumer --- .../client/java/impl/consumer/PushConsumerImpl.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 295367ace..fbf0b5d0a 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -389,6 +390,14 @@ public void onSuccess(Assignments latest) { } log.info("Attention!!! acquired empty assignments from remote, but existed assignments" + " is not empty, topic={}, clientId={}", topic, clientId); + } else { + List newAssignmentList = new ArrayList<>(latest.getAssignmentList().size()); + for (Assignment assignment : latest.getAssignmentList()) { + if (assignment.getMessageQueue().getPermission().isReadable()) { + newAssignmentList.add(assignment); + } + } + latest = new Assignments(newAssignmentList); } if (!latest.equals(existed)) {