Skip to content

Commit

Permalink
added new example resume sender
Browse files Browse the repository at this point in the history
  • Loading branch information
minghuaw committed Feb 23, 2024
1 parent 33a6d44 commit 4a383cd
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 23 deletions.
81 changes: 81 additions & 0 deletions examples/event_hubs/src/bin/resume_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::env;

use dotenv::dotenv;
use fe2o3_amqp::{
sasl_profile::SaslProfile,
types::{
messaging::{Message, Properties},
primitives::Binary,
},
Connection, Sender, Session,
};

#[tokio::main]
async fn main() {
dotenv().ok();
env_logger::init();

let hostname = env::var("HOST_NAME").unwrap();
let sa_key_name = env::var("SHARED_ACCESS_KEY_NAME").unwrap();
let sa_key_value = env::var("SHARED_ACCESS_KEY_VALUE").unwrap();
let event_hub_name = env::var("EVENT_HUB_NAME").unwrap();

let url = format!("amqps://{}", hostname);
let mut connection = Connection::builder()
.container_id("rust-connection-1")
.alt_tls_establishment(true) // EventHubs uses alternative TLS establishment
.sasl_profile(SaslProfile::Plain {
username: sa_key_name,
password: sa_key_value,
})
.open(&url[..])
.await
.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
let mut sender = Sender::attach(&mut session, "rust-simple-sender", event_hub_name)
.await
.unwrap();

// Message will be randomly distributed to different partitions
for i in 0..3 {
// All of the Microsoft AMQP clients represent the event body as an uninterpreted bag of bytes.
// A message builder can be used to specify the type of body section
let data = format!("Message {}", i).into_bytes();
let message = Message::builder()
.properties(
Properties::builder()
.group_id(String::from("send_to_event_hub"))
.build(),
)
.data(Binary::from(data))
.build();
let outcome = sender.send(message).await.unwrap();
outcome.accepted_or_else(|outcome| outcome).unwrap();
}

let detached = sender.detach().await.unwrap();

// Close the old session and create a new one
session.close().await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();

let mut sender = detached.resume_on_session(&session).await.unwrap();

for i in 10..13 {
let data = format!("Message {}", i).into_bytes();
let message = Message::builder()
.properties(
Properties::builder()
.group_id(String::from("send_to_event_hub"))
.build(),
)
.data(Binary::from(data))
.build();
let outcome = sender.send(message).await.unwrap();
outcome.accepted_or_else(|outcome| outcome).unwrap();
}

sender.close().await.unwrap();
session.end().await.unwrap();
connection.close().await.unwrap();
}
23 changes: 0 additions & 23 deletions examples/event_hubs/src/bin/simple_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use fe2o3_amqp::{
#[tokio::main]
async fn main() {
dotenv().ok();
env_logger::init();

let hostname = env::var("HOST_NAME").unwrap();
let sa_key_name = env::var("SHARED_ACCESS_KEY_NAME").unwrap();
Expand Down Expand Up @@ -53,28 +52,6 @@ async fn main() {
outcome.accepted_or_else(|outcome| outcome).unwrap();
}

let detached = sender.detach().await.unwrap();

// Close the old session and create a new one
session.close().await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();

let mut sender = detached.resume_on_session(&session).await.unwrap();

for i in 10..13 {
let data = format!("Message {}", i).into_bytes();
let message = Message::builder()
.properties(
Properties::builder()
.group_id(String::from("send_to_event_hub"))
.build(),
)
.data(Binary::from(data))
.build();
let outcome = sender.send(message).await.unwrap();
outcome.accepted_or_else(|outcome| outcome).unwrap();
}

sender.close().await.unwrap();
session.end().await.unwrap();
connection.close().await.unwrap();
Expand Down

0 comments on commit 4a383cd

Please sign in to comment.