diff --git a/cpp/include/raft/comms/detail/std_comms.hpp b/cpp/include/raft/comms/detail/std_comms.hpp index cb1accc95e..c5d64f6a29 100644 --- a/cpp/include/raft/comms/detail/std_comms.hpp +++ b/cpp/include/raft/comms/detail/std_comms.hpp @@ -307,13 +307,16 @@ class std_comms : public comms_iface { bool restart = false; // resets the timeout when any progress was made if (worker->isProgressThreadRunning()) { - // Wait for a UCXX progress thread roundtrip + // Wait for a UCXX progress thread roundtrip, prevent waiting for longer + // than 10ms for each operation, will retry in next iteration. ucxx::utils::CallbackNotifier callbackNotifierPre{}; - worker->registerGenericPre([&callbackNotifierPre]() { callbackNotifierPre.set(); }); + worker->registerGenericPre([&callbackNotifierPre]() { callbackNotifierPre.set(); }, + 10000000 /* 10ms */); callbackNotifierPre.wait(); ucxx::utils::CallbackNotifier callbackNotifierPost{}; - worker->registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); }); + worker->registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); }, + 10000000 /* 10ms */); callbackNotifierPost.wait(); } else { // Causes UCXX to progress through the send/recv message queue