Skip to content

Commit

Permalink
Added handling of the case where we're within a Tokio Runtime or not (#…
Browse files Browse the repository at this point in the history
…50)

* Added handling of the case where we're within a Tokio Runtime or not based on a check
* if we're within a Tokio Runtime use the current handle
* if not, use a dedicated Tokio Runtime we have (e.g. we're within an async-std executor)

* Reorganize the crate path

Signed-off-by: ChenYing Kuo <evshary@gmail.com>

---------

Signed-off-by: ChenYing Kuo <evshary@gmail.com>
Co-authored-by: ChenYing Kuo <evshary@gmail.com>
  • Loading branch information
PLeVasseur and evshary authored Jun 26, 2024
1 parent 4607454 commit b977153
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions src/utransport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@
********************************************************************************/
use crate::{MessageFlag, UPClientZenoh, CB_RUNTIME};
use async_trait::async_trait;
use std::{sync::Arc, time::Duration};
use tokio::{runtime::Handle, task};
use lazy_static::lazy_static;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{
runtime::{Handle, Runtime},
task,
};
use up_rust::{
ComparableListener, UAttributes, UAttributesValidators, UCode, UListener, UMessage,
UMessageType, UStatus, UTransport, UUri,
Expand All @@ -24,20 +31,42 @@ use zenoh::{
queryable::Query,
};

lazy_static! {
static ref TOKIO_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
}

#[inline]
fn invoke_block_callback(listener: &Arc<dyn UListener>, resp_msg: Result<UMessage, &str>) {
match resp_msg {
Ok(umsg) => {
task::block_in_place(|| {
Handle::current().block_on(listener.on_receive(umsg));
});
}
Ok(umsg) => match Handle::try_current() {
Ok(handle) => {
task::block_in_place(|| {
handle.block_on(listener.on_receive(umsg));
});
}
Err(_) => {
TOKIO_RUNTIME
.lock()
.unwrap()
.block_on(listener.on_receive(umsg));
}
},
Err(err_msg) => {
log::error!("{err_msg}");
task::block_in_place(|| {
Handle::current()
.block_on(listener.on_error(UStatus::fail_with_code(UCode::INTERNAL, err_msg)));
});
match Handle::try_current() {
Ok(handle) => {
task::block_in_place(|| {
handle.block_on(
listener.on_error(UStatus::fail_with_code(UCode::INTERNAL, err_msg)),
);
});
}
Err(_) => {
TOKIO_RUNTIME.lock().unwrap().block_on(
listener.on_error(UStatus::fail_with_code(UCode::INTERNAL, err_msg)),
);
}
}
}
}
}
Expand Down

0 comments on commit b977153

Please sign in to comment.