diff --git a/Cargo.toml b/Cargo.toml index 441c88a..c22d884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ prost = "0.12" prost-types = "0.12" protobuf = { version = "3.3" } rand = "0.8.5" -up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "a30d3655ab13f8d97815280d718f4891f693ed2d" } +up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "c705ac97602ad6917a93d23651e8a504ec7bb718" } zenoh = { version = "0.10.1-rc", features = ["unstable"]} [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 3366035..bf323dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,11 +17,11 @@ pub mod utransport; use protobuf::{Enum, Message}; use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc, Mutex}, + sync::{Arc, Mutex}, }; use up_rust::{ - UAttributes, UAuthority, UCode, UEntity, UMessage, UPayloadFormat, UPriority, UResourceBuilder, - UStatus, UUri, + ComparableListener, UAttributes, UAuthority, UCode, UEntity, UListener, UPayloadFormat, + UPriority, UResourceBuilder, UStatus, UUri, }; use zenoh::{ config::Config, @@ -31,23 +31,22 @@ use zenoh::{ subscriber::Subscriber, }; -pub type UtransportListener = Box) + Send + Sync + 'static>; - const UATTRIBUTE_VERSION: u8 = 1; -pub struct ZenohListener {} +type SubscriberMap = Arc>>>; +type QueryableMap = Arc>>>; +type QueryMap = Arc>>; +type RpcCallbackMap = Arc>>>; pub struct UPClientZenoh { session: Arc, // Able to unregister Subscriber - subscriber_map: Arc>>>, + subscriber_map: SubscriberMap, // Able to unregister Queryable - queryable_map: Arc>>>, + queryable_map: QueryableMap, // Save the reqid to be able to send back response - query_map: Arc>>, + query_map: QueryMap, // Save the callback for RPC response - rpc_callback_map: Arc>>>, - // Used to identify different callback - callback_counter: AtomicU64, + rpc_callback_map: RpcCallbackMap, // Source UUri in RPC source_uuri: UUri, } @@ -117,7 +116,6 @@ impl UPClientZenoh { queryable_map: Arc::new(Mutex::new(HashMap::new())), query_map: Arc::new(Mutex::new(HashMap::new())), rpc_callback_map: Arc::new(Mutex::new(HashMap::new())), - callback_counter: AtomicU64::new(0), source_uuri, }) } diff --git a/src/rpc.rs b/src/rpc.rs index 03f3544..8361093 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -44,7 +44,7 @@ impl RpcClient for UPClientZenoh { // Create UAttributes and put into Zenoh user attachment let uattributes = UAttributes::request( - UUIDBuilder::new().build(), + UUIDBuilder::build(), topic, self.get_response_uuri(), options.clone(), diff --git a/src/utransport.rs b/src/utransport.rs index ba081ff..ff050f8 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -11,15 +11,13 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::{UPClientZenoh, UtransportListener}; +use crate::UPClientZenoh; +use async_std::task::block_on; use async_trait::async_trait; -use std::{ - sync::{atomic::Ordering, Arc}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use up_rust::{ - Data, UAttributes, UAttributesValidators, UCode, UMessage, UMessageType, UPayload, - UPayloadFormat, UStatus, UTransport, UUri, UriValidator, + ComparableListener, Data, UAttributes, UAttributesValidators, UCode, UListener, UMessage, + UMessageType, UPayload, UPayloadFormat, UStatus, UTransport, UUri, UriValidator, }; use zenoh::{ prelude::{r#async::*, Sample}, @@ -102,12 +100,11 @@ impl UPClientZenoh { log::error!("{msg}"); UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) })?; - let hashmap_key = UPClientZenoh::to_zenoh_key_string(&source_uuri)?; let resp_callback = self .rpc_callback_map .lock() .unwrap() - .get(&hashmap_key) + .get(&source_uuri) .ok_or_else(|| { let msg = "Unable to get callback".to_string(); log::error!("{msg}"); @@ -115,52 +112,59 @@ impl UPClientZenoh { })? .clone(); let zenoh_callback = move |reply: Reply| { - let msg = match reply.sample { + match reply.sample { Ok(sample) => { // Get the encoding of UPayload let Some(encoding) = UPClientZenoh::to_upayload_format(&sample.encoding) else { let msg = "Unable to get the encoding".to_string(); log::error!("{msg}"); - resp_callback(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + resp_callback.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; }; // Get UAttribute from the attachment let Some(attachment) = sample.attachment() else { let msg = "Unable to get the attachment".to_string(); log::error!("{msg}"); - resp_callback(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + resp_callback.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; }; let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { Ok(uattr) => uattr, Err(e) => { - let msg = - format!("Unable to transform attachment to UAttributes: {e:?}"); + let msg = format!("Transform attachment to UAttributes failed: {e:?}"); log::error!("{msg}"); - resp_callback(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + resp_callback + .on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; } }; // Create UMessage - Ok(UMessage { - attributes: Some(u_attribute).into(), - payload: Some(UPayload { - length: Some(0), - format: encoding.into(), - data: Some(Data::Value(sample.payload.contiguous().to_vec())), + block_on( + resp_callback.on_receive(UMessage { + attributes: Some(u_attribute).into(), + payload: Some(UPayload { + length: Some(0), + format: encoding.into(), + data: Some(Data::Value(sample.payload.contiguous().to_vec())), + ..Default::default() + }) + .into(), ..Default::default() - }) - .into(), - ..Default::default() - }) + }), + ); } Err(e) => { let msg = format!("Error while parsing Zenoh reply: {e:?}"); log::error!("{msg}"); - Err(UStatus::fail_with_code(UCode::INTERNAL, msg)) + block_on(resp_callback.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); } - }; - resp_callback(msg); + } }; // Send query @@ -249,25 +253,19 @@ impl UPClientZenoh { async fn register_publish_notification_listener( &self, topic: &UUri, - listener: Arc, - ) -> Result { + listener: Arc, + ) -> Result<(), UStatus> { // Get Zenoh key let zenoh_key = UPClientZenoh::to_zenoh_key_string(topic)?; - // Generate listener string for users to delete - let hashmap_key = format!( - "{}_{:X}", - zenoh_key, - self.callback_counter.fetch_add(1, Ordering::SeqCst) - ); - // Setup callback + let listener_cloned = listener.clone(); let callback = move |sample: Sample| { // Get the UAttribute from Zenoh user attachment let Some(attachment) = sample.attachment() else { let msg = "Unable to get attachment"; log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on(listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); return; }; let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { @@ -275,7 +273,9 @@ impl UPClientZenoh { Err(e) => { let msg = format!("Unable to transform attachment to UAttributes: {e:?}"); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; } }; @@ -283,7 +283,7 @@ impl UPClientZenoh { let Some(encoding) = UPClientZenoh::to_upayload_format(&sample.encoding) else { let msg = "Unable to get payload encoding"; log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on(listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); return; }; let u_payload = UPayload { @@ -298,7 +298,7 @@ impl UPClientZenoh { payload: Some(u_payload).into(), ..Default::default() }; - listener(Ok(msg)); + block_on(listener_cloned.on_receive(msg)); }; // Create Zenoh subscriber @@ -309,42 +309,36 @@ impl UPClientZenoh { .res() .await { - self.subscriber_map - .lock() - .unwrap() - .insert(hashmap_key.clone(), subscriber); + self.subscriber_map.lock().unwrap().insert( + (topic.clone(), ComparableListener::new(listener)), + subscriber, + ); } else { let msg = "Unable to register callback with Zenoh"; log::error!("{msg}"); return Err(UStatus::fail_with_code(UCode::INTERNAL, msg)); } - Ok(hashmap_key) + Ok(()) } async fn register_request_listener( &self, topic: &UUri, - listener: Arc, - ) -> Result { + listener: Arc, + ) -> Result<(), UStatus> { // Get Zenoh key let zenoh_key = UPClientZenoh::to_zenoh_key_string(topic)?; - // Generate listener string for users to delete - let hashmap_key = format!( - "{}_{:X}", - zenoh_key, - self.callback_counter.fetch_add(1, Ordering::SeqCst) - ); - // Setup callback + let listener_cloned = listener.clone(); let query_map = self.query_map.clone(); let callback = move |query: Query| { // Create UAttribute from Zenoh user attachment let Some(attachment) = query.attachment() else { let msg = "Unable to get attachment".to_string(); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on(listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg))); return; }; let u_attribute = match UPClientZenoh::attachment_to_uattributes(attachment) { @@ -352,7 +346,9 @@ impl UPClientZenoh { Err(e) => { let msg = format!("Unable to transform user attachment to UAttributes: {e:?}"); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; } }; @@ -362,7 +358,9 @@ impl UPClientZenoh { let Some(encoding) = UPClientZenoh::to_upayload_format(&value.encoding) else { let msg = "Unable to get payload encoding".to_string(); log::error!("{msg}"); - listener(Err(UStatus::fail_with_code(UCode::INTERNAL, msg))); + block_on( + listener_cloned.on_error(UStatus::fail_with_code(UCode::INTERNAL, msg)), + ); return; }; UPayload { @@ -389,7 +387,7 @@ impl UPClientZenoh { .lock() .unwrap() .insert(u_attribute.id.to_string(), query); - listener(Ok(msg)); + block_on(listener_cloned.on_receive(msg)); }; // Create Zenoh queryable @@ -400,34 +398,25 @@ impl UPClientZenoh { .res() .await { - self.queryable_map - .lock() - .unwrap() - .insert(hashmap_key.clone(), queryable); + self.queryable_map.lock().unwrap().insert( + (topic.clone(), ComparableListener::new(listener)), + queryable, + ); } else { let msg = "Unable to register callback with Zenoh".to_string(); log::error!("{msg}"); return Err(UStatus::fail_with_code(UCode::INTERNAL, msg)); } - Ok(hashmap_key) + Ok(()) } - fn register_response_listener( - &self, - topic: &UUri, - listener: Arc, - ) -> Result { - // Get Zenoh key - let zenoh_key = UPClientZenoh::to_zenoh_key_string(topic)?; - + fn register_response_listener(&self, topic: &UUri, listener: Arc) { // Store the response callback (Will be used in send_request) self.rpc_callback_map .lock() .unwrap() - .insert(zenoh_key.clone(), listener); - - Ok(zenoh_key) + .insert(topic.clone(), listener); } } @@ -527,24 +516,19 @@ impl UTransport for UPClientZenoh { async fn register_listener( &self, topic: UUri, - listener: Box) + Send + Sync + 'static>, - ) -> Result { - let listener = Arc::new(listener); + listener: Arc, + ) -> Result<(), UStatus> { if topic.authority.is_some() && topic.entity.is_none() && topic.resource.is_none() { // This is special UUri which means we need to register for all of Publish, Notification, Request, and Response // RPC response - let mut listener_str = self.register_response_listener(&topic, listener.clone())?; + self.register_response_listener(&topic, listener.clone()); // RPC request - listener_str += "&"; - listener_str += &self - .register_request_listener(&topic, listener.clone()) + self.register_request_listener(&topic, listener.clone()) .await?; // Publish & Notification - listener_str += "&"; - listener_str += &self - .register_publish_notification_listener(&topic, listener.clone()) + self.register_publish_notification_listener(&topic, listener.clone()) .await?; - Ok(listener_str) + Ok(()) } else { // Do the validation UriValidator::validate(&topic) @@ -552,7 +536,8 @@ impl UTransport for UPClientZenoh { if UriValidator::is_rpc_response(&topic) { // RPC response - self.register_response_listener(&topic, listener.clone()) + self.register_response_listener(&topic, listener.clone()); + Ok(()) } else if UriValidator::is_rpc_method(&topic) { // RPC request self.register_request_listener(&topic, listener.clone()) @@ -565,22 +550,20 @@ impl UTransport for UPClientZenoh { } } - async fn unregister_listener(&self, topic: UUri, listener: &str) -> Result<(), UStatus> { - let mut pub_listener_str: Option<&str> = None; - let mut req_listener_str: Option<&str> = None; - let mut resp_listener_str: Option<&str> = None; + async fn unregister_listener( + &self, + topic: UUri, + listener: Arc, + ) -> Result<(), UStatus> { + let mut remove_pub_listener = false; + let mut remove_req_listener = false; + let mut remove_resp_listener = false; if topic.authority.is_some() && topic.entity.is_none() && topic.resource.is_none() { // This is special UUri which means we need to unregister all listeners - let listener_vec = listener.split('&').collect::>(); - if listener_vec.len() != 3 { - let msg = "Invalid listener string".to_string(); - log::error!("{msg}"); - return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); - } - resp_listener_str = Some(listener_vec[0]); - req_listener_str = Some(listener_vec[1]); - pub_listener_str = Some(listener_vec[2]); + remove_pub_listener = true; + remove_req_listener = true; + remove_resp_listener = true; } else { // Do the validation UriValidator::validate(&topic).map_err(|_| { @@ -590,20 +573,20 @@ impl UTransport for UPClientZenoh { })?; if UriValidator::is_rpc_response(&topic) { - resp_listener_str = Some(listener); + remove_resp_listener = true; } else if UriValidator::is_rpc_method(&topic) { - req_listener_str = Some(listener); + remove_req_listener = true; } else { - pub_listener_str = Some(listener); + remove_pub_listener = true; } } - if let Some(listener) = resp_listener_str { + if remove_resp_listener { // RPC response if self .rpc_callback_map .lock() .unwrap() - .remove(listener) + .remove(&topic) .is_none() { let msg = "RPC response callback doesn't exist".to_string(); @@ -611,13 +594,13 @@ impl UTransport for UPClientZenoh { return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); } } - if let Some(listener) = req_listener_str { + if remove_req_listener { // RPC request if self .queryable_map .lock() .unwrap() - .remove(listener) + .remove(&(topic.clone(), ComparableListener::new(listener.clone()))) .is_none() { let msg = "RPC request listener doesn't exist".to_string(); @@ -625,13 +608,13 @@ impl UTransport for UPClientZenoh { return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)); } } - if let Some(listener) = pub_listener_str { + if remove_pub_listener { // Normal publish if self .subscriber_map .lock() .unwrap() - .remove(listener) + .remove(&(topic.clone(), ComparableListener::new(listener.clone()))) .is_none() { let msg = "Publish listener doesn't exist".to_string(); diff --git a/tests/publish.rs b/tests/publish.rs index b002913..f2b970c 100644 --- a/tests/publish.rs +++ b/tests/publish.rs @@ -14,111 +14,127 @@ pub mod test_lib; use async_std::task; +use async_trait::async_trait; use std::{ sync::{Arc, Mutex}, time, }; -use up_rust::{Data, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder}; +use test_case::test_case; +use up_rust::{ + Data, UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUIDBuilder, + UUri, +}; +struct PublishNotificationListener { + recv_data: Arc>, +} +impl PublishNotificationListener { + fn new() -> Self { + PublishNotificationListener { + recv_data: Arc::new(Mutex::new(String::new())), + } + } + fn get_recv_data(&self) -> String { + self.recv_data.lock().unwrap().clone() + } +} +#[async_trait] +impl UListener for PublishNotificationListener { + async fn on_receive(&self, msg: UMessage) { + if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { + let value = v.into_iter().map(|c| c as char).collect::(); + *self.recv_data.lock().unwrap() = value; + } else { + panic!("The message should be Data::Value type."); + } + } + async fn on_error(&self, err: UStatus) { + panic!("Internal Error: {err:?}"); + } +} + +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(0), 0, 0); "Normal UUri")] +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_special_uuri(0); "Special UUri")] #[async_std::test] -async fn test_publish_and_subscribe() { +async fn test_publish_and_subscribe(publish_uuri: UUri, listen_uuri: UUri) { test_lib::before_test(); // Initialization let target_data = String::from("Hello World!"); - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_utransport_uuri(0); - let verified_data = Arc::new(Mutex::new(String::new())); + let upclient_send = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let upclient_recv = test_lib::create_up_client_zenoh(1, 1).await.unwrap(); // Register the listener - let verified_data_cloned = verified_data.clone(); - let listener = move |result: Result| match result { - Ok(msg) => { - if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - *verified_data_cloned.lock().unwrap() = value; - } else { - panic!("The message should be Data::Value type."); - } - } - Err(ustatus) => panic!("Internal Error: {ustatus:?}"), - }; - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(listener)) + let pub_listener = Arc::new(PublishNotificationListener::new()); + upclient_recv + .register_listener(listen_uuri.clone(), pub_listener.clone()) .await .unwrap(); + // Waiting for listener to take effect + task::sleep(time::Duration::from_millis(1000)).await; // Send UMessage - let umessage = UMessageBuilder::publish(uuri.clone()) - .with_message_id(UUIDBuilder::new().build()) + let umessage = UMessageBuilder::publish(publish_uuri.clone()) + .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, ) .unwrap(); - upclient.send(umessage).await.unwrap(); + upclient_send.send(umessage).await.unwrap(); // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; // Compare the result - assert_eq!(*verified_data.lock().unwrap(), target_data); + assert_eq!(pub_listener.get_recv_data(), target_data); // Cleanup - upclient - .unregister_listener(uuri.clone(), &listener_string) + upclient_recv + .unregister_listener(listen_uuri.clone(), pub_listener) .await .unwrap(); } +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(1), 1, 1), test_lib::create_utransport_uuri(Some(1), 1, 1); "Normal UUri")] +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0), test_lib::create_utransport_uuri(Some(1), 1, 1), test_lib::create_special_uuri(1); "Special UUri")] #[async_std::test] -async fn test_notification_and_subscribe() { +async fn test_notification_and_subscribe(origin_uuri: UUri, dst_uuri: UUri, listen_uuri: UUri) { test_lib::before_test(); // Initialization let target_data = String::from("Hello World!"); - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let src_uuri = test_lib::create_utransport_uuri(0); - let dst_uuri = test_lib::create_utransport_uuri(1); - let verified_data = Arc::new(Mutex::new(String::new())); + let upclient_notify = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let upclient_recv = test_lib::create_up_client_zenoh(1, 1).await.unwrap(); // Register the listener - let verified_data_cloned = verified_data.clone(); - let listener = move |result: Result| match result { - Ok(msg) => { - if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - *verified_data_cloned.lock().unwrap() = value; - } else { - panic!("The message should be Data::Value type."); - } - } - Err(ustatus) => panic!("Internal Error: {ustatus:?}"), - }; - let listener_string = upclient - .register_listener(dst_uuri.clone(), Box::new(listener)) + let notification_listener = Arc::new(PublishNotificationListener::new()); + upclient_recv + .register_listener(listen_uuri.clone(), notification_listener.clone()) .await .unwrap(); + // Waiting for listener to take effect + task::sleep(time::Duration::from_millis(1000)).await; // Send UMessage - let umessage = UMessageBuilder::notification(src_uuri.clone(), dst_uuri.clone()) - .with_message_id(UUIDBuilder::new().build()) + let umessage = UMessageBuilder::notification(origin_uuri.clone(), dst_uuri.clone()) + .with_message_id(UUIDBuilder::build()) .build_with_payload( target_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, ) .unwrap(); - upclient.send(umessage).await.unwrap(); + upclient_notify.send(umessage).await.unwrap(); // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; // Compare the result - assert_eq!(*verified_data.lock().unwrap(), target_data); + assert_eq!(notification_listener.get_recv_data(), target_data); // Cleanup - upclient - .unregister_listener(dst_uuri.clone(), &listener_string) + upclient_recv + .unregister_listener(listen_uuri.clone(), notification_listener) .await .unwrap(); } diff --git a/tests/register.rs b/tests/register.rs index b3f44a4..e48406e 100644 --- a/tests/register.rs +++ b/tests/register.rs @@ -13,109 +13,45 @@ // pub mod test_lib; -use up_rust::{UCode, UStatus, UTransport}; +use std::sync::Arc; -#[async_std::test] -async fn test_utransport_register_and_unregister() { - test_lib::before_test(); - - // Initialization - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_utransport_uuri(0); - - // Compare the return string - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(|_| {})) - .await - .unwrap(); - assert_eq!(listener_string, "upl/0100162e04d20100_0"); - - // Able to ungister - upclient - .unregister_listener(uuri.clone(), &listener_string) - .await - .unwrap(); +use async_trait::async_trait; +use test_case::test_case; +use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; - // Unable to ungister - let result = upclient - .unregister_listener(uuri.clone(), &listener_string) - .await; - assert_eq!( - result, - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Publish listener doesn't exist" - )) - ); +struct FooListener; +#[async_trait] +impl UListener for FooListener { + async fn on_receive(&self, _msg: UMessage) {} + async fn on_error(&self, _err: UStatus) {} } +#[test_case(test_lib::create_utransport_uuri(Some(0), 0, 0); "Publish / Notification register_listener")] +#[test_case(test_lib::create_rpcserver_uuri(Some(0), 0); "RPC register_listener")] +#[test_case(test_lib::create_special_uuri(0); "Special UUri register_listener")] #[async_std::test] -async fn test_rpcserver_register_and_unregister() { +async fn test_register_and_unregister(uuri: UUri) { test_lib::before_test(); // Initialization - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_rpcserver_uuri(); - - // Compare the return string - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(|_| {})) - .await - .unwrap(); - assert_eq!(listener_string, "upl/0100162e04d20100_0"); + let upclient = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let foo_listener = Arc::new(FooListener); - // Able to ungister + // Register the listener upclient - .unregister_listener(uuri.clone(), &listener_string) - .await - .unwrap(); - - // Unable to ungister - let result = upclient - .unregister_listener(uuri.clone(), &listener_string) - .await; - assert_eq!( - result, - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "RPC request listener doesn't exist" - )) - ); -} - -#[async_std::test] -async fn test_utransport_special_uuri_register_and_unregister() { - test_lib::before_test(); - - // Initialization - let upclient = test_lib::create_up_client_zenoh().await.unwrap(); - let uuri = test_lib::create_special_uuri(); - - // Compare the return string - let listener_string = upclient - .register_listener(uuri.clone(), Box::new(|_| {})) + .register_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); - assert_eq!( - listener_string, - "upr/060102030a0b0c/**&upr/060102030a0b0c/**_0&upr/060102030a0b0c/**_1" - ); // Able to ungister upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await .unwrap(); // Unable to ungister let result = upclient - .unregister_listener(uuri.clone(), &listener_string) + .unregister_listener(uuri.clone(), foo_listener.clone()) .await; - assert_eq!( - result, - Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "RPC response callback doesn't exist" - )) - ); + assert!(result.is_err()); } diff --git a/tests/rpc.rs b/tests/rpc.rs index ddb665d..27cdd36 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -14,62 +14,114 @@ pub mod test_lib; use async_std::task::{self, block_on}; -use std::{sync::Arc, time}; +use async_trait::async_trait; +use std::{ + sync::{Arc, Mutex}, + time, +}; +use test_case::test_case; +use up_client_zenoh::UPClientZenoh; use up_rust::{ - CallOptions, Data, RpcClient, UMessage, UMessageBuilder, UPayload, UPayloadFormat, UStatus, - UTransport, UUIDBuilder, + CallOptions, Data, RpcClient, UListener, UMessage, UMessageBuilder, UPayload, UPayloadFormat, + UStatus, UTransport, UUIDBuilder, UUri, }; +struct RequestListener { + up_client: Arc, + request_data: String, + response_data: String, +} +impl RequestListener { + fn new(up_client: Arc, request_data: String, response_data: String) -> Self { + RequestListener { + up_client, + request_data, + response_data, + } + } +} +#[async_trait] +impl UListener for RequestListener { + async fn on_receive(&self, msg: UMessage) { + let UMessage { + attributes, + payload, + .. + } = msg; + // Check the payload of request + if let Data::Value(v) = payload.unwrap().data.unwrap() { + let value = v.into_iter().map(|c| c as char).collect::(); + assert_eq!(self.request_data, value); + } else { + panic!("The message should be Data::Value type."); + } + // Send back result + let umessage = UMessageBuilder::response_for_request(&attributes) + .with_message_id(UUIDBuilder::build()) + .build_with_payload( + self.response_data.as_bytes().to_vec().into(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) + .unwrap(); + block_on(self.up_client.send(umessage)).unwrap(); + } + async fn on_error(&self, err: UStatus) { + panic!("Internal Error: {err:?}"); + } +} + +struct ResponseListener { + response_data: Arc>, +} +impl ResponseListener { + fn new() -> Self { + ResponseListener { + response_data: Arc::new(Mutex::new(String::new())), + } + } + fn get_response_data(&self) -> String { + self.response_data.lock().unwrap().clone() + } +} +#[async_trait] +impl UListener for ResponseListener { + async fn on_receive(&self, msg: UMessage) { + let UMessage { payload, .. } = msg; + // Check the response data + if let Data::Value(v) = payload.unwrap().data.unwrap() { + let value = v.into_iter().map(|c| c as char).collect::(); + *self.response_data.lock().unwrap() = value; + } else { + panic!("The message should be Data::Value type."); + } + } + async fn on_error(&self, _err: UStatus) { + //panic!("Internal Error: {err:?}"); + } +} + +#[test_case(test_lib::create_rpcserver_uuri(Some(1), 1), test_lib::create_rpcserver_uuri(Some(1), 1); "Normal RPC UUri")] +#[test_case(test_lib::create_rpcserver_uuri(Some(1), 1), test_lib::create_special_uuri(1); "Special listen UUri")] #[async_std::test] -async fn test_rpc_server_client() { +async fn test_rpc_server_client(dst_uuri: UUri, listen_uuri: UUri) { test_lib::before_test(); // Initialization - let upclient_client = test_lib::create_up_client_zenoh().await.unwrap(); - let upclient_server = Arc::new(test_lib::create_up_client_zenoh().await.unwrap()); + let upclient_client = test_lib::create_up_client_zenoh(0, 0).await.unwrap(); + let upclient_server = Arc::new(test_lib::create_up_client_zenoh(1, 1).await.unwrap()); let request_data = String::from("This is the request data"); let response_data = String::from("This is the response data"); - let dst_uuri = test_lib::create_rpcserver_uuri(); // Setup RpcServer callback - let upclient_server_cloned = upclient_server.clone(); - let response_data_cloned = response_data.clone(); - let request_data_cloned = request_data.clone(); - let callback = move |result: Result| { - match result { - Ok(msg) => { - let UMessage { - attributes, - payload, - .. - } = msg; - // Check the payload of request - if let Data::Value(v) = payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(request_data_cloned, value); - } else { - panic!("The message should be Data::Value type."); - } - // Send back result - let umessage = UMessageBuilder::response_for_request(&attributes) - .with_message_id(UUIDBuilder::new().build()) - .build_with_payload( - response_data_cloned.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - block_on(upclient_server_cloned.send(umessage)).unwrap(); - } - Err(ustatus) => { - panic!("Internal Error: {ustatus:?}"); - } - } - }; - let listener_string = upclient_server - .register_listener(dst_uuri.clone(), Box::new(callback)) + let request_listener = Arc::new(RequestListener::new( + upclient_server.clone(), + request_data.clone(), + response_data.clone(), + )); + upclient_server + .register_listener(listen_uuri.clone(), request_listener.clone()) .await .unwrap(); - // Need some time for queryable to run task::sleep(time::Duration::from_millis(1000)).await; @@ -103,32 +155,16 @@ async fn test_rpc_server_client() { // Send Request with send { // Register Response callback - let callback = move |result: Result| { - match result { - Ok(msg) => { - let UMessage { payload, .. } = msg; - // Check the response data - if let Data::Value(v) = payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(response_data, value); - } else { - panic!("The message should be Data::Value type."); - } - } - Err(ustatus) => { - panic!("Internal Error: {ustatus:?}"); - } - } - }; let response_uuri = upclient_client.get_response_uuri(); + let response_listener = Arc::new(ResponseListener::new()); upclient_client - .register_listener(response_uuri.clone(), Box::new(callback)) + .register_listener(response_uuri.clone(), response_listener.clone()) .await .unwrap(); // Send request - let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri, 3000) - .with_message_id(UUIDBuilder::new().build()) + let umessage = UMessageBuilder::request(dst_uuri.clone(), response_uuri.clone(), 1000) + .with_message_id(UUIDBuilder::build()) .build_with_payload( request_data.as_bytes().to_vec().into(), UPayloadFormat::UPAYLOAD_FORMAT_TEXT, @@ -137,12 +173,21 @@ async fn test_rpc_server_client() { upclient_client.send(umessage).await.unwrap(); // Waiting for the callback to process data - task::sleep(time::Duration::from_millis(1000)).await; + task::sleep(time::Duration::from_millis(5000)).await; + + // Compare the result + assert_eq!(response_listener.get_response_data(), response_data); + + // Cleanup + upclient_client + .unregister_listener(response_uuri.clone(), response_listener.clone()) + .await + .unwrap(); } // Cleanup upclient_server - .unregister_listener(dst_uuri.clone(), &listener_string) + .unregister_listener(listen_uuri.clone(), request_listener.clone()) .await .unwrap(); } diff --git a/tests/special_uuri.rs b/tests/special_uuri.rs deleted file mode 100644 index b1e6678..0000000 --- a/tests/special_uuri.rs +++ /dev/null @@ -1,143 +0,0 @@ -// -// Copyright (c) 2024 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -pub mod test_lib; - -use async_std::task::{self, block_on}; -use std::{sync::Arc, time}; -use up_rust::{ - CallOptions, Data, RpcClient, UMessage, UMessageBuilder, UMessageType, UPayload, - UPayloadFormat, UStatus, UTransport, UUIDBuilder, -}; - -#[async_std::test] -async fn test_register_listener_with_special_uuri() { - test_lib::before_test(); - - // Initialization - let upclient1 = Arc::new(test_lib::create_up_client_zenoh().await.unwrap()); - let upclient1_clone = upclient1.clone(); - let upclient2 = test_lib::create_up_client_zenoh().await.unwrap(); - let publish_data = String::from("Hello World!"); - let request_data = String::from("This is the request data"); - let response_data = String::from("This is the request data"); - - // Register the listener - let publish_data_cloned = publish_data.clone(); - let request_data_cloned = request_data.clone(); - let response_data_cloned = response_data.clone(); - let listener_uuri = test_lib::create_special_uuri(); - let listener = move |result: Result| match result { - Ok(msg) => { - let UMessage { - attributes, - payload, - .. - } = msg; - let value = if let Data::Value(v) = payload.clone().unwrap().data.unwrap() { - v.into_iter().map(|c| c as char).collect::() - } else { - panic!("The message should be Data::Value type."); - }; - match attributes.type_.enum_value().unwrap() { - UMessageType::UMESSAGE_TYPE_PUBLISH => { - assert_eq!(publish_data_cloned, value); - } - UMessageType::UMESSAGE_TYPE_NOTIFICATION => { - panic!("Notification type"); - } - UMessageType::UMESSAGE_TYPE_REQUEST => { - assert_eq!(request_data_cloned, value); - // Send back result - let umessage = UMessageBuilder::response_for_request(&attributes) - .with_message_id(UUIDBuilder::new().build()) - .build_with_payload( - response_data_cloned.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - block_on(upclient1_clone.send(umessage)).unwrap(); - } - UMessageType::UMESSAGE_TYPE_RESPONSE => { - panic!("Response type"); - } - UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { - panic!("Unknown type"); - } - } - } - Err(ustatus) => panic!("Internal Error: {ustatus:?}"), - }; - let listener_string = upclient1 - .register_listener(listener_uuri.clone(), Box::new(listener)) - .await - .unwrap(); - - // Send Publish - { - // Initialization - let mut publish_uuri = test_lib::create_utransport_uuri(0); - publish_uuri.authority = Some(test_lib::create_authority()).into(); - - // Send Publish data - let umessage = UMessageBuilder::publish(publish_uuri) - .with_message_id(UUIDBuilder::new().build()) - .build_with_payload( - publish_data.as_bytes().to_vec().into(), - UPayloadFormat::UPAYLOAD_FORMAT_TEXT, - ) - .unwrap(); - upclient2.send(umessage).await.unwrap(); - - // Waiting for the subscriber to receive data - task::sleep(time::Duration::from_millis(1000)).await; - } - - // Send Request - { - // Initialization - let mut request_uuri = test_lib::create_rpcserver_uuri(); - request_uuri.authority = Some(test_lib::create_authority()).into(); - - // RpcClient: Send Request data - let payload = UPayload { - format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), - data: Some(Data::Value(request_data.as_bytes().to_vec())), - ..Default::default() - }; - let result = upclient2 - .invoke_method( - request_uuri, - payload, - CallOptions { - ttl: 1000, - ..Default::default() - }, - ) - .await; - - // Process the result - if let Data::Value(v) = result.unwrap().payload.unwrap().data.unwrap() { - let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(response_data, value); - } else { - panic!("Failed to get result from invoke_method."); - } - } - - // Cleanup - upclient1 - .unregister_listener(listener_uuri, &listener_string) - .await - .unwrap(); -} diff --git a/tests/test_lib.rs b/tests/test_lib.rs index 95a6342..7bf056f 100644 --- a/tests/test_lib.rs +++ b/tests/test_lib.rs @@ -17,82 +17,85 @@ use up_rust::{Number, UAuthority, UEntity, UResource, UResourceBuilder, UStatus, use zenoh::config::Config; static INIT: Once = Once::new(); +static AUTH_NAME: &str = "auth_name"; +static ENTITY_NAME: &str = "entity_name"; +static RESOURCE_NAME: &str = "resource_name"; +static INSTANCE_NAME: &str = "instance_name"; +static MESSAGE_NAME: &str = "message_name"; pub fn before_test() { INIT.call_once(env_logger::init); } -/// # Errors -/// Will return `Err` if unable to create `UPClientZenoh` -pub async fn create_up_client_zenoh() -> Result { - let uauthority = UAuthority { - name: Some("MyAuthName".to_string()), - number: Some(Number::Id(vec![1, 2, 3, 4])), +#[allow(clippy::must_use_candidate)] +pub fn create_authority(idx: u8) -> UAuthority { + UAuthority { + name: Some(format!("{AUTH_NAME}{idx}")), + number: Some(Number::Id(vec![1, 2, 3, 4 + idx])), ..Default::default() - }; - let uentity = UEntity { - name: "default.entity".to_string(), - id: Some(u32::from(rand::random::())), + } +} + +#[allow(clippy::must_use_candidate)] +pub fn create_entity(idx: u32) -> UEntity { + UEntity { + name: format!("{ENTITY_NAME}{idx}"), + id: Some(idx), version_major: Some(1), version_minor: None, ..Default::default() - }; + } +} + +#[allow(clippy::must_use_candidate)] +pub fn create_resource(idx: u32) -> UResource { + UResource { + name: format!("{RESOURCE_NAME}{idx}"), + instance: Some(format!("{INSTANCE_NAME}{idx}")), + message: Some(format!("{MESSAGE_NAME}{idx}")), + id: Some(idx), + ..Default::default() + } +} + +/// # Errors +/// Will return `Err` if unable to create `UPClientZenoh` +pub async fn create_up_client_zenoh( + auth_idx: u8, + entity_idx: u32, +) -> Result { + let uauthority = create_authority(auth_idx); + let uentity = create_entity(entity_idx); UPClientZenoh::new(Config::default(), uauthority, uentity).await } #[allow(clippy::must_use_candidate)] -pub fn create_utransport_uuri(index: u8) -> UUri { - if index == 1 { - UUri { - entity: Some(UEntity { - name: "entity1".to_string(), - version_major: Some(1), - id: Some(1111), - ..Default::default() - }) - .into(), - resource: Some(UResource { - name: "name1".to_string(), - instance: Some("instance1".to_string()), - message: Some("message1".to_string()), - id: Some(1111), - ..Default::default() - }) - .into(), - ..Default::default() - } - } else { - UUri { - entity: Some(UEntity { - name: "body.access".to_string(), - version_major: Some(1), - id: Some(1234), - ..Default::default() - }) - .into(), - resource: Some(UResource { - name: "door".to_string(), - instance: Some("front_left".to_string()), - message: Some("Door".to_string()), - id: Some(5678), - ..Default::default() - }) - .into(), - ..Default::default() - } +pub fn create_utransport_uuri(auth_idx: Option, entity_idx: u32, resource_idx: u32) -> UUri { + UUri { + authority: if let Some(idx) = auth_idx { + // Remote UUri + Some(create_authority(idx)).into() + } else { + // Local UUri + None.into() + }, + entity: Some(create_entity(entity_idx)).into(), + resource: Some(create_resource(resource_idx)).into(), + ..Default::default() } } #[allow(clippy::must_use_candidate)] -pub fn create_rpcserver_uuri() -> UUri { +pub fn create_rpcserver_uuri(auth_idx: Option, entity_idx: u32) -> UUri { UUri { - entity: Some(UEntity { - name: "test_rpc.app".to_string(), - version_major: Some(1), - id: Some(1234), - ..Default::default() - }) - .into(), + authority: if let Some(idx) = auth_idx { + // Remote UUri + Some(create_authority(idx)).into() + } else { + // Local UUri + None.into() + }, + entity: Some(create_entity(entity_idx)).into(), resource: Some(UResourceBuilder::for_rpc_request( Some("SimpleTest".to_string()), Some(5678), @@ -103,18 +106,9 @@ pub fn create_rpcserver_uuri() -> UUri { } #[allow(clippy::must_use_candidate)] -pub fn create_authority() -> UAuthority { - UAuthority { - name: Some("UAuthName".to_string()), - number: Some(Number::Id(vec![1, 2, 3, 10, 11, 12])), - ..Default::default() - } -} - -#[allow(clippy::must_use_candidate)] -pub fn create_special_uuri() -> UUri { +pub fn create_special_uuri(idx: u8) -> UUri { UUri { - authority: Some(create_authority()).into(), + authority: Some(create_authority(idx)).into(), ..Default::default() } }