-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #669] Support RocketMQ as storage of mqtt broker #687
Conversation
@@ -26,6 +26,16 @@ limitations under the License. | |||
<name>rocketmq-iot-bridge ${project.version}</name> | |||
|
|||
<dependencies> | |||
<dependency> | |||
<groupId>org.apache.rocketmq</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tools has depends on client module.
workerGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfWorkerGroup()); | ||
this.bridgeConfig = new MqttBridgeConfig(); | ||
|
||
subscriptionStore = new InMemorySubscriptionStore(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default InMemory subscription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscription related with consumer logic. I'll finish it at consumer part.
|
||
subscriptionStore = new InMemorySubscriptionStore(); | ||
if (bridgeConfig.isEnableRocketMQStore()) { | ||
this.publishProducer = new RocketMQPublishProducer(bridgeConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If disable, where init publishProducer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default store is rmq, but the old design stored in memory(simple mode) could be keeped.
User could choose if turn on the rmq store, developer could use simple mode to test basic feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the memory mode or rmq storage mode be the default? I don't think the default mode is memory mode, which is not a production-available mode. If we could reach a consensus, we should change the enable logic for the default choice.
if (bridgeConfig.isEnableRocketMQStore()) { | ||
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttPublishMessageHandler(messageStore, publishProducer)); | ||
} else { | ||
// TODO: mqtt cluster inner forwarder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see implementation while not too many todo :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cluster mode need management of consumer offset and client, I'll finish it at consumer part.
ChannelFuture channelFuture = serverBootstrap.bind().sync(); | ||
channelFuture.channel().closeFuture().sync(); | ||
logger.info("start the MQTTServer success."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better print all necesarry configuration when you starting server. Only success or ok is too little significant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix it~
log.error("send msg to rocketMQ failed. clientId:" + client.getId(), e); | ||
} | ||
|
||
// TODO: qos1, qos2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qos1 need consumer response.
qos2 means Exactly once, it's difficult to implemet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, we could postpone to support QoS 2
Add some necessary tests, making this is a relatively complete commit. I will merge this pr. |
I'm working on the branch with consumer part.When I finish the first version of consumer, I will add necessary tests~ |
…RIBE and UNSUBSCRIBE protcol
ping~ |
@maixiaohai I will merge it. pls go on :-0 |
…#687) * Config refactor, add some rocketmq config * Add rmq publish producer * Fix NPE start with memory mode, remove useless dependency * Non-standard mqtt request return directly * Add rmq subscribe consumer and related pull task, complete mqtt SUBSCRIBE and UNSUBSCRIBE protcol * Fix subTopic not remove when disconnect * Fix connection management, fix consume delay to make pull task to queue level * Fix ut not pass when root topic removed from subsription * Fix integration test not pass when not use rmq subscribe consumer Co-authored-by: zhangxu16 <zhangxu16@xiaomi.com>
What is the purpose of the change
issue #669
Brief changelog
Verifying this change
XXXX
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR
.[ISSUE #123] Fix UnknownException when host config not exist
. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle
to make sure basic checks pass. Runmvn clean install -DskipITs
to make sure unit-test pass. Runmvn clean test-compile failsafe:integration-test
to make sure integration-test pass.