Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): Add mqtt connector #15388

Merged
merged 12 commits into from
Mar 11, 2024

Conversation

bakjos
Copy link
Contributor

@bakjos bakjos commented Mar 2, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

MQTT is a popular protocol for receiving data from multiple IoT devices, it’s a lightweight pub-sub protocol with some features such as retaining the latest message and some delivering guarantees (QoS). The main use case for me is to trigger alerts when certain events happen over a period of time (which I think can be fully implemented using Rising wave).

This PR Implements the following features using the rumqtt library (A rust native library for MQTT).

  • MQTT v5
  • QoS settings using 0 (At most once) by default
  • Support for plain TCP and SSL, including client authentication
  • Retain option for sinks
  • Data splits when using wildcards. Since it’s not supported by default, the way is achieved is by subscribing to the topics using the wildcard and for each new message received will check if the topic was added to include it on the splits.

Since the protocol doesn’t keep track of the offsets when a new consumer subscribes, it will receive the messages from the moment it subscribes plus the retained messages if any it doesn’t make sense to save the state or implement checkpoints

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@bakjos bakjos requested a review from a team as a code owner March 2, 2024 02:36
@bakjos bakjos force-pushed the bakjos/mqtt_connector branch 7 times, most recently from 5a5544d to c9a47a0 Compare March 3, 2024 18:10
Copy link
Member

@xxchan xxchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bakjos, thanks for your PR! Before rushing into the code, could you please discuss about the motivation and some high level ideas? (As mentioned in the PR template)

e.g.,

  • Why do you want MQTT connector? What do you want to use RisingWave for?
  • How does the MQTT connector work? I think it may be different from message brokers like Kafka, and can lose messages. Is it problematic?

These discussions can help the community better understand the context of the PR and make it faster to get the PR reviewed and merged.

@fuyufjh fuyufjh changed the title feat(stream): Add mqtt connector feat(connector): Add mqtt connector Mar 4, 2024
@bakjos
Copy link
Contributor Author

bakjos commented Mar 4, 2024

Hi @bakjos, thanks for your PR! Before rushing into the code, could you please discuss about the motivation and some high level ideas? (As mentioned in the PR template)

e.g.,

  • Why do you want MQTT connector? What do you want to use RisingWave for?
  • How does the MQTT connector work? I think it may be different from message brokers like Kafka, and can lose messages. Is it problematic?

These discussions can help the community better understand the context of the PR and make it faster to get the PR reviewed and merged.

@xxchan
Thanks for the quick response, I tried to improve the summary of the PR, let me know if you think is enough or if more details are needed

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally look good! Thanks for the contribution

@bakjos bakjos force-pushed the bakjos/mqtt_connector branch from 002b1ae to b8c2fc7 Compare March 5, 2024 14:48
@bakjos bakjos requested a review from fuyufjh March 5, 2024 15:28
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your contribution. the common and the source part LGTM, the sink part may need another look.

Comment on lines +51 to +54
let mut topics = HashSet::new();
if !topic.contains('#') && !topic.contains('+') {
topics.insert(topic.clone());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what case is it deal with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tabVersion When it doesn't have wildcards will subscribe to one and only one topic, so we don't have to wait until a message is received

})
}

async fn list_splits(&mut self) -> ConnectorResult<Vec<MqttSplit>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split number (vec length) here stands for the max parallelism for each source streaming job.
Take kafka source as an example, each split matches a partition in concept so we can read multiple partitions in parallel. I am not sure what struct at the place in mqtt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mqtt doesn't have the concept of a partition, but it does support hierarchies /parent/1/alerts /parent/2/alerts and you can use /parent/+/alerts which will have 2 splits (one per topic)

@bakjos
Copy link
Contributor Author

bakjos commented Mar 5, 2024

@ neverchanje The v5 is backwards compatible with v3 and most brokers support both. The rumqttc API for both is quite similar so instead of creating a new connector for v5 if the v3 is needed a new field with the version can be added.

@bakjos bakjos requested review from tabVersion and neverchanje March 5, 2024 20:42
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the rest

(
connector='mqtt',
url='tcp://mqtt-server',
topic= 'test',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output topic is the same as the input topic, forming a loop. Is this intended?

It looks a bit weird to me. Recommend using separated topics for input and output, and writing a sink_check.py for outputted data. Please search for sink_check.py for examples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this from the NATS integration tests, it does this table -> sink -> mqtt broker -> source , so every record inserted in the table ends up in the source table using MQTT, but I will add a sink_check

@@ -998,6 +998,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
NATS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf],
),
MQTT_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Bytes],
Copy link
Contributor

@xiangjinwu xiangjinwu Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is format plain encode bytes on source tested?

Example query:

create table from_source (raw_bytes bytea) with (...) format plain encode bytes;
select raw_bytes from from_source limit 1;
select encode(raw_bytes, 'escape') from from_source limit 1; -- assuming the message is mostly ascii, for example json

Copy link
Contributor Author

@bakjos bakjos Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, and this is the output


dev=> CREATE TABLE mqtt_source_table                                                                                                                                                                                                                       
 (                                                                                                                                                                                                                                                            
  value bytea                                                                                                                                                                                                                                              
 )                                                                                                                                                                                                                                                          
 WITH (                                                                                                                                                                                                                                                         
   connector='mqtt',                                                                                                                                                                                                                                           
   url='tcp://localhost',                                                                                                                                                                                                                                     
   topic= 'test',                                                                                                                                                                                                                                              
   qos = 'at_least_once',                                                                                                                                                                                                                                 
  ) FORMAT PLAIN ENCODE BYTES;

CREATE_TABLE

dev=> SELECT * FROM mqtt_source_table;
                         value
--------------------------------------------------------
 \x7b226964223a352c226e616d65223a224172616d696e7461227d
 \x7b226964223a342c226e616d65223a224a65727279227d
 \x7b226964223a322c226e616d65223a22426f62227d
 \x7b226964223a362c226e616d65223a22436c6f766572227d
 \x7b226964223a332c226e616d65223a22546f6d227d
 \x7b226964223a312c226e616d65223a22416c696365227d
 \x7b226964223a382c226e616d65223a2257617665726c79227d
 \x7b226964223a372c226e616d65223a22506f736579227d
(8 rows)

dev=> select encode(value, 'escape') from mqtt_source_table limit 1;
           encode
----------------------------
 {"id":5,"name":"Araminta"}
(1 row)

dev=>

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!

Might need to refine the sink implementation mentioned in the comments.

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you didn't add any delivery future, which indicates that the chunk will have been delivered after the method returns. However, from the the doc of AsyncClient::publish, after the method returns, IIUC, the message is only added to an event loop buffer, and has not been received by the external queue. If so, this may cause data loss if failure happens.

From the doc, it seems that you should poll the event loop to get notified on the successful delivery. If so, a better implementation might be, to implement a MqttLogSinker specially for mqtt sink, in which you use select! to poll both the upstream input from log_reader and delivery notification from mqtt client event loop. On delivery, you then truncate the log reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenym1 This is not possible to implement because there's currently no way to map the published message with the pkid returned by the event loop, there's an issue already reported in rumqtt and an RFC to add the future for when the message is actually published

@bakjos bakjos requested a review from wenym1 March 6, 2024 14:39
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the sink part. Thanks for the clarification on the comment.

If there is no way to ensure the messages have been successfully delivered, you may mention in the release doc that the current mqtt sink only delivers each message at its best effort, and it does not yet ensure that all messages will be delivered. Also you may create an issue for it to track it so that we can fix it when the client is improved in the future.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's merge the pr

@wenym1 wenym1 added this pull request to the merge queue Mar 11, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Mar 11, 2024
@wenym1
Copy link
Contributor

wenym1 commented Mar 11, 2024

@bakjos Unfortunately, the PR has some conflicts with the current main branch and it fails to pass the CI at the compile stage. Could you please rebase the latest main branch and fix the compile error before we merge it? Thank you.

@bakjos bakjos force-pushed the bakjos/mqtt_connector branch from d319bf0 to 80ecdab Compare March 11, 2024 13:12
@bakjos bakjos force-pushed the bakjos/mqtt_connector branch from 80ecdab to 09adecb Compare March 11, 2024 13:13
@bakjos
Copy link
Contributor Author

bakjos commented Mar 11, 2024

@wenym1 Done :)

@bakjos bakjos force-pushed the bakjos/mqtt_connector branch from 09adecb to 98f4782 Compare March 11, 2024 13:48
@wenym1 wenym1 added this pull request to the merge queue Mar 11, 2024
Merged via the queue into risingwavelabs:main with commit a543896 Mar 11, 2024
30 checks passed
@nemccarthy
Copy link

First of all thanks @bakjos for this PR! This was the only thing holding us back from doing a poc with RisingWave for an IoT use case vs Fluvio.

One issue we have is that a few of the mqtt servers we use are stuck on v3. The best know example of this would be The Things Network (TTN) https://www.thethingsindustries.com/docs/integrations/mqtt/

I know there was mention of using a version flag and importing both versions of the client lib based on this. Not sure if you have any plans to implement or would this be best served as another PR?

@bakjos
Copy link
Contributor Author

bakjos commented Mar 12, 2024

@nemccarthy Not right now, but I might push a new PR to support v3

@bakjos
Copy link
Contributor Author

bakjos commented Mar 13, 2024

@nemccarthy Have you tried using the current client to connect to the TTN server?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci/run-connector-node-integration-tests user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants