Skip to content

Commit

Permalink
fix: make handle_unilateral non-async
Browse files Browse the repository at this point in the history
The check for `unsolicited.is_full()`
at the beginning of `handle_unilateral`
is not sufficient if the function is called
from multiple threads parallel.
This normally should not happen,
but not guaranteed.

Instead of checking if the channel is full in advance,
use `tr_send` and ignore the error
if the channel happens to be full
when we try to send into it.

We also ignore the error when the channel
is closed instead of panic
because the library should never panic.
  • Loading branch information
link2xt committed Jan 28, 2025
1 parent ddbf1e9 commit 2c2309c
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,7 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
}

if let Some(unsolicited) = unsolicited.clone() {
handle_unilateral(response, unsolicited).await;
handle_unilateral(response, unsolicited);
}

if let Some(res) = self.stream.next().await {
Expand Down
2 changes: 1 addition & 1 deletion src/extensions/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) async fn parse_id<T: Stream<Item = io::Result<ResponseData>> + Unpin>
})
}
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
// continuation, wait for it
}
Response::Done { .. } => {
handle_unilateral(resp, sender.clone()).await;
handle_unilateral(resp, sender.clone());
}
_ => return Ok(IdleResponse::NewData(resp)),
}
Expand Down Expand Up @@ -203,10 +203,10 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
.into());
}
}
handle_unilateral(res, self.session.unsolicited_responses_tx.clone()).await;
handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
}
_ => {
handle_unilateral(res, self.session.unsolicited_responses_tx.clone()).await;
handle_unilateral(res, self.session.unsolicited_responses_tx.clone());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub(crate) async fn parse_get_quota<T: Stream<Item = io::Result<ResponseData>> +
match resp.parsed() {
Response::Quota(q) => quota = Some(q.clone().into()),
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand Down Expand Up @@ -65,7 +65,7 @@ pub(crate) async fn parse_get_quota_root<T: Stream<Item = io::Result<ResponseDat
quotas.push(q.clone().into());
}
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand Down
52 changes: 17 additions & 35 deletions src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) fn parse_names<T: Stream<Item = io::Result<ResponseData>> + Unpin + S
Some(Ok(name))
}
_ => {
handle_unilateral(resp, unsolicited).await;
handle_unilateral(resp, unsolicited);
None
}
},
Expand Down Expand Up @@ -79,7 +79,7 @@ pub(crate) fn parse_fetches<T: Stream<Item = io::Result<ResponseData>> + Unpin +
Ok(resp) => match resp.parsed() {
Response::Fetch(..) => Some(Ok(Fetch::new(resp))),
_ => {
handle_unilateral(resp, unsolicited).await;
handle_unilateral(resp, unsolicited);
None
}
},
Expand Down Expand Up @@ -157,7 +157,7 @@ pub(crate) async fn parse_status<T: Stream<Item = io::Result<ResponseData>> + Un
}
}
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand All @@ -182,7 +182,7 @@ pub(crate) fn parse_expunge<T: Stream<Item = io::Result<ResponseData>> + Unpin +
Ok(resp) => match resp.parsed() {
Response::Expunge(id) => Some(Ok(*id)),
_ => {
handle_unilateral(resp, unsolicited).await;
handle_unilateral(resp, unsolicited);
None
}
},
Expand Down Expand Up @@ -213,7 +213,7 @@ pub(crate) async fn parse_capabilities<T: Stream<Item = io::Result<ResponseData>
}
}
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand All @@ -232,7 +232,7 @@ pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpi
.await
{
let resp = resp?;
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}

Ok(())
Expand Down Expand Up @@ -338,7 +338,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
}
}
Response::MailboxData(m) => match m {
MailboxDatum::Status { .. } => handle_unilateral(resp, unsolicited.clone()).await,
MailboxDatum::Status { .. } => handle_unilateral(resp, unsolicited.clone()),
MailboxDatum::Exists(e) => {
mailbox.exists = *e;
}
Expand All @@ -358,7 +358,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
_ => {}
},
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand Down Expand Up @@ -386,7 +386,7 @@ pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin
}
}
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand Down Expand Up @@ -421,7 +421,7 @@ pub(crate) async fn parse_metadata<T: Stream<Item = io::Result<ResponseData>> +
// [Unsolicited METADATA Response without Values](https://datatracker.ietf.org/doc/html/rfc5464.html#section-4.4.2),
// they go to unsolicited channel with other unsolicited responses.
_ => {
handle_unilateral(resp, unsolicited.clone()).await;
handle_unilateral(resp, unsolicited.clone());
}
}
}
Expand All @@ -430,48 +430,30 @@ pub(crate) async fn parse_metadata<T: Stream<Item = io::Result<ResponseData>> +

// check if this is simply a unilateral server response
// (see Section 7 of RFC 3501):
pub(crate) async fn handle_unilateral(
pub(crate) fn handle_unilateral(
res: ResponseData,
unsolicited: channel::Sender<UnsolicitedResponse>,
) {
// ignore these if they are not being consumed
if unsolicited.is_full() {
return;
}

match res.parsed() {
Response::MailboxData(MailboxDatum::Status { mailbox, status }) => {
unsolicited
.send(UnsolicitedResponse::Status {
.try_send(UnsolicitedResponse::Status {
mailbox: (mailbox.as_ref()).into(),
attributes: status.to_vec(),
})
.await
.expect("Channel closed unexpectedly");
.ok();
}
Response::MailboxData(MailboxDatum::Recent(n)) => {
unsolicited
.send(UnsolicitedResponse::Recent(*n))
.await
.expect("Channel closed unexpectedly");
unsolicited.try_send(UnsolicitedResponse::Recent(*n)).ok();
}
Response::MailboxData(MailboxDatum::Exists(n)) => {
unsolicited
.send(UnsolicitedResponse::Exists(*n))
.await
.expect("Channel closed unexpectedly");
unsolicited.try_send(UnsolicitedResponse::Exists(*n)).ok();
}
Response::Expunge(n) => {
unsolicited
.send(UnsolicitedResponse::Expunge(*n))
.await
.expect("Channel closed unexpectedly");
unsolicited.try_send(UnsolicitedResponse::Expunge(*n)).ok();
}
_ => {
unsolicited
.send(UnsolicitedResponse::Other(res))
.await
.expect("Channel closed unexpectedly");
unsolicited.try_send(UnsolicitedResponse::Other(res)).ok();
}
}
}
Expand Down

0 comments on commit 2c2309c

Please sign in to comment.