Skip to content

Commit

Permalink
Merge pull request #26 from minghuaw/txn-experiment
Browse files Browse the repository at this point in the history
1. Added types defined for transactions.
2. Changed existing types to include transaction types
3. Fixed bugs where Sender or Receiver may double detach while dropping
4. Added variants to `LinkState`
  • Loading branch information
minghuaw authored May 3, 2022
2 parents 07fcf93 + 9de4886 commit 2c006d7
Show file tree
Hide file tree
Showing 50 changed files with 1,682 additions and 1,338 deletions.
2 changes: 1 addition & 1 deletion examples/protocol_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fe2o3-amqp = { version = "0.0.7", path = "../../fe2o3-amqp", features = ["acceptor"] }
fe2o3-amqp = { version = "0.0.8", path = "../../fe2o3-amqp", features = ["acceptor"] }
tokio = { version = "1.11.0", features = ["net", "macros", "rt-multi-thread", "time"] }
tracing = "0.1.31"
tracing-subscriber = "0.3.9"
Expand Down
27 changes: 1 addition & 26 deletions examples/protocol_test/src/bin/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,4 @@ async fn main() {

let _handle = tokio::spawn(connection_main(connection));
}
}

// use tokio::net::TcpListener;
// use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};

// #[tokio::main]
// async fn main() {
// let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
// let connection_acceptor = ConnectionAcceptor::new("example-listener");

// while let Ok((stream, addr)) = tcp_listener.accept().await {
// let mut connection = connection_acceptor.accept(stream).await.unwrap();
// let handle = tokio::spawn(async move {
// let session_acceptor = SessionAcceptor::new();
// while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
// let handle = tokio::spawn(async move {
// let link_acceptor = LinkAcceptor::new();
// match link_acceptor.accept(&mut session).await.unwrap() {
// LinkEndpoint::Sender(sender) => { },
// LinkEndpoint::Receiver(recver) => { },
// }
// });
// }
// });
// }
// }
}
21 changes: 20 additions & 1 deletion fe2o3-amqp-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fe2o3-amqp-types"
version = "0.0.8"
version = "0.0.12"
edition = "2021"
description = "Implementation of AMQP1.0 data types"
license = "MIT/Apache-2.0"
Expand All @@ -12,6 +12,25 @@ readme = "Readme.md"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[features]

default = [
"primitive",
"transport",
"messaging",
"security",
]

primitive = []
transport = ["primitive"]
messaging = ["primitive", "transport"]
transaction = ["primitive", "messaging"]
security = ["primitive"]

[dependencies]
# serde_amqp = { version = "0.0.1", path = "../serde_amqp", features = ["derive"] }
serde_amqp = { version = "^0.0.6", features = ["derive"] }
Expand Down
4 changes: 4 additions & 0 deletions fe2o3-amqp-types/Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## 0.0.9 ~ 0.0.12

1. Added types defined in transactions

## 0.0.7

1. Updated dependency `serde_amqp` to `"^0.0.5"`.
Expand Down
46 changes: 45 additions & 1 deletion fe2o3-amqp-types/src/definitions/error_cond.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,35 @@ use serde::{de, ser};

use serde_amqp::primitives::Symbol;

#[cfg(feature = "transaction")]
use crate::transaction::TransactionError;

use super::{AmqpError, ConnectionError, LinkError, SessionError};

/// Archetype error-condition
#[derive(Debug, Clone, PartialEq)]
#[allow(missing_docs)]
pub enum ErrorCondition {
/// 2.8.15 AMQP Error
AmqpError(AmqpError),

/// 2.8.16 Connection Error
ConnectionError(ConnectionError),

/// 2.8.17 Session Error
SessionError(SessionError),

/// 2.8.18 Link Error
LinkError(LinkError),

/// Customized error condition (**NOT** recommended)
///
/// This variant may get removed in the future
Custom(Symbol),

/// 4.5.8 Transaction Error
#[cfg_attr(docsrs, doc(cfg(feature = "transaction")))]
#[cfg(feature = "transaction")]
TransactionError(TransactionError),
}

impl ser::Serialize for ErrorCondition {
Expand All @@ -27,6 +46,9 @@ impl ser::Serialize for ErrorCondition {
Self::SessionError(err) => err.serialize(serializer),
Self::LinkError(err) => err.serialize(serializer),
Self::Custom(err) => err.serialize(serializer),

#[cfg(feature = "transaction")]
Self::TransactionError(err) => err.serialize(serializer),
}
}
}
Expand Down Expand Up @@ -94,6 +116,11 @@ impl<'de> de::Deserialize<'de> for ErrorCondition {
Ok(val) => return Ok(ErrorCondition::LinkError(val)),
Err(e) => e,
};
#[cfg(feature = "transaction")]
let v = match TransactionError::try_from(v) {
Ok(val) => return Ok(ErrorCondition::TransactionError(val)),
Err(e) => e,
};
Ok(ErrorCondition::Custom(Symbol::from(v)))
}
}
Expand All @@ -116,4 +143,21 @@ mod tests {
let deserialized: ErrorCondition = from_slice(&buf).unwrap();
assert_eq!(expected, deserialized);
}

#[cfg(feature = "transaction")]
#[test]
fn test_transaction_error_condition() {
use serde_amqp::to_vec;

use crate::transaction::TransactionError;

let err = TransactionError::Timeout;
let buf = to_vec(&err).unwrap();
let err: ErrorCondition = from_slice(&buf).unwrap();

match err {
ErrorCondition::TransactionError(_) => {}
_ => panic!(),
}
}
}
10 changes: 5 additions & 5 deletions fe2o3-amqp-types/src/definitions/link_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ impl From<LinkError> for ErrorCondition {
impl From<&LinkError> for Symbol {
fn from(value: &LinkError) -> Self {
let val = match value {
&LinkError::DetachForced => "amqp:link:detach-forced",
&LinkError::TransferLimitExceeded => "amqp:link:transfer-limit-exceeded",
&LinkError::MessageSizeExceeded => "amqp:link:message-size-exceeded",
&LinkError::Redirect => "amqp:link:redirect",
&LinkError::Stolen => "amqp:link:stolen",
LinkError::DetachForced => "amqp:link:detach-forced",
LinkError::TransferLimitExceeded => "amqp:link:transfer-limit-exceeded",
LinkError::MessageSizeExceeded => "amqp:link:message-size-exceeded",
LinkError::Redirect => "amqp:link:redirect",
LinkError::Stolen => "amqp:link:stolen",
};
Symbol::from(val)
}
Expand Down
8 changes: 4 additions & 4 deletions fe2o3-amqp-types/src/definitions/session_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ impl From<SessionError> for ErrorCondition {
impl From<&SessionError> for Symbol {
fn from(value: &SessionError) -> Self {
let val = match value {
&SessionError::WindowViolation => "amqp:session:window-violation",
&SessionError::ErrantLink => "amqp:session:errant-link",
&SessionError::HandleInUse => "amqp:session:handle-in-use",
&SessionError::UnattachedHandle => "amqp:session:unattached-handle",
SessionError::WindowViolation => "amqp:session:window-violation",
SessionError::ErrantLink => "amqp:session:errant-link",
SessionError::HandleInUse => "amqp:session:handle-in-use",
SessionError::UnattachedHandle => "amqp:session:unattached-handle",
};
Symbol::from(val)
}
Expand Down
107 changes: 105 additions & 2 deletions fe2o3-amqp-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,113 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs, missing_debug_implementations)]

//! Implements AMQP1.0 data types as defined in the [specification](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html).
//! Implements AMQP1.0 data types as defined in the core [specification](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html).
//!
//! # Feature flags
//!
//! Please note that `Performative` will require both `"transport"` and `"messaging"` feature flags
//! enabled.
//!
//! - `"primitive"`: enables the primitive types defined in part 1.6 in the core specification.
//! - `"transport"`: enables most of the types defined in part 2.4, 2.5, and 2.8 of the core specifiction.
//! - `"messaging"`: enables the types defined in part 2.7 and part 3 defined in the core specification
//! - `"transaction"`: enables the types defined in part 4.5 of the core specification
//! - `"security"`: enables the types defined in part 5 of the core specifiction.
//!
//! ```toml
//! default = [
//! "primitive",
//! "transport",
//! "messaging",
//! "security",
//! ]
//! ```
// List of archetypes:
//
// 1. "frame"
// 2. "error-condition"
// 3. "section"
// 4. "message-id"
// 5. "address"
// 6. "delivery-state"
// 7. "outcome"
// 8. "source"
// 9. "target"
// 10. "distribution-mode"
// 11. "lifetime-policy"
// 12. "txn-id"
// 13. "txn-capability"
// 14. "sasl-frame"
// 15.
//
// List of restricted types:
//
// 1. "role"
// 2. "sender-settle-mode"
// 3. "receiver-settle-mode"
// 4. "handle"
// 5. "seconds"
// 6. "milliseconds"
// 7. "delivery-tag"
// 8. "delivery-number"
// 9. "transfer-number"
// 10. "sequence-no"
// 11. "message-format"
// 12. "ietf-language-tag"
// 13. "fields"
// 14. "amqp-error"
// 15. "connection-error"
// 16. "session-error"
// 17. "link-error"
// 18. "delivery-annotations"
// 19. "message-annotations"
// 20. "application-properties"
// 21. "data"
// 22. "amqp-sequence"
// 23. "amqp-value"
// 24. "footer"
// 25. "annotations"
// 26. "message-id-ulong"
// 27. "message-id-uuid"
// 28. "message-id-binary"
// 31. "message-id-string"
// 32. "address-string"
// 33. "terminus-durability"
// 34. "terminus-expiry-policy"
// 35. "std-dist-mode"
// 36. "filter-set"
// 37. "node-properties"
// 38. "transaction-id"
// 39. "txn-capability"
// 40. "transaction-error"
// 41. "sasl-code"
//

#[cfg_attr(docsrs, doc(cfg(feature = "primitive")))]
#[cfg(feature = "primitive")]
pub mod primitives;

#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(feature = "transport")]
pub mod definitions;

#[cfg_attr(docsrs, doc(cfg(feature = "messaging")))]
#[cfg(feature = "messaging")]
pub mod messaging;

#[cfg_attr(docsrs, doc(cfg(all(feature = "transport", feature = "messaging"))))]
#[cfg(all(feature = "transport", feature = "messaging"))]
pub mod performatives;
pub mod primitives;

#[cfg_attr(docsrs, doc(cfg(feature = "security")))]
#[cfg(feature = "security")]
pub mod sasl;

#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(feature = "transport")]
pub mod states;

#[cfg_attr(docsrs, doc(cfg(feature = "transaction")))]
#[cfg(feature = "transaction")]
pub mod transaction;
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ impl ser::Serialize for DeliveryState {
DeliveryState::Released(value) => value.serialize(serializer),
DeliveryState::Modified(value) => value.serialize(serializer),
DeliveryState::Received(value) => value.serialize(serializer),

#[cfg(feature = "transaction")]
DeliveryState::Declared(value) => value.serialize(serializer),

#[cfg(feature = "transaction")]
DeliveryState::TransactionalState(value) => value.serialize(serializer),
}
}
}
Expand All @@ -26,6 +32,12 @@ enum Field {
Released,
Modified,
Received,

#[cfg(feature = "transaction")]
Declared,

#[cfg(feature = "transaction")]
TransactionalState,
}

struct FieldVisitor {}
Expand All @@ -47,6 +59,13 @@ impl<'de> de::Visitor<'de> for FieldVisitor {
"amqp:released:list" => Field::Released,
"amqp:modified:list" => Field::Modified,
"amqp:received:list" => Field::Received,

#[cfg(feature = "transaction")]
"amqp:declared:list" => Field::Declared,

#[cfg(feature = "transaction")]
"amqp:transactional-state:list" => Field::TransactionalState,

_ => return Err(de::Error::custom("Wrong symbol value for descriptor")),
};

Expand All @@ -61,8 +80,15 @@ impl<'de> de::Visitor<'de> for FieldVisitor {
0x0000_0000_0000_0023 => Field::Received,
0x0000_0000_0000_0024 => Field::Accepted,
0x0000_0000_0000_0025 => Field::Rejected,
0x000_0000_0000_0026 => Field::Released,
0x0000_0000_0000_0026 => Field::Released,
0x0000_0000_0000_0027 => Field::Modified,

#[cfg(feature = "transaction")]
0x0000_0000_0000_0033 => Field::Declared,

#[cfg(feature = "transaction")]
0x0000_0000_0000_0034 => Field::TransactionalState,

_ => {
return Err(de::Error::custom(format!(
"Wrong code value for descriptor, found {:#x?}",
Expand Down Expand Up @@ -119,6 +145,18 @@ impl<'de> de::Visitor<'de> for Visitor {
let value = variant.newtype_variant()?;
Ok(DeliveryState::Received(value))
}

#[cfg(feature = "transaction")]
Field::Declared => {
let value = variant.newtype_variant()?;
Ok(DeliveryState::Declared(value))
}

#[cfg(feature = "transaction")]
Field::TransactionalState => {
let value = variant.newtype_variant()?;
Ok(DeliveryState::TransactionalState(value))
}
}
}
}
Expand All @@ -128,7 +166,7 @@ impl<'de> de::Deserialize<'de> for DeliveryState {
where
D: serde::Deserializer<'de>,
{
const VARIANTS: &'static [&'static str] = &[
const VARIANTS: &[&str] = &[
"amqp:accepted:list",
"amqp:rejected:list",
"amqp:released:list",
Expand Down
Loading

0 comments on commit 2c006d7

Please sign in to comment.