diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 295367ace..fbf0b5d0a 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -389,6 +390,14 @@ public void onSuccess(Assignments latest) { } log.info("Attention!!! acquired empty assignments from remote, but existed assignments" + " is not empty, topic={}, clientId={}", topic, clientId); + } else { + List newAssignmentList = new ArrayList<>(latest.getAssignmentList().size()); + for (Assignment assignment : latest.getAssignmentList()) { + if (assignment.getMessageQueue().getPermission().isReadable()) { + newAssignmentList.add(assignment); + } + } + latest = new Assignments(newAssignmentList); } if (!latest.equals(existed)) {