Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish / subscribe / unsubscribe methods return a notice future that waits publish (QoS0), PubAck (QoS1) and PubRec (QoS2) #851

Open
wants to merge 14 commits into
base: acked
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initiali
`rusttls-pemfile` to `2.0.0`, `async-tungstenite` to `0.24.0`, `ws_stream_tungstenite` to `0.12.0`
and `http` to `1.0.0`. This is a breaking change as types from some of these crates are part of
the public API.
- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `PkidPromise` which resolves into the identifier value chosen by the `EventLoop` when handling the packet.
- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `NoticeFuture` which is noticed after the packet is released (sent in QoS0, ACKed in QoS1, COMPed in QoS2).

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bytes = "1.5"
log = "0.4"
flume = { version = "0.11", default-features = false, features = ["async"] }
thiserror = "1"
linked-hash-map = "0.5"

# Optional
# rustls
Expand Down
65 changes: 65 additions & 0 deletions rumqttc/examples/ack_notif_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use tokio::task::{self, JoinSet};

use rumqttc::v5::{AsyncClient, MqttOptions, mqttbytes::QoS};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1024])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 1024])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 1024])
.await
.unwrap();
set.spawn(future.wait_async());

while let Some(res) = set.join_next().await {
println!("Acknoledged = {:?}", res?);
}

Ok(())
}
70 changes: 0 additions & 70 deletions rumqttc/examples/pkid_promise.rs

This file was deleted.

70 changes: 0 additions & 70 deletions rumqttc/examples/pkid_promise_v5.rs

This file was deleted.

22 changes: 16 additions & 6 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{framed::Network, Transport};
use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
use crate::{MqttOptions, Outgoing};
use crate::NoticeError;

use crate::framed::AsyncReadWrite;
use crate::mqttbytes::v4::*;
Expand Down Expand Up @@ -149,13 +150,22 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
for request in self.pending.drain(..) {
// If the request is a publish request, send an error to the future that is waiting for the ack.
if let Request::Publish(Some(tx), _) = request {
tx.error(NoticeError::SessionReset)
}
}
}
self.network = Some(network);

if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

return Ok(Event::Incoming(connack));
return Ok(Event::Incoming(Packet::ConnAck(connack)));
}

match self.select().await {
Expand Down Expand Up @@ -294,14 +304,14 @@ impl EventLoop {
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
let connack = mqtt_connect(mqtt_options, &mut network).await?;

Ok((network, packet))
Ok((network, connack))
}

pub(crate) async fn socket_connect(
Expand Down Expand Up @@ -469,7 +479,7 @@ async fn network_connect(
async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();
Expand All @@ -486,7 +496,7 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
Ok(Packet::ConnAck(connack))
Ok(connack)
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(packet)),
Expand Down
14 changes: 12 additions & 2 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType};

pub type Incoming = Packet;

use v5::mqttbytes::v5::{SubscribeReasonCode as V5SubscribeReasonCode, UnsubAckReason};
use v5::mqttbytes::v5::{SubscribeReasonCode as V5SubscribeReasonCode,
UnsubAckReason,
PubAckReason, PubRecReason, PubCompReason};

#[derive(Debug, thiserror::Error)]
pub enum NoticeError {
Expand All @@ -168,6 +170,14 @@ pub enum NoticeError {
V5Subscribe(V5SubscribeReasonCode),
#[error(" v5 Unsubscription Failure Reason: {0:?}")]
V5Unsubscribe(UnsubAckReason),
#[error(" v5 Publish Ack Failure Reason Code: {0:?}")]
V5PubAck(PubAckReason),
#[error(" v5 Publish Rec Failure Reason Code: {0:?}")]
V5PubRec(PubRecReason),
#[error(" v5 Publish Comp Failure Reason Code: {0:?}")]
V5PubComp(PubCompReason),
#[error(" Dropped due to session reconnect with previous state expire/lost")]
SessionReset,
}

impl From<oneshot::error::RecvError> for NoticeError {
Expand Down Expand Up @@ -245,7 +255,7 @@ pub enum Request {
PubAck(PubAck),
PubRec(PubRec),
PubComp(PubComp),
PubRel(Option<NoticeTx>, PubRel),
PubRel(PubRel),
PingReq(PingReq),
PingResp(PingResp),
Subscribe(Option<NoticeTx>, Subscribe),
Expand Down
Loading