Skip to content

Commit

Permalink
Use lock for sync_wait completion (#272)
Browse files Browse the repository at this point in the history
* Use lock for sync_wait completion

* release/acquire memory ordering has a race condition
* also reproduced on seq_cst
* requiring a lock around the std::condition_variable to properly and always wake up the waiting sync_wait thread, this is necessary for correctness over speed

Closes #270
  • Loading branch information
jbaldwin authored Aug 2, 2024
1 parent b698069 commit ee02dc8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
2 changes: 1 addition & 1 deletion include/coro/sync_wait.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ auto sync_wait(awaitable_type&& a) -> decltype(auto)
// For non-trivial types (or possibly types that don't fit in a register)
// the compiler will end up calling the ~return_type() when the promise
// is destructed at the end of sync_wait(). This causes the return_type
// object to also be destructed causingn the final return/move from
// object to also be destructed causing the final return/move from
// sync_wait() to be a 'use after free' bug. To work around this the result
// must be moved off the promise object before the promise is destructed.
// Other solutions could be heap allocating the return_type but that has
Expand Down
9 changes: 6 additions & 3 deletions src/sync_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ sync_wait_event::sync_wait_event(bool initially_set) : m_set(initially_set)

auto sync_wait_event::set() noexcept -> void
{
m_set.exchange(true, std::memory_order::release);
// issue-270 100~ task's on a thread_pool within sync_wait(when_all(tasks)) can cause a deadlock/hang if using
// release/acquire or even seq_cst.
m_set.exchange(true, std::memory_order::seq_cst);
std::unique_lock<std::mutex> lk{m_mutex};
m_cv.notify_all();
}

auto sync_wait_event::reset() noexcept -> void
{
m_set.exchange(false, std::memory_order::release);
m_set.exchange(false, std::memory_order::seq_cst);
}

auto sync_wait_event::wait() noexcept -> void
{
std::unique_lock<std::mutex> lk{m_mutex};
m_cv.wait(lk, [this] { return m_set.load(std::memory_order::acquire); });
m_cv.wait(lk, [this] { return m_set.load(std::memory_order::seq_cst); });
}

} // namespace coro::detail
46 changes: 46 additions & 0 deletions test/test_sync_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <coro/coro.hpp>

#include <iostream>
#include <random>
#include <unordered_set>

TEST_CASE("sync_wait simple integer return", "[sync_wait]")
{
Expand Down Expand Up @@ -62,3 +64,47 @@ TEST_CASE("sync_wait task that throws", "[sync_wait]")

REQUIRE_THROWS(coro::sync_wait(f()));
}

TEST_CASE("sync_wait very rarely hangs issue-270", "[sync_wait]")
{
coro::thread_pool tp{};

const int ITERATIONS = 100;

std::unordered_set<int> data{};
data.reserve(ITERATIONS);

std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(0, ITERATIONS);

for (int i = 0; i < ITERATIONS; ++i)
{
data.insert(dist(rng));
}

std::atomic<int> count{0};

auto make_task = [&](int i) -> coro::task<void>
{
co_await tp.schedule();

if (data.find(i) != data.end())
{
count.fetch_add(1);
}

co_return;
};

std::vector<coro::task<void>> tasks{};
tasks.reserve(ITERATIONS);
for (int i = 0; i < ITERATIONS; ++i)
{
tasks.emplace_back(make_task(i));
}

coro::sync_wait(coro::when_all(std::move(tasks)));

REQUIRE(count > 0);
}

0 comments on commit ee02dc8

Please sign in to comment.