From 4b9b3db49caa3bdba25713b2d0697b42955a36ba Mon Sep 17 00:00:00 2001 From: Alexander Polyakov Date: Fri, 17 Jan 2025 13:10:27 +0300 Subject: [PATCH] [k2] add missing RPC bultins (#1211) * rpc_tl_pending_queries_count * rpc_tl_query_result_synchronously * set_fail_rpc_on_int32_overflow --- builtin-functions/kphp-light/rpc.txt | 13 +++++----- runtime-light/stdlib/rpc/rpc-api.cpp | 12 ++++------ runtime-light/stdlib/rpc/rpc-api.h | 36 +++++++++++++++++++++------- runtime-light/stdlib/rpc/rpc-state.h | 1 + runtime-light/tl/tl-builtins.cpp | 10 ++++++-- 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/builtin-functions/kphp-light/rpc.txt b/builtin-functions/kphp-light/rpc.txt index 53a1965a40..e0d41571e5 100644 --- a/builtin-functions/kphp-light/rpc.txt +++ b/builtin-functions/kphp-light/rpc.txt @@ -74,6 +74,9 @@ function store_string ($v ::: string) ::: bool; function store_double ($v ::: float) ::: bool; function store_float ($v ::: float) ::: bool; +function rpc_tl_pending_queries_count (): int; +function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool): bool; + // === Rpc Old API ================================================================================= /** @kphp-extern-func-info interruptible */ @@ -82,6 +85,9 @@ function rpc_tl_query (\RpcConnection $rpc_conn, $arr ::: array, $timeout ::: fl /** @kphp-extern-func-info interruptible */ function rpc_tl_query_result ($query_ids ::: array) ::: mixed[][]; +/** @kphp-extern-func-info interruptible */ +function rpc_tl_query_result_synchronously ($query_ids ::: array) ::: mixed[][]; + /** @kphp-extern-func-info interruptible generate-stub */ function typed_rpc_tl_query (\RpcConnection $connection, @tl\RpcFunction[] $query_functions, $timeout ::: float = -1.0, $ignore_answer ::: bool = false, \KphpRpcRequestsExtraInfo $requests_extra_info = null, $need_responses_extra_info ::: bool = false) ::: int[]; @@ -101,19 +107,12 @@ function rpc_queue_empty ($queue_id ::: int) ::: bool; function rpc_queue_next ($queue_id ::: int, $timeout ::: float = -1.0) ::: int | false; /** @kphp-extern-func-info interruptible generate-stub */ function rpc_queue_push ($queue_id ::: int, $request_ids ::: mixed) ::: int; -/** @kphp-extern-func-info interruptible generate-stub */ -function rpc_tl_pending_queries_count () ::: int; - -function rpc_tl_query_result_synchronously ($query_ids ::: array) ::: mixed[][]; /** @kphp-extern-func-info interruptible generate-stub */ function store_error ($error_code ::: int, $error_text ::: string) ::: bool; /** @kphp-extern-func-info interruptible generate-stub */ function store_finish() ::: bool; -/** @kphp-extern-func-info interruptible generate-stub */ -function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool) ::: bool; - /** @kphp-extern-func-info interruptible generate-stub */ function rpc_tl_query_one (\RpcConnection $rpc_conn, $arr ::: mixed, $timeout ::: float = -1.0) ::: int; diff --git a/runtime-light/stdlib/rpc/rpc-api.cpp b/runtime-light/stdlib/rpc/rpc-api.cpp index cffae0d4a5..77999e3f48 100644 --- a/runtime-light/stdlib/rpc/rpc-api.cpp +++ b/runtime-light/stdlib/rpc/rpc-api.cpp @@ -122,7 +122,7 @@ task_t rpc_send_impl(string actor, double timeout, bool ignore_ans auto comp_query{co_await f$component_client_send_request(actor, request_buf)}; if (comp_query.is_null()) { php_warning("can't send rpc query to %s", actor.c_str()); - co_return RpcQueryInfo{.id = RPC_INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + co_return RpcQueryInfo{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; } // create response extra info @@ -154,7 +154,7 @@ task_t rpc_send_impl(string actor, double timeout, bool ignore_ans const auto waiter_fork_id{co_await start_fork_t{static_cast>(std::move(waiter_task)), start_fork_t::execution::self}}; if (ignore_answer) { - co_return RpcQueryInfo{.id = RPC_IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; + co_return RpcQueryInfo{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp}; } rpc_ctx.response_waiter_forks.emplace(query_id, waiter_fork_id); co_return RpcQueryInfo{.id = query_id, .request_size = request_size, .timestamp = timestamp}; @@ -209,7 +209,7 @@ task_t typed_rpc_tl_query_one_impl(string actor, const RpcRequest } task_t> rpc_tl_query_result_one_impl(int64_t query_id) noexcept { - if (query_id < RPC_VALID_QUERY_ID_RANGE_START) { + if (query_id < kphp::rpc::VALID_QUERY_ID_RANGE_START) { co_return make_fetch_error(string{"wrong query_id"}, TL_ERROR_WRONG_QUERY_ID); } @@ -256,7 +256,7 @@ task_t> rpc_tl_query_result_one_impl(int64_t query_id) noexcept { } task_t> typed_rpc_tl_query_result_one_impl(int64_t query_id, const RpcErrorFactory &error_factory) noexcept { - if (query_id < RPC_VALID_QUERY_ID_RANGE_START) { + if (query_id < kphp::rpc::VALID_QUERY_ID_RANGE_START) { co_return error_factory.make_error(string{"wrong query_id"}, TL_ERROR_WRONG_QUERY_ID); } @@ -392,10 +392,6 @@ task_t>> f$rpc_fetch_responses(array query_ids) noex // === Rpc Misc ================================================================================== -void f$rpc_clean() noexcept { - RpcInstanceState::get().rpc_buffer.clean(); -} - // === Misc ======================================================================================= bool is_int32_overflow(int64_t v) noexcept { diff --git a/runtime-light/stdlib/rpc/rpc-api.h b/runtime-light/stdlib/rpc/rpc-api.h index 4fac53b8d7..512618490b 100644 --- a/runtime-light/stdlib/rpc/rpc-api.h +++ b/runtime-light/stdlib/rpc/rpc-api.h @@ -10,18 +10,23 @@ #include "runtime-common/core/runtime-core.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/rpc/rpc-extra-info.h" +#include "runtime-light/stdlib/rpc/rpc-state.h" #include "runtime-light/stdlib/rpc/rpc-tl-error.h" #include "runtime-light/stdlib/rpc/rpc-tl-function.h" #include "runtime-light/stdlib/rpc/rpc-tl-kphp-request.h" -inline constexpr int64_t RPC_VALID_QUERY_ID_RANGE_START = 0; -inline constexpr int64_t RPC_INVALID_QUERY_ID = -1; -inline constexpr int64_t RPC_IGNORED_ANSWER_QUERY_ID = -2; +namespace kphp::rpc { + +inline constexpr int64_t VALID_QUERY_ID_RANGE_START = 0; +inline constexpr int64_t INVALID_QUERY_ID = -1; +inline constexpr int64_t IGNORED_ANSWER_QUERY_ID = -2; + +} // namespace kphp::rpc namespace rpc_impl_ { struct RpcQueryInfo { - int64_t id{RPC_INVALID_QUERY_ID}; + int64_t id{kphp::rpc::INVALID_QUERY_ID}; size_t request_size{0}; double timestamp{0.0}; }; @@ -112,13 +117,17 @@ f$typed_rpc_tl_query_result_synchronously(array query_ids) noexcept co_return co_await f$rpc_fetch_typed_responses_synchronously(std::move(query_ids)); } +inline task_t>> f$rpc_tl_query_result_synchronously(array query_ids) noexcept { + co_return co_await f$rpc_fetch_responses(std::move(query_ids)); +} + template -task_t>> f$rpc_tl_query_result(const array &) { - php_critical_error("call to unsupported function"); +task_t>> f$rpc_tl_query_result_synchronously(array query_ids) noexcept { + co_return co_await f$rpc_tl_query_result_synchronously(array::convert_from(query_ids)); } template -array> f$rpc_tl_query_result_synchronously(const array &) { +task_t>> f$rpc_tl_query_result(const array &) { php_critical_error("call to unsupported function"); } @@ -129,7 +138,18 @@ inline task_t> f$rpc_tl_query(const class_instance response_waiter_forks; unordered_map> response_fetcher_instances; unordered_map> rpc_responses_extra_info; diff --git a/runtime-light/tl/tl-builtins.cpp b/runtime-light/tl/tl-builtins.cpp index 16e2a33fee..76ba0f21f4 100644 --- a/runtime-light/tl/tl-builtins.cpp +++ b/runtime-light/tl/tl-builtins.cpp @@ -5,6 +5,7 @@ #include "runtime-light/tl/tl-builtins.h" #include "common/php-functions.h" +#include "runtime-light/stdlib/rpc/rpc-state.h" void register_tl_storers_table_and_fetcher(const array &gen$ht, tl_fetch_wrapper_ptr gen$t_ReqResult_fetch) { auto &rpc_mutable_image_state{RpcImageState::get_mutable()}; @@ -81,8 +82,13 @@ void t_Int::typed_fetch_to(t_Int::PhpType &out) { int32_t t_Int::prepare_int_for_storing(int64_t v) { auto v32 = static_cast(v); - if (is_int32_overflow(v)) { - // TODO + if (is_int32_overflow(v)) [[unlikely]] { + if (RpcInstanceState::get().fail_rpc_on_int32_overflow) { + CurrentTlQuery::get().raise_storing_error("Got int32 overflow with value '%" PRIi64 "'. Serialization will fail.", v); + } else { + php_warning("Got int32 overflow on storing %s: the value '%" PRIi64 "' will be casted to '%d'. Serialization will succeed.", + CurrentTlQuery::get().get_current_tl_function_name().c_str(), v, v32); + } } return v32; }