-
Notifications
You must be signed in to change notification settings - Fork 268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(rumqttc): Replace Vec with FixedBitSet for QoS 2 packet trac… #869
Changes from 1 commit
a9eee8e
36f3372
138d0b1
009f0a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
use crate::{Event, Incoming, Outgoing, Request}; | ||
|
||
use crate::mqttbytes::v4::*; | ||
use crate::mqttbytes::{self, *}; | ||
use crate::mqttbytes::*; | ||
use fixedbitset::FixedBitSet; | ||
use std::collections::VecDeque; | ||
use std::{io, time::Instant}; | ||
|
||
|
@@ -28,7 +29,7 @@ pub enum StateError { | |
#[error("A Subscribe packet must contain atleast one filter")] | ||
EmptySubscription, | ||
#[error("Mqtt serialization/deserialization error: {0}")] | ||
Deserialization(#[from] mqttbytes::Error), | ||
Deserialization(#[from] Error), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have placed things this way to make it clear where that particular error is trickling down from. |
||
#[error("Connection closed by peer abruptly")] | ||
ConnectionAborted, | ||
} | ||
|
@@ -62,9 +63,9 @@ pub struct MqttState { | |
/// Outgoing QoS 1, 2 publishes which aren't acked yet | ||
pub(crate) outgoing_pub: Vec<Option<Publish>>, | ||
/// Packet ids of released QoS 2 publishes | ||
pub(crate) outgoing_rel: Vec<Option<u16>>, | ||
pub(crate) outgoing_rel: FixedBitSet, | ||
/// Packet ids on incoming QoS 2 publishes | ||
pub(crate) incoming_pub: Vec<Option<u16>>, | ||
pub(crate) incoming_pub: FixedBitSet, | ||
/// Last collision due to broker not acking in order | ||
pub collision: Option<Publish>, | ||
/// Buffered incoming packets | ||
|
@@ -89,8 +90,8 @@ impl MqttState { | |
max_inflight, | ||
// index 0 is wasted as 0 is not a valid packet id | ||
outgoing_pub: vec![None; max_inflight as usize + 1], | ||
outgoing_rel: vec![None; max_inflight as usize + 1], | ||
incoming_pub: vec![None; u16::MAX as usize + 1], | ||
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1), | ||
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1), | ||
collision: None, | ||
// TODO: Optimize these sizes later | ||
events: VecDeque::with_capacity(100), | ||
|
@@ -113,17 +114,14 @@ impl MqttState { | |
} | ||
|
||
// remove and collect pending releases | ||
for rel in self.outgoing_rel.iter_mut() { | ||
if let Some(pkid) = rel.take() { | ||
let request = Request::PubRel(PubRel::new(pkid)); | ||
pending.push(request); | ||
} | ||
for pkid in self.outgoing_rel.ones() { | ||
let request = Request::PubRel(PubRel::new(pkid as u16)); | ||
pending.push(request); | ||
} | ||
self.outgoing_rel.clear(); | ||
|
||
// remove packed ids of incoming qos2 publishes | ||
for id in self.incoming_pub.iter_mut() { | ||
id.take(); | ||
} | ||
// remove packet ids of incoming qos2 publishes | ||
self.incoming_pub.clear(); | ||
|
||
self.await_pingresp = false; | ||
self.collision_ping_count = 0; | ||
|
@@ -210,7 +208,7 @@ impl MqttState { | |
} | ||
QoS::ExactlyOnce => { | ||
let pkid = publish.pkid; | ||
self.incoming_pub[pkid as usize] = Some(pkid); | ||
self.incoming_pub.insert(pkid as usize); | ||
|
||
if !self.manual_acks { | ||
let pubrec = PubRec::new(pkid); | ||
|
@@ -261,7 +259,7 @@ impl MqttState { | |
} | ||
|
||
// NOTE: Inflight - 1 for qos2 in comp | ||
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid); | ||
self.outgoing_rel.insert(pubrec.pkid as usize); | ||
let pubrel = PubRel { pkid: pubrec.pkid }; | ||
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); | ||
self.events.push_back(event); | ||
|
@@ -270,16 +268,12 @@ impl MqttState { | |
} | ||
|
||
fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> { | ||
let publish = self | ||
.incoming_pub | ||
.get_mut(pubrel.pkid as usize) | ||
.ok_or(StateError::Unsolicited(pubrel.pkid))?; | ||
|
||
if publish.take().is_none() { | ||
if !self.incoming_pub.contains(pubrel.pkid as usize) { | ||
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); | ||
return Err(StateError::Unsolicited(pubrel.pkid)); | ||
} | ||
|
||
self.incoming_pub.set(pubrel.pkid as usize, false); | ||
let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid)); | ||
let pubcomp = PubComp { pkid: pubrel.pkid }; | ||
self.events.push_back(event); | ||
|
@@ -288,17 +282,12 @@ impl MqttState { | |
} | ||
|
||
fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<Option<Packet>, StateError> { | ||
if self | ||
.outgoing_rel | ||
.get_mut(pubcomp.pkid as usize) | ||
.ok_or(StateError::Unsolicited(pubcomp.pkid))? | ||
.take() | ||
.is_none() | ||
{ | ||
if !self.outgoing_rel.contains(pubcomp.pkid as usize) { | ||
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); | ||
return Err(StateError::Unsolicited(pubcomp.pkid)); | ||
} | ||
|
||
self.outgoing_rel.set(pubcomp.pkid as usize, false); | ||
self.inflight -= 1; | ||
let packet = self.check_collision(pubcomp.pkid).map(|publish| { | ||
let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); | ||
|
@@ -486,7 +475,7 @@ impl MqttState { | |
_ => pubrel, | ||
}; | ||
|
||
self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid); | ||
self.outgoing_rel.insert(pubrel.pkid as usize); | ||
self.inflight += 1; | ||
Ok(pubrel) | ||
} | ||
|
@@ -610,10 +599,8 @@ mod test { | |
mqtt.handle_incoming_publish(&publish2).unwrap(); | ||
mqtt.handle_incoming_publish(&publish3).unwrap(); | ||
|
||
let pkid = mqtt.incoming_pub[3].unwrap(); | ||
|
||
// only qos2 publish should be add to queue | ||
assert_eq!(pkid, 3); | ||
assert!(mqtt.incoming_pub.contains(3)); | ||
} | ||
|
||
#[test] | ||
|
@@ -656,8 +643,7 @@ mod test { | |
mqtt.handle_incoming_publish(&publish2).unwrap(); | ||
mqtt.handle_incoming_publish(&publish3).unwrap(); | ||
|
||
let pkid = mqtt.incoming_pub[3].unwrap(); | ||
assert_eq!(pkid, 3); | ||
assert!(mqtt.incoming_pub.contains(3)); | ||
|
||
assert!(mqtt.events.is_empty()); | ||
} | ||
|
@@ -725,7 +711,7 @@ mod test { | |
assert_eq!(backup.unwrap().pkid, 1); | ||
|
||
// check if the qos2 element's release pkid is 2 | ||
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2); | ||
assert!(mqtt.outgoing_rel.contains(2)); | ||
} | ||
|
||
#[test] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems to be unrelated to the core of this PR