Skip to content

Commit

Permalink
chore: improve logs when buffering messages
Browse files Browse the repository at this point in the history
* Add the epoch of the buffered message to the buffered future message error
* Add some info logs in code paths related to buffering, restoring, and clearing future messages
  • Loading branch information
typfel authored and SimonThormeyer committed Jan 27, 2025
1 parent 0f47782 commit dd75e0d
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 26 deletions.
2 changes: 1 addition & 1 deletion crypto-ffi/src/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl From<RecursiveError> for CoreCryptoError {

match_heterogenous!(innermost => {
core_crypto::LeafError::ConversationAlreadyExists(id) => MlsError::ConversationAlreadyExists(id.clone()).into(),
core_crypto::mls::conversation::Error::BufferedFutureMessage => MlsError::BufferedFutureMessage.into(),
core_crypto::mls::conversation::Error::BufferedFutureMessage{..} => MlsError::BufferedFutureMessage.into(),
core_crypto::mls::conversation::Error::DuplicateMessage => MlsError::DuplicateMessage.into(),
core_crypto::mls::conversation::Error::MessageEpochTooOld => MlsError::MessageEpochTooOld.into(),
core_crypto::mls::conversation::Error::SelfCommitIgnored => MlsError::SelfCommitIgnored.into(),
Expand Down
4 changes: 2 additions & 2 deletions crypto/src/mls/buffer_external_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,12 @@ mod tests {
let decrypt = alice_central.context.decrypt_message(&id, msg1).await;
assert!(matches!(
decrypt.unwrap_err(),
mls::conversation::Error::BufferedFutureMessage
mls::conversation::Error::BufferedFutureMessage { .. }
));
let decrypt = alice_central.context.decrypt_message(&id, msg2).await;
assert!(matches!(
decrypt.unwrap_err(),
mls::conversation::Error::BufferedFutureMessage
mls::conversation::Error::BufferedFutureMessage { .. }
));
assert_eq!(alice_central.context.count_entities().await.pending_messages, 2);

Expand Down
25 changes: 10 additions & 15 deletions crypto/src/mls/conversation/buffer_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
//! Feel free to delete all of this when the issue is fixed on the DS side !
use super::{Error, Result};
use crate::obfuscate::Obfuscated;
use crate::{
context::CentralContext,
group_store::GroupStoreValue,
prelude::{
decrypt::MlsBufferedConversationDecryptMessage, Client, ConversationId, MlsConversation,
MlsConversationDecryptMessage,
},
prelude::{decrypt::MlsBufferedConversationDecryptMessage, Client, ConversationId, MlsConversation},
KeystoreError, RecursiveError,
};
use core_crypto_keystore::{
Expand All @@ -23,11 +21,7 @@ use openmls::prelude::{MlsMessageIn, MlsMessageInBody};
use tls_codec::Deserialize;

impl CentralContext {
pub(crate) async fn handle_future_message(
&self,
id: &ConversationId,
message: impl AsRef<[u8]>,
) -> Result<MlsConversationDecryptMessage> {
pub(crate) async fn handle_future_message(&self, id: &ConversationId, message: impl AsRef<[u8]>) -> Result<()> {
let keystore = self
.keystore()
.await
Expand All @@ -41,7 +35,7 @@ impl CentralContext {
.save::<MlsPendingMessage>(pending_msg)
.await
.map_err(KeystoreError::wrap("saving pending mls message"))?;
Err(Error::BufferedFutureMessage)
Ok(())
}

pub(crate) async fn restore_pending_messages(
Expand Down Expand Up @@ -78,7 +72,6 @@ impl MlsConversation {
is_rejoin: bool,
) -> Result<Option<Vec<MlsBufferedConversationDecryptMessage>>> {
// using the macro produces a clippy warning
info!("restore_pending_messages");
let result = async move {
let keystore = backend.keystore();
let group_id = self.id().as_slice();
Expand Down Expand Up @@ -124,6 +117,8 @@ impl MlsConversation {
// luckily for us that's the exact same order as the [ContentType] enum
pending_messages.sort_by(|(a, _), (b, _)| a.cmp(b));

info!(group_id = Obfuscated::from(&self.id); "Attempting to restore {} buffered messages", pending_messages.len());

let mut decrypted_messages = Vec::with_capacity(pending_messages.len());
for (_, m) in pending_messages {
let parent_conversation = match &self.parent_id {
Expand Down Expand Up @@ -238,10 +233,10 @@ mod tests {
.map(|m| m.to_bytes().unwrap());
for m in messages {
let decrypt = bob_central.context.decrypt_message(&id, m).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage));
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));
}
let decrypt = bob_central.context.decrypt_message(&id, app_msg).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage));
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));

// Bob should have buffered the messages
assert_eq!(bob_central.context.count_entities().await.pending_messages, 4);
Expand Down Expand Up @@ -369,10 +364,10 @@ mod tests {
.map(|m| m.to_bytes().unwrap());
for m in messages {
let decrypt = alice_central.context.decrypt_message(&id, m).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage));
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));
}
let decrypt = alice_central.context.decrypt_message(&id, app_msg).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage));
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));

// Alice should have buffered the messages
assert_eq!(alice_central.context.count_entities().await.pending_messages, 4);
Expand Down
2 changes: 1 addition & 1 deletion crypto/src/mls/conversation/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ mod tests {

// fails when a commit is skipped
let out_of_order = bob_central.context.decrypt_message(&id, &commit2).await;
assert!(matches!(out_of_order.unwrap_err(), Error::BufferedFutureMessage));
assert!(matches!(out_of_order.unwrap_err(), Error::BufferedFutureMessage { .. }));

// works in the right order though
// NB: here 'commit2' has been buffered so it is also applied when we decrypt commit1
Expand Down
18 changes: 12 additions & 6 deletions crypto/src/mls/conversation/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl MlsConversation {
.restore_pending_messages(client, backend, parent_conv, false)
.await?
{
info!(group_id = Obfuscated::from(&self.id); "Clearing all buffered messages for conversation");
backend
.key_store()
.remove::<MlsPendingMessage, _>(self.id())
Expand Down Expand Up @@ -373,7 +374,9 @@ impl MlsConversation {
} else if msg_epoch == group_epoch + 1 {
// limit to next epoch otherwise if we were buffering a commit for epoch + 2
// we would fail when trying to decrypt it in [MlsCentral::commit_accepted]
Error::BufferedFutureMessage
Error::BufferedFutureMessage {
message_epoch: msg_epoch,
}
} else if msg_epoch < group_epoch {
match content_type {
ContentType::Application => Error::StaleMessage,
Expand Down Expand Up @@ -474,10 +477,13 @@ impl CentralContext {
)
.await;

let decrypt_message = match decrypt_message {
Err(Error::BufferedFutureMessage) => self.handle_future_message(id, message).await?,
_ => decrypt_message?,
};
if let Err(Error::BufferedFutureMessage { message_epoch }) = decrypt_message {
self.handle_future_message(id, message).await?;
info!(group_id = Obfuscated::from(id); "Buffered future message from epoch {message_epoch}");
return decrypt_message;
}

let decrypt_message = decrypt_message?;

if !decrypt_message.is_active {
self.wipe_conversation(id).await?;
Expand Down Expand Up @@ -1040,7 +1046,7 @@ mod tests {

// which Bob cannot decrypt because of Post CompromiseSecurity
let decrypt = bob_central.context.decrypt_message(&id, &encrypted).await;
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage));
assert!(matches!(decrypt.unwrap_err(), Error::BufferedFutureMessage { .. }));

let decrypted_commit = bob_central
.context
Expand Down
2 changes: 1 addition & 1 deletion crypto/src/mls/conversation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum Error {
#[error("Incoming message is from a prior epoch")]
StaleMessage,
#[error("Incoming message is for a future epoch. We will buffer it until the commit for that epoch arrives")]
BufferedFutureMessage,
BufferedFutureMessage { message_epoch: u64 },
#[error("Incoming message is from an epoch too far in the future to buffer.")]
UnbufferedFarFutureMessage,
#[error("The received commit is deemed stale and is from an older epoch.")]
Expand Down

0 comments on commit dd75e0d

Please sign in to comment.