Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes requested against bytebeamio#864 #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value
* Ordering of `State.events` related to `QoS > 0` publishes
* Resume v5 session only if broker CONNACK with Session Present 1.
* Resume session only if broker sends `CONNACK` with `session_present == 1`.

### Security

Expand Down
20 changes: 12 additions & 8 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,24 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

return Ok(Event::Incoming(connack));
return Ok(Event::Incoming(Packet::ConnAck(connack)));
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -294,14 +300,14 @@ impl EventLoop {
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
let connack = mqtt_connect(mqtt_options, &mut network).await?;

Ok((network, packet))
Ok((network, connack))
}

pub(crate) async fn socket_connect(
Expand Down Expand Up @@ -469,7 +475,7 @@ async fn network_connect(
async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();
Expand All @@ -485,9 +491,7 @@ async fn mqtt_connect(

// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
Ok(Packet::ConnAck(connack))
}
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => Ok(connack),
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(packet)),
}
Expand Down
40 changes: 15 additions & 25 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,28 @@ impl EventLoop {
if self.network.is_none() {
let (network, connack) = time::timeout(
Duration::from_secs(self.options.connection_timeout()),
connect(&mut self.pending, &mut self.options),
connect(&mut self.options),
)
.await??;
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
self.pending.clear();
}
self.network = Some(network);

if self.keepalive_timeout.is_none() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive)));
}

self.state.handle_incoming_packet(connack)?;
self.state
.handle_incoming_packet(Incoming::ConnAck(connack))?;
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
// MQTT requires that packets pending acknowledgement should be republished on session resume.
// Move pending messages from state to eventloop.
self.clean();
Err(e)
}
Expand Down Expand Up @@ -263,19 +270,14 @@ impl EventLoop {
/// the stream.
/// This function (for convenience) includes internal delays for users to perform internal sleeps
/// between re-connections so that cancel semantics can be used during this sleep
async fn connect(pending: &mut VecDeque<Request>, options: &mut MqttOptions) -> Result<(Network, Incoming), ConnectionError> {
async fn connect(options: &mut MqttOptions) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(pending, options, &mut network).await?;

// Last session might contain packets which aren't acked. MQTT says these packets should be
// republished in the next session if session is resumed
// move pending messages from state to eventloop
// let pending = self.state.clean();
// self.pending = pending.into_iter();
Ok((network, packet))
let connack = mqtt_connect(options, &mut network).await?;

Ok((network, connack))
}

async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionError> {
Expand Down Expand Up @@ -385,10 +387,9 @@ async fn network_connect(options: &MqttOptions) -> Result<Network, ConnectionErr
}

async fn mqtt_connect(
pending: &mut VecDeque<Request>,
options: &mut MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_start = options.clean_start();
let client_id = options.client_id();
Expand All @@ -407,24 +408,13 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
// Check if it's a new session
if !connack.session_present {
// If it's a new session, clear the pendings
pending.clear();
}
// Override local settings if set by server.
if let Some(props) = &connack.properties {
// Override local keep_alive value if set by server.
if let Some(keep_alive) = props.server_keep_alive {
options.keep_alive = Duration::from_secs(keep_alive as u64);
}
network.set_max_outgoing_size(props.max_packet_size);
// Override local session_expiry_interval value if set by server.
if (props.session_expiry_interval).is_some() {
options.set_session_expiry_interval(props.session_expiry_interval);
}
}
Ok(Packet::ConnAck(connack))
Ok(connack)
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
Expand Down