Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #669] Support RocketMQ as storage of mqtt broker #687

Merged
merged 9 commits into from
Jul 21, 2021
22 changes: 16 additions & 6 deletions rocketmq-iot-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ limitations under the License.
<name>rocketmq-iot-bridge ${project.version}</name>

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tools has depends on client module.

<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down Expand Up @@ -53,12 +58,6 @@ limitations under the License.
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand All @@ -74,7 +73,10 @@ limitations under the License.
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<rocketmq.version>4.8.0</rocketmq.version>
</properties>

<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -209,6 +211,14 @@ limitations under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import org.apache.rocketmq.iot.common.configuration.MQTTBridgeConfiguration;
import org.apache.rocketmq.iot.common.config.MqttBridgeConfig;
import org.apache.rocketmq.iot.common.data.Message;
import org.apache.rocketmq.iot.connection.client.ClientManager;
import org.apache.rocketmq.iot.protocol.mqtt.handler.MessageDispatcher;
Expand All @@ -38,36 +38,61 @@
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttDisconnectMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttMessageForwarder;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttPingreqMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttPublishMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttSubscribeMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.iot.storage.message.MessageStore;
import org.apache.rocketmq.iot.storage.rocketmq.PublishProducer;
import org.apache.rocketmq.iot.storage.rocketmq.RocketMQPublishProducer;
import org.apache.rocketmq.iot.storage.rocketmq.RocketMQSubscribeConsumer;
import org.apache.rocketmq.iot.storage.rocketmq.SubscribeConsumer;
import org.apache.rocketmq.iot.storage.subscription.SubscriptionStore;
import org.apache.rocketmq.iot.storage.subscription.impl.InMemorySubscriptionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTBridge {
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class);

private MqttBridgeConfig bridgeConfig;

private ServerBootstrap serverBootstrap;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;

private MessageDispatcher messageDispatcher;
private SubscriptionStore subscriptionStore;
private ClientManager clientManager;
private MqttConnectionHandler connectionHandler;
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class);
private MessageStore messageStore;
private PublishProducer publishProducer;
private SubscribeConsumer subscribeConsumer;

public MQTTBridge() {
init();
}

private void init() {
bossGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfBossGroup());
workerGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfWorkerGroup());
this.bridgeConfig = new MqttBridgeConfig();

subscriptionStore = new InMemorySubscriptionStore();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default InMemory subscription?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription related with consumer logic. I'll finish it at consumer part.

if (bridgeConfig.isEnableRocketMQStore()) {
this.publishProducer = new RocketMQPublishProducer(bridgeConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If disable, where init publishProducer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default store is rmq, but the old design stored in memory(simple mode) could be keeped.
User could choose if turn on the rmq store, developer could use simple mode to test basic feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the memory mode or rmq storage mode be the default? I don't think the default mode is memory mode, which is not a production-available mode. If we could reach a consensus, we should change the enable logic for the default choice.

this.subscribeConsumer = new RocketMQSubscribeConsumer(bridgeConfig, subscriptionStore);
}

clientManager = new ClientManagerImpl();
messageDispatcher = new MessageDispatcher(clientManager);
connectionHandler = new MqttConnectionHandler(clientManager, subscriptionStore, subscribeConsumer);
registerMessageHandlers();

bossGroup = new NioEventLoopGroup(bridgeConfig.getBossGroupThreadNum());
workerGroup = new NioEventLoopGroup(bridgeConfig.getWorkerGroupThreadNum());
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.localAddress(MQTTBridgeConfiguration.port())
.localAddress(bridgeConfig.getBrokerPort())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, MQTTBridgeConfiguration.socketBacklog())
.option(ChannelOption.SO_BACKLOG, bridgeConfig.getSocketBacklogSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
Expand All @@ -78,29 +103,35 @@ private void init() {
pipeline.addLast("connection-manager", connectionHandler);
}
});
subscriptionStore = new InMemorySubscriptionStore();
clientManager = new ClientManagerImpl();
messageDispatcher = new MessageDispatcher(clientManager);
connectionHandler = new MqttConnectionHandler(clientManager, subscriptionStore);
registerMessageHandlers();

}

private void registerMessageHandlers() {
messageDispatcher.registerHandler(Message.Type.MQTT_CONNECT, new MqttConnectMessageHandler(clientManager));
messageDispatcher.registerHandler(Message.Type.MQTT_DISCONNECT, new MqttDisconnectMessageHandler(clientManager));
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttMessageForwarder(subscriptionStore));
if (bridgeConfig.isEnableRocketMQStore()) {
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttPublishMessageHandler(messageStore, publishProducer));
// TODO: mqtt cluster inner forwarder, need management of offset and client
} else {
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttMessageForwarder(subscriptionStore));
}
// TODO qos 1/2 PUBLISH
// TODO qos 1: PUBACK
// TODO qos 2: PUBREC
// TODO qos 2: PUBREL
// TODO qos 2: PUBCOMP
messageDispatcher.registerHandler(Message.Type.MQTT_PINGREQ, new MqttPingreqMessageHandler());
messageDispatcher.registerHandler(Message.Type.MQTT_SUBSCRIBE, new MqttSubscribeMessageHandler(subscriptionStore));
messageDispatcher.registerHandler(Message.Type.MQTT_UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(subscriptionStore));
messageDispatcher.registerHandler(Message.Type.MQTT_SUBSCRIBE, new MqttSubscribeMessageHandler(subscriptionStore, subscribeConsumer));
messageDispatcher.registerHandler(Message.Type.MQTT_UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(subscriptionStore, subscribeConsumer));
}

public void start() {
logger.info("start the MQTTServer with config " + bridgeConfig);
try {
if (bridgeConfig.isEnableRocketMQStore()) {
publishProducer.start();
subscribeConsumer.start();
}
ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
Expand All @@ -109,12 +140,15 @@ public void start() {
logger.info("shutdown the MQTTServer");
shutdown();
}

}

public void shutdown() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
if (bridgeConfig.isEnableRocketMQStore()) {
publishProducer.shutdown();
subscribeConsumer.shutdown();
}
}

public static void main(String [] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.iot.common.config;

import java.util.Properties;

import static org.apache.rocketmq.iot.common.configuration.MQTTBridgeConfiguration.*;

public class MqttBridgeConfig {
private Properties properties;

private String brokerHost;
private int brokerPort;
private int bossGroupThreadNum;
private int workerGroupThreadNum;
private int socketBacklogSize;

private boolean enableRocketMQStore;
private String rmqNamesrvAddr;
private String rmqProductGroup;
private String rmqConsumerGroup;
private int rmqConsumerPullNums;
private String rmqAccessKey;
private String rmqSecretKey;

public MqttBridgeConfig() {
initConfig();
}

public MqttBridgeConfig(Properties properties) {
this.properties = properties;
}

public void initConfig() {
this.brokerHost = System.getProperty(MQTT_BROKER_HOST, MQTT_BROKER_HOST_DEFAULT);
this.brokerPort = Integer.parseInt(System.getProperty(MQTT_BROKER_PORT, MQTT_BROKER_PORT_DEFAULT));

this.bossGroupThreadNum = Integer.parseInt(System.getProperty(MQTT_SERVER_BOSS_GROUP_THREAD_NUM,
MQTT_SERVER_BOSS_GROUP_THREAD_NUM_DEFAULT));
this.workerGroupThreadNum = Integer.parseInt(System.getProperty(MQTT_SERVER_WORKER_GROUP_THREAD_NUM,
MQTT_SERVER_WORKER_GROUP_THREAD_NUM_DEFAULT));
this.socketBacklogSize = Integer.parseInt(System.getProperty(MQTT_SERVER_SOCKET_BACKLOG_SIZE,
MQTT_SERVER_SOCKET_BACKLOG_SIZE_DEFAULT));

this.enableRocketMQStore = Boolean.parseBoolean(System.getProperty(MQTT_ROCKETMQ_STORE_ENABLED, MQTT_ROCKETMQ_STORE_ENABLED_DEFAULT));
if (enableRocketMQStore) {
this.rmqNamesrvAddr = System.getProperty(MQTT_ROCKETMQ_NAMESRVADDR, MQTT_ROCKETMQ_NAMESRVADDR_DEFAULT);
this.rmqProductGroup = System.getProperty(MQTT_ROCKETMQ_PRODUCER_GROUP, MQTT_ROCKETMQ_PRODUCER_GROUP_DEFAULT);
this.rmqConsumerGroup = System.getProperty(MQTT_ROCKETMQ_CONSUMER_GROUP, MQTT_ROCKETMQ_CONSUMER_GROUP_DEFAULT);
this.rmqConsumerPullNums = Integer.parseInt(System.getProperty(MQTT_ROKECTMQ_CONSUMER_PULL_NUMS,
MQTT_ROKECTMQ_CONSUMER_PULL_NUMS_DEFAULT));

this.rmqAccessKey = System.getProperty(MQTT_ROCKETMQ_ACCESSKEY, MQTT_ROCKETMQ_ACCESSKEY_DEFAULT);
this.rmqSecretKey = System.getProperty(MQTT_ROCKETMQ_SECRETKEY, MQTT_ROCKETMQ_SECRETKEY_DEFAULT);
}

}

public String getBrokerHost() {
return brokerHost;
}

public int getBrokerPort() {
return brokerPort;
}

public int getBossGroupThreadNum() {
return bossGroupThreadNum;
}

public int getWorkerGroupThreadNum() {
return workerGroupThreadNum;
}

public int getSocketBacklogSize() {
return socketBacklogSize;
}

public boolean isEnableRocketMQStore() {
return enableRocketMQStore;
}

public String getRmqAccessKey() {
return rmqAccessKey;
}

public String getRmqSecretKey() {
return rmqSecretKey;
}

public String getRmqNamesrvAddr() {
return rmqNamesrvAddr;
}

public String getRmqProductGroup() {
return rmqProductGroup;
}

public String getRmqConsumerGroup() {
return rmqConsumerGroup;
}

public int getRmqConsumerPullNums() {
return rmqConsumerPullNums;
}

@Override public String toString() {
return "MqttBridgeConfig{" +
"brokerHost='" + brokerHost + '\'' +
", brokerPort=" + brokerPort +
", bossGroupThreadNum=" + bossGroupThreadNum +
", workerGroupThreadNum=" + workerGroupThreadNum +
", socketBacklogSize=" + socketBacklogSize +
", enableRocketMQStore=" + enableRocketMQStore +
", rmqNamesrvAddr='" + rmqNamesrvAddr + '\'' +
", rmqProductGroup='" + rmqProductGroup + '\'' +
", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
", rmqConsumerPullNums='" + rmqConsumerPullNums + '\'' +
", rmqAccessKey='" + rmqAccessKey + '\'' +
", rmqSecretKey='" + rmqSecretKey + '\'' +
'}';
}
}
Loading