From f8139d05e67f55fa0533b5eab09c8356f1ce2db4 Mon Sep 17 00:00:00 2001 From: Alexander Polyakov Date: Tue, 13 Aug 2024 17:06:20 +0300 Subject: [PATCH] Add cancellations (#1055) * add cancellable awaitables * add start fork policy * make RPC use forks --- compiler/code-gen/vertex-compiler.cpp | 4 +- runtime-core/utils/hash.h | 13 + runtime-light/coroutine/awaitable.h | 369 +++++++++++++++++++---- runtime-light/coroutine/task.h | 9 + runtime-light/scheduler/scheduler.cpp | 64 ++-- runtime-light/scheduler/scheduler.h | 99 +++++- runtime-light/stdlib/fork/fork-api.h | 22 +- runtime-light/stdlib/fork/fork-context.h | 47 +-- runtime-light/stdlib/rpc/rpc-api.cpp | 148 ++++----- runtime-light/stdlib/rpc/rpc-context.cpp | 6 - runtime-light/stdlib/rpc/rpc-context.h | 12 +- runtime-light/stdlib/timer/timer.h | 3 +- runtime-light/streams/interface.cpp | 2 +- runtime-light/streams/interface.h | 1 + 14 files changed, 581 insertions(+), 218 deletions(-) create mode 100644 runtime-core/utils/hash.h diff --git a/compiler/code-gen/vertex-compiler.cpp b/compiler/code-gen/vertex-compiler.cpp index 0c8b517ac6..b13f5b062f 100644 --- a/compiler/code-gen/vertex-compiler.cpp +++ b/compiler/code-gen/vertex-compiler.cpp @@ -849,7 +849,7 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ if (mode == func_call_mode::fork_call) { if (func->is_interruptible) { - W << "(co_await start_fork_and_reschedule_t{" << FunctionName(func); + W << "(co_await start_fork_t{" << FunctionName(func); } else { W << FunctionForkName(func); } @@ -883,7 +883,7 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ W << ")"; if (func->is_interruptible) { if (mode == func_call_mode::fork_call) { - W << "})"; + W << ", start_fork_t::execution::fork})"; } else if (func->is_k2_fork) { // k2 fork's return type is 'task_t' so we need to unpack actual result from fork_result W << ").get_result<" << TypeName(tinf::get_type(root)) << ">()"; } else { diff --git a/runtime-core/utils/hash.h b/runtime-core/utils/hash.h new file mode 100644 index 0000000000..8df06cee7d --- /dev/null +++ b/runtime-core/utils/hash.h @@ -0,0 +1,13 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include +#include + +// from boost +// see https://www.boost.org/doc/libs/1_55_0/doc/html/hash/reference.html#boost.hash_combine +template +void hash_combine(size_t &seed, const T &v) noexcept { + seed ^= std::hash{}(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); +} diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index f468796117..c27ab80eb3 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -9,159 +9,316 @@ #include #include #include +#include +#include #include -#include "runtime-core/core-types/decl/optional.h" #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/component/component.h" #include "runtime-light/coroutine/task.h" -#include "runtime-light/stdlib/fork/fork-context.h" -#include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/header.h" #include "runtime-light/scheduler/scheduler.h" +#include "runtime-light/stdlib/fork/fork-context.h" +#include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/utils/context.h" template -concept Awaitable = requires(T && awaitable, std::coroutine_handle<> coro) { +concept Awaitable = requires(T awaitable, std::coroutine_handle<> coro) { { awaitable.await_ready() } noexcept -> std::convertible_to; { awaitable.await_suspend(coro) } noexcept; { awaitable.await_resume() } noexcept; }; template -concept CancellableAwaitable = Awaitable && requires(T && awaitable) { +concept CancellableAwaitable = Awaitable && requires(T awaitable) { + // semantics: returns 'true' if it's safe to call 'await_resume', 'false' otherwise. + { awaitable.resumable() } noexcept -> std::convertible_to; + // semantics: following resume of the awaitable should not have any effect on the awaiter. + // semantics (optional): stop useless calculations. { awaitable.cancel() } noexcept -> std::same_as; }; // === Awaitables ================================================================================= +// +// ***Important*** +// Below awaitables are not supposed to be co_awaited on more than once. + +namespace awaitable_impl_ { + +enum class State : uint8_t { Init, Suspend, Ready, End }; + +} // namespace awaitable_impl_ class wait_for_update_t { uint64_t stream_d; - SuspendToken suspend_token_; + SuspendToken suspend_token; + awaitable_impl_::State state{awaitable_impl_::State::Init}; public: explicit wait_for_update_t(uint64_t stream_d_) noexcept : stream_d(stream_d_) - , suspend_token_(std::noop_coroutine(), WaitEvent::UpdateOnStream{.stream_d = stream_d}) {} + , suspend_token(std::noop_coroutine(), WaitEvent::UpdateOnStream{.stream_d = stream_d}) {} - bool await_ready() const noexcept { - return get_component_context()->stream_updated(stream_d); + wait_for_update_t(wait_for_update_t &&other) noexcept + : stream_d(std::exchange(other.stream_d, INVALID_PLATFORM_DESCRIPTOR)) + , suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} + + wait_for_update_t(const wait_for_update_t &) = delete; + wait_for_update_t &operator=(const wait_for_update_t &) = delete; + wait_for_update_t &operator=(wait_for_update_t &&) = delete; + + ~wait_for_update_t() { + if (state == awaitable_impl_::State::Suspend) { + cancel(); + } + } + + bool await_ready() noexcept { + php_assert(state == awaitable_impl_::State::Init); + state = get_component_context()->stream_updated(stream_d) ? awaitable_impl_::State::Ready : awaitable_impl_::State::Init; + return state == awaitable_impl_::State::Ready; } void await_suspend(std::coroutine_handle<> coro) noexcept { - suspend_token_.first = coro; - CoroutineScheduler::get().suspend(suspend_token_); + state = awaitable_impl_::State::Suspend; + suspend_token.first = coro; + CoroutineScheduler::get().suspend(suspend_token); } - constexpr void await_resume() const noexcept {} + constexpr void await_resume() noexcept { + state = awaitable_impl_::State::End; + } - void cancel() const noexcept { - CoroutineScheduler::get().cancel(suspend_token_); + bool resumable() const noexcept { + return state == awaitable_impl_::State::Ready || (state == awaitable_impl_::State::Suspend && !CoroutineScheduler::get().contains(suspend_token)); + } + + void cancel() noexcept { + state = awaitable_impl_::State::End; + CoroutineScheduler::get().cancel(suspend_token); } }; // ================================================================================================ class wait_for_incoming_stream_t { - SuspendToken suspend_token_{std::noop_coroutine(), WaitEvent::IncomingStream{}}; + SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::IncomingStream{}}; + awaitable_impl_::State state{awaitable_impl_::State::Init}; public: - bool await_ready() const noexcept { - return !get_component_context()->incoming_streams().empty(); + wait_for_incoming_stream_t() noexcept = default; + + wait_for_incoming_stream_t(wait_for_incoming_stream_t &&other) noexcept + : suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} + + wait_for_incoming_stream_t(const wait_for_incoming_stream_t &) = delete; + wait_for_incoming_stream_t &operator=(const wait_for_incoming_stream_t &) = delete; + wait_for_incoming_stream_t &operator=(wait_for_incoming_stream_t &&) = delete; + + ~wait_for_incoming_stream_t() { + if (state == awaitable_impl_::State::Suspend) { + cancel(); + } + } + + bool await_ready() noexcept { + php_assert(state == awaitable_impl_::State::Init); + state = !get_component_context()->incoming_streams().empty() ? awaitable_impl_::State::Ready : awaitable_impl_::State::Init; + return state == awaitable_impl_::State::Ready; } void await_suspend(std::coroutine_handle<> coro) noexcept { - suspend_token_.first = coro; - CoroutineScheduler::get().suspend(suspend_token_); + state = awaitable_impl_::State::Suspend; + suspend_token.first = coro; + CoroutineScheduler::get().suspend(suspend_token); } - uint64_t await_resume() const noexcept { + uint64_t await_resume() noexcept { + state = awaitable_impl_::State::End; const auto incoming_stream_d{get_component_context()->take_incoming_stream()}; php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR); return incoming_stream_d; } - void cancel() const noexcept { - CoroutineScheduler::get().cancel(suspend_token_); + bool resumable() const noexcept { + return state == awaitable_impl_::State::Ready || (state == awaitable_impl_::State::Suspend && !CoroutineScheduler::get().contains(suspend_token)); + } + + void cancel() noexcept { + state = awaitable_impl_::State::End; + CoroutineScheduler::get().cancel(suspend_token); } }; // ================================================================================================ class wait_for_reschedule_t { - SuspendToken suspend_token_{std::noop_coroutine(), WaitEvent::Rechedule{}}; + SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}}; + awaitable_impl_::State state{awaitable_impl_::State::Init}; public: + wait_for_reschedule_t() noexcept = default; + + wait_for_reschedule_t(wait_for_reschedule_t &&other) noexcept + : suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} + + wait_for_reschedule_t(const wait_for_reschedule_t &) = delete; + wait_for_reschedule_t &operator=(const wait_for_reschedule_t &) = delete; + wait_for_reschedule_t &operator=(wait_for_reschedule_t &&) = delete; + + ~wait_for_reschedule_t() { + if (state == awaitable_impl_::State::Suspend) { + cancel(); + } + } + constexpr bool await_ready() const noexcept { + php_assert(state == awaitable_impl_::State::Init); return false; } void await_suspend(std::coroutine_handle<> coro) noexcept { - suspend_token_.first = coro; - CoroutineScheduler::get().suspend(suspend_token_); + state = awaitable_impl_::State::Suspend; + suspend_token.first = coro; + CoroutineScheduler::get().suspend(suspend_token); } - constexpr void await_resume() const noexcept {} + constexpr void await_resume() noexcept { + state = awaitable_impl_::State::End; + } - void cancel() const noexcept { - CoroutineScheduler::get().cancel(suspend_token_); + bool resumable() const noexcept { + return state == awaitable_impl_::State::Suspend && !CoroutineScheduler::get().contains(suspend_token); + } + + void cancel() noexcept { + state = awaitable_impl_::State::End; + CoroutineScheduler::get().cancel(suspend_token); } }; // ================================================================================================ class wait_for_timer_t { - uint64_t timer_d{}; - SuspendToken suspend_token_; + std::chrono::nanoseconds duration; + uint64_t timer_d{INVALID_PLATFORM_DESCRIPTOR}; + SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}}; + awaitable_impl_::State state{awaitable_impl_::State::Init}; public: - explicit wait_for_timer_t(std::chrono::nanoseconds duration) noexcept - : timer_d(get_component_context()->set_timer(duration)) - , suspend_token_(std::noop_coroutine(), WaitEvent::UpdateOnTimer{.timer_d = timer_d}) {} + explicit wait_for_timer_t(std::chrono::nanoseconds duration_) noexcept + : duration(duration_) {} + + wait_for_timer_t(wait_for_timer_t &&other) noexcept + : duration(std::exchange(other.duration, std::chrono::nanoseconds{0})) + , timer_d(std::exchange(other.timer_d, INVALID_PLATFORM_DESCRIPTOR)) + , suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) + , state(std::exchange(other.state, awaitable_impl_::State::End)) {} + + wait_for_timer_t(const wait_for_timer_t &) = delete; + wait_for_timer_t &operator=(const wait_for_timer_t &) = delete; + wait_for_timer_t &operator=(wait_for_timer_t &&) = delete; + + ~wait_for_timer_t() { + if (state == awaitable_impl_::State::Suspend) { + cancel(); + } + if (timer_d != INVALID_PLATFORM_DESCRIPTOR) { + get_component_context()->release_stream(timer_d); + } + } - bool await_ready() const noexcept { - TimePoint tp{}; - return timer_d == INVALID_PLATFORM_DESCRIPTOR || get_platform_context()->get_timer_status(timer_d, std::addressof(tp)) == TimerStatus::TimerStatusElapsed; + constexpr bool await_ready() const noexcept { + php_assert(state == awaitable_impl_::State::Init); + return false; } void await_suspend(std::coroutine_handle<> coro) noexcept { - suspend_token_.first = coro; - CoroutineScheduler::get().suspend(suspend_token_); + state = awaitable_impl_::State::Suspend; + timer_d = get_component_context()->set_timer(duration); + if (timer_d != INVALID_PLATFORM_DESCRIPTOR) { + suspend_token = std::make_pair(coro, WaitEvent::UpdateOnTimer{.timer_d = timer_d}); + } + CoroutineScheduler::get().suspend(suspend_token); + } + + constexpr void await_resume() noexcept { + state = awaitable_impl_::State::End; } - void await_resume() const noexcept { - get_component_context()->release_stream(timer_d); + bool resumable() const noexcept { + return state == awaitable_impl_::State::Suspend && !CoroutineScheduler::get().contains(suspend_token); } - void cancel() const noexcept { - get_component_context()->release_stream(timer_d); - CoroutineScheduler::get().cancel(suspend_token_); + void cancel() noexcept { + state = awaitable_impl_::State::End; + CoroutineScheduler::get().cancel(suspend_token); } }; // ================================================================================================ -class start_fork_and_reschedule_t { +class start_fork_t { +public: + /** + * Fork start policy: + * 1. self: create fork, suspend fork coroutine, continue current coroutine; + * 2. fork: create fork, suspend current coroutine, continue fork coroutine. + */ + enum class execution : uint8_t { self, fork }; + +private: + execution exec_policy; std::coroutine_handle<> fork_coro; int64_t fork_id{}; - SuspendToken suspend_token_{std::noop_coroutine(), WaitEvent::Rechedule{}}; + SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}}; public: - explicit start_fork_and_reschedule_t(task_t &&task_) noexcept - : fork_coro(task_.get_handle()) + explicit start_fork_t(task_t &&task_, execution exec_policy_) noexcept + : exec_policy(exec_policy_) + , fork_coro(task_.get_handle()) , fork_id(ForkComponentContext::get().push_fork(std::move(task_))) {} + start_fork_t(start_fork_t &&other) noexcept + : exec_policy(other.exec_policy) + , fork_coro(std::exchange(other.fork_coro, std::noop_coroutine())) + , fork_id(std::exchange(other.fork_id, INVALID_FORK_ID)) + , suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}))) {} + + start_fork_t(const start_fork_t &) = delete; + start_fork_t &operator=(const start_fork_t &) = delete; + start_fork_t &operator=(start_fork_t &&) = delete; + ~start_fork_t() = default; + constexpr bool await_ready() const noexcept { return false; } std::coroutine_handle<> await_suspend(std::coroutine_handle<> current_coro) noexcept { - suspend_token_.first = current_coro; - CoroutineScheduler::get().suspend(suspend_token_); - return fork_coro; + std::coroutine_handle<> continuation{}; + switch (exec_policy) { + case execution::fork: { + suspend_token.first = current_coro; + continuation = fork_coro; + break; + } + case execution::self: { + suspend_token.first = fork_coro; + continuation = current_coro; + break; + } + } + CoroutineScheduler::get().suspend(suspend_token); + // reset fork_coro and suspend_token to guarantee that the same fork will be started only once + fork_coro = std::noop_coroutine(); + suspend_token = std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{}); + return continuation; } - int64_t await_resume() const noexcept { + constexpr int64_t await_resume() const noexcept { return fork_id; } }; @@ -170,32 +327,112 @@ class start_fork_and_reschedule_t { template class wait_fork_t { - task_t task; - wait_for_timer_t timer_awaiter; + int64_t fork_id; + task_t fork_task; task_t::awaiter_t fork_awaiter; public: - wait_fork_t(task_t &&task_, std::chrono::nanoseconds timeout_) noexcept - : task(std::move(task_)) - , timer_awaiter(timeout_) - , fork_awaiter(std::addressof(task)) {} + explicit wait_fork_t(int64_t fork_id_) noexcept + : fork_id(fork_id_) + , fork_task(ForkComponentContext::get().pop_fork(fork_id)) + , fork_awaiter(std::addressof(fork_task)) {} + + wait_fork_t(wait_fork_t &&other) noexcept + : fork_id(std::exchange(other.fork_id, INVALID_FORK_ID)) + , fork_task(std::move(other.fork_task)) + , fork_awaiter(std::addressof(fork_task)) {} + + wait_fork_t(const wait_fork_t &) = delete; + wait_fork_t &operator=(const wait_fork_t &) = delete; + wait_fork_t &operator=(wait_fork_t &&) = delete; + ~wait_fork_t() = default; bool await_ready() const noexcept { - return task.done(); + return fork_awaiter.resumable(); } - void await_suspend(std::coroutine_handle<> coro) noexcept { + constexpr void await_suspend(std::coroutine_handle<> coro) noexcept { fork_awaiter.await_suspend(coro); - timer_awaiter.await_suspend(coro); } - Optional await_resume() noexcept { - if (task.done()) { - timer_awaiter.cancel(); - return {fork_awaiter.await_resume().get_result()}; + T await_resume() noexcept { + return fork_awaiter.await_resume().get_result(); + } + + constexpr bool resumable() const noexcept { + return fork_awaiter.resumable(); + } + + constexpr void cancel() const noexcept { + fork_awaiter.cancel(); + } +}; + +// ================================================================================================ + +template +class wait_with_timeout_t { + T awaitable; + wait_for_timer_t timer_awaitable; + + static_assert(CancellableAwaitable); + static_assert(std::is_void_v{}))>); + using awaitable_suspend_t = decltype(awaitable.await_suspend(std::coroutine_handle<>{})); + using await_suspend_return_t = awaitable_suspend_t; + using awaitable_resume_t = decltype(awaitable.await_resume()); + using await_resume_return_t = std::conditional_t, void, std::optional>; + +public: + wait_with_timeout_t(T &&awaitable_, std::chrono::nanoseconds timeout) noexcept + : awaitable(std::move(awaitable_)) + , timer_awaitable(timeout) {} + + wait_with_timeout_t(wait_with_timeout_t &&other) noexcept + : awaitable(std::move(other.awaitable)) + , timer_awaitable(std::move(other.timer_awaitable)) {} + + wait_with_timeout_t(const wait_with_timeout_t &) = delete; + wait_with_timeout_t &operator=(const wait_with_timeout_t &) = delete; + wait_with_timeout_t &operator=(wait_with_timeout_t &&) = delete; + ~wait_with_timeout_t() = default; + + constexpr bool await_ready() const noexcept { + return awaitable.await_ready() || timer_awaitable.await_ready(); + } + + // according to C++ standard, there can be 3 possible cases: + // 1. await_suspend returns void; + // 2. await_suspend returns bool; + // 3. await_suspend returns std::coroutine_handle<>. + // we must guarantee that 'co_await wait_with_timeout_t{awaitable, timeout}' behaves like 'co_await awaitable' except + // it may cancel 'co_await awaitable' if the timeout has elapsed. + await_suspend_return_t await_suspend(std::coroutine_handle<> coro) noexcept { + // as we don't rely on coroutine scheduler implementation, let's always suspend awaitable first. in case of some smart scheduler + // it won't have any effect, but it will have an effect if our scheduler is quite simple. + if constexpr (std::is_void_v) { + awaitable.await_suspend(coro); + timer_awaitable.await_suspend(coro); } else { - fork_awaiter.cancel(); - return {}; + const auto awaitable_suspend_res{awaitable.await_suspend(coro)}; + timer_awaitable.await_suspend(coro); + return awaitable_suspend_res; + } + } + + await_resume_return_t await_resume() noexcept { + if (awaitable.resumable()) { + timer_awaitable.cancel(); + if constexpr (!std::is_void_v) { + return awaitable.await_resume(); + } + } else { + awaitable.cancel(); + if constexpr (!std::is_void_v) { + return std::nullopt; + } } } }; + +template +wait_with_timeout_t(T &&, std::chrono::nanoseconds) -> wait_with_timeout_t; diff --git a/runtime-light/coroutine/task.h b/runtime-light/coroutine/task.h index b9c13b7cf6..ae368e9b63 100644 --- a/runtime-light/coroutine/task.h +++ b/runtime-light/coroutine/task.h @@ -50,6 +50,11 @@ struct task_base_t { void *handle_address{nullptr}; }; +/** + * Please, read following documents before trying to understand what's going on here: + * 1. C++20 coroutines — https://en.cppreference.com/w/cpp/language/coroutines + * 2. C++ coroutines: understanding symmetric stransfer — https://lewissbaker.github.io/2020/05/11/understanding_symmetric_transfer + */ template struct task_t : public task_base_t { template F> @@ -202,6 +207,10 @@ struct task_t : public task_base_t { return task->get_result(); } + bool resumable() const noexcept { + return task->done(); + } + void cancel() const noexcept { task->get_handle().promise().next = nullptr; } diff --git a/runtime-light/scheduler/scheduler.cpp b/runtime-light/scheduler/scheduler.cpp index c0bbe95a20..491d1d3d1b 100644 --- a/runtime-light/scheduler/scheduler.cpp +++ b/runtime-light/scheduler/scheduler.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "runtime-light/component/component.h" @@ -20,32 +21,35 @@ SimpleCoroutineScheduler &SimpleCoroutineScheduler::get() noexcept { } ScheduleStatus SimpleCoroutineScheduler::scheduleOnNoEvent() noexcept { - if (yield_coros.empty()) { + if (yield_tokens.empty()) { return ScheduleStatus::Skipped; } - const auto coro{yield_coros.front()}; - yield_coros.pop_front(); - coro.resume(); + const auto token{yield_tokens.front()}; + yield_tokens.pop_front(); + suspend_tokens.erase(token); + token.first.resume(); return ScheduleStatus::Resumed; } ScheduleStatus SimpleCoroutineScheduler::scheduleOnIncomingStream() noexcept { - if (awaiting_for_stream_coros.empty()) { + if (awaiting_for_stream_tokens.empty()) { return ScheduleStatus::Skipped; } - const auto coro{awaiting_for_stream_coros.front()}; - awaiting_for_stream_coros.pop_front(); - coro.resume(); + const auto token{awaiting_for_stream_tokens.front()}; + awaiting_for_stream_tokens.pop_front(); + suspend_tokens.erase(token); + token.first.resume(); return ScheduleStatus::Resumed; } ScheduleStatus SimpleCoroutineScheduler::scheduleOnStreamUpdate(uint64_t stream_d) noexcept { if (stream_d == INVALID_PLATFORM_DESCRIPTOR) { return ScheduleStatus::Error; - } else if (const auto it_coro{awaiting_for_update_coros.find(stream_d)}; it_coro != awaiting_for_update_coros.cend()) { - const auto coro{it_coro->second}; - awaiting_for_update_coros.erase(it_coro); - coro.resume(); + } else if (const auto it_token{awaiting_for_update_tokens.find(stream_d)}; it_token != awaiting_for_update_tokens.cend()) { + const auto token{it_token->second}; + awaiting_for_update_tokens.erase(it_token); + suspend_tokens.erase(token); + token.first.resume(); return ScheduleStatus::Resumed; } else { return ScheduleStatus::Skipped; @@ -58,7 +62,7 @@ ScheduleStatus SimpleCoroutineScheduler::scheduleOnYield() noexcept { ScheduleStatus SimpleCoroutineScheduler::schedule(ScheduleEvent::EventT event) noexcept { return std::visit( - [this](auto &&event) { + [this](auto &&event) noexcept { using event_t = std::remove_cvref_t; if constexpr (std::is_same_v) { return scheduleOnNoEvent(); @@ -78,47 +82,53 @@ ScheduleStatus SimpleCoroutineScheduler::schedule(ScheduleEvent::EventT event) n } void SimpleCoroutineScheduler::suspend(SuspendToken token) noexcept { - const auto [coro, event]{token}; + suspend_tokens.emplace(token); std::visit( - [this, coro](auto &&event) { + [this, token](auto &&event) noexcept { using event_t = std::remove_cvref_t; if constexpr (std::is_same_v) { - yield_coros.push_back(coro); + yield_tokens.push_back(token); } else if constexpr (std::is_same_v) { - awaiting_for_stream_coros.push_back(coro); + awaiting_for_stream_tokens.push_back(token); } else if constexpr (std::is_same_v) { if (event.stream_d == INVALID_PLATFORM_DESCRIPTOR) { return; } - awaiting_for_update_coros.emplace(event.stream_d, coro); + awaiting_for_update_tokens.emplace(event.stream_d, token); } else if constexpr (std::is_same_v) { if (event.timer_d == INVALID_PLATFORM_DESCRIPTOR) { return; } - awaiting_for_update_coros.emplace(event.timer_d, coro); + awaiting_for_update_tokens.emplace(event.timer_d, token); } else { static_assert(false, "non-exhaustive visitor"); } }, - event); + token.second); } void SimpleCoroutineScheduler::cancel(SuspendToken token) noexcept { - const auto [coro, event]{token}; + suspend_tokens.erase(token); std::visit( - [this, coro](auto &&event) { + [this, token](auto &&event) noexcept { using event_t = std::remove_cvref_t; if constexpr (std::is_same_v) { - yield_coros.erase(std::find(yield_coros.cbegin(), yield_coros.cend(), coro)); + const auto it_token{std::find(yield_tokens.cbegin(), yield_tokens.cend(), token)}; + if (it_token != yield_tokens.cend()) { + yield_tokens.erase(it_token); + } } else if constexpr (std::is_same_v) { - awaiting_for_stream_coros.erase(std::find(awaiting_for_stream_coros.cbegin(), awaiting_for_stream_coros.cend(), coro)); + const auto it_token{std::find(awaiting_for_stream_tokens.cbegin(), awaiting_for_stream_tokens.cend(), token)}; + if (it_token != awaiting_for_stream_tokens.cend()) { + awaiting_for_stream_tokens.erase(it_token); + } } else if constexpr (std::is_same_v) { - awaiting_for_update_coros.erase(event.stream_d); + awaiting_for_update_tokens.erase(event.stream_d); } else if constexpr (std::is_same_v) { - awaiting_for_update_coros.erase(event.timer_d); + awaiting_for_update_tokens.erase(event.timer_d); } else { static_assert(false, "non-exhaustive visitor"); } }, - event); + token.second); } diff --git a/runtime-light/scheduler/scheduler.h b/runtime-light/scheduler/scheduler.h index c03723bf8f..89b96cba16 100644 --- a/runtime-light/scheduler/scheduler.h +++ b/runtime-light/scheduler/scheduler.h @@ -6,12 +6,15 @@ #include #include +#include #include +#include #include #include #include "runtime-core/memory-resource/resource_allocator.h" #include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-core/utils/hash.h" #include "runtime-light/utils/concepts.h" /** @@ -55,27 +58,84 @@ enum class ScheduleStatus : uint8_t { Resumed, Skipped, Error }; */ namespace WaitEvent { -struct Rechedule {}; +struct Rechedule { + static constexpr size_t HASH_VALUE = 4001; + bool operator==([[maybe_unused]] const Rechedule &other) const noexcept { + return true; + } +}; -struct IncomingStream {}; +struct IncomingStream { + static constexpr size_t HASH_VALUE = 4003; + bool operator==([[maybe_unused]] const IncomingStream &other) const noexcept { + return true; + } +}; struct UpdateOnStream { uint64_t stream_d{}; + + bool operator==(const UpdateOnStream &other) const noexcept { + return stream_d == other.stream_d; + } }; struct UpdateOnTimer { uint64_t timer_d{}; + + bool operator==(const UpdateOnTimer &other) const noexcept { + return timer_d == other.timer_d; + } }; using EventT = std::variant; }; // namespace WaitEvent +// std::hash specializations for WaitEvent types + +template<> +struct std::hash { + size_t operator()([[maybe_unused]] const WaitEvent::Rechedule &v) const noexcept { + return WaitEvent::Rechedule::HASH_VALUE; + } +}; + +template<> +struct std::hash { + size_t operator()([[maybe_unused]] const WaitEvent::IncomingStream &v) const noexcept { + return WaitEvent::IncomingStream::HASH_VALUE; + } +}; + +template<> +struct std::hash { + size_t operator()(const WaitEvent::UpdateOnStream &v) const noexcept { + return v.stream_d; + } +}; + +template<> +struct std::hash { + size_t operator()(const WaitEvent::UpdateOnTimer &v) const noexcept { + return v.timer_d; + } +}; + /** * SuspendToken type that binds an event and a coroutine waiting for that event. */ using SuspendToken = std::pair, WaitEvent::EventT>; +template<> +struct std::hash { + size_t operator()(const SuspendToken &token) const noexcept { + size_t suspend_token_hash{std::hash>{}(token.first)}; + hash_combine(suspend_token_hash, token.second); + return suspend_token_hash; + } +}; + /** * Coroutine scheduler concept. * @@ -84,8 +144,9 @@ using SuspendToken = std::pair, WaitEvent::EventT>; * 2. have static `get` function that returns a reference to scheduler instance; * 3. have `done` method that returns whether scheduler's scheduled all coroutines; * 4. have `schedule` method that takes an event and schedules coroutines for execution; - * 5. have `suspend` method that suspends specified coroutine; - * 6. have `cancel` method that cancels specified SuspendToken. + * 5. have `contains` method returns whether specified SuspendToken is in schedule queue; + * 6. have `suspend` method that suspends specified coroutine; + * 7. have `cancel` method that cancels specified SuspendToken. */ template concept CoroutineSchedulerConcept = std::constructible_from @@ -93,22 +154,29 @@ concept CoroutineSchedulerConcept = std::constructible_from std::same_as; { s.done() } noexcept -> std::convertible_to; { s.schedule(schedule_event) } noexcept -> std::same_as; + { s.contains(token) } noexcept -> std::convertible_to; { s.suspend(token) } noexcept -> std::same_as; { s.cancel(token) } noexcept -> std::same_as; }; // === SimpleCoroutineScheduler =================================================================== +// This scheduler doesn't support waiting for the same event from multiple coroutines. +// We need to finalize our decision whether we allow to do it from PHP code or not. class SimpleCoroutineScheduler { template using unordered_map = memory_resource::stl::unordered_map; - template - using deque = memory_resource::stl::deque; + template + using unordered_set = memory_resource::stl::unordered_set; - deque> yield_coros; - deque> awaiting_for_stream_coros; - unordered_map> awaiting_for_update_coros; + template + using deque = memory_resource::stl::deque; + + deque yield_tokens; + deque awaiting_for_stream_tokens; + unordered_map awaiting_for_update_tokens; + unordered_set suspend_tokens; ScheduleStatus scheduleOnNoEvent() noexcept; ScheduleStatus scheduleOnIncomingStream() noexcept; @@ -117,17 +185,22 @@ class SimpleCoroutineScheduler { public: explicit SimpleCoroutineScheduler(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : yield_coros(deque>::allocator_type{memory_resource}) - , awaiting_for_stream_coros(deque>::allocator_type{memory_resource}) - , awaiting_for_update_coros(unordered_map>::allocator_type{memory_resource}) {} + : yield_tokens(deque::allocator_type{memory_resource}) + , awaiting_for_stream_tokens(deque::allocator_type{memory_resource}) + , awaiting_for_update_tokens(unordered_map::allocator_type{memory_resource}) + , suspend_tokens(unordered_set::allocator_type{memory_resource}) {} static SimpleCoroutineScheduler &get() noexcept; bool done() const noexcept { - return yield_coros.empty() && awaiting_for_stream_coros.empty() && awaiting_for_update_coros.empty(); + return suspend_tokens.empty(); } ScheduleStatus schedule(ScheduleEvent::EventT) noexcept; + + bool contains(SuspendToken token) const noexcept { + return suspend_tokens.contains(token); + } void suspend(SuspendToken) noexcept; void cancel(SuspendToken) noexcept; }; diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index 9ccb268e64..99b3fa2167 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -15,24 +15,26 @@ namespace fork_api_impl_ { -constexpr double WAIT_FORK_MAX_TIMEOUT = 86400.0; +constexpr double MAX_TIMEOUT_S = 86400.0; +constexpr double DEFAULT_TIMEOUT_S = MAX_TIMEOUT_S; +constexpr auto MAX_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{MAX_TIMEOUT_S}); +constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{DEFAULT_TIMEOUT_S}); } // namespace fork_api_impl_ -constexpr int64_t INVALID_FORK_ID = -1; - template requires(is_optional::value) task_t f$wait(int64_t fork_id, double timeout = -1.0) noexcept { - if (timeout < 0.0) { - timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT; - } - auto task_opt{ForkComponentContext::get().pop_fork(fork_id)}; - if (!task_opt.has_value()) { + auto &fork_ctx{ForkComponentContext::get()}; + if (!fork_ctx.contains(fork_id)) { php_warning("can't find fork %" PRId64, fork_id); co_return T{}; } - const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; - co_return co_await wait_fork_t>{*std::move(task_opt), timeout_ns}; + // normalize timeout + const auto timeout_ns{timeout > 0 && timeout <= fork_api_impl_::MAX_TIMEOUT_S + ? std::chrono::duration_cast(std::chrono::duration{timeout}) + : fork_api_impl_::DEFAULT_TIMEOUT_NS}; + auto result_opt{co_await wait_with_timeout_t{wait_fork_t>{fork_id}, timeout_ns}}; + co_return result_opt.has_value() ? T{std::move(result_opt.value())} : T{}; } template diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h index 79bd0f6f24..57c67ec1af 100644 --- a/runtime-light/stdlib/fork/fork-context.h +++ b/runtime-light/stdlib/fork/fork-context.h @@ -5,7 +5,7 @@ #pragma once #include -#include +#include #include "runtime-core/memory-resource/unsynchronized_pool_resource.h" #include "runtime-core/utils/kphp-assert-core.h" @@ -13,35 +13,42 @@ #include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/utils/concepts.h" +constexpr int64_t INVALID_FORK_ID = -1; + class ForkComponentContext { template using unordered_map = memory_resource::stl::unordered_map; - static constexpr auto FORK_ID_INIT = 1; + static constexpr auto FORK_ID_INIT = 0; + + unordered_map> forks; + int64_t next_fork_id{FORK_ID_INIT + 1}; + + int64_t push_fork(task_t &&task) noexcept { + return forks.emplace(next_fork_id, std::move(task)), next_fork_id++; + } + + task_t pop_fork(int64_t fork_id) noexcept { + const auto it_fork{forks.find(fork_id)}; + if (it_fork == forks.end()) { + php_critical_error("can't find fork %" PRId64, fork_id); + } + auto fork{std::move(it_fork->second)}; + forks.erase(it_fork); + return fork; + } - unordered_map> forks_; - int64_t next_fork_id_{FORK_ID_INIT}; + friend class start_fork_t; + template + friend class wait_fork_t; public: explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : forks_(unordered_map>::allocator_type{memory_resource}) {} + : forks(unordered_map>::allocator_type{memory_resource}) {} static ForkComponentContext &get() noexcept; - int64_t push_fork(task_t &&task) noexcept { - const auto fork_id{next_fork_id_++}; - forks_.emplace(fork_id, std::move(task)); - php_debug("ForkComponentContext: push fork %" PRId64, fork_id); - return fork_id; - } - - std::optional> pop_fork(int64_t fork_id) noexcept { - if (const auto it_fork{forks_.find(fork_id)}; it_fork != forks_.cend()) { - php_debug("ForkComponentContext: pop fork %" PRId64, fork_id); - auto fork{std::move(it_fork->second)}; - forks_.erase(it_fork); - return {std::move(fork)}; - } - return std::nullopt; + bool contains(int64_t fork_id) const noexcept { + return forks.contains(fork_id); } }; diff --git a/runtime-light/stdlib/rpc/rpc-api.cpp b/runtime-light/stdlib/rpc/rpc-api.cpp index 1a7462e82b..3fd36bbeae 100644 --- a/runtime-light/stdlib/rpc/rpc-api.cpp +++ b/runtime-light/stdlib/rpc/rpc-api.cpp @@ -7,21 +7,28 @@ #include #include #include -#include +#include #include #include "common/algorithms/find.h" #include "common/rpc-error-codes.h" +#include "runtime-core/runtime-core.h" #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/allocator/allocator.h" +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/rpc/rpc-context.h" #include "runtime-light/stdlib/rpc/rpc-extra-headers.h" +#include "runtime-light/stdlib/rpc/rpc-extra-info.h" #include "runtime-light/streams/interface.h" #include "runtime-light/tl/tl-core.h" namespace rpc_impl_ { -constexpr int32_t MAX_TIMEOUT_S = 86400; +constexpr double MAX_TIMEOUT_S = 86400.0; +constexpr double DEFAULT_TIMEOUT_S = 0.3; +constexpr auto MAX_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{MAX_TIMEOUT_S}); +constexpr auto DEFAULT_TIMEOUT_NS = std::chrono::duration_cast(std::chrono::duration{DEFAULT_TIMEOUT_S}); mixed mixed_array_get_value(const mixed &arr, const string &str_key, int64_t num_key) noexcept { if (!arr.is_array()) { @@ -89,42 +96,65 @@ class_instance store_function(const mixed &tl_object) noexcept { return rpc_tl_query; } -task_t rpc_send_impl(string actor, double timeout, bool ignore_answer) noexcept { - if (timeout <= 0 || timeout > MAX_TIMEOUT_S) { // TODO: handle timeouts - // timeout = conn.get()->timeout_ms * 0.001; - } - - auto &rpc_ctx = RpcComponentContext::get(); +task_t rpc_send_impl(string actor, double timeout, bool ignore_answer, bool collect_responses_extra_info) noexcept { + auto &rpc_ctx{RpcComponentContext::get()}; + // prepare RPC request string request_buf{}; size_t request_size{rpc_ctx.rpc_buffer.size()}; - // 'request_buf' will look like this: // [ RpcExtraHeaders (optional) ] [ payload ] if (const auto [opt_new_extra_header, cur_extra_header_size]{regularize_extra_headers(rpc_ctx.rpc_buffer.data(), ignore_answer)}; opt_new_extra_header) { const auto new_extra_header{opt_new_extra_header.value()}; - const auto new_extra_header_size{sizeof(std::decay_t)}; + const auto new_extra_header_size{sizeof(std::remove_cvref_t)}; request_size = request_size - cur_extra_header_size + new_extra_header_size; - request_buf.append(reinterpret_cast(&new_extra_header), new_extra_header_size); + request_buf.reserve_at_least(request_size); + request_buf.append(reinterpret_cast(std::addressof(new_extra_header)), new_extra_header_size); request_buf.append(rpc_ctx.rpc_buffer.data() + cur_extra_header_size, rpc_ctx.rpc_buffer.size() - cur_extra_header_size); } else { request_buf.append(rpc_ctx.rpc_buffer.data(), request_size); } - - // get timestamp before co_await to also count the time we were waiting for runtime to resume this coroutine + // send RPC request + const auto query_id{rpc_ctx.current_query_id++}; const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - auto comp_query{co_await f$component_client_send_query(actor, request_buf)}; if (comp_query.is_null()) { - php_error("could not send rpc query to %s", actor.c_str()); + php_error("can't send rpc query to %s", actor.c_str()); co_return RpcQueryInfo{.id = RPC_INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; } - if (ignore_answer) { // TODO: wait for answer in a separate coroutine and keep returning RPC_IGNORED_ANSWER_QUERY_ID + // create response extra info + if (collect_responses_extra_info) { + rpc_ctx.rpc_responses_extra_info.emplace(query_id, std::make_pair(rpc_response_extra_info_status_t::NOT_READY, rpc_response_extra_info_t{0, timestamp})); + } + // normalize timeout + const auto timeout_ns{timeout > 0 && timeout <= MAX_TIMEOUT_S ? std::chrono::duration_cast(std::chrono::duration{timeout}) + : DEFAULT_TIMEOUT_NS}; + // create fork to wait for RPC response. we need to do it even if 'ignore_answer' is 'true' to make sure + // that the stream will not be closed too early. otherwise, platform may even not send RPC request + auto waiter_task{[](int64_t query_id, auto comp_query, std::chrono::nanoseconds timeout, bool collect_responses_extra_info) noexcept -> task_t { + auto fetch_task{f$component_client_get_result(std::move(comp_query))}; + const auto response{(co_await wait_with_timeout_t{task_t::awaiter_t{std::addressof(fetch_task)}, timeout}).value_or(string{})}; + // update response extra info if needed + if (collect_responses_extra_info) { + auto &extra_info_map{RpcComponentContext::get().rpc_responses_extra_info}; + if (const auto it_extra_info{extra_info_map.find(query_id)}; it_extra_info != extra_info_map.end()) { + const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; + it_extra_info->second.second = std::make_tuple(response.size(), timestamp - std::get<1>(it_extra_info->second.second)); + it_extra_info->second.first = rpc_response_extra_info_status_t::READY; + } else { + php_warning("can't find extra info for RPC query %" PRId64, query_id); + } + } + co_return response; + }(query_id, std::move(comp_query), timeout_ns, collect_responses_extra_info)}; + // start waiter fork + const auto waiter_fork_id{co_await start_fork_t{std::move(waiter_task), start_fork_t::execution::self}}; + + if (ignore_answer) { co_return RpcQueryInfo{.id = RPC_IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; } - const auto query_id{rpc_ctx.current_query_id++}; - rpc_ctx.pending_component_queries.emplace(query_id, std::move(comp_query)); + rpc_ctx.response_waiter_forks.emplace(query_id, waiter_fork_id); co_return RpcQueryInfo{.id = query_id, .request_size = request_size, .timestamp = timestamp}; } @@ -142,15 +172,10 @@ task_t rpc_tl_query_one_impl(string actor, mixed tl_object, double co_return RpcQueryInfo{}; } - const auto query_info{co_await rpc_send_impl(actor, timeout, ignore_answer)}; + const auto query_info{co_await rpc_send_impl(actor, timeout, ignore_answer, collect_resp_extra_info)}; if (!ignore_answer) { - rpc_ctx.pending_rpc_queries.emplace(query_info.id, std::move(rpc_tl_query)); - } - if (collect_resp_extra_info) { - rpc_ctx.rpc_responses_extra_info.emplace(query_info.id, - std::make_pair(rpc_response_extra_info_status_t::NOT_READY, rpc_response_extra_info_t{0, query_info.timestamp})); + rpc_ctx.response_fetcher_instances.emplace(query_info.id, std::move(rpc_tl_query)); } - co_return query_info; } @@ -170,19 +195,14 @@ task_t typed_rpc_tl_query_one_impl(string actor, const RpcRequest co_return RpcQueryInfo{}; } - const auto query_info{co_await rpc_send_impl(actor, timeout, ignore_answer)}; + const auto query_info{co_await rpc_send_impl(actor, timeout, ignore_answer, collect_responses_extra_info)}; if (!ignore_answer) { auto rpc_tl_query{make_instance()}; rpc_tl_query.get()->result_fetcher = std::move(fetcher); rpc_tl_query.get()->tl_function_name = rpc_request.tl_function_name(); - rpc_ctx.pending_rpc_queries.emplace(query_info.id, std::move(rpc_tl_query)); - } - if (collect_responses_extra_info) { - rpc_ctx.rpc_responses_extra_info.emplace(query_info.id, - std::make_pair(rpc_response_extra_info_status_t::NOT_READY, rpc_response_extra_info_t{0, query_info.timestamp})); + rpc_ctx.response_fetcher_instances.emplace(query_info.id, std::move(rpc_tl_query)); } - co_return query_info; } @@ -193,22 +213,22 @@ task_t> rpc_tl_query_result_one_impl(int64_t query_id) noexcept { auto &rpc_ctx{RpcComponentContext::get()}; class_instance rpc_query{}; - class_instance component_query{}; + int64_t response_waiter_fork_id{INVALID_FORK_ID}; { - const auto it_rpc_query{rpc_ctx.pending_rpc_queries.find(query_id)}; - const auto it_component_query{rpc_ctx.pending_component_queries.find(query_id)}; + const auto it_rpc_query{rpc_ctx.response_fetcher_instances.find(query_id)}; + const auto it_response_fetcher_fork_id{rpc_ctx.response_waiter_forks.find(query_id)}; - vk::final_action finalizer{[&rpc_ctx, it_rpc_query, it_component_query]() { - rpc_ctx.pending_rpc_queries.erase(it_rpc_query); - rpc_ctx.pending_component_queries.erase(it_component_query); + vk::final_action finalizer{[&rpc_ctx, it_rpc_query, it_response_fetcher_fork_id]() { + rpc_ctx.response_fetcher_instances.erase(it_rpc_query); + rpc_ctx.response_waiter_forks.erase(it_response_fetcher_fork_id); }}; - if (it_rpc_query == rpc_ctx.pending_rpc_queries.end() || it_component_query == rpc_ctx.pending_component_queries.end()) { + if (it_rpc_query == rpc_ctx.response_fetcher_instances.end() || it_response_fetcher_fork_id == rpc_ctx.response_waiter_forks.end()) { co_return make_fetch_error(string{"unexpectedly could not find query in pending queries"}, TL_ERROR_INTERNAL); } rpc_query = std::move(it_rpc_query->second); - component_query = std::move(it_component_query->second); + response_waiter_fork_id = it_response_fetcher_fork_id->second; } if (rpc_query.is_null()) { @@ -220,20 +240,16 @@ task_t> rpc_tl_query_result_one_impl(int64_t query_id) noexcept { if (rpc_query.get()->result_fetcher->is_typed) { co_return make_fetch_error(string{"can't get untyped result from typed TL query. Use consistent API for that"}, TL_ERROR_INTERNAL); } - - const auto data{co_await f$component_client_get_result(component_query)}; - - // TODO: subscribe to rpc response event? - // update rpc response extra info - if (const auto it_response_extra_info{rpc_ctx.rpc_responses_extra_info.find(query_id)}; it_response_extra_info != rpc_ctx.rpc_responses_extra_info.end()) { - const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - it_response_extra_info->second.second = {data.size(), timestamp - std::get<1>(it_response_extra_info->second.second)}; - it_response_extra_info->second.first = rpc_response_extra_info_status_t::READY; + if (response_waiter_fork_id == INVALID_FORK_ID) { + co_return make_fetch_error(string{"can't find fetcher fork"}, TL_ERROR_INTERNAL); } + const auto data{(co_await wait_with_timeout_t{wait_fork_t{response_waiter_fork_id}, MAX_TIMEOUT_NS}).value()}; + if (data.empty()) { + co_return make_fetch_error(string{"rpc response timeout"}, TL_ERROR_QUERY_TIMEOUT); + } rpc_ctx.rpc_buffer.clean(); rpc_ctx.rpc_buffer.store_bytes(data.c_str(), data.size()); - co_return fetch_function_untyped(rpc_query); } @@ -244,22 +260,22 @@ task_t> typed_rpc_tl_query_result_one_impl(i auto &rpc_ctx{RpcComponentContext::get()}; class_instance rpc_query{}; - class_instance component_query{}; + int64_t response_waiter_fork_id{INVALID_FORK_ID}; { - const auto it_rpc_query{rpc_ctx.pending_rpc_queries.find(query_id)}; - const auto it_component_query{rpc_ctx.pending_component_queries.find(query_id)}; + const auto it_rpc_query{rpc_ctx.response_fetcher_instances.find(query_id)}; + const auto it_response_fetcher_fork_id{rpc_ctx.response_waiter_forks.find(query_id)}; - vk::final_action finalizer{[&rpc_ctx, it_rpc_query, it_component_query]() { - rpc_ctx.pending_rpc_queries.erase(it_rpc_query); - rpc_ctx.pending_component_queries.erase(it_component_query); + vk::final_action finalizer{[&rpc_ctx, it_rpc_query, it_response_fetcher_fork_id]() { + rpc_ctx.response_fetcher_instances.erase(it_rpc_query); + rpc_ctx.response_waiter_forks.erase(it_response_fetcher_fork_id); }}; - if (it_rpc_query == rpc_ctx.pending_rpc_queries.end() || it_component_query == rpc_ctx.pending_component_queries.end()) { + if (it_rpc_query == rpc_ctx.response_fetcher_instances.end() || it_response_fetcher_fork_id == rpc_ctx.response_waiter_forks.end()) { co_return error_factory.make_error(string{"unexpectedly could not find query in pending queries"}, TL_ERROR_INTERNAL); } rpc_query = std::move(it_rpc_query->second); - component_query = std::move(it_component_query->second); + response_waiter_fork_id = it_response_fetcher_fork_id->second; } if (rpc_query.is_null()) { @@ -271,20 +287,16 @@ task_t> typed_rpc_tl_query_result_one_impl(i if (!rpc_query.get()->result_fetcher->is_typed) { co_return error_factory.make_error(string{"can't get typed result from untyped TL query. Use consistent API for that"}, TL_ERROR_INTERNAL); } - - const auto data{co_await f$component_client_get_result(component_query)}; - - // TODO: subscribe to rpc response event? - // update rpc response extra info - if (const auto it_response_extra_info{rpc_ctx.rpc_responses_extra_info.find(query_id)}; it_response_extra_info != rpc_ctx.rpc_responses_extra_info.end()) { - const auto timestamp{std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count()}; - it_response_extra_info->second.second = {data.size(), timestamp - std::get<1>(it_response_extra_info->second.second)}; - it_response_extra_info->second.first = rpc_response_extra_info_status_t::READY; + if (response_waiter_fork_id == INVALID_FORK_ID) { + co_return error_factory.make_error(string{"can't find fetcher fork"}, TL_ERROR_INTERNAL); } + const auto data{(co_await wait_with_timeout_t{wait_fork_t{response_waiter_fork_id}, MAX_TIMEOUT_NS}).value()}; + if (data.empty()) { + co_return error_factory.make_error(string{"rpc response timeout"}, TL_ERROR_QUERY_TIMEOUT); + } rpc_ctx.rpc_buffer.clean(); rpc_ctx.rpc_buffer.store_bytes(data.c_str(), data.size()); - co_return fetch_function_typed(rpc_query, error_factory); } diff --git a/runtime-light/stdlib/rpc/rpc-context.cpp b/runtime-light/stdlib/rpc/rpc-context.cpp index 14b305d2bd..0c73fcd547 100644 --- a/runtime-light/stdlib/rpc/rpc-context.cpp +++ b/runtime-light/stdlib/rpc/rpc-context.cpp @@ -8,12 +8,6 @@ #include "runtime-light/component/image.h" #include "runtime-light/utils/context.h" -RpcComponentContext::RpcComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) - : current_query() - , pending_component_queries(unordered_map>::allocator_type{memory_resource}) - , pending_rpc_queries(unordered_map>::allocator_type{memory_resource}) - , rpc_responses_extra_info(unordered_map>::allocator_type{memory_resource}) {} - RpcComponentContext &RpcComponentContext::get() noexcept { return get_component_context()->rpc_component_context; } diff --git a/runtime-light/stdlib/rpc/rpc-context.h b/runtime-light/stdlib/rpc/rpc-context.h index b2e9d7fcf0..a2fcd7e962 100644 --- a/runtime-light/stdlib/rpc/rpc-context.h +++ b/runtime-light/stdlib/rpc/rpc-context.h @@ -13,7 +13,6 @@ #include "runtime-light/stdlib/rpc/rpc-extra-info.h" #include "runtime-light/stdlib/rpc/rpc-tl-defs.h" #include "runtime-light/stdlib/rpc/rpc-tl-query.h" -#include "runtime-light/streams/component-stream.h" #include "runtime-light/tl/tl-core.h" struct RpcComponentContext final : private vk::not_copyable { @@ -23,11 +22,16 @@ struct RpcComponentContext final : private vk::not_copyable { tl::TLBuffer rpc_buffer; int64_t current_query_id{0}; CurrentTlQuery current_query; - unordered_map> pending_component_queries; - unordered_map> pending_rpc_queries; + unordered_map response_waiter_forks; + unordered_map> response_fetcher_instances; unordered_map> rpc_responses_extra_info; - explicit RpcComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource); + explicit RpcComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept + : current_query() + , response_waiter_forks(unordered_map::allocator_type{memory_resource}) + , response_fetcher_instances(unordered_map>::allocator_type{memory_resource}) + , rpc_responses_extra_info( + unordered_map>::allocator_type{memory_resource}) {} static RpcComponentContext &get() noexcept; }; diff --git a/runtime-light/stdlib/timer/timer.h b/runtime-light/stdlib/timer/timer.h index fb34dd28d3..b771d05fcf 100644 --- a/runtime-light/stdlib/timer/timer.h +++ b/runtime-light/stdlib/timer/timer.h @@ -25,5 +25,6 @@ task_t f$set_timer(int64_t timeout_ms, T &&on_timer_callback) noexcept { co_return 0; }}; // TODO: someone should pop that fork from ForkComponentContext since it will stay there unless we perform f$wait on fork const auto duration_ms{std::chrono::milliseconds{static_cast(timeout_ms)}}; - co_await start_fork_and_reschedule_t(fork_f(std::chrono::duration_cast(duration_ms), std::forward(on_timer_callback))); + co_await start_fork_t{fork_f(std::chrono::duration_cast(duration_ms), std::forward(on_timer_callback)), + start_fork_t::execution::fork}; } diff --git a/runtime-light/streams/interface.cpp b/runtime-light/streams/interface.cpp index 0bf69dea59..bde0b4667d 100644 --- a/runtime-light/streams/interface.cpp +++ b/runtime-light/streams/interface.cpp @@ -46,9 +46,9 @@ task_t f$component_client_get_result(class_instance qu const auto [buffer, size]{co_await read_all_from_stream(stream_d)}; string result{buffer, static_cast(size)}; get_platform_context()->allocator.free(buffer); + php_debug("read %d bytes from stream %" PRIu64, size, stream_d); get_component_context()->release_stream(stream_d); query.get()->stream_d = INVALID_PLATFORM_DESCRIPTOR; - php_debug("read %d bytes from stream %" PRIu64, size, stream_d); co_return result; } diff --git a/runtime-light/streams/interface.h b/runtime-light/streams/interface.h index 3890e6930b..51915bef5f 100644 --- a/runtime-light/streams/interface.h +++ b/runtime-light/streams/interface.h @@ -6,6 +6,7 @@ #include +#include "runtime-core/runtime-core.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/streams/component-stream.h"