Skip to content

Commit

Permalink
[k2] add missing RPC bultins (#1211)
Browse files Browse the repository at this point in the history
* rpc_tl_pending_queries_count
* rpc_tl_query_result_synchronously
* set_fail_rpc_on_int32_overflow
  • Loading branch information
apolyakov authored Jan 17, 2025
1 parent a41879a commit 4b9b3db
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 25 deletions.
13 changes: 6 additions & 7 deletions builtin-functions/kphp-light/rpc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ function store_string ($v ::: string) ::: bool;
function store_double ($v ::: float) ::: bool;
function store_float ($v ::: float) ::: bool;

function rpc_tl_pending_queries_count (): int;
function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool): bool;

// === Rpc Old API =================================================================================

/** @kphp-extern-func-info interruptible */
Expand All @@ -82,6 +85,9 @@ function rpc_tl_query (\RpcConnection $rpc_conn, $arr ::: array, $timeout ::: fl
/** @kphp-extern-func-info interruptible */
function rpc_tl_query_result ($query_ids ::: array) ::: mixed[][];

/** @kphp-extern-func-info interruptible */
function rpc_tl_query_result_synchronously ($query_ids ::: array) ::: mixed[][];

/** @kphp-extern-func-info interruptible generate-stub */
function typed_rpc_tl_query (\RpcConnection $connection, @tl\RpcFunction[] $query_functions, $timeout ::: float = -1.0, $ignore_answer ::: bool = false, \KphpRpcRequestsExtraInfo $requests_extra_info = null, $need_responses_extra_info ::: bool = false) ::: int[];

Expand All @@ -101,19 +107,12 @@ function rpc_queue_empty ($queue_id ::: int) ::: bool;
function rpc_queue_next ($queue_id ::: int, $timeout ::: float = -1.0) ::: int | false;
/** @kphp-extern-func-info interruptible generate-stub */
function rpc_queue_push ($queue_id ::: int, $request_ids ::: mixed) ::: int;
/** @kphp-extern-func-info interruptible generate-stub */
function rpc_tl_pending_queries_count () ::: int;

function rpc_tl_query_result_synchronously ($query_ids ::: array) ::: mixed[][];

/** @kphp-extern-func-info interruptible generate-stub */
function store_error ($error_code ::: int, $error_text ::: string) ::: bool;
/** @kphp-extern-func-info interruptible generate-stub */
function store_finish() ::: bool;

/** @kphp-extern-func-info interruptible generate-stub */
function set_fail_rpc_on_int32_overflow ($fail_rpc ::: bool) ::: bool;

/** @kphp-extern-func-info interruptible generate-stub */
function rpc_tl_query_one (\RpcConnection $rpc_conn, $arr ::: mixed, $timeout ::: float = -1.0) ::: int;

Expand Down
12 changes: 4 additions & 8 deletions runtime-light/stdlib/rpc/rpc-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ task_t<RpcQueryInfo> rpc_send_impl(string actor, double timeout, bool ignore_ans
auto comp_query{co_await f$component_client_send_request(actor, request_buf)};
if (comp_query.is_null()) {
php_warning("can't send rpc query to %s", actor.c_str());
co_return RpcQueryInfo{.id = RPC_INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
co_return RpcQueryInfo{.id = kphp::rpc::INVALID_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
}

// create response extra info
Expand Down Expand Up @@ -154,7 +154,7 @@ task_t<RpcQueryInfo> rpc_send_impl(string actor, double timeout, bool ignore_ans
const auto waiter_fork_id{co_await start_fork_t{static_cast<task_t<void>>(std::move(waiter_task)), start_fork_t::execution::self}};

if (ignore_answer) {
co_return RpcQueryInfo{.id = RPC_IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
co_return RpcQueryInfo{.id = kphp::rpc::IGNORED_ANSWER_QUERY_ID, .request_size = request_size, .timestamp = timestamp};
}
rpc_ctx.response_waiter_forks.emplace(query_id, waiter_fork_id);
co_return RpcQueryInfo{.id = query_id, .request_size = request_size, .timestamp = timestamp};
Expand Down Expand Up @@ -209,7 +209,7 @@ task_t<RpcQueryInfo> typed_rpc_tl_query_one_impl(string actor, const RpcRequest
}

task_t<array<mixed>> rpc_tl_query_result_one_impl(int64_t query_id) noexcept {
if (query_id < RPC_VALID_QUERY_ID_RANGE_START) {
if (query_id < kphp::rpc::VALID_QUERY_ID_RANGE_START) {
co_return make_fetch_error(string{"wrong query_id"}, TL_ERROR_WRONG_QUERY_ID);
}

Expand Down Expand Up @@ -256,7 +256,7 @@ task_t<array<mixed>> rpc_tl_query_result_one_impl(int64_t query_id) noexcept {
}

task_t<class_instance<C$VK$TL$RpcResponse>> typed_rpc_tl_query_result_one_impl(int64_t query_id, const RpcErrorFactory &error_factory) noexcept {
if (query_id < RPC_VALID_QUERY_ID_RANGE_START) {
if (query_id < kphp::rpc::VALID_QUERY_ID_RANGE_START) {
co_return error_factory.make_error(string{"wrong query_id"}, TL_ERROR_WRONG_QUERY_ID);
}

Expand Down Expand Up @@ -392,10 +392,6 @@ task_t<array<array<mixed>>> f$rpc_fetch_responses(array<int64_t> query_ids) noex

// === Rpc Misc ==================================================================================

void f$rpc_clean() noexcept {
RpcInstanceState::get().rpc_buffer.clean();
}

// === Misc =======================================================================================

bool is_int32_overflow(int64_t v) noexcept {
Expand Down
36 changes: 28 additions & 8 deletions runtime-light/stdlib/rpc/rpc-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
#include "runtime-common/core/runtime-core.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/rpc/rpc-extra-info.h"
#include "runtime-light/stdlib/rpc/rpc-state.h"
#include "runtime-light/stdlib/rpc/rpc-tl-error.h"
#include "runtime-light/stdlib/rpc/rpc-tl-function.h"
#include "runtime-light/stdlib/rpc/rpc-tl-kphp-request.h"

inline constexpr int64_t RPC_VALID_QUERY_ID_RANGE_START = 0;
inline constexpr int64_t RPC_INVALID_QUERY_ID = -1;
inline constexpr int64_t RPC_IGNORED_ANSWER_QUERY_ID = -2;
namespace kphp::rpc {

inline constexpr int64_t VALID_QUERY_ID_RANGE_START = 0;
inline constexpr int64_t INVALID_QUERY_ID = -1;
inline constexpr int64_t IGNORED_ANSWER_QUERY_ID = -2;

} // namespace kphp::rpc

namespace rpc_impl_ {

struct RpcQueryInfo {
int64_t id{RPC_INVALID_QUERY_ID};
int64_t id{kphp::rpc::INVALID_QUERY_ID};
size_t request_size{0};
double timestamp{0.0};
};
Expand Down Expand Up @@ -112,13 +117,17 @@ f$typed_rpc_tl_query_result_synchronously(array<query_id_t> query_ids) noexcept
co_return co_await f$rpc_fetch_typed_responses_synchronously(std::move(query_ids));
}

inline task_t<array<array<mixed>>> f$rpc_tl_query_result_synchronously(array<int64_t> query_ids) noexcept {
co_return co_await f$rpc_fetch_responses(std::move(query_ids));
}

template<class T>
task_t<array<array<mixed>>> f$rpc_tl_query_result(const array<T> &) {
php_critical_error("call to unsupported function");
task_t<array<array<mixed>>> f$rpc_tl_query_result_synchronously(array<T> query_ids) noexcept {
co_return co_await f$rpc_tl_query_result_synchronously(array<int64_t>::convert_from(query_ids));
}

template<class T>
array<array<mixed>> f$rpc_tl_query_result_synchronously(const array<T> &) {
task_t<array<array<mixed>>> f$rpc_tl_query_result(const array<T> &) {
php_critical_error("call to unsupported function");
}

Expand All @@ -129,7 +138,18 @@ inline task_t<array<int64_t>> f$rpc_tl_query(const class_instance<C$RpcConnectio

// === Rpc Misc ===================================================================================

void f$rpc_clean() noexcept;
inline void f$rpc_clean() noexcept {
RpcInstanceState::get().rpc_buffer.clean();
}

inline int64_t f$rpc_tl_pending_queries_count() noexcept {
return RpcInstanceState::get().response_waiter_forks.size();
}

inline bool f$set_fail_rpc_on_int32_overflow(bool fail_rpc) noexcept {
RpcInstanceState::get().fail_rpc_on_int32_overflow = fail_rpc;
return true;
}

// === Misc =======================================================================================

Expand Down
1 change: 1 addition & 0 deletions runtime-light/stdlib/rpc/rpc-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct RpcInstanceState final : private vk::not_copyable {
tl::TLBuffer rpc_buffer;
int64_t current_query_id{0};
CurrentTlQuery current_query{};
bool fail_rpc_on_int32_overflow{};
unordered_map<int64_t, int64_t> response_waiter_forks;
unordered_map<int64_t, class_instance<RpcTlQuery>> response_fetcher_instances;
unordered_map<int64_t, std::pair<rpc_response_extra_info_status_t, rpc_response_extra_info_t>> rpc_responses_extra_info;
Expand Down
10 changes: 8 additions & 2 deletions runtime-light/tl/tl-builtins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "runtime-light/tl/tl-builtins.h"

#include "common/php-functions.h"
#include "runtime-light/stdlib/rpc/rpc-state.h"

void register_tl_storers_table_and_fetcher(const array<tl_storer_ptr> &gen$ht, tl_fetch_wrapper_ptr gen$t_ReqResult_fetch) {
auto &rpc_mutable_image_state{RpcImageState::get_mutable()};
Expand Down Expand Up @@ -81,8 +82,13 @@ void t_Int::typed_fetch_to(t_Int::PhpType &out) {

int32_t t_Int::prepare_int_for_storing(int64_t v) {
auto v32 = static_cast<int32_t>(v);
if (is_int32_overflow(v)) {
// TODO
if (is_int32_overflow(v)) [[unlikely]] {
if (RpcInstanceState::get().fail_rpc_on_int32_overflow) {
CurrentTlQuery::get().raise_storing_error("Got int32 overflow with value '%" PRIi64 "'. Serialization will fail.", v);
} else {
php_warning("Got int32 overflow on storing %s: the value '%" PRIi64 "' will be casted to '%d'. Serialization will succeed.",
CurrentTlQuery::get().get_current_tl_function_name().c_str(), v, v32);
}
}
return v32;
}
Expand Down

0 comments on commit 4b9b3db

Please sign in to comment.