Skip to content

Commit

Permalink
AMQP 1.0 message format (#120)
Browse files Browse the repository at this point in the history
*  AMQP 1.0
  • Loading branch information
wolf4ood authored Apr 11, 2022
1 parent 0720cf7 commit 100b040
Show file tree
Hide file tree
Showing 45 changed files with 3,331 additions and 123 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target
Cargo.lock
.idea
.idea
.DS_Store
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ dashmap = "4.0.2"
[dev-dependencies]
tracing-subscriber = "0.3.1"
fake = { version = "2.4", features=['derive']}
chrono = "0.4.19"
86 changes: 86 additions & 0 deletions examples/complex_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::sync::Arc;

use chrono::Utc;
use futures::StreamExt;
use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tokio::sync::Barrier;
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

let stream_name = "complex_message";
let _ = environment.delete_stream(stream_name).await;
let message_count = 1;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
.create(stream_name)
.await?;

let producer = environment.producer().build(stream_name).await?;

let barrier = Arc::new(Barrier::new(message_count + 1));
for i in 0..message_count {
let producer_cloned = producer.clone();

let barrier_cloned = barrier.clone();
tokio::task::spawn(async move {
let message = Message::builder()
.message_annotations()
.insert("test", i as i32)
.message_builder()
.application_properties()
.insert("test", i as i32)
.message_builder()
.properties()
.content_encoding("application/json")
.absolute_expiry_time(Utc::now())
.message_builder()
.body(format!("message{}", i))
.build();
producer_cloned.send_with_confirm(message).await.unwrap();

barrier_cloned.wait().await;
});
}

barrier.wait().await;

producer.close().await?;

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream_name)
.await
.unwrap();

for _ in 0..message_count {
let delivery = consumer.next().await.unwrap()?;
info!(
"Got message: {:#?} with offset: {}",
delivery.message(),
delivery.offset(),
);
}

consumer.handle().close().await.unwrap();

environment.delete_stream(stream_name).await?;
Ok(())
}
14 changes: 10 additions & 4 deletions protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ edition = "2018"
license = "Apache-2.0 OR MPL-2.0"


[features]
wasm-bindgen = ["uuid/wasm-bindgen"]

[dependencies]
byteorder = "1"
ntex-amqp-codec = "= 0.7.2"
ntex-bytes = "0.1"

ordered-float = "2.10.0"
uuid = "0.8.2"
chrono = "0.4.19"
num_enum = "0.5.7"
derive_more = "0.99"

[dev-dependencies]
fake = { version = "2.4", features=['derive']}
fake = { version = "2.4", features=['derive', 'chrono','uuid']}
rand = "0.8"
pretty_assertions = "1.2.0"
32 changes: 20 additions & 12 deletions protocol/src/codec/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use byteorder::ByteOrder;

use crate::error::IncompleteError;
use crate::message::Message;
use crate::types::PublishedMessage;
use crate::types::PublishingError;
Expand All @@ -12,42 +13,42 @@ use super::Decoder;

impl Decoder for i8 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_i8(input)
read_i8(input).map(Ok)?
}
}

impl Decoder for i32 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_i32(input)
read_i32(input).map(Ok)?
}
}

impl Decoder for u8 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_u8(input)
read_u8(input).map(Ok)?
}
}

impl Decoder for u16 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_u16(input)
read_u16(input).map(Ok)?
}
}

impl Decoder for u32 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_u32(input)
read_u32(input).map(Ok)?
}
}

impl Decoder for u64 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_u64(input)
read_u64(input).map(Ok)?
}
}
impl Decoder for i64 {
fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
read_i64(input)
read_i64(input).map(Ok)?
}
}

Expand Down Expand Up @@ -199,37 +200,44 @@ impl Decoder for Vec<PublishingError> {
}
}

pub fn check_len(input: &[u8], size: usize) -> Result<(), DecodeError> {
pub fn check_len(input: &[u8], size: usize) -> Result<(), IncompleteError> {
if input.len() < size {
return Err(DecodeError::Incomplete(size));
return Err(IncompleteError(size));
}
Ok(())
}

macro_rules! reader {
( $fn:ident, $size:expr, $ret:ty) => {
#[allow(unused)]
pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), crate::error::DecodeError> {
pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), IncompleteError> {
check_len(input, $size)?;
let x = byteorder::BigEndian::$fn(input);
Ok((&input[$size..], x))
}
};
}

pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), DecodeError> {
pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), IncompleteError> {
check_len(input, 1)?;
Ok((&input[1..], input[0]))
}

pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), DecodeError> {
pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), IncompleteError> {
check_len(input, 1)?;
Ok((&input[1..], input[0] as i8))
}

pub fn read_exact(input: &[u8], len: usize) -> Result<(&[u8], &[u8]), IncompleteError> {
check_len(input, len)?;
Ok((&input[len..], &input[..len]))
}

reader!(read_i16, 2, i16);
reader!(read_u16, 2, u16);
reader!(read_u32, 4, u32);
reader!(read_i32, 4, i32);
reader!(read_u64, 8, u64);
reader!(read_i64, 8, i64);
reader!(read_f32, 4, f32);
reader!(read_f64, 8, f64);
6 changes: 1 addition & 5 deletions protocol/src/commands/deliver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,11 @@ mod tests {
use fake::{Dummy, Faker};

use crate::{commands::tests::command_encode_decode_test, message::InternalMessage};
use ntex_amqp_codec::Message as AmpqMessage;

use super::{DeliverCommand, Message};
impl Dummy<Faker> for Message {
fn dummy_with_rng<R: rand::Rng + ?Sized>(_config: &Faker, _rng: &mut R) -> Self {
Message::new(InternalMessage {
message: AmpqMessage::default(),
publishing_id: None,
})
Message::new(InternalMessage::default())
}
}

Expand Down
12 changes: 11 additions & 1 deletion protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::string::FromUtf8Error;

#[derive(Debug)]
pub enum DecodeError {
Incomplete(usize),
Incomplete(IncompleteError),
Utf8Error(FromUtf8Error),
UnknownResponseCode(u16),
UnsupportedResponseType(u16),
MismatchSize(usize),
MessageParse(String),
InvalidFormatCode(u8),
Empty,
}

Expand All @@ -28,3 +29,12 @@ impl From<FromUtf8Error> for DecodeError {
DecodeError::Utf8Error(err)
}
}

impl From<IncompleteError> for DecodeError {
fn from(err: IncompleteError) -> Self {
DecodeError::Incomplete(err)
}
}

#[derive(Debug)]
pub struct IncompleteError(pub usize);
2 changes: 2 additions & 0 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ mod response;
pub mod types;
pub use request::*;
pub use response::*;

pub mod utils;
100 changes: 0 additions & 100 deletions protocol/src/message.rs

This file was deleted.

Loading

0 comments on commit 100b040

Please sign in to comment.