-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #669] Support RocketMQ as storage of mqtt broker #687
Changes from all commits
9873930
6e00a74
655f1bf
1f7459b
abd9978
e4dca11
5770837
2000e67
232aaf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
import io.netty.channel.socket.nio.NioServerSocketChannel; | ||
import io.netty.handler.codec.mqtt.MqttDecoder; | ||
import io.netty.handler.codec.mqtt.MqttEncoder; | ||
import org.apache.rocketmq.iot.common.configuration.MQTTBridgeConfiguration; | ||
import org.apache.rocketmq.iot.common.config.MqttBridgeConfig; | ||
import org.apache.rocketmq.iot.common.data.Message; | ||
import org.apache.rocketmq.iot.connection.client.ClientManager; | ||
import org.apache.rocketmq.iot.protocol.mqtt.handler.MessageDispatcher; | ||
|
@@ -38,36 +38,61 @@ | |
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttDisconnectMessageHandler; | ||
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttMessageForwarder; | ||
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttPingreqMessageHandler; | ||
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttPublishMessageHandler; | ||
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttSubscribeMessageHandler; | ||
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttUnsubscribeMessagHandler; | ||
import org.apache.rocketmq.iot.storage.message.MessageStore; | ||
import org.apache.rocketmq.iot.storage.rocketmq.PublishProducer; | ||
import org.apache.rocketmq.iot.storage.rocketmq.RocketMQPublishProducer; | ||
import org.apache.rocketmq.iot.storage.rocketmq.RocketMQSubscribeConsumer; | ||
import org.apache.rocketmq.iot.storage.rocketmq.SubscribeConsumer; | ||
import org.apache.rocketmq.iot.storage.subscription.SubscriptionStore; | ||
import org.apache.rocketmq.iot.storage.subscription.impl.InMemorySubscriptionStore; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MQTTBridge { | ||
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class); | ||
|
||
private MqttBridgeConfig bridgeConfig; | ||
|
||
private ServerBootstrap serverBootstrap; | ||
private NioEventLoopGroup bossGroup; | ||
private NioEventLoopGroup workerGroup; | ||
|
||
private MessageDispatcher messageDispatcher; | ||
private SubscriptionStore subscriptionStore; | ||
private ClientManager clientManager; | ||
private MqttConnectionHandler connectionHandler; | ||
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class); | ||
private MessageStore messageStore; | ||
private PublishProducer publishProducer; | ||
private SubscribeConsumer subscribeConsumer; | ||
|
||
public MQTTBridge() { | ||
init(); | ||
} | ||
|
||
private void init() { | ||
bossGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfBossGroup()); | ||
workerGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfWorkerGroup()); | ||
this.bridgeConfig = new MqttBridgeConfig(); | ||
|
||
subscriptionStore = new InMemorySubscriptionStore(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default InMemory subscription? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. subscription related with consumer logic. I'll finish it at consumer part. |
||
if (bridgeConfig.isEnableRocketMQStore()) { | ||
this.publishProducer = new RocketMQPublishProducer(bridgeConfig); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If disable, where init publishProducer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default store is rmq, but the old design stored in memory(simple mode) could be keeped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will the memory mode or rmq storage mode be the default? I don't think the default mode is memory mode, which is not a production-available mode. If we could reach a consensus, we should change the enable logic for the default choice. |
||
this.subscribeConsumer = new RocketMQSubscribeConsumer(bridgeConfig, subscriptionStore); | ||
} | ||
|
||
clientManager = new ClientManagerImpl(); | ||
messageDispatcher = new MessageDispatcher(clientManager); | ||
connectionHandler = new MqttConnectionHandler(clientManager, subscriptionStore, subscribeConsumer); | ||
registerMessageHandlers(); | ||
|
||
bossGroup = new NioEventLoopGroup(bridgeConfig.getBossGroupThreadNum()); | ||
workerGroup = new NioEventLoopGroup(bridgeConfig.getWorkerGroupThreadNum()); | ||
serverBootstrap = new ServerBootstrap(); | ||
serverBootstrap.group(bossGroup, workerGroup) | ||
.localAddress(MQTTBridgeConfiguration.port()) | ||
.localAddress(bridgeConfig.getBrokerPort()) | ||
.channel(NioServerSocketChannel.class) | ||
.option(ChannelOption.SO_BACKLOG, MQTTBridgeConfiguration.socketBacklog()) | ||
.option(ChannelOption.SO_BACKLOG, bridgeConfig.getSocketBacklogSize()) | ||
.childHandler(new ChannelInitializer<SocketChannel>() { | ||
@Override protected void initChannel(SocketChannel ch) throws Exception { | ||
ChannelPipeline pipeline = ch.pipeline(); | ||
|
@@ -78,29 +103,35 @@ private void init() { | |
pipeline.addLast("connection-manager", connectionHandler); | ||
} | ||
}); | ||
subscriptionStore = new InMemorySubscriptionStore(); | ||
clientManager = new ClientManagerImpl(); | ||
messageDispatcher = new MessageDispatcher(clientManager); | ||
connectionHandler = new MqttConnectionHandler(clientManager, subscriptionStore); | ||
registerMessageHandlers(); | ||
|
||
} | ||
|
||
private void registerMessageHandlers() { | ||
messageDispatcher.registerHandler(Message.Type.MQTT_CONNECT, new MqttConnectMessageHandler(clientManager)); | ||
messageDispatcher.registerHandler(Message.Type.MQTT_DISCONNECT, new MqttDisconnectMessageHandler(clientManager)); | ||
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttMessageForwarder(subscriptionStore)); | ||
if (bridgeConfig.isEnableRocketMQStore()) { | ||
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttPublishMessageHandler(messageStore, publishProducer)); | ||
// TODO: mqtt cluster inner forwarder, need management of offset and client | ||
} else { | ||
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttMessageForwarder(subscriptionStore)); | ||
} | ||
// TODO qos 1/2 PUBLISH | ||
// TODO qos 1: PUBACK | ||
// TODO qos 2: PUBREC | ||
// TODO qos 2: PUBREL | ||
// TODO qos 2: PUBCOMP | ||
messageDispatcher.registerHandler(Message.Type.MQTT_PINGREQ, new MqttPingreqMessageHandler()); | ||
messageDispatcher.registerHandler(Message.Type.MQTT_SUBSCRIBE, new MqttSubscribeMessageHandler(subscriptionStore)); | ||
messageDispatcher.registerHandler(Message.Type.MQTT_UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(subscriptionStore)); | ||
messageDispatcher.registerHandler(Message.Type.MQTT_SUBSCRIBE, new MqttSubscribeMessageHandler(subscriptionStore, subscribeConsumer)); | ||
messageDispatcher.registerHandler(Message.Type.MQTT_UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(subscriptionStore, subscribeConsumer)); | ||
} | ||
|
||
public void start() { | ||
logger.info("start the MQTTServer with config " + bridgeConfig); | ||
try { | ||
if (bridgeConfig.isEnableRocketMQStore()) { | ||
publishProducer.start(); | ||
subscribeConsumer.start(); | ||
} | ||
ChannelFuture channelFuture = serverBootstrap.bind().sync(); | ||
channelFuture.channel().closeFuture().sync(); | ||
} catch (Exception e) { | ||
|
@@ -109,12 +140,15 @@ public void start() { | |
logger.info("shutdown the MQTTServer"); | ||
shutdown(); | ||
} | ||
|
||
} | ||
|
||
public void shutdown() { | ||
bossGroup.shutdownGracefully(); | ||
workerGroup.shutdownGracefully(); | ||
if (bridgeConfig.isEnableRocketMQStore()) { | ||
publishProducer.shutdown(); | ||
subscribeConsumer.shutdown(); | ||
} | ||
} | ||
|
||
public static void main(String [] args) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.rocketmq.iot.common.config; | ||
|
||
import java.util.Properties; | ||
|
||
import static org.apache.rocketmq.iot.common.configuration.MQTTBridgeConfiguration.*; | ||
|
||
public class MqttBridgeConfig { | ||
private Properties properties; | ||
|
||
private String brokerHost; | ||
private int brokerPort; | ||
private int bossGroupThreadNum; | ||
private int workerGroupThreadNum; | ||
private int socketBacklogSize; | ||
|
||
private boolean enableRocketMQStore; | ||
private String rmqNamesrvAddr; | ||
private String rmqProductGroup; | ||
private String rmqConsumerGroup; | ||
private int rmqConsumerPullNums; | ||
private String rmqAccessKey; | ||
private String rmqSecretKey; | ||
|
||
public MqttBridgeConfig() { | ||
initConfig(); | ||
} | ||
|
||
public MqttBridgeConfig(Properties properties) { | ||
this.properties = properties; | ||
} | ||
|
||
public void initConfig() { | ||
this.brokerHost = System.getProperty(MQTT_BROKER_HOST, MQTT_BROKER_HOST_DEFAULT); | ||
this.brokerPort = Integer.parseInt(System.getProperty(MQTT_BROKER_PORT, MQTT_BROKER_PORT_DEFAULT)); | ||
|
||
this.bossGroupThreadNum = Integer.parseInt(System.getProperty(MQTT_SERVER_BOSS_GROUP_THREAD_NUM, | ||
MQTT_SERVER_BOSS_GROUP_THREAD_NUM_DEFAULT)); | ||
this.workerGroupThreadNum = Integer.parseInt(System.getProperty(MQTT_SERVER_WORKER_GROUP_THREAD_NUM, | ||
MQTT_SERVER_WORKER_GROUP_THREAD_NUM_DEFAULT)); | ||
this.socketBacklogSize = Integer.parseInt(System.getProperty(MQTT_SERVER_SOCKET_BACKLOG_SIZE, | ||
MQTT_SERVER_SOCKET_BACKLOG_SIZE_DEFAULT)); | ||
|
||
this.enableRocketMQStore = Boolean.parseBoolean(System.getProperty(MQTT_ROCKETMQ_STORE_ENABLED, MQTT_ROCKETMQ_STORE_ENABLED_DEFAULT)); | ||
if (enableRocketMQStore) { | ||
this.rmqNamesrvAddr = System.getProperty(MQTT_ROCKETMQ_NAMESRVADDR, MQTT_ROCKETMQ_NAMESRVADDR_DEFAULT); | ||
this.rmqProductGroup = System.getProperty(MQTT_ROCKETMQ_PRODUCER_GROUP, MQTT_ROCKETMQ_PRODUCER_GROUP_DEFAULT); | ||
this.rmqConsumerGroup = System.getProperty(MQTT_ROCKETMQ_CONSUMER_GROUP, MQTT_ROCKETMQ_CONSUMER_GROUP_DEFAULT); | ||
this.rmqConsumerPullNums = Integer.parseInt(System.getProperty(MQTT_ROKECTMQ_CONSUMER_PULL_NUMS, | ||
MQTT_ROKECTMQ_CONSUMER_PULL_NUMS_DEFAULT)); | ||
|
||
this.rmqAccessKey = System.getProperty(MQTT_ROCKETMQ_ACCESSKEY, MQTT_ROCKETMQ_ACCESSKEY_DEFAULT); | ||
this.rmqSecretKey = System.getProperty(MQTT_ROCKETMQ_SECRETKEY, MQTT_ROCKETMQ_SECRETKEY_DEFAULT); | ||
} | ||
|
||
} | ||
|
||
public String getBrokerHost() { | ||
return brokerHost; | ||
} | ||
|
||
public int getBrokerPort() { | ||
return brokerPort; | ||
} | ||
|
||
public int getBossGroupThreadNum() { | ||
return bossGroupThreadNum; | ||
} | ||
|
||
public int getWorkerGroupThreadNum() { | ||
return workerGroupThreadNum; | ||
} | ||
|
||
public int getSocketBacklogSize() { | ||
return socketBacklogSize; | ||
} | ||
|
||
public boolean isEnableRocketMQStore() { | ||
return enableRocketMQStore; | ||
} | ||
|
||
public String getRmqAccessKey() { | ||
return rmqAccessKey; | ||
} | ||
|
||
public String getRmqSecretKey() { | ||
return rmqSecretKey; | ||
} | ||
|
||
public String getRmqNamesrvAddr() { | ||
return rmqNamesrvAddr; | ||
} | ||
|
||
public String getRmqProductGroup() { | ||
return rmqProductGroup; | ||
} | ||
|
||
public String getRmqConsumerGroup() { | ||
return rmqConsumerGroup; | ||
} | ||
|
||
public int getRmqConsumerPullNums() { | ||
return rmqConsumerPullNums; | ||
} | ||
|
||
@Override public String toString() { | ||
return "MqttBridgeConfig{" + | ||
"brokerHost='" + brokerHost + '\'' + | ||
", brokerPort=" + brokerPort + | ||
", bossGroupThreadNum=" + bossGroupThreadNum + | ||
", workerGroupThreadNum=" + workerGroupThreadNum + | ||
", socketBacklogSize=" + socketBacklogSize + | ||
", enableRocketMQStore=" + enableRocketMQStore + | ||
", rmqNamesrvAddr='" + rmqNamesrvAddr + '\'' + | ||
", rmqProductGroup='" + rmqProductGroup + '\'' + | ||
", rmqConsumerGroup='" + rmqConsumerGroup + '\'' + | ||
", rmqConsumerPullNums='" + rmqConsumerPullNums + '\'' + | ||
", rmqAccessKey='" + rmqAccessKey + '\'' + | ||
", rmqSecretKey='" + rmqSecretKey + '\'' + | ||
'}'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tools has depends on client module.