Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RAII for subscription, client, and service #463

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rosidl_runtime_rs::Message;
use crate::{
error::{RclReturnCode, ToResult},
rcl_bindings::*,
MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
Expand Down Expand Up @@ -76,14 +76,18 @@ where
pub(crate) handle: Arc<ClientHandle>,
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
/// Ensure the parent node remains alive as long as the subscription is held.
/// This implementation will change in the future.
#[allow(unused)]
node: Arc<Node>,
}

impl<T> Client<T>
where
T: rosidl_runtime_rs::Service,
{
/// Creates a new client.
pub(crate) fn new(node_handle: Arc<NodeHandle>, topic: &str) -> Result<Self, RclrsError>
pub(crate) fn new(node: &Arc<Node>, topic: &str) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
Expand All @@ -102,7 +106,7 @@ where
let client_options = unsafe { rcl_client_get_default_options() };

{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let rcl_node = node.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();

// SAFETY:
Expand All @@ -126,7 +130,7 @@ where

let handle = Arc::new(ClientHandle {
rcl_client: Mutex::new(rcl_client),
node_handle,
node_handle: Arc::clone(&node.handle),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand All @@ -136,6 +140,7 @@ where
futures: Arc::new(Mutex::new(
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
)),
node: Arc::clone(node),
})
}

Expand Down
21 changes: 6 additions & 15 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ impl Node {
///
/// [1]: crate::Client
// TODO: make client's lifetime depend on node's lifetime
pub fn create_client<T>(&self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
pub fn create_client<T>(self: &Arc<Self>, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(Client::<T>::new(Arc::clone(&self.handle), topic)?);
let client = Arc::new(Client::<T>::new(self, topic)?);
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
}
Expand Down Expand Up @@ -292,19 +292,15 @@ impl Node {
///
/// [1]: crate::Service
pub fn create_service<T, F>(
&self,
self: &Arc<Self>,
topic: &str,
callback: F,
) -> Result<Arc<Service<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let service = Arc::new(Service::<T>::new(
Arc::clone(&self.handle),
topic,
callback,
)?);
let service = Arc::new(Service::<T>::new(self, topic, callback)?);
{ self.services_mtx.lock().unwrap() }
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
Ok(service)
Expand All @@ -314,20 +310,15 @@ impl Node {
///
/// [1]: crate::Subscription
pub fn create_subscription<T, Args>(
&self,
self: &Arc<Self>,
topic: &str,
qos: QoSProfile,
callback: impl SubscriptionCallback<T, Args>,
) -> Result<Arc<Subscription<T>>, RclrsError>
where
T: Message,
{
let subscription = Arc::new(Subscription::<T>::new(
Arc::clone(&self.handle),
topic,
qos,
callback,
)?);
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
{ self.subscriptions_mtx.lock() }
.unwrap()
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ impl ParameterInterface {
}
}

pub(crate) fn create_services(&self, node: &Node) -> Result<(), RclrsError> {
pub(crate) fn create_services(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
*self.services.lock().unwrap() =
Some(ParameterService::new(node, self.parameter_map.clone())?);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/parameter/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn set_parameters_atomically(

impl ParameterService {
pub(crate) fn new(
node: &Node,
node: &Arc<Node>,
parameter_map: Arc<Mutex<ParameterMap>>,
) -> Result<Self, RclrsError> {
let fqn = node.fully_qualified_name();
Expand Down
17 changes: 9 additions & 8 deletions rclrs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rosidl_runtime_rs::Message;
use crate::{
error::{RclReturnCode, ToResult},
rcl_bindings::*,
MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
Expand Down Expand Up @@ -73,18 +73,18 @@ where
pub(crate) handle: Arc<ServiceHandle>,
/// The callback function that runs when a request was received.
pub callback: Mutex<ServiceCallback<T::Request, T::Response>>,
/// Ensure the parent node remains alive as long as the subscription is held.
/// This implementation will change in the future.
#[allow(unused)]
node: Arc<Node>,
}

impl<T> Service<T>
where
T: rosidl_runtime_rs::Service,
{
/// Creates a new service.
pub(crate) fn new<F>(
node_handle: Arc<NodeHandle>,
topic: &str,
callback: F,
) -> Result<Self, RclrsError>
pub(crate) fn new<F>(node: &Arc<Node>, topic: &str, callback: F) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_service`], see the struct's documentation for the rationale
where
Expand All @@ -104,7 +104,7 @@ where
let service_options = unsafe { rcl_service_get_default_options() };

{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let rcl_node = node.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
// SAFETY:
Expand All @@ -127,12 +127,13 @@ where

let handle = Arc::new(ServiceHandle {
rcl_service: Mutex::new(rcl_service),
node_handle,
node_handle: Arc::clone(&node.handle),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Ok(Self {
handle,
node: Arc::clone(node),
callback: Mutex::new(Box::new(callback)),
})
}
Expand Down
50 changes: 46 additions & 4 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
error::{RclReturnCode, ToResult},
qos::QoSProfile,
rcl_bindings::*,
NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
};

mod callback;
Expand Down Expand Up @@ -84,6 +84,10 @@ where
pub(crate) handle: Arc<SubscriptionHandle>,
/// The callback function that runs when a message was received.
pub callback: Mutex<AnySubscriptionCallback<T>>,
/// Ensure the parent node remains alive as long as the subscription is held.
/// This implementation will change in the future.
#[allow(unused)]
node: Arc<Node>,
message: PhantomData<T>,
}

Expand All @@ -93,7 +97,7 @@ where
{
/// Creates a new subscription.
pub(crate) fn new<Args>(
node_handle: Arc<NodeHandle>,
node: &Arc<Node>,
topic: &str,
qos: QoSProfile,
callback: impl SubscriptionCallback<T, Args>,
Expand All @@ -117,7 +121,7 @@ where
subscription_options.qos = qos.into();

{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let rcl_node = node.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
// SAFETY:
Expand All @@ -139,13 +143,14 @@ where

let handle = Arc::new(SubscriptionHandle {
rcl_subscription: Mutex::new(rcl_subscription),
node_handle,
node_handle: Arc::clone(&node.handle),
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Ok(Self {
handle,
callback: Mutex::new(callback.into_callback()),
node: Arc::clone(node),
message: PhantomData,
})
}
Expand Down Expand Up @@ -396,4 +401,41 @@ mod tests {
);
Ok(())
}

#[test]
fn test_node_subscription_raii() {
use crate::*;
use std::sync::atomic::Ordering;

let mut executor = Context::default().create_basic_executor();

let triggered = Arc::new(AtomicBool::new(false));
let inner_triggered = Arc::clone(&triggered);
let callback = move |_: msg::Empty| {
inner_triggered.store(true, Ordering::Release);
};

let (_subscription, publisher) = {
let node = executor
.create_node(&format!("test_node_subscription_raii_{}", line!()))
.unwrap();

let qos = QoSProfile::default().keep_all().reliable();
let subscription = node
.create_subscription::<msg::Empty, _>("test_topic", qos, callback)
.unwrap();
let publisher = node
.create_publisher::<msg::Empty>("test_topic", qos)
.unwrap();

(subscription, publisher)
};

publisher.publish(msg::Empty::default()).unwrap();
let start_time = std::time::Instant::now();
while !triggered.load(Ordering::Acquire) {
assert!(executor.spin(SpinOptions::spin_once()).is_empty());
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
}
}
}