Skip to content

Commit

Permalink
Fix disconnect packet handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 16, 2022
1 parent 55afff5 commit d63dffa
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 12 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changes

## [0.8.9] - 2022-09-16

* v3: Send disconnect packet on sink close

* v3: Treat disconnect packet as error on client side

## [0.8.8] - 2022-08-22

* Allow to get inner io stream and codec for negotiated clients
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.8.8"
version = "0.8.9"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
2 changes: 1 addition & 1 deletion src/v3/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ where
ClientError::Disconnected(None)
})?;

let shared = Rc::new(MqttShared::new(io.get_ref(), codec, max_send, pool));
let shared = Rc::new(MqttShared::new(io.get_ref(), codec, max_send, true, pool));

match packet {
codec::Packet::ConnectAck { session_present, return_code } => {
Expand Down
7 changes: 3 additions & 4 deletions src/v3/client/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum ControlMessage<E> {
/// Unhandled publish packet
Publish(Publish),
/// Disconnect packet
#[doc(hidden)]
#[deprecated(since = "0.8.9", note = "Disconnect is not allowed on client side")]
Disconnect(Disconnect),
/// Connection closed
Closed(Closed),
Expand All @@ -26,10 +28,6 @@ impl<E> ControlMessage<E> {
ControlMessage::Publish(Publish(pkt))
}

pub(super) fn dis() -> Self {
ControlMessage::Disconnect(Disconnect)
}

pub(super) fn closed(is_error: bool) -> Self {
ControlMessage::Closed(Closed::new(is_error))
}
Expand All @@ -46,6 +44,7 @@ impl<E> ControlMessage<E> {
ControlMessage::PeerGone(PeerGone(err))
}

/// Initiate clean disconnect
pub fn disconnect(&self) -> ControlResult {
ControlResult { result: ControlResultKind::Disconnect }
}
Expand Down
18 changes: 12 additions & 6 deletions src/v3/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,15 @@ where
DispatchItem::Item(codec::Packet::PingRequest) => {
Either::Right(Either::Left(Ready::Ok(Some(codec::Packet::PingResponse))))
}
DispatchItem::Item(codec::Packet::Disconnect) => Either::Right(Either::Right(
ControlResponse::new(ControlMessage::dis(), &self.inner),
)),
DispatchItem::Item(codec::Packet::Disconnect) => {
Either::Right(Either::Left(Ready::Err(
ProtocolError::Unexpected(
packet_type::DISCONNECT,
"Disconnect packet is not allowed",
)
.into(),
)))
}
DispatchItem::Item(codec::Packet::SubscribeAck { packet_id, status }) => {
if let Err(e) = self.sink.pkt_ack(Ack::Subscribe { packet_id, status }) {
Either::Right(Either::Left(Ready::Err(MqttError::Protocol(e))))
Expand All @@ -159,7 +165,7 @@ where
Either::Right(Either::Left(Ready::Err(
ProtocolError::Unexpected(
packet_type::SUBSCRIBE,
"Subscribe packet is not supported",
"Subscribe packet is not allowed",
)
.into(),
)))
Expand All @@ -168,7 +174,7 @@ where
Either::Right(Either::Left(Ready::Err(
ProtocolError::Unexpected(
packet_type::UNSUBSCRIBE,
"Unsubscribe packet is not supported",
"Unsubscribe packet is not allowed",
)
.into(),
)))
Expand Down Expand Up @@ -302,7 +308,7 @@ where
ControlResultKind::Unsubscribe(_) => unreachable!(),
ControlResultKind::Disconnect => {
this.inner.sink.close();
Some(codec::Packet::Disconnect)
None
}
ControlResultKind::Closed | ControlResultKind::Nothing => None,
},
Expand Down
2 changes: 2 additions & 0 deletions src/v3/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ where
io.clone(),
mqtt::Codec::default().max_size(self.max_size),
16,
false,
self.pool.clone(),
));
let mut timeout = Deadline::new(self.handshake_timeout);
Expand Down Expand Up @@ -318,6 +319,7 @@ where
io.get_ref(),
mqtt::Codec::default().max_size(self.max_size),
16,
false,
self.pool.clone(),
));

Expand Down
1 change: 1 addition & 0 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ where
io.get_ref(),
mqtt::Codec::default().max_size(self.max_size),
16,
false,
self.pool.clone(),
));
let handshake_timeout = self.handshake_timeout;
Expand Down
3 changes: 3 additions & 0 deletions src/v3/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct MqttShared {
pub(super) inflight_idx: Cell<u16>,
pub(super) pool: Rc<MqttSinkPool>,
pub(super) codec: codec::Codec,
pub(super) client: bool,
}

pub(super) struct MqttSharedQueues {
Expand All @@ -57,12 +58,14 @@ impl MqttShared {
io: IoRef,
codec: codec::Codec,
cap: usize,
client: bool,
pool: Rc<MqttSinkPool>,
) -> Self {
Self {
io,
pool,
codec,
client,
cap: Cell::new(cap),
queues: RefCell::new(MqttSharedQueues {
inflight: HashMap::default(),
Expand Down
3 changes: 3 additions & 0 deletions src/v3/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ impl MqttSink {
#[inline]
/// Close mqtt connection
pub fn close(&self) {
if self.0.client {
let _ = self.0.io.encode(codec::Packet::Disconnect, &self.0.codec);
}
self.0.io.close();
self.0.with_queues(|q| {
q.inflight.clear();
Expand Down

0 comments on commit d63dffa

Please sign in to comment.