From 100b04034cac789e1ff7b3ae72d6116f108ddbef Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Mon, 11 Apr 2022 10:03:55 +0200 Subject: [PATCH] AMQP 1.0 message format (#120) * AMQP 1.0 --- .gitignore | 3 +- Cargo.toml | 1 + examples/complex_message.rs | 86 +++ protocol/Cargo.toml | 14 +- protocol/src/codec/decoder.rs | 32 +- protocol/src/commands/deliver.rs | 6 +- protocol/src/error.rs | 12 +- protocol/src/lib.rs | 2 + protocol/src/message.rs | 100 --- protocol/src/message/amqp/body.rs | 89 +++ protocol/src/message/amqp/codec/constants.rs | 88 +++ protocol/src/message/amqp/codec/decoder.rs | 1 + protocol/src/message/amqp/codec/mod.rs | 27 + protocol/src/message/amqp/definitions.rs | 0 protocol/src/message/amqp/error.rs | 43 ++ .../src/message/amqp/fixtures/empty_message | Bin 0 -> 5 bytes .../amqp/fixtures/header_amqpvalue_message | Bin 0 -> 47 bytes .../message/amqp/fixtures/message_body_250 | Bin 0 -> 255 bytes .../message/amqp/fixtures/message_body_700 | Bin 0 -> 708 bytes .../message_random_application_properties_300 | Bin 0 -> 1532 bytes .../message_random_application_properties_500 | Bin 0 -> 3313 bytes ...ndom_application_properties_properties_900 | Bin 0 -> 3009 bytes .../amqp/fixtures/static_test_message_compare | Bin 0 -> 151 bytes protocol/src/message/amqp/header.rs | 115 +++ protocol/src/message/amqp/message.rs | 509 +++++++++++++ protocol/src/message/amqp/mod.rs | 62 ++ protocol/src/message/amqp/properties.rs | 140 ++++ protocol/src/message/amqp/section.rs | 62 ++ .../src/message/amqp/types/annotations.rs | 85 +++ .../src/message/amqp/types/definitions.rs | 19 + protocol/src/message/amqp/types/descriptor.rs | 53 ++ protocol/src/message/amqp/types/mod.rs | 78 ++ .../src/message/amqp/types/primitives/list.rs | 199 +++++ .../src/message/amqp/types/primitives/map.rs | 186 +++++ .../src/message/amqp/types/primitives/mod.rs | 9 + .../message/amqp/types/primitives/simple.rs | 707 ++++++++++++++++++ .../message/amqp/types/primitives/value.rs | 235 ++++++ protocol/src/message/amqp/types/symbol.rs | 91 +++ protocol/src/message/builder.rs | 176 +++++ protocol/src/message/mod.rs | 145 ++++ protocol/src/message/types.rs | 0 protocol/src/utils.rs | 25 + src/consumer.rs | 1 + src/lib.rs | 5 + tests/integration/producer_test.rs | 48 ++ 45 files changed, 3331 insertions(+), 123 deletions(-) create mode 100644 examples/complex_message.rs delete mode 100644 protocol/src/message.rs create mode 100644 protocol/src/message/amqp/body.rs create mode 100644 protocol/src/message/amqp/codec/constants.rs create mode 100644 protocol/src/message/amqp/codec/decoder.rs create mode 100644 protocol/src/message/amqp/codec/mod.rs create mode 100644 protocol/src/message/amqp/definitions.rs create mode 100644 protocol/src/message/amqp/error.rs create mode 100644 protocol/src/message/amqp/fixtures/empty_message create mode 100644 protocol/src/message/amqp/fixtures/header_amqpvalue_message create mode 100644 protocol/src/message/amqp/fixtures/message_body_250 create mode 100644 protocol/src/message/amqp/fixtures/message_body_700 create mode 100644 protocol/src/message/amqp/fixtures/message_random_application_properties_300 create mode 100644 protocol/src/message/amqp/fixtures/message_random_application_properties_500 create mode 100644 protocol/src/message/amqp/fixtures/message_random_application_properties_properties_900 create mode 100644 protocol/src/message/amqp/fixtures/static_test_message_compare create mode 100644 protocol/src/message/amqp/header.rs create mode 100644 protocol/src/message/amqp/message.rs create mode 100644 protocol/src/message/amqp/mod.rs create mode 100644 protocol/src/message/amqp/properties.rs create mode 100644 protocol/src/message/amqp/section.rs create mode 100644 protocol/src/message/amqp/types/annotations.rs create mode 100644 protocol/src/message/amqp/types/definitions.rs create mode 100644 protocol/src/message/amqp/types/descriptor.rs create mode 100644 protocol/src/message/amqp/types/mod.rs create mode 100644 protocol/src/message/amqp/types/primitives/list.rs create mode 100644 protocol/src/message/amqp/types/primitives/map.rs create mode 100644 protocol/src/message/amqp/types/primitives/mod.rs create mode 100644 protocol/src/message/amqp/types/primitives/simple.rs create mode 100644 protocol/src/message/amqp/types/primitives/value.rs create mode 100644 protocol/src/message/amqp/types/symbol.rs create mode 100644 protocol/src/message/builder.rs create mode 100644 protocol/src/message/mod.rs create mode 100644 protocol/src/message/types.rs create mode 100644 protocol/src/utils.rs diff --git a/.gitignore b/.gitignore index 408b8a5..30fc9a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target Cargo.lock -.idea \ No newline at end of file +.idea +.DS_Store diff --git a/Cargo.toml b/Cargo.toml index 773e62a..ab32c32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,4 @@ dashmap = "4.0.2" [dev-dependencies] tracing-subscriber = "0.3.1" fake = { version = "2.4", features=['derive']} +chrono = "0.4.19" diff --git a/examples/complex_message.rs b/examples/complex_message.rs new file mode 100644 index 0000000..2a840d7 --- /dev/null +++ b/examples/complex_message.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use chrono::Utc; +use futures::StreamExt; +use rabbitmq_stream_client::{ + types::{ByteCapacity, Message, OffsetSpecification}, + Environment, +}; +use tokio::sync::Barrier; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let environment = Environment::builder() + .host("localhost") + .port(5552) + .build() + .await?; + + let stream_name = "complex_message"; + let _ = environment.delete_stream(stream_name).await; + let message_count = 1; + environment + .stream_creator() + .max_length(ByteCapacity::GB(2)) + .create(stream_name) + .await?; + + let producer = environment.producer().build(stream_name).await?; + + let barrier = Arc::new(Barrier::new(message_count + 1)); + for i in 0..message_count { + let producer_cloned = producer.clone(); + + let barrier_cloned = barrier.clone(); + tokio::task::spawn(async move { + let message = Message::builder() + .message_annotations() + .insert("test", i as i32) + .message_builder() + .application_properties() + .insert("test", i as i32) + .message_builder() + .properties() + .content_encoding("application/json") + .absolute_expiry_time(Utc::now()) + .message_builder() + .body(format!("message{}", i)) + .build(); + producer_cloned.send_with_confirm(message).await.unwrap(); + + barrier_cloned.wait().await; + }); + } + + barrier.wait().await; + + producer.close().await?; + + let mut consumer = environment + .consumer() + .offset(OffsetSpecification::First) + .build(stream_name) + .await + .unwrap(); + + for _ in 0..message_count { + let delivery = consumer.next().await.unwrap()?; + info!( + "Got message: {:#?} with offset: {}", + delivery.message(), + delivery.offset(), + ); + } + + consumer.handle().close().await.unwrap(); + + environment.delete_stream(stream_name).await?; + Ok(()) +} diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 83f4a68..797c458 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -6,12 +6,18 @@ edition = "2018" license = "Apache-2.0 OR MPL-2.0" +[features] +wasm-bindgen = ["uuid/wasm-bindgen"] + [dependencies] byteorder = "1" -ntex-amqp-codec = "= 0.7.2" -ntex-bytes = "0.1" - +ordered-float = "2.10.0" +uuid = "0.8.2" +chrono = "0.4.19" +num_enum = "0.5.7" +derive_more = "0.99" [dev-dependencies] -fake = { version = "2.4", features=['derive']} +fake = { version = "2.4", features=['derive', 'chrono','uuid']} rand = "0.8" +pretty_assertions = "1.2.0" diff --git a/protocol/src/codec/decoder.rs b/protocol/src/codec/decoder.rs index 53e98f1..64bb905 100644 --- a/protocol/src/codec/decoder.rs +++ b/protocol/src/codec/decoder.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use byteorder::ByteOrder; +use crate::error::IncompleteError; use crate::message::Message; use crate::types::PublishedMessage; use crate::types::PublishingError; @@ -12,42 +13,42 @@ use super::Decoder; impl Decoder for i8 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_i8(input) + read_i8(input).map(Ok)? } } impl Decoder for i32 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_i32(input) + read_i32(input).map(Ok)? } } impl Decoder for u8 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u8(input) + read_u8(input).map(Ok)? } } impl Decoder for u16 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u16(input) + read_u16(input).map(Ok)? } } impl Decoder for u32 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u32(input) + read_u32(input).map(Ok)? } } impl Decoder for u64 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u64(input) + read_u64(input).map(Ok)? } } impl Decoder for i64 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_i64(input) + read_i64(input).map(Ok)? } } @@ -199,9 +200,9 @@ impl Decoder for Vec { } } -pub fn check_len(input: &[u8], size: usize) -> Result<(), DecodeError> { +pub fn check_len(input: &[u8], size: usize) -> Result<(), IncompleteError> { if input.len() < size { - return Err(DecodeError::Incomplete(size)); + return Err(IncompleteError(size)); } Ok(()) } @@ -209,7 +210,7 @@ pub fn check_len(input: &[u8], size: usize) -> Result<(), DecodeError> { macro_rules! reader { ( $fn:ident, $size:expr, $ret:ty) => { #[allow(unused)] - pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), crate::error::DecodeError> { + pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), IncompleteError> { check_len(input, $size)?; let x = byteorder::BigEndian::$fn(input); Ok((&input[$size..], x)) @@ -217,19 +218,26 @@ macro_rules! reader { }; } -pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), DecodeError> { +pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), IncompleteError> { check_len(input, 1)?; Ok((&input[1..], input[0])) } -pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), DecodeError> { +pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), IncompleteError> { check_len(input, 1)?; Ok((&input[1..], input[0] as i8)) } +pub fn read_exact(input: &[u8], len: usize) -> Result<(&[u8], &[u8]), IncompleteError> { + check_len(input, len)?; + Ok((&input[len..], &input[..len])) +} + reader!(read_i16, 2, i16); reader!(read_u16, 2, u16); reader!(read_u32, 4, u32); reader!(read_i32, 4, i32); reader!(read_u64, 8, u64); reader!(read_i64, 8, i64); +reader!(read_f32, 4, f32); +reader!(read_f64, 8, f64); diff --git a/protocol/src/commands/deliver.rs b/protocol/src/commands/deliver.rs index 6b5a357..c4a5e77 100644 --- a/protocol/src/commands/deliver.rs +++ b/protocol/src/commands/deliver.rs @@ -160,15 +160,11 @@ mod tests { use fake::{Dummy, Faker}; use crate::{commands::tests::command_encode_decode_test, message::InternalMessage}; - use ntex_amqp_codec::Message as AmpqMessage; use super::{DeliverCommand, Message}; impl Dummy for Message { fn dummy_with_rng(_config: &Faker, _rng: &mut R) -> Self { - Message::new(InternalMessage { - message: AmpqMessage::default(), - publishing_id: None, - }) + Message::new(InternalMessage::default()) } } diff --git a/protocol/src/error.rs b/protocol/src/error.rs index 8351816..bc10ed2 100644 --- a/protocol/src/error.rs +++ b/protocol/src/error.rs @@ -2,12 +2,13 @@ use std::string::FromUtf8Error; #[derive(Debug)] pub enum DecodeError { - Incomplete(usize), + Incomplete(IncompleteError), Utf8Error(FromUtf8Error), UnknownResponseCode(u16), UnsupportedResponseType(u16), MismatchSize(usize), MessageParse(String), + InvalidFormatCode(u8), Empty, } @@ -28,3 +29,12 @@ impl From for DecodeError { DecodeError::Utf8Error(err) } } + +impl From for DecodeError { + fn from(err: IncompleteError) -> Self { + DecodeError::Incomplete(err) + } +} + +#[derive(Debug)] +pub struct IncompleteError(pub usize); diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index f98ed25..f806431 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -8,3 +8,5 @@ mod response; pub mod types; pub use request::*; pub use response::*; + +pub mod utils; diff --git a/protocol/src/message.rs b/protocol/src/message.rs deleted file mode 100644 index f32e929..0000000 --- a/protocol/src/message.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::sync::Arc; - -use ntex_amqp_codec::{Encode, Message as AmpqMessage}; -use ntex_bytes::BytesMut; - -use crate::{ - codec::{Decoder, Encoder}, - error::DecodeError, -}; - -#[derive(Debug, PartialEq, Clone)] -pub struct Message(Arc); - -#[derive(Debug, PartialEq)] -pub struct InternalMessage { - pub(crate) publishing_id: Option, - pub(crate) message: AmpqMessage, -} - -unsafe impl Send for Message {} -unsafe impl Sync for Message {} - -impl Encoder for Message { - fn encoded_size(&self) -> u32 { - self.0.message.encoded_size() as u32 - } - - fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> { - let mut buf = BytesMut::with_capacity(self.encoded_size() as usize); - - ntex_amqp_codec::Encode::encode(&self.0.message, &mut buf); - - writer.write_all(&buf)?; - - Ok(()) - } -} - -impl Message { - #[cfg(test)] - pub(crate) fn new(internal: InternalMessage) -> Message { - Message(Arc::new(internal)) - } - pub fn builder() -> MessageBuilder { - MessageBuilder(InternalMessage { - message: AmpqMessage::default(), - publishing_id: None, - }) - } - - pub fn data(&self) -> Option<&[u8]> { - self.0.message.body().data().map(|data| data.as_ref()) - } - - /// Get a reference to the message's publishing id. - pub fn publishing_id(&self) -> Option<&u64> { - self.0.publishing_id.as_ref() - } -} - -impl Decoder for Message { - fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - ntex_amqp_codec::Decode::decode(input) - .map_err(|err| DecodeError::MessageParse(err.to_string())) - .map(|message| { - ( - message.0, - Message(Arc::new(InternalMessage { - publishing_id: None, - message: message.1, - })), - ) - }) - } -} - -pub struct MessageBuilder(InternalMessage); - -impl MessageBuilder { - pub fn body(mut self, data: impl Into>) -> Self { - self.0 - .message - .set_body(|body| body.set_data(data.into().into())); - self - } - - pub fn publising_id(mut self, publishing_id: u64) -> Self { - self.0.publishing_id = Some(publishing_id); - self - } - pub fn build(self) -> Message { - Message(Arc::new(self.0)) - } -} - -impl From for Vec { - fn from(message: Message) -> Self { - vec![message] - } -} diff --git a/protocol/src/message/amqp/body.rs b/protocol/src/message/amqp/body.rs new file mode 100644 index 0000000..0694ea5 --- /dev/null +++ b/protocol/src/message/amqp/body.rs @@ -0,0 +1,89 @@ +use crate::message::amqp::codec::AmqpEncoder; + +use super::{ + codec::constants::{MESSAGE_BODY_DATA, MESSAGE_BODY_SEQUENCE, MESSAGE_BODY_VALUE}, + types::{List, Value}, + AmqpEncodeError, +}; + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct MessageBody { + pub data: Vec>, + pub sequence: Vec, + pub value: Option, +} + +impl MessageBody { + pub fn data(&self) -> Option<&Vec> { + self.data.get(0) + } + + pub fn value(&self) -> Option<&Value> { + self.value.as_ref() + } + + #[cfg(test)] + pub fn set_value(&mut self, value: impl Into) -> &mut Self { + self.value = Some(value.into()); + self + } + + pub fn set_data(&mut self, data: impl Into>) -> &mut Self { + self.data.clear(); + self.data.push(data.into()); + self + } +} + +impl AmqpEncoder for MessageBody { + fn encoded_size(&self) -> u32 { + let mut size = self.data.iter().fold(0, |a, d| { + a + d.encoded_size() + MESSAGE_BODY_DATA.encoded_size() + }); + size += self.sequence.iter().fold(0, |a, seq| { + a + seq.encoded_size() + MESSAGE_BODY_SEQUENCE.encoded_size() + }); + + if let Some(ref val) = self.value { + size + val.encoded_size() + MESSAGE_BODY_VALUE.encoded_size() + } else { + size + } + } + + fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> { + for data in &self.data { + MESSAGE_BODY_DATA.encode(writer)?; + data.encode(writer)?; + } + + for sequence in &self.sequence { + MESSAGE_BODY_SEQUENCE.encode(writer)?; + sequence.encode(writer)?; + } + if let Some(ref val) = self.value { + MESSAGE_BODY_VALUE.encode(writer)?; + val.encode(writer)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use fake::{Dummy, Fake, Faker}; + + use crate::message::amqp::Value; + + use super::MessageBody; + impl Dummy for MessageBody { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + MessageBody { + data: config.fake_with_rng(rng), + sequence: config.fake_with_rng(rng), + value: Some(Value::Simple(config.fake_with_rng(rng))), + } + } + } +} diff --git a/protocol/src/message/amqp/codec/constants.rs b/protocol/src/message/amqp/codec/constants.rs new file mode 100644 index 0000000..6dd4b8b --- /dev/null +++ b/protocol/src/message/amqp/codec/constants.rs @@ -0,0 +1,88 @@ +use std::convert::TryInto; + +use crate::{ + codec::decoder::read_u8, + message::amqp::{ + error::{AmqpDecodeError, AmqpEncodeError}, + types::Descriptor, + }, +}; + +pub const MESSAGE_HEADER: Descriptor = Descriptor::Ulong(112); +pub const MESSAGE_DELIVERY_ANNOTATIONS: Descriptor = Descriptor::Ulong(113); +pub const MESSAGE_ANNOTATIONS: Descriptor = Descriptor::Ulong(114); +pub const MESSAGE_PROPERTIES: Descriptor = Descriptor::Ulong(115); +pub const MESSAGE_APPLICATION_PROPERTIES: Descriptor = Descriptor::Ulong(116); +pub const MESSAGE_BODY_DATA: Descriptor = Descriptor::Ulong(117); +pub const MESSAGE_BODY_SEQUENCE: Descriptor = Descriptor::Ulong(118); +pub const MESSAGE_BODY_VALUE: Descriptor = Descriptor::Ulong(119); +pub const MESSAGE_FOOTER: Descriptor = Descriptor::Ulong(120); + +use byteorder::WriteBytesExt; +use num_enum::{IntoPrimitive, TryFromPrimitive}; + +use super::{AmqpDecoder, AmqpEncoder}; + +#[derive(Debug, TryFromPrimitive, Clone, Copy, IntoPrimitive)] +#[repr(u8)] +pub enum TypeCode { + Described = 0x00, + Null = 0x40, + Boolean = 0x56, + BooleanTrue = 0x41, + BooleanFalse = 0x42, + UInt0 = 0x43, + ULong0 = 0x44, + UByte = 0x50, + UShort = 0x60, + UInt = 0x70, + ULong = 0x80, + Byte = 0x51, + Short = 0x61, + Int = 0x71, + Long = 0x81, + UIntSmall = 0x52, + ULongSmall = 0x53, + IntSmall = 0x54, + LongSmall = 0x55, + Float = 0x72, + Double = 0x82, + Char = 0x73, + Timestamp = 0x83, + Uuid = 0x98, + Binary8 = 0xa0, + Binary32 = 0xb0, + String8 = 0xa1, + String32 = 0xb1, + Symbol8 = 0xa3, + Symbol32 = 0xb3, + List0 = 0x45, + List8 = 0xc0, + List32 = 0xd0, + Map8 = 0xc1, + Map32 = 0xd1, + Array8 = 0xe0, + Array32 = 0xf0, +} + +impl AmqpEncoder for TypeCode { + fn encoded_size(&self) -> u32 { + 1 + } + + fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> { + let code: u8 = (*self).into(); + writer.write_u8(code).map(Ok)? + } +} + +impl AmqpDecoder for TypeCode { + fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> { + let (input, code) = read_u8(input)?; + Ok(( + input, + code.try_into() + .map_err(|_| AmqpDecodeError::InvalidTypeCode(code))?, + )) + } +} diff --git a/protocol/src/message/amqp/codec/decoder.rs b/protocol/src/message/amqp/codec/decoder.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/protocol/src/message/amqp/codec/decoder.rs @@ -0,0 +1 @@ + diff --git a/protocol/src/message/amqp/codec/mod.rs b/protocol/src/message/amqp/codec/mod.rs new file mode 100644 index 0000000..1379a3a --- /dev/null +++ b/protocol/src/message/amqp/codec/mod.rs @@ -0,0 +1,27 @@ +use std::{any::type_name, io::Write}; + +use self::constants::TypeCode; + +use super::error::{AmqpDecodeError, AmqpEncodeError}; + +pub mod constants; +mod decoder; + +pub trait AmqpEncoder { + fn encoded_size(&self) -> u32; + fn encode(&self, writer: &mut impl Write) -> Result<(), AmqpEncodeError>; +} + +pub trait AmqpDecoder +where + Self: Sized, +{ + fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError>; + + fn invalid_type_code(code: TypeCode) -> AmqpDecodeError { + AmqpDecodeError::InvalidTypeCodeFor { + target: type_name::().to_string(), + code, + } + } +} diff --git a/protocol/src/message/amqp/definitions.rs b/protocol/src/message/amqp/definitions.rs new file mode 100644 index 0000000..e69de29 diff --git a/protocol/src/message/amqp/error.rs b/protocol/src/message/amqp/error.rs new file mode 100644 index 0000000..1d73ce9 --- /dev/null +++ b/protocol/src/message/amqp/error.rs @@ -0,0 +1,43 @@ +use std::string::FromUtf8Error; + +use crate::error::IncompleteError; + +use super::codec::constants::TypeCode; + +#[derive(Debug)] +pub enum AmqpEncodeError { + Io(std::io::Error), +} + +impl From for AmqpEncodeError { + fn from(err: std::io::Error) -> Self { + AmqpEncodeError::Io(err) + } +} + +#[derive(Debug)] +pub enum AmqpDecodeError { + InvalidTypeCode(u8), + InvalidTypeCodeFor { target: String, code: TypeCode }, + MessageParse(String), + Incomplete(IncompleteError), + Utf8Error(FromUtf8Error), + UuidError(uuid::Error), +} + +impl From for AmqpDecodeError { + fn from(err: IncompleteError) -> Self { + AmqpDecodeError::Incomplete(err) + } +} + +impl From for AmqpDecodeError { + fn from(err: FromUtf8Error) -> Self { + AmqpDecodeError::Utf8Error(err) + } +} +impl From for AmqpDecodeError { + fn from(err: uuid::Error) -> Self { + AmqpDecodeError::UuidError(err) + } +} diff --git a/protocol/src/message/amqp/fixtures/empty_message b/protocol/src/message/amqp/fixtures/empty_message new file mode 100644 index 0000000000000000000000000000000000000000..0b06f60d88c75042744bacf5d0e79e6f885dccf7 GIT binary patch literal 5 McmZPwE?vL?00Zy=X#fBK literal 0 HcmV?d00001 diff --git a/protocol/src/message/amqp/fixtures/header_amqpvalue_message b/protocol/src/message/amqp/fixtures/header_amqpvalue_message new file mode 100644 index 0000000000000000000000000000000000000000..36639cb6b564eb69d883e0aa737f02fbbb3d53c4 GIT binary patch literal 47 zcmZPwF1WzJz`zH@td0RG4vqy342(Jq!KDk>GEx&$Qi~XZ%NKGc<`x!&CFYc-0s#46 B46y(J literal 0 HcmV?d00001 diff --git a/protocol/src/message/amqp/fixtures/message_body_250 b/protocol/src/message/amqp/fixtures/message_body_250 new file mode 100644 index 0000000000000000000000000000000000000000..d4585c642809be798136aa2a70327dadcd1acc06 GIT binary patch literal 255 zcmV~$Np^!E007WlEkzLtGB}~x1=AoZgVWP^nx3<({{L0%zkk(-KUO`t8-xp}w0p8H(>3h?np%7V2pnr2F24cy*PNY_+yg0))2L|Ms zYyq_?n%!#WUhD8$t_&{7K#r5ZrDxPAfqN2;u`?oxQkf3rPSi&A(XlAWUgJg^o&aki zG0l2{YhfJDJREyBS{TzO6a-B(@0FTV}g=v9~1CYn4t`wL0BWL1kwf+C~nN zqwt5FF3?*aw=hqXu(kZOY_29>m-zJuV|Zz{-}a7&I6AUOfHRLZ$a&Pra-V9ijT^|m zpVSX*Z}^0bNO!1!PA#5nBp$mJUUCht$MNn`?;Ep9I%d&nL`4v+p2^W*SK>2{ue4LV z#L=^X{aF0v-ClLf)%O4_iM*qjozp|f0y{%tw=K@3(wIb^-HD<-SfgZsk!YxcgL3Ne zLpez-+UxMp_A-yMp+XxdA2pnFB{pfiYt-be^JrbO3#2u?QLYjOpO8|jZ(7h)+twBcj`;XQ#|2s4*FfVQ)Ni|ARoccD^8&<_7%?{gxLv{(OZM~)e`a5 zu57N#T)dlmL+`2xcPx_MKAR4lqRVJ%i9P@*U#jxD!^<#hV7#2-&Vq*v3S;`_z7gwg zfbW2r?6$|WuFzQ1(xy8cWJ1PC~@r zX?O5debtXZe11GW{rz}6zI*?W`U6P|(CSG9Sm7OVF&C|p#9HD<1UTalt? zlbxgk9^jrQ;AqlnqFG7(yh|d#PLmS&O|r2&OZx2U3YeQ!5yrZx2sT-NZnqmHLHUHG zu;Xy-fIiDt-r{b|1~MY5cd|N{P!C*DTkRPvk+v4)@o#;+q z<{js+vu0;dWO+McW8*LCg7;4^e&ViaofC*azy(b)i4WQ3-D}iGr?G3;uUZb$_bd(jg0chxE3%&-pr1Y zZc`2iwAcrzzYgCU?Q_syo78dH_ihqk&uGddpmw@gYCc5MtXplbfl__{SMr#307`|} zA3nzc_7%KO-sdo$NX&}4Wpvj`Y0k>s+G@k9w7hWqbYz)^Wo>g!-H<6%1k)@zgQ}{Z zuU-+2NtlmfPp2gG{B>}P_HCM^ZyRfPE3ddmWqh_P**nDa=hB%=U zNUd5~Ui0J@Q?gXXqbP=7 z=WVpEu}P?&WTu}8n{YVlrQA^^uEi6578=I0F>HlJ(gto6L8e4szW*geAXOtB@m>}f z?YcgLX9JWjyIpm9%D|aq=UO8dCK-EY@%q-dWAg9PF>~Z z?bLe}P-x;Sq}k<8SI#SP6^9ND9QYa3#{0miFsg00UjsFbkb>)8&H^A$_P`S-`;i_d?W zKR(`{o-mcgK&{$S8w+q!7-QB{$6iylFIJ>t z>?lL4!#)ejB#aRUX@}t~+=}e$%24A_)#Cf=rYBCBY46hoGxP+}vK_|hD|U#u=M<2Z zeyfh`Dsx*wG?X}RY^TT*@#;HYGl;t#Udd(JwnJk`XGRoRFSdiTTzY>WUP^(7ppNwD ru=t`lr(1cDav0XHWhBV{eH0Ny`QuCDI$ zoImvZpFe;8<-h*?`SZ_z`*-BJVTT|1oUI3Hr+_Ql+xpAI;fOJCsVM`hsso(A;B6(u1`L^$&Lh`;3FQ4N9wyclw)#)tmuH^QfP^fF%yJjk~ zCn^lC8PCuJ>$udx{3g(ef--jb*l{4;v*!t4SI-lIXG*RRa)K854>P(99|kw_j!+ic0;k5 zed$XJDMWiDJUR)Mvuy0K71tN z=uO?=lMi#oGYS0}$!a;Rs~=n@Y6CvhDC)Bg_Hpb9Pb=pfl%U`-;GY+sY*jClS)Z zTM-r#?$Js%FLlznFi!2Rhi-ImTnX8o=t-`z*NJ!A$Fn~dys%XIV&sJZJ>^@QENaRU6nVszXl}C zA17Djz^g>j0I<`+Rd37VC-ud$Wdgo9W-9Wed8Zn)y&I~({dX6nt)tZCzw8YiDkx0% z68o`j2hT<4BkkH>n*<6SZLzAl=$((g!|bhXXlfLC!ddQUcnvMnYq434@{Ut}#nFE} z*!Wf>4U^gM+W_^2@}dy#&caw@%hvtHTeB!kJ`tkbW`!6?Jigg&t8T%lrw2dQ;!CR*&c5Qpdl?d#KY{28OW+45}-D7Oqc17A-bihYXI<4cH z-o!VOJg`P!`s)OPdSrH2yWs&~Io9WtY8mUB`@0=iobKAAIGxl%VrN^d`Sz^mYQB6E zr+-cR?8c2E?z$YW-F6%r*R4ca!H%#0{l?Adb(w~j{3{v0JC4(??!bzo)( ztjw{Pf^k}mt3gBEw^`_0oy4hilg_GSt-{;RPV%ua*RJ*+K;iY48Xt0j@bUK}c$WLL zniMicPvURo=e}hzLy~Aze_}_pR)5)BS;a=x86iu{vX8#+)cpElQQ26Vd2MNn>d|(a z`MU6e5dquPYjjEd%07$cCPn@6*?GePjb_UKLh!$8-;am^qmU8QcN|2Q$gcCg)i>LB zBCxL`fXp8l~F7g@>509+%IKZTJf4(o9{(_H!ZKBmNq`!C+GvyvX01EgZc{! z-IV*BEGN#QmiXcnd%@(!6U}XacIr&eLm*|EMaM57pOV}p7t_mQ*8<14=6D}Fb()?w z4Vo2aMXk49Ux4R|HD%`Mi1O-OKj-CcbC%K7WNrrNRSdwHoR!4)Rz~?;pdHw^m7^Qr z-Asn1CYFt%mTKU!i;A54G|ValtAxXsg0Jv@t+p2S=uNkp-S@=QKl*H>D`)%KbNc({(MaV|#Q#LJ%C>Vs4nuVZUkRP^rT z1O;G&UL$wW%%+)QrM9+^zb1+9_?ithnI4(Uaq1qXvszxig@LJ^5BvHCKaw7P`+#S` z7!BD^0^aO=#?FdlOuoU{R>arxcq|^IdMojnN+YNK5F;Kf;Zg6h3u}!zmm*B`HwI?O zUU?Yd>*N@_foVs@7KeUQ)@Uv*oE}arOKcH6TShyRyF2D2L=cw~?)I`t#$<5Q!Z5UB z{*C%Ly_<~!ZKu|)W!+_9xpHf1HNDEZcx+}vesHFix8y#0pS{3Ne?LVtzsB11X`k9J z%pPyC*!-U5xto}e)|H?85wo%-hB0?zR%X+@?b-6H<+5Gm2rpa3Q3t+fkna05&JW?j zYHM<~x#1JvYT(Dqvv=Qc0|9$TdwP%0sFwm&98Sc&Sl({S5N+*V7*6{ENz>ErC+82n z{^QS|fBHA5g4m#MrOvQ`Mdv-%+fx?wS@F5X{`QuY?BkMlS!dS~B<}%bUD60>h8{P5 znJm|XS=fGyew&MJ^Uqr|Y`KMFE=#oz>rM}FH_n(B?cM7Bevg$qW ztkkzI%i;R{K*2W{q{oR5K^wer3XtB%(RpM5%{}>P-&#a_WZtG{^(>gwc;vSsgba71 z*PGv8w+nbfn7D<7bOM=C7dHFUooaf*$zJz>wyUfpQE9rK9vu4+dt)m*j_3L z{r7ize>x|NMYBaW9olEA-t)2bDO>_5BY&@3v|D;g|9OQ`wI7pFfBBv&D<>+EL z*Vc0Lbx(}Kvw74$gYLhcFKuP|iuQkfef{FMp9lHr-@m?o_802YcPoPV^?3lf+8Y&b z`l)KyNsUW;Dyxy=DF_`|n!ZoW7F{mNUe#9>6K*<}A*m049@H1+x0ct+doh+0%}>H! z&|sv+af_E3(} zz86RBWEFc)ep`4-IGoN$v|3wvyz1>j7+Wn8Cui}_DAct)Yr&v$8RViK8q^iQ8PaP~ zL8+|g_|$M`nvaE=WXPM^TVuV_p5Ds(%O_w4+2QIivvK-bDb^F7g`pWzca2G6TGLK) zTT$0=RrRRK{rlPzAky@XslMQb)jj=$S{*#VQWz6*nyo-y3ABxuF%2@M z-TZYAv(0;^k{ICiFsh8jQc&-|ki|6&0T#_2;$m={*PLp{?dBR<+12l-T8yC%p9j4o zh;mFNk6VXDEZifw>FcD!*(HWkf{4UMq~%A}n`Y8XGJ zGQZzZ{8(0X=e|{8*@B``2e{3iA7VpWZZUOpR^&l6k8g$T{C)wLx-p;KLd?=FAUbMU zx|jQIyt%HHBqg-=Iojn&ZS)3i96G6z)v0%NpSD@f@&t0sYjp34xegvA*%pe72Ry2> zX33vrz|r=uYIZ4dsauI*e#CtO`=Co1mBeGDvXwakR#~fvtiT0^+*}o7aH=5)sYp^B z^`joaGTx7I zU#EFACN?w*TOQnpZ?B(b`);>MWoHd$g!oZ_-0NxG8!=gIL$^wG-f4uRcI8`F>REp> z6Fi&@nH@3X);-4wZ|=n>H=iq|ZKqJyqHeS+lv0uD<3P&aG#Uc+;)WK%qT;soL-|> zH`QUk3#&82Gq^5A^UYHSnM-Py@T}5TK*`By!4?f+)I1EVgFkhbvFmgG&T)$n^PBtA z5WUzht#3!=9ri10#&rnTQDL?%4}U+h!9#yVCdfhG&U302U(xZm|C!<>16r(I_^w#} zz*A(vm~MTh@rI0jea*{!(cx=Bo)?~3ru@`c{dO6rcp16VJ!WGhX5x@|^B^ti)jCXIqD^$>)N`Y0 zonSeh1NXxzQ29uMAFY^Pai=w|;G-UXS&`sJg2__Hdf_)PRrX zuwE*UR&zieoBBNBow-HrD{Xv*+sEJAS2I=|2)yCv0YJy)IAz^&9)o{SvoTk&>BhF> z_bU7V03+#nMvEw{R58xACokjD>We>7Nvx!Wi!<1bh8`Y{pa&gf*%I25q69ONePO6h z9BC@MU9--HX!Yn+n#8=4Vt7jm&JWPzHP>M2x3;j(M%mvYEPNI1M;GRLcHHVr{W;7> zbXB9fk8#-J+j`}RC<7ms;OovrBN!B*VdL}M&#m@SZgm@4hi)!Q6&$YTi2yG}?(uA@ zd!OxZoQxL8>N8U=#V7kIjJ1WTsjQm`;4*t`wWC~}r)8V-zEIc@S!l{n4;!I;MX{${8p{LjgEE23l$37R#uvo@IpjP2Kd|*#C zc}~-O)k(V$RT27FVmm5NT=_H64iF=AGwp XCD9S8`e7=!i6HdSRhQb+Qj1F#g2_, + pub first_acquirer: bool, + pub delivery_count: u32, +} + +impl Default for Header { + fn default() -> Self { + Self { + durable: Default::default(), + priority: 4, + ttl: Default::default(), + first_acquirer: Default::default(), + delivery_count: 0, + } + } +} + +impl Header { + fn content_size(&self) -> u32 { + self.durable.encoded_size() + + self.priority.encoded_size() + + self.ttl.encoded_size() + + self.first_acquirer.encoded_size() + + self.delivery_count.encoded_size() + } +} + +impl AmqpEncoder for Header { + fn encoded_size(&self) -> u32 { + let size = self.content_size() + MESSAGE_HEADER.encoded_size(); + let fixed = if size > u8::MAX as u32 { 9 } else { 3 }; + fixed + size + } + + fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> { + MESSAGE_HEADER.encode(writer)?; + + let content_size = self.content_size(); + + if content_size + MESSAGE_HEADER.encoded_size() > u8::MAX as u32 { + TypeCode::List8.encode(writer)?; + writer.write_u32::(content_size + 4)?; + writer.write_u32::(5)?; + } else { + TypeCode::List8.encode(writer)?; + writer.write_u8((content_size + 4) as u8)?; + writer.write_u8(5)?; + } + self.durable.encode(writer)?; + self.priority.encode(writer)?; + self.ttl.encode(writer)?; + self.first_acquirer.encode(writer)?; + self.delivery_count.encode(writer)?; + Ok(()) + } +} + +impl AmqpDecoder for Header { + fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> { + match Descriptor::decode(input)? { + (input, MESSAGE_HEADER) => { + let header = Header::default(); + List::decode_with_fields(input, list_decoder_header, header) + } + (_, descriptor) => Err(AmqpDecodeError::MessageParse(format!( + "Invalid descriptor for header {:?}", + descriptor + ))), + } + } +} + +list_decoder!(Header, list_decoder_header, + { + 0 => { durable, Boolean, false}, + 1 => { priority, u8, 4}, + 2 => { ttl, u32, None, true}, + 3 => { first_acquirer, Boolean, false}, + 4 => { delivery_count, UInt, 4} + } +); + +#[cfg(test)] +mod tests { + use crate::message::amqp::tests::type_encode_decode_test_fuzzy; + + use super::Header; + + #[test] + fn test_header_encode_decode() { + type_encode_decode_test_fuzzy::
() + } +} diff --git a/protocol/src/message/amqp/message.rs b/protocol/src/message/amqp/message.rs new file mode 100644 index 0000000..ce6434e --- /dev/null +++ b/protocol/src/message/amqp/message.rs @@ -0,0 +1,509 @@ +use super::body::MessageBody; +use super::codec::constants::{ + MESSAGE_ANNOTATIONS, MESSAGE_APPLICATION_PROPERTIES, MESSAGE_DELIVERY_ANNOTATIONS, + MESSAGE_FOOTER, +}; +use super::codec::AmqpEncoder; +use super::error::AmqpEncodeError; +use super::header::Header; +use super::properties::Properties; +use super::section::MessageSection; +use super::types::{ApplicationProperties, DeliveryAnnotations, Footer, MessageAnnotations}; +use super::{AmqpDecodeError, AmqpDecoder}; + +#[cfg(test)] +use fake::Fake; + +#[derive(Debug, Clone, Default, PartialEq)] +#[cfg_attr(test, derive(fake::Dummy))] +pub struct Message { + header: Option
, + delivery_annotations: Option, + message_annotations: Option, + properties: Option, + application_properties: Option, + footer: Option