diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 3f90b67ec99..f5ff3179bf0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -307,7 +307,12 @@ public synchronized void start() throws MQClientException { log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); - operateAfterRunning(); + try { + operateAfterRunning(); + } catch (Exception e) { + shutdown(); + throw e; + } break; case RUNNING: diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index c92cadf5057..4eccba8e8d4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -1006,10 +1006,16 @@ public synchronized void start() throws MQClientException { break; } - this.updateTopicSubscribeInfoWhenSubscriptionChanged(); - this.mQClientFactory.checkClientInBroker(); - if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) { - this.mQClientFactory.rebalanceImmediately(); + try { + this.updateTopicSubscribeInfoWhenSubscriptionChanged(); + this.mQClientFactory.checkClientInBroker(); + if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) { + this.mQClientFactory.rebalanceImmediately(); + } + } catch (Exception e) { + log.warn("Start the consumer {} fail.", this.defaultMQPushConsumer.getConsumerGroup(), e); + shutdown(); + throw e; } }