Skip to content

Commit

Permalink
add wait queue
Browse files Browse the repository at this point in the history
  • Loading branch information
astrophysik committed Aug 14, 2024
1 parent f8139d0 commit 345fffa
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 7 deletions.
11 changes: 11 additions & 0 deletions builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ function get_hash_of_class (object $klass) ::: int;

function strlen ($str ::: string) ::: int;

function rand() ::: int;

// === Fork =======================================================================================

/** @kphp-extern-func-info interruptible cpp_template_call */
Expand All @@ -87,6 +89,15 @@ function sched_yield() ::: void;
/** @kphp-extern-func-info interruptible */
function sched_yield_sleep($timeout_ns ::: int) ::: void;

function wait_queue_create (array<future<any> | false> $request_ids = []) ::: future_queue<^1[*][*]>;

function wait_queue_push (future_queue<any> &$queue_id, future<any> | false $request_ids) ::: void;

function wait_queue_empty (future_queue<any> $queue_id) ::: bool;

/** @kphp-extern-func-info interruptible */
function wait_queue_next (future_queue<any> $queue_id, $timeout ::: float = -1.0) ::: future<^1[*]> | false;

// === Rpc ========================================================================================

/** @kphp-tl-class */
Expand Down
2 changes: 1 addition & 1 deletion runtime-core/memory-resource/resource_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class resource_allocator {
}

static constexpr size_t max_value_type_size() {
return 128U;
return 256U;
}

friend inline bool operator==(const resource_allocator &lhs, const resource_allocator &rhs) noexcept {
Expand Down
39 changes: 39 additions & 0 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/context.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

template<class T>
concept Awaitable = requires(T awaitable, std::coroutine_handle<> coro) {
Expand Down Expand Up @@ -434,5 +435,43 @@ class wait_with_timeout_t {
}
};

// ================================================================================================

class wait_queue_next_t {
int64_t queue_id;
wait_for_timer_t timer_awaiter;

public:
explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept
: queue_id(queue_id_)
, timer_awaiter(timeout_) {}

bool await_ready() const noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty() || queue.val()->has_ready_fork();
}
return true;
}

void await_suspend(std::coroutine_handle<> coro) noexcept {
auto queue = WaitQueueContext::get().get_queue(queue_id);
queue.val()->push_coro_handle(coro);
timer_awaiter.await_suspend(coro);
}

Optional<int64_t> await_resume() noexcept {
auto queue = WaitQueueContext::get().get_queue(queue_id);
queue.val()->pop_coro_handle();
auto ready_fork = queue.val()->pop_fork();
if (ready_fork.has_value()) {
timer_awaiter.cancel();
// todo set here info about prev fork_id
return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second));
} else {
return {};
}
}
};

template<class T>
wait_with_timeout_t(T &&, std::chrono::nanoseconds) -> wait_with_timeout_t<T>;
33 changes: 33 additions & 0 deletions runtime-light/stdlib/fork/fork-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

task_t<void> f$sched_yield() noexcept {
co_await wait_for_reschedule_t{};
Expand All @@ -22,3 +23,35 @@ task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept {
}
co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast<uint64_t>(duration_ns)}};
}

int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept {
return WaitQueueContext::get().create_queue(fork_ids);
}

void f$wait_queue_push(int64_t queue_id, Optional<int64_t> fork_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) {
auto task = ForkComponentContext::get().pop_fork(fork_id.val());
if (task.has_value()) {
php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id);
queue.val()->push_fork(fork_id.val(), std::move(task.val()));
}
}
}

bool f$wait_queue_empty(int64_t queue_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty();
}
return false;
}

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue_id, double timeout) noexcept {
if (WaitQueueContext::get().get_queue(queue_id).has_value()) {
if (timeout < 0.0) {
timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT;
}
const auto timeout_ns{std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{timeout})};
co_return co_await wait_queue_next_t{queue_id, timeout_ns};
}
co_return Optional<int64_t>{};
}
9 changes: 8 additions & 1 deletion runtime-light/stdlib/fork/fork-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork-context.h"

namespace fork_api_impl_ {

Expand Down Expand Up @@ -45,3 +44,11 @@ requires(is_optional<T>::value) task_t<T> f$wait(Optional<int64_t> fork_id_opt,
task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept;

int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept;

void f$wait_queue_push(int64_t queue, Optional<int64_t> fork_id) noexcept;

bool f$wait_queue_empty(int64_t queue_id) noexcept;

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept;
29 changes: 29 additions & 0 deletions runtime-light/stdlib/fork/wait-queue-context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/stdlib/fork/wait-queue-context.h"

#include "runtime-light/component/component.h"
#include "runtime-light/stdlib/fork/fork-context.h"

WaitQueueContext &WaitQueueContext::get() noexcept {
return ForkComponentContext::get().wait_queue_context;
}

int64_t WaitQueueContext::create_queue(const array<Optional<int64_t>> &fork_ids) noexcept {
auto &memory_resource{get_component_context()->runtime_allocator.memory_resource};
unordered_map<int64_t, task_t<fork_result>> forks(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource});
std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) {
Optional<int64_t> fork_id = it.get_value();
if (fork_id.has_value()) {
if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) {
forks[fork_id.val()] = std::move(task.val());
}
}
});
int64_t queue_id{++next_wait_queue_id};
wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks)));
php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size);
return queue_id;
}
37 changes: 37 additions & 0 deletions runtime-light/stdlib/fork/wait-queue-context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <cstdint>

#include "runtime-core/core-types/decl/optional.h"
#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-light/stdlib/fork/wait-queue.h"
#include "runtime-light/utils/concepts.h"

class WaitQueueContext {
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, WaitQueue> wait_queues;
static constexpr auto WAIT_QUEUE_INIT_ID = 0;
int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID};

public:
explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: wait_queues(unordered_map<int64_t, WaitQueue>::allocator_type{memory_resource}) {}

static WaitQueueContext &get() noexcept;

int64_t create_queue(const array<Optional<int64_t>> &fork_ids) noexcept;

Optional<WaitQueue *> get_queue(int64_t queue_id) noexcept {
if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) {
return &it->second;
}
return {};
}
};
18 changes: 18 additions & 0 deletions runtime-light/stdlib/fork/wait-queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/stdlib/fork/wait-queue.h"

#include "runtime-light/component/component.h"

void WaitQueue::resume_awaited_handles_if_empty() {
if (forks.empty()) {
while (!awaited_handles.empty()) {
auto handle = awaited_handles.front();
awaited_handles.pop_front();
CoroutineScheduler::get().suspend({handle, WaitEvent::Rechedule{}});
}
}
}

96 changes: 96 additions & 0 deletions runtime-light/stdlib/fork/wait-queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <algorithm>
#include <coroutine>
#include <cstdint>

#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/utils/concepts.h"

class WaitQueue {
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

template<typename T>
using deque = memory_resource::stl::deque<T, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, task_t<fork_result>> forks;
unordered_map<int64_t, task_t<fork_result>::awaiter_t> fork_awaiters;
deque<std::coroutine_handle<>> awaited_handles;

void resume_awaited_handles_if_empty();

public:
WaitQueue(const WaitQueue &) = delete;
WaitQueue &operator=(const WaitQueue &) = delete;
WaitQueue &operator=(WaitQueue &&) = delete;

WaitQueue(WaitQueue &&other) noexcept
: forks(std::move(other.forks))
, fork_awaiters(std::move(other.fork_awaiters))
, awaited_handles(std::move(other.awaited_handles)) {}

explicit WaitQueue(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_map<int64_t, task_t<fork_result>> &&forks_) noexcept
: forks(std::move(forks_))
, fork_awaiters(unordered_map<int64_t, task_t<fork_result>::awaiter_t>::allocator_type{memory_resource})
, awaited_handles(deque<std::coroutine_handle<>>::allocator_type{memory_resource}) {
for (auto &fork : forks) {
fork_awaiters.emplace(fork.first, std::addressof(fork.second));
}
}

void push_fork(int64_t fork_id, task_t<fork_result> &&fork) noexcept {
auto [fork_it, fork_success] = forks.emplace(fork_id, std::move(fork));
auto [awaiter_id, awaiter_success] = fork_awaiters.emplace(fork_id, std::addressof(fork_it->second));
if (!awaited_handles.empty()) {
awaiter_id->second.await_suspend(awaited_handles.front());
}
}

Optional<std::pair<int64_t, task_t<fork_result>>> pop_fork() noexcept {
auto it = std::find_if(forks.begin(), forks.end(), [](std::pair<const int64_t, task_t<fork_result>> &fork) { return fork.second.done(); });
if (it != forks.end()) {
auto fork = std::move(*it);
forks.erase(fork.first);
fork_awaiters.erase(fork.first);
resume_awaited_handles_if_empty();
return fork;
} else {
return {};
}
}

void push_coro_handle(std::coroutine_handle<> coro) noexcept {
if (awaited_handles.empty()) {
std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [coro](auto &awaiter) { awaiter.second.await_suspend(coro); });
}
awaited_handles.push_back(coro);
}

void pop_coro_handle() noexcept {
if (!awaited_handles.empty()) {
awaited_handles.pop_front();
}
std::coroutine_handle<> next_awaiter = awaited_handles.empty() ? std::noop_coroutine() : awaited_handles.front();
std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [&](auto &awaiter) { awaiter.second.await_suspend(next_awaiter); });
}

bool has_ready_fork() const noexcept {
return std::any_of(forks.begin(), forks.end(), [](const auto &fork) { return fork.second.done(); });
}

size_t size() const noexcept {
return forks.size();
}

bool empty() const noexcept {
return forks.empty();
}
};
3 changes: 2 additions & 1 deletion runtime-light/stdlib/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

int64_t f$rand() {
std::random_device rd;
int64_t dice_roll = rd();
std::uniform_int_distribution<> dis(0, rd.max());
int64_t dice_roll = dis(rd);
return dice_roll;
}
2 changes: 2 additions & 0 deletions runtime-light/stdlib/stdlib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ prepend(
superglobals.cpp
fork/fork-api.cpp
fork/fork-context.cpp
fork/wait-queue-context.cpp
fork/wait-queue.cpp
rpc/rpc-api.cpp
rpc/rpc-context.cpp
rpc/rpc-extra-headers.cpp
Expand Down
8 changes: 4 additions & 4 deletions tests/phpt/fork/001_basic.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@ok k2_skip
@ok
<?php
require_once 'kphp_tester_include.php';

Expand Down Expand Up @@ -90,15 +90,15 @@ function hash4($n) {



$id2 = fork(hash2(100));
$id = fork(hash3(100));
$id2 = fork(hash2(10));
$id = fork(hash3(10));
wait($id);
wait($id2);

print "{$res}\n";

# $id3 = fork(hash4(100));
$qid = hash4(100);
$qid = hash4(10);
while (true) {
$t = wait_queue_next ($qid);
if (!$t) {
Expand Down

0 comments on commit 345fffa

Please sign in to comment.