Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rumqttc: resume session only if CONNACK with session present 1 #864

Merged
merged 13 commits into from
May 21, 2024
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes
* Resume session only if broker sends `CONNACK` with `session_present == 1`.

### Security

Expand Down
20 changes: 12 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,24 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

return Ok(Event::Incoming(connack));
return Ok(Event::Incoming(Packet::ConnAck(connack)));
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -294,14 +300,14 @@ impl EventLoop {
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
let connack = mqtt_connect(mqtt_options, &mut network).await?;

Ok((network, packet))
Ok((network, connack))
}

pub(crate) async fn socket_connect(
Expand Down Expand Up @@ -469,7 +475,7 @@ async fn network_connect(
async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();
Expand All @@ -485,9 +491,7 @@ async fn mqtt_connect(

// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
Ok(Packet::ConnAck(connack))
}
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack),
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(packet)),
}
Expand Down
27 changes: 14 additions & 13 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,25 @@ impl EventLoop {
connect(&mut self.options),
)
.await??;
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive)));
}

self.state.handle_incoming_packet(connack)?;
self.state
.handle_incoming_packet(Incoming::ConnAck(connack))?;
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -263,19 +270,14 @@ impl EventLoop {
/// the stream.
/// This function (for convenience) includes internal delays for users to perform internal sleeps
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
async fn connect(options: &mut MqttOptions) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(options, &mut network).await?;

// Last session might contain packets which aren't acked. MQTT says these packets should be
// republished in the next session
// move pending messages from state to eventloop
// let pending = self.state.clean();
// self.pending = pending.into_iter();
Ok((network, packet))
let connack = mqtt_connect(options, &mut network).await?;

Ok((network, connack))
}

async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionError> {
Expand Down Expand Up @@ -387,7 +389,7 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
async fn mqtt_connect(
options: &mut MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_start = options.clean_start();
let client_id = options.client_id();
Expand All @@ -406,7 +408,6 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
// Override local keep_alive value if set by server.
if let Some(props) = &connack.properties {
if let Some(keep_alive) = props.server_keep_alive {
options.keep_alive = Duration::from_secs(keep_alive as u64);
Expand All @@ -418,7 +419,7 @@ async fn mqtt_connect(
options.set_session_expiry_interval(props.session_expiry_interval);
}
}
Ok(Packet::ConnAck(connack))
Ok(connack)
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
Expand Down
9 changes: 6 additions & 3 deletions rumqttc/tests/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Broker {

impl Broker {
/// Create a new broker which accepts 1 mqtt connection
pub async fn new(port: u16, connack: u8) -> Broker {
pub async fn new(port: u16, connack: u8, session_saved: bool) -> Broker {
let addr = format!("127.0.0.1:{port}");
let listener = TcpListener::bind(&addr).await.unwrap();

Expand All @@ -32,9 +32,12 @@ impl Broker {
framed.readb(&mut incoming).await.unwrap();

match incoming.pop_front().unwrap() {
Packet::Connect(_) => {
Packet::Connect(connect) => {
let connack = match connack {
0 => ConnAck::new(ConnectReturnCode::Success, false),
0 => ConnAck::new(
ConnectReturnCode::Success,
!connect.clean_session && session_saved,
),
1 => ConnAck::new(ConnectReturnCode::BadUserNamePassword, false),
_ => {
return Broker {
Expand Down
38 changes: 21 additions & 17 deletions rumqttc/tests/reliability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn _tick(
#[tokio::test]
async fn connection_should_timeout_on_time() {
task::spawn(async move {
let _broker = Broker::new(1880, 3).await;
let _broker = Broker::new(1880, 3, false).await;
time::sleep(Duration::from_secs(10)).await;
});

Expand Down Expand Up @@ -125,7 +125,7 @@ async fn idle_connection_triggers_pings_on_time() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(1885, 0).await;
let mut broker = Broker::new(1885, 0, false).await;
let mut count = 0;
let mut start = Instant::now();

Expand Down Expand Up @@ -169,7 +169,7 @@ async fn some_outgoing_and_no_incoming_should_trigger_pings_on_time() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(1886, 0).await;
let mut broker = Broker::new(1886, 0, false).await;
let mut count = 0;
let mut start = Instant::now();

Expand Down Expand Up @@ -204,7 +204,7 @@ async fn some_incoming_and_no_outgoing_should_trigger_pings_on_time() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(2000, 0).await;
let mut broker = Broker::new(2000, 0, false).await;
let mut count = 0;

// Start sending qos 0 publishes to the client. This triggers
Expand Down Expand Up @@ -238,7 +238,7 @@ async fn detects_halfopen_connections_in_the_second_ping_request() {

// A broker which consumes packets but doesn't reply
task::spawn(async move {
let mut broker = Broker::new(2001, 0).await;
let mut broker = Broker::new(2001, 0, false).await;
broker.blackhole().await;
});

Expand Down Expand Up @@ -279,7 +279,7 @@ async fn requests_are_blocked_after_max_inflight_queue_size() {
run(&mut eventloop, false).await.unwrap();
});

let mut broker = Broker::new(1887, 0).await;
let mut broker = Broker::new(1887, 0, false).await;
for i in 1..=10 {
let packet = broker.read_publish().await;

Expand All @@ -306,7 +306,7 @@ async fn requests_are_recovered_after_inflight_queue_size_falls_below_max() {
run(&mut eventloop, true).await.unwrap();
});

let mut broker = Broker::new(1888, 0).await;
let mut broker = Broker::new(1888, 0, false).await;

// packet 1, 2, and 3
assert!(broker.read_publish().await.is_some());
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn packet_id_collisions_are_detected_and_flow_control_is_applied() {
});

task::spawn(async move {
let mut broker = Broker::new(1891, 0).await;
let mut broker = Broker::new(1891, 0, false).await;

// read all incoming packets first
for i in 1..=4 {
Expand Down Expand Up @@ -449,8 +449,8 @@ async fn next_poll_after_connect_failure_reconnects() {
let options = MqttOptions::new("dummy", "127.0.0.1", 3000);

task::spawn(async move {
let _broker = Broker::new(3000, 1).await;
let _broker = Broker::new(3000, 0).await;
let _broker = Broker::new(3000, 1, false).await;
let _broker = Broker::new(3000, 0, false).await;
time::sleep(Duration::from_secs(15)).await;
});

Expand All @@ -474,7 +474,9 @@ async fn next_poll_after_connect_failure_reconnects() {
#[tokio::test]
async fn reconnection_resumes_from_the_previous_state() {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 3001);
options.set_keep_alive(Duration::from_secs(5));
options
.set_keep_alive(Duration::from_secs(5))
.set_clean_session(false);

// start sending qos0 publishes. Makes sure that there is out activity but no in activity
let (client, mut eventloop) = AsyncClient::new(options, 5);
Expand All @@ -489,7 +491,7 @@ async fn reconnection_resumes_from_the_previous_state() {
});

// broker connection 1
let mut broker = Broker::new(3001, 0).await;
let mut broker = Broker::new(3001, 0, false).await;
for i in 1..=2 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
Expand All @@ -503,7 +505,7 @@ async fn reconnection_resumes_from_the_previous_state() {
// a block around broker with {} is closing the connection as expected

// broker connection 2
let mut broker = Broker::new(3001, 0).await;
let mut broker = Broker::new(3001, 0, true).await;
for i in 3..=4 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
Expand All @@ -514,7 +516,9 @@ async fn reconnection_resumes_from_the_previous_state() {
#[tokio::test]
async fn reconnection_resends_unacked_packets_from_the_previous_connection_first() {
let mut options = MqttOptions::new("dummy", "127.0.0.1", 3002);
options.set_keep_alive(Duration::from_secs(5));
options
.set_keep_alive(Duration::from_secs(5))
.set_clean_session(false);

// start sending qos0 publishes. this makes sure that there is
// outgoing activity but no incoming activity
Expand All @@ -530,14 +534,14 @@ async fn reconnection_resends_unacked_packets_from_the_previous_connection_first
});

// broker connection 1. receive but don't ack
let mut broker = Broker::new(3002, 0).await;
let mut broker = Broker::new(3002, 0, false).await;
for i in 1..=2 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
}

// broker connection 2 receives from scratch
let mut broker = Broker::new(3002, 0).await;
let mut broker = Broker::new(3002, 0, true).await;
for i in 1..=6 {
let packet = broker.read_publish().await.unwrap();
assert_eq!(i, packet.payload[0]);
Expand All @@ -559,7 +563,7 @@ async fn state_is_being_cleaned_properly_and_pending_request_calculated_properly
});

task::spawn(async move {
let mut broker = Broker::new(3004, 0).await;
let mut broker = Broker::new(3004, 0, false).await;
while (broker.read_packet().await).is_some() {
time::sleep(Duration::from_secs_f64(0.5)).await;
}
Expand Down