diff --git a/.gitignore b/.gitignore index 408b8a5..30fc9a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target Cargo.lock -.idea \ No newline at end of file +.idea +.DS_Store diff --git a/Cargo.toml b/Cargo.toml index 773e62a..ab32c32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,4 @@ dashmap = "4.0.2" [dev-dependencies] tracing-subscriber = "0.3.1" fake = { version = "2.4", features=['derive']} +chrono = "0.4.19" diff --git a/examples/complex_message.rs b/examples/complex_message.rs new file mode 100644 index 0000000..2a840d7 --- /dev/null +++ b/examples/complex_message.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use chrono::Utc; +use futures::StreamExt; +use rabbitmq_stream_client::{ + types::{ByteCapacity, Message, OffsetSpecification}, + Environment, +}; +use tokio::sync::Barrier; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let environment = Environment::builder() + .host("localhost") + .port(5552) + .build() + .await?; + + let stream_name = "complex_message"; + let _ = environment.delete_stream(stream_name).await; + let message_count = 1; + environment + .stream_creator() + .max_length(ByteCapacity::GB(2)) + .create(stream_name) + .await?; + + let producer = environment.producer().build(stream_name).await?; + + let barrier = Arc::new(Barrier::new(message_count + 1)); + for i in 0..message_count { + let producer_cloned = producer.clone(); + + let barrier_cloned = barrier.clone(); + tokio::task::spawn(async move { + let message = Message::builder() + .message_annotations() + .insert("test", i as i32) + .message_builder() + .application_properties() + .insert("test", i as i32) + .message_builder() + .properties() + .content_encoding("application/json") + .absolute_expiry_time(Utc::now()) + .message_builder() + .body(format!("message{}", i)) + .build(); + producer_cloned.send_with_confirm(message).await.unwrap(); + + barrier_cloned.wait().await; + }); + } + + barrier.wait().await; + + producer.close().await?; + + let mut consumer = environment + .consumer() + .offset(OffsetSpecification::First) + .build(stream_name) + .await + .unwrap(); + + for _ in 0..message_count { + let delivery = consumer.next().await.unwrap()?; + info!( + "Got message: {:#?} with offset: {}", + delivery.message(), + delivery.offset(), + ); + } + + consumer.handle().close().await.unwrap(); + + environment.delete_stream(stream_name).await?; + Ok(()) +} diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 83f4a68..797c458 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -6,12 +6,18 @@ edition = "2018" license = "Apache-2.0 OR MPL-2.0" +[features] +wasm-bindgen = ["uuid/wasm-bindgen"] + [dependencies] byteorder = "1" -ntex-amqp-codec = "= 0.7.2" -ntex-bytes = "0.1" - +ordered-float = "2.10.0" +uuid = "0.8.2" +chrono = "0.4.19" +num_enum = "0.5.7" +derive_more = "0.99" [dev-dependencies] -fake = { version = "2.4", features=['derive']} +fake = { version = "2.4", features=['derive', 'chrono','uuid']} rand = "0.8" +pretty_assertions = "1.2.0" diff --git a/protocol/src/codec/decoder.rs b/protocol/src/codec/decoder.rs index 53e98f1..64bb905 100644 --- a/protocol/src/codec/decoder.rs +++ b/protocol/src/codec/decoder.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use byteorder::ByteOrder; +use crate::error::IncompleteError; use crate::message::Message; use crate::types::PublishedMessage; use crate::types::PublishingError; @@ -12,42 +13,42 @@ use super::Decoder; impl Decoder for i8 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_i8(input) + read_i8(input).map(Ok)? } } impl Decoder for i32 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_i32(input) + read_i32(input).map(Ok)? } } impl Decoder for u8 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u8(input) + read_u8(input).map(Ok)? } } impl Decoder for u16 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u16(input) + read_u16(input).map(Ok)? } } impl Decoder for u32 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u32(input) + read_u32(input).map(Ok)? } } impl Decoder for u64 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_u64(input) + read_u64(input).map(Ok)? } } impl Decoder for i64 { fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - read_i64(input) + read_i64(input).map(Ok)? } } @@ -199,9 +200,9 @@ impl Decoder for Vec { } } -pub fn check_len(input: &[u8], size: usize) -> Result<(), DecodeError> { +pub fn check_len(input: &[u8], size: usize) -> Result<(), IncompleteError> { if input.len() < size { - return Err(DecodeError::Incomplete(size)); + return Err(IncompleteError(size)); } Ok(()) } @@ -209,7 +210,7 @@ pub fn check_len(input: &[u8], size: usize) -> Result<(), DecodeError> { macro_rules! reader { ( $fn:ident, $size:expr, $ret:ty) => { #[allow(unused)] - pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), crate::error::DecodeError> { + pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), IncompleteError> { check_len(input, $size)?; let x = byteorder::BigEndian::$fn(input); Ok((&input[$size..], x)) @@ -217,19 +218,26 @@ macro_rules! reader { }; } -pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), DecodeError> { +pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), IncompleteError> { check_len(input, 1)?; Ok((&input[1..], input[0])) } -pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), DecodeError> { +pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), IncompleteError> { check_len(input, 1)?; Ok((&input[1..], input[0] as i8)) } +pub fn read_exact(input: &[u8], len: usize) -> Result<(&[u8], &[u8]), IncompleteError> { + check_len(input, len)?; + Ok((&input[len..], &input[..len])) +} + reader!(read_i16, 2, i16); reader!(read_u16, 2, u16); reader!(read_u32, 4, u32); reader!(read_i32, 4, i32); reader!(read_u64, 8, u64); reader!(read_i64, 8, i64); +reader!(read_f32, 4, f32); +reader!(read_f64, 8, f64); diff --git a/protocol/src/commands/deliver.rs b/protocol/src/commands/deliver.rs index 6b5a357..c4a5e77 100644 --- a/protocol/src/commands/deliver.rs +++ b/protocol/src/commands/deliver.rs @@ -160,15 +160,11 @@ mod tests { use fake::{Dummy, Faker}; use crate::{commands::tests::command_encode_decode_test, message::InternalMessage}; - use ntex_amqp_codec::Message as AmpqMessage; use super::{DeliverCommand, Message}; impl Dummy for Message { fn dummy_with_rng(_config: &Faker, _rng: &mut R) -> Self { - Message::new(InternalMessage { - message: AmpqMessage::default(), - publishing_id: None, - }) + Message::new(InternalMessage::default()) } } diff --git a/protocol/src/error.rs b/protocol/src/error.rs index 8351816..bc10ed2 100644 --- a/protocol/src/error.rs +++ b/protocol/src/error.rs @@ -2,12 +2,13 @@ use std::string::FromUtf8Error; #[derive(Debug)] pub enum DecodeError { - Incomplete(usize), + Incomplete(IncompleteError), Utf8Error(FromUtf8Error), UnknownResponseCode(u16), UnsupportedResponseType(u16), MismatchSize(usize), MessageParse(String), + InvalidFormatCode(u8), Empty, } @@ -28,3 +29,12 @@ impl From for DecodeError { DecodeError::Utf8Error(err) } } + +impl From for DecodeError { + fn from(err: IncompleteError) -> Self { + DecodeError::Incomplete(err) + } +} + +#[derive(Debug)] +pub struct IncompleteError(pub usize); diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index f98ed25..f806431 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -8,3 +8,5 @@ mod response; pub mod types; pub use request::*; pub use response::*; + +pub mod utils; diff --git a/protocol/src/message.rs b/protocol/src/message.rs deleted file mode 100644 index f32e929..0000000 --- a/protocol/src/message.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::sync::Arc; - -use ntex_amqp_codec::{Encode, Message as AmpqMessage}; -use ntex_bytes::BytesMut; - -use crate::{ - codec::{Decoder, Encoder}, - error::DecodeError, -}; - -#[derive(Debug, PartialEq, Clone)] -pub struct Message(Arc); - -#[derive(Debug, PartialEq)] -pub struct InternalMessage { - pub(crate) publishing_id: Option, - pub(crate) message: AmpqMessage, -} - -unsafe impl Send for Message {} -unsafe impl Sync for Message {} - -impl Encoder for Message { - fn encoded_size(&self) -> u32 { - self.0.message.encoded_size() as u32 - } - - fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> { - let mut buf = BytesMut::with_capacity(self.encoded_size() as usize); - - ntex_amqp_codec::Encode::encode(&self.0.message, &mut buf); - - writer.write_all(&buf)?; - - Ok(()) - } -} - -impl Message { - #[cfg(test)] - pub(crate) fn new(internal: InternalMessage) -> Message { - Message(Arc::new(internal)) - } - pub fn builder() -> MessageBuilder { - MessageBuilder(InternalMessage { - message: AmpqMessage::default(), - publishing_id: None, - }) - } - - pub fn data(&self) -> Option<&[u8]> { - self.0.message.body().data().map(|data| data.as_ref()) - } - - /// Get a reference to the message's publishing id. - pub fn publishing_id(&self) -> Option<&u64> { - self.0.publishing_id.as_ref() - } -} - -impl Decoder for Message { - fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> { - ntex_amqp_codec::Decode::decode(input) - .map_err(|err| DecodeError::MessageParse(err.to_string())) - .map(|message| { - ( - message.0, - Message(Arc::new(InternalMessage { - publishing_id: None, - message: message.1, - })), - ) - }) - } -} - -pub struct MessageBuilder(InternalMessage); - -impl MessageBuilder { - pub fn body(mut self, data: impl Into>) -> Self { - self.0 - .message - .set_body(|body| body.set_data(data.into().into())); - self - } - - pub fn publising_id(mut self, publishing_id: u64) -> Self { - self.0.publishing_id = Some(publishing_id); - self - } - pub fn build(self) -> Message { - Message(Arc::new(self.0)) - } -} - -impl From for Vec { - fn from(message: Message) -> Self { - vec![message] - } -} diff --git a/protocol/src/message/amqp/body.rs b/protocol/src/message/amqp/body.rs new file mode 100644 index 0000000..0694ea5 --- /dev/null +++ b/protocol/src/message/amqp/body.rs @@ -0,0 +1,89 @@ +use crate::message::amqp::codec::AmqpEncoder; + +use super::{ + codec::constants::{MESSAGE_BODY_DATA, MESSAGE_BODY_SEQUENCE, MESSAGE_BODY_VALUE}, + types::{List, Value}, + AmqpEncodeError, +}; + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct MessageBody { + pub data: Vec>, + pub sequence: Vec, + pub value: Option, +} + +impl MessageBody { + pub fn data(&self) -> Option<&Vec> { + self.data.get(0) + } + + pub fn value(&self) -> Option<&Value> { + self.value.as_ref() + } + + #[cfg(test)] + pub fn set_value(&mut self, value: impl Into) -> &mut Self { + self.value = Some(value.into()); + self + } + + pub fn set_data(&mut self, data: impl Into>) -> &mut Self { + self.data.clear(); + self.data.push(data.into()); + self + } +} + +impl AmqpEncoder for MessageBody { + fn encoded_size(&self) -> u32 { + let mut size = self.data.iter().fold(0, |a, d| { + a + d.encoded_size() + MESSAGE_BODY_DATA.encoded_size() + }); + size += self.sequence.iter().fold(0, |a, seq| { + a + seq.encoded_size() + MESSAGE_BODY_SEQUENCE.encoded_size() + }); + + if let Some(ref val) = self.value { + size + val.encoded_size() + MESSAGE_BODY_VALUE.encoded_size() + } else { + size + } + } + + fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> { + for data in &self.data { + MESSAGE_BODY_DATA.encode(writer)?; + data.encode(writer)?; + } + + for sequence in &self.sequence { + MESSAGE_BODY_SEQUENCE.encode(writer)?; + sequence.encode(writer)?; + } + if let Some(ref val) = self.value { + MESSAGE_BODY_VALUE.encode(writer)?; + val.encode(writer)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use fake::{Dummy, Fake, Faker}; + + use crate::message::amqp::Value; + + use super::MessageBody; + impl Dummy for MessageBody { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + MessageBody { + data: config.fake_with_rng(rng), + sequence: config.fake_with_rng(rng), + value: Some(Value::Simple(config.fake_with_rng(rng))), + } + } + } +} diff --git a/protocol/src/message/amqp/codec/constants.rs b/protocol/src/message/amqp/codec/constants.rs new file mode 100644 index 0000000..6dd4b8b --- /dev/null +++ b/protocol/src/message/amqp/codec/constants.rs @@ -0,0 +1,88 @@ +use std::convert::TryInto; + +use crate::{ + codec::decoder::read_u8, + message::amqp::{ + error::{AmqpDecodeError, AmqpEncodeError}, + types::Descriptor, + }, +}; + +pub const MESSAGE_HEADER: Descriptor = Descriptor::Ulong(112); +pub const MESSAGE_DELIVERY_ANNOTATIONS: Descriptor = Descriptor::Ulong(113); +pub const MESSAGE_ANNOTATIONS: Descriptor = Descriptor::Ulong(114); +pub const MESSAGE_PROPERTIES: Descriptor = Descriptor::Ulong(115); +pub const MESSAGE_APPLICATION_PROPERTIES: Descriptor = Descriptor::Ulong(116); +pub const MESSAGE_BODY_DATA: Descriptor = Descriptor::Ulong(117); +pub const MESSAGE_BODY_SEQUENCE: Descriptor = Descriptor::Ulong(118); +pub const MESSAGE_BODY_VALUE: Descriptor = Descriptor::Ulong(119); +pub const MESSAGE_FOOTER: Descriptor = Descriptor::Ulong(120); + +use byteorder::WriteBytesExt; +use num_enum::{IntoPrimitive, TryFromPrimitive}; + +use super::{AmqpDecoder, AmqpEncoder}; + +#[derive(Debug, TryFromPrimitive, Clone, Copy, IntoPrimitive)] +#[repr(u8)] +pub enum TypeCode { + Described = 0x00, + Null = 0x40, + Boolean = 0x56, + BooleanTrue = 0x41, + BooleanFalse = 0x42, + UInt0 = 0x43, + ULong0 = 0x44, + UByte = 0x50, + UShort = 0x60, + UInt = 0x70, + ULong = 0x80, + Byte = 0x51, + Short = 0x61, + Int = 0x71, + Long = 0x81, + UIntSmall = 0x52, + ULongSmall = 0x53, + IntSmall = 0x54, + LongSmall = 0x55, + Float = 0x72, + Double = 0x82, + Char = 0x73, + Timestamp = 0x83, + Uuid = 0x98, + Binary8 = 0xa0, + Binary32 = 0xb0, + String8 = 0xa1, + String32 = 0xb1, + Symbol8 = 0xa3, + Symbol32 = 0xb3, + List0 = 0x45, + List8 = 0xc0, + List32 = 0xd0, + Map8 = 0xc1, + Map32 = 0xd1, + Array8 = 0xe0, + Array32 = 0xf0, +} + +impl AmqpEncoder for TypeCode { + fn encoded_size(&self) -> u32 { + 1 + } + + fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> { + let code: u8 = (*self).into(); + writer.write_u8(code).map(Ok)? + } +} + +impl AmqpDecoder for TypeCode { + fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> { + let (input, code) = read_u8(input)?; + Ok(( + input, + code.try_into() + .map_err(|_| AmqpDecodeError::InvalidTypeCode(code))?, + )) + } +} diff --git a/protocol/src/message/amqp/codec/decoder.rs b/protocol/src/message/amqp/codec/decoder.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/protocol/src/message/amqp/codec/decoder.rs @@ -0,0 +1 @@ + diff --git a/protocol/src/message/amqp/codec/mod.rs b/protocol/src/message/amqp/codec/mod.rs new file mode 100644 index 0000000..1379a3a --- /dev/null +++ b/protocol/src/message/amqp/codec/mod.rs @@ -0,0 +1,27 @@ +use std::{any::type_name, io::Write}; + +use self::constants::TypeCode; + +use super::error::{AmqpDecodeError, AmqpEncodeError}; + +pub mod constants; +mod decoder; + +pub trait AmqpEncoder { + fn encoded_size(&self) -> u32; + fn encode(&self, writer: &mut impl Write) -> Result<(), AmqpEncodeError>; +} + +pub trait AmqpDecoder +where + Self: Sized, +{ + fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError>; + + fn invalid_type_code(code: TypeCode) -> AmqpDecodeError { + AmqpDecodeError::InvalidTypeCodeFor { + target: type_name::().to_string(), + code, + } + } +} diff --git a/protocol/src/message/amqp/definitions.rs b/protocol/src/message/amqp/definitions.rs new file mode 100644 index 0000000..e69de29 diff --git a/protocol/src/message/amqp/error.rs b/protocol/src/message/amqp/error.rs new file mode 100644 index 0000000..1d73ce9 --- /dev/null +++ b/protocol/src/message/amqp/error.rs @@ -0,0 +1,43 @@ +use std::string::FromUtf8Error; + +use crate::error::IncompleteError; + +use super::codec::constants::TypeCode; + +#[derive(Debug)] +pub enum AmqpEncodeError { + Io(std::io::Error), +} + +impl From for AmqpEncodeError { + fn from(err: std::io::Error) -> Self { + AmqpEncodeError::Io(err) + } +} + +#[derive(Debug)] +pub enum AmqpDecodeError { + InvalidTypeCode(u8), + InvalidTypeCodeFor { target: String, code: TypeCode }, + MessageParse(String), + Incomplete(IncompleteError), + Utf8Error(FromUtf8Error), + UuidError(uuid::Error), +} + +impl From for AmqpDecodeError { + fn from(err: IncompleteError) -> Self { + AmqpDecodeError::Incomplete(err) + } +} + +impl From for AmqpDecodeError { + fn from(err: FromUtf8Error) -> Self { + AmqpDecodeError::Utf8Error(err) + } +} +impl From for AmqpDecodeError { + fn from(err: uuid::Error) -> Self { + AmqpDecodeError::UuidError(err) + } +} diff --git a/protocol/src/message/amqp/fixtures/empty_message b/protocol/src/message/amqp/fixtures/empty_message new file mode 100644 index 0000000..0b06f60 Binary files /dev/null and b/protocol/src/message/amqp/fixtures/empty_message differ diff --git a/protocol/src/message/amqp/fixtures/header_amqpvalue_message b/protocol/src/message/amqp/fixtures/header_amqpvalue_message new file mode 100644 index 0000000..36639cb Binary files /dev/null and b/protocol/src/message/amqp/fixtures/header_amqpvalue_message differ diff --git a/protocol/src/message/amqp/fixtures/message_body_250 b/protocol/src/message/amqp/fixtures/message_body_250 new file mode 100644 index 0000000..d4585c6 Binary files /dev/null and b/protocol/src/message/amqp/fixtures/message_body_250 differ diff --git a/protocol/src/message/amqp/fixtures/message_body_700 b/protocol/src/message/amqp/fixtures/message_body_700 new file mode 100644 index 0000000..5e51a9c Binary files /dev/null and b/protocol/src/message/amqp/fixtures/message_body_700 differ diff --git a/protocol/src/message/amqp/fixtures/message_random_application_properties_300 b/protocol/src/message/amqp/fixtures/message_random_application_properties_300 new file mode 100644 index 0000000..980d714 Binary files /dev/null and b/protocol/src/message/amqp/fixtures/message_random_application_properties_300 differ diff --git a/protocol/src/message/amqp/fixtures/message_random_application_properties_500 b/protocol/src/message/amqp/fixtures/message_random_application_properties_500 new file mode 100644 index 0000000..110ca77 Binary files /dev/null and b/protocol/src/message/amqp/fixtures/message_random_application_properties_500 differ diff --git a/protocol/src/message/amqp/fixtures/message_random_application_properties_properties_900 b/protocol/src/message/amqp/fixtures/message_random_application_properties_properties_900 new file mode 100644 index 0000000..44f7855 Binary files /dev/null and b/protocol/src/message/amqp/fixtures/message_random_application_properties_properties_900 differ diff --git a/protocol/src/message/amqp/fixtures/static_test_message_compare b/protocol/src/message/amqp/fixtures/static_test_message_compare new file mode 100644 index 0000000..2fc584a Binary files /dev/null and b/protocol/src/message/amqp/fixtures/static_test_message_compare differ diff --git a/protocol/src/message/amqp/header.rs b/protocol/src/message/amqp/header.rs new file mode 100644 index 0000000..0b5d317 --- /dev/null +++ b/protocol/src/message/amqp/header.rs @@ -0,0 +1,115 @@ +use byteorder::{BigEndian, WriteBytesExt}; + +use crate::message::amqp::{ + codec::constants::MESSAGE_HEADER, + types::{Descriptor, UInt}, +}; + +use super::{ + codec::constants::TypeCode, + types::Milliseconds, + types::{list_decoder, Boolean, List}, + AmqpDecodeError, AmqpDecoder, AmqpEncodeError, AmqpEncoder, +}; +#[cfg(test)] +use fake::Fake; + +/// Header of the message +#[derive(Clone, Debug, PartialEq)] +#[cfg_attr(test, derive(fake::Dummy))] +pub struct Header { + pub durable: bool, + pub priority: u8, + pub ttl: Option, + pub first_acquirer: bool, + pub delivery_count: u32, +} + +impl Default for Header { + fn default() -> Self { + Self { + durable: Default::default(), + priority: 4, + ttl: Default::default(), + first_acquirer: Default::default(), + delivery_count: 0, + } + } +} + +impl Header { + fn content_size(&self) -> u32 { + self.durable.encoded_size() + + self.priority.encoded_size() + + self.ttl.encoded_size() + + self.first_acquirer.encoded_size() + + self.delivery_count.encoded_size() + } +} + +impl AmqpEncoder for Header { + fn encoded_size(&self) -> u32 { + let size = self.content_size() + MESSAGE_HEADER.encoded_size(); + let fixed = if size > u8::MAX as u32 { 9 } else { 3 }; + fixed + size + } + + fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> { + MESSAGE_HEADER.encode(writer)?; + + let content_size = self.content_size(); + + if content_size + MESSAGE_HEADER.encoded_size() > u8::MAX as u32 { + TypeCode::List8.encode(writer)?; + writer.write_u32::(content_size + 4)?; + writer.write_u32::(5)?; + } else { + TypeCode::List8.encode(writer)?; + writer.write_u8((content_size + 4) as u8)?; + writer.write_u8(5)?; + } + self.durable.encode(writer)?; + self.priority.encode(writer)?; + self.ttl.encode(writer)?; + self.first_acquirer.encode(writer)?; + self.delivery_count.encode(writer)?; + Ok(()) + } +} + +impl AmqpDecoder for Header { + fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> { + match Descriptor::decode(input)? { + (input, MESSAGE_HEADER) => { + let header = Header::default(); + List::decode_with_fields(input, list_decoder_header, header) + } + (_, descriptor) => Err(AmqpDecodeError::MessageParse(format!( + "Invalid descriptor for header {:?}", + descriptor + ))), + } + } +} + +list_decoder!(Header, list_decoder_header, + { + 0 => { durable, Boolean, false}, + 1 => { priority, u8, 4}, + 2 => { ttl, u32, None, true}, + 3 => { first_acquirer, Boolean, false}, + 4 => { delivery_count, UInt, 4} + } +); + +#[cfg(test)] +mod tests { + use crate::message::amqp::tests::type_encode_decode_test_fuzzy; + + use super::Header; + + #[test] + fn test_header_encode_decode() { + type_encode_decode_test_fuzzy::
() + } +} diff --git a/protocol/src/message/amqp/message.rs b/protocol/src/message/amqp/message.rs new file mode 100644 index 0000000..ce6434e --- /dev/null +++ b/protocol/src/message/amqp/message.rs @@ -0,0 +1,509 @@ +use super::body::MessageBody; +use super::codec::constants::{ + MESSAGE_ANNOTATIONS, MESSAGE_APPLICATION_PROPERTIES, MESSAGE_DELIVERY_ANNOTATIONS, + MESSAGE_FOOTER, +}; +use super::codec::AmqpEncoder; +use super::error::AmqpEncodeError; +use super::header::Header; +use super::properties::Properties; +use super::section::MessageSection; +use super::types::{ApplicationProperties, DeliveryAnnotations, Footer, MessageAnnotations}; +use super::{AmqpDecodeError, AmqpDecoder}; + +#[cfg(test)] +use fake::Fake; + +#[derive(Debug, Clone, Default, PartialEq)] +#[cfg_attr(test, derive(fake::Dummy))] +pub struct Message { + header: Option
, + delivery_annotations: Option, + message_annotations: Option, + properties: Option, + application_properties: Option, + footer: Option