-
Notifications
You must be signed in to change notification settings - Fork 615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): Add mqtt connector #15388
Conversation
5a5544d
to
c9a47a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bakjos, thanks for your PR! Before rushing into the code, could you please discuss about the motivation and some high level ideas? (As mentioned in the PR template)
e.g.,
- Why do you want MQTT connector? What do you want to use RisingWave for?
- How does the MQTT connector work? I think it may be different from message brokers like Kafka, and can lose messages. Is it problematic?
These discussions can help the community better understand the context of the PR and make it faster to get the PR reviewed and merged.
@xxchan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally look good! Thanks for the contribution
002b1ae
to
b8c2fc7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your contribution. the common and the source part LGTM, the sink part may need another look.
let mut topics = HashSet::new(); | ||
if !topic.contains('#') && !topic.contains('+') { | ||
topics.insert(topic.clone()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what case is it deal with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabVersion When it doesn't have wildcards will subscribe to one and only one topic, so we don't have to wait until a message is received
}) | ||
} | ||
|
||
async fn list_splits(&mut self) -> ConnectorResult<Vec<MqttSplit>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split number (vec length) here stands for the max parallelism for each source streaming job.
Take kafka source as an example, each split matches a partition in concept so we can read multiple partitions in parallel. I am not sure what struct at the place in mqtt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mqtt doesn't have the concept of a partition, but it does support hierarchies /parent/1/alerts
/parent/2/alerts
and you can use /parent/+/alerts
which will have 2 splits (one per topic)
@ neverchanje The v5 is backwards compatible with v3 and most brokers support both. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the rest
( | ||
connector='mqtt', | ||
url='tcp://mqtt-server', | ||
topic= 'test', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output topic is the same as the input topic, forming a loop. Is this intended?
It looks a bit weird to me. Recommend using separated topics for input and output, and writing a sink_check.py
for outputted data. Please search for sink_check.py
for examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took this from the NATS integration tests, it does this table -> sink -> mqtt broker -> source
, so every record inserted in the table ends up in the source table using MQTT, but I will add a sink_check
@@ -998,6 +998,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V | |||
NATS_CONNECTOR => hashmap!( | |||
Format::Plain => vec![Encode::Json, Encode::Protobuf], | |||
), | |||
MQTT_CONNECTOR => hashmap!( | |||
Format::Plain => vec![Encode::Json, Encode::Bytes], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is format plain encode bytes
on source
tested?
Example query:
create table from_source (raw_bytes bytea) with (...) format plain encode bytes;
select raw_bytes from from_source limit 1;
select encode(raw_bytes, 'escape') from from_source limit 1; -- assuming the message is mostly ascii, for example json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did, and this is the output
dev=> CREATE TABLE mqtt_source_table
(
value bytea
)
WITH (
connector='mqtt',
url='tcp://localhost',
topic= 'test',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE BYTES;
CREATE_TABLE
dev=> SELECT * FROM mqtt_source_table;
value
--------------------------------------------------------
\x7b226964223a352c226e616d65223a224172616d696e7461227d
\x7b226964223a342c226e616d65223a224a65727279227d
\x7b226964223a322c226e616d65223a22426f62227d
\x7b226964223a362c226e616d65223a22436c6f766572227d
\x7b226964223a332c226e616d65223a22546f6d227d
\x7b226964223a312c226e616d65223a22416c696365227d
\x7b226964223a382c226e616d65223a2257617665726c79227d
\x7b226964223a372c226e616d65223a22506f736579227d
(8 rows)
dev=> select encode(value, 'escape') from mqtt_source_table limit 1;
encode
----------------------------
{"id":5,"name":"Araminta"}
(1 row)
dev=>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution!
Might need to refine the sink implementation mentioned in the comments.
async fn write_chunk<'a>( | ||
&'a mut self, | ||
chunk: StreamChunk, | ||
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you didn't add any delivery future, which indicates that the chunk will have been delivered after the method returns. However, from the the doc of AsyncClient::publish, after the method returns, IIUC, the message is only added to an event loop buffer, and has not been received by the external queue. If so, this may cause data loss if failure happens.
From the doc, it seems that you should poll the event loop to get notified on the successful delivery. If so, a better implementation might be, to implement a MqttLogSinker
specially for mqtt sink, in which you use select!
to poll both the upstream input from log_reader and delivery notification from mqtt client event loop. On delivery, you then truncate the log reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the sink part. Thanks for the clarification on the comment.
If there is no way to ensure the messages have been successfully delivered, you may mention in the release doc that the current mqtt sink only delivers each message at its best effort, and it does not yet ensure that all messages will be delivered. Also you may create an issue for it to track it so that we can fix it when the client is improved in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's merge the pr
@bakjos Unfortunately, the PR has some conflicts with the current main branch and it fails to pass the CI at the compile stage. Could you please rebase the latest main branch and fix the compile error before we merge it? Thank you. |
d319bf0
to
80ecdab
Compare
80ecdab
to
09adecb
Compare
@wenym1 Done :) |
09adecb
to
98f4782
Compare
First of all thanks @bakjos for this PR! This was the only thing holding us back from doing a poc with RisingWave for an IoT use case vs Fluvio. One issue we have is that a few of the mqtt servers we use are stuck on v3. The best know example of this would be The Things Network (TTN) https://www.thethingsindustries.com/docs/integrations/mqtt/ I know there was mention of using a version flag and importing both versions of the client lib based on this. Not sure if you have any plans to implement or would this be best served as another PR? |
@nemccarthy Not right now, but I might push a new PR to support v3 |
@nemccarthy Have you tried using the current client to connect to the TTN server? |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
MQTT is a popular protocol for receiving data from multiple IoT devices, it’s a lightweight pub-sub protocol with some features such as retaining the latest message and some delivering guarantees (QoS). The main use case for me is to trigger alerts when certain events happen over a period of time (which I think can be fully implemented using Rising wave).
This PR Implements the following features using the rumqtt library (A rust native library for MQTT).
Since the protocol doesn’t keep track of the offsets when a new consumer subscribes, it will receive the messages from the moment it subscribes plus the retained messages if any it doesn’t make sense to save the state or implement checkpoints
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.