Skip to content

Commit

Permalink
Split TL and RPC stuff (#1060)
Browse files Browse the repository at this point in the history
Split TL and RPC introducing TLBuffer instead of RpcBuffer
  • Loading branch information
mkornaukhov03 authored Aug 13, 2024
1 parent 9123ffa commit 152e60b
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 111 deletions.
121 changes: 12 additions & 109 deletions runtime-light/stdlib/rpc/rpc-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "runtime-light/stdlib/rpc/rpc-api.h"

#include <array>
#include <chrono>
#include <cstddef>
#include <cstdint>
Expand All @@ -15,26 +14,15 @@
#include "common/rpc-error-codes.h"
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/allocator/allocator.h"
#include "runtime-light/stdlib/rpc/rpc-buffer.h"
#include "runtime-light/stdlib/rpc/rpc-context.h"
#include "runtime-light/stdlib/rpc/rpc-extra-headers.h"
#include "runtime-light/streams/interface.h"
#include "runtime-light/tl/tl-core.h"

namespace rpc_impl_ {

constexpr int32_t MAX_TIMEOUT_S = 86400;

constexpr auto SMALL_STRING_SIZE_LEN = 1;
constexpr auto MEIDUM_STRING_SIZE_LEN = 3;
constexpr auto LARGE_STRING_SIZE_LEN = 7;

constexpr uint64_t SMALL_STRING_MAX_LEN = 253;
constexpr uint64_t MEDIUM_STRING_MAX_LEN = (static_cast<uint64_t>(1) << 24) - 1;
[[maybe_unused]] constexpr uint64_t LARGE_STRING_MAX_LEN = (static_cast<uint64_t>(1) << 56) - 1;

constexpr uint8_t LARGE_STRING_MAGIC = 0xff;
constexpr uint8_t MEDIUM_STRING_MAGIC = 0xfe;

mixed mixed_array_get_value(const mixed &arr, const string &str_key, int64_t num_key) noexcept {
if (!arr.is_array()) {
return {};
Expand Down Expand Up @@ -244,7 +232,7 @@ task_t<array<mixed>> rpc_tl_query_result_one_impl(int64_t query_id) noexcept {
}

rpc_ctx.rpc_buffer.clean();
rpc_ctx.rpc_buffer.store(data.c_str(), data.size());
rpc_ctx.rpc_buffer.store_bytes(data.c_str(), data.size());

co_return fetch_function_untyped(rpc_query);
}
Expand Down Expand Up @@ -295,7 +283,7 @@ task_t<class_instance<C$VK$TL$RpcResponse>> typed_rpc_tl_query_result_one_impl(i
}

rpc_ctx.rpc_buffer.clean();
rpc_ctx.rpc_buffer.store(data.c_str(), data.size());
rpc_ctx.rpc_buffer.store_bytes(data.c_str(), data.size());

co_return fetch_function_typed(rpc_query, error_factory);
}
Expand All @@ -308,53 +296,27 @@ bool f$store_int(int64_t v) noexcept {
if (unlikely(is_int32_overflow(v))) {
php_warning("Got int32 overflow on storing '%" PRIi64 "', the value will be casted to '%d'", v, static_cast<int32_t>(v));
}
RpcComponentContext::get().rpc_buffer.store_trivial(static_cast<int32_t>(v));
RpcComponentContext::get().rpc_buffer.store_trivial<int32_t>(v);
return true;
}

bool f$store_long(int64_t v) noexcept {
RpcComponentContext::get().rpc_buffer.store_trivial(v);
RpcComponentContext::get().rpc_buffer.store_trivial<int64_t>(v);
return true;
}

bool f$store_float(double v) noexcept {
RpcComponentContext::get().rpc_buffer.store_trivial(static_cast<float>(v));
RpcComponentContext::get().rpc_buffer.store_trivial<float>(v);
return true;
}

bool f$store_double(double v) noexcept {
RpcComponentContext::get().rpc_buffer.store_trivial(v);
RpcComponentContext::get().rpc_buffer.store_trivial<double>(v);
return true;
}

bool f$store_string(const string &v) noexcept {
auto &rpc_buf{RpcComponentContext::get().rpc_buffer};

uint8_t size_len{};
uint64_t string_len{v.size()};
if (string_len <= rpc_impl_::SMALL_STRING_MAX_LEN) {
size_len = rpc_impl_::SMALL_STRING_SIZE_LEN;
rpc_buf.store_trivial(static_cast<uint8_t>(string_len));
} else if (string_len <= rpc_impl_::MEDIUM_STRING_MAX_LEN) {
size_len = rpc_impl_::MEIDUM_STRING_SIZE_LEN + 1;
rpc_buf.store_trivial(static_cast<uint8_t>(rpc_impl_::MEDIUM_STRING_MAGIC));
rpc_buf.store_trivial(static_cast<uint8_t>(string_len & 0xff));
rpc_buf.store_trivial(static_cast<uint8_t>((string_len >> 8) & 0xff));
rpc_buf.store_trivial(static_cast<uint8_t>((string_len >> 16) & 0xff));
} else {
php_warning("large strings aren't supported");
size_len = rpc_impl_::SMALL_STRING_SIZE_LEN;
string_len = 0;
rpc_buf.store_trivial(static_cast<uint8_t>(string_len));
}
rpc_buf.store(v.c_str(), string_len);

const auto total_len{size_len + string_len};
const auto total_len_with_padding{(total_len + 3) & ~static_cast<string::size_type>(3)};
const auto padding{total_len_with_padding - total_len};

std::array padding_array{'\0', '\0', '\0', '\0'};
rpc_buf.store(padding_array.data(), padding);
RpcComponentContext::get().rpc_buffer.store_string(std::string_view{v.c_str(), v.size()});
return true;
}

Expand All @@ -373,71 +335,12 @@ double f$fetch_double() noexcept {
}

double f$fetch_float() noexcept {
return static_cast<double>(RpcComponentContext::get().rpc_buffer.fetch_trivial<float>().value_or(0.0));
return static_cast<double>(RpcComponentContext::get().rpc_buffer.fetch_trivial<float>().value_or(0));
}

string f$fetch_string() noexcept {
auto &rpc_buf{RpcComponentContext::get().rpc_buffer};

uint8_t first_byte{};
if (const auto opt_first_byte{rpc_buf.fetch_trivial<uint8_t>()}; opt_first_byte) {
first_byte = opt_first_byte.value();
} else {
return {}; // TODO: error handling
}

uint8_t size_len{};
uint64_t string_len{};
switch (first_byte) {
case rpc_impl_::LARGE_STRING_MAGIC: {
if (rpc_buf.remaining() < rpc_impl_::LARGE_STRING_SIZE_LEN) {
return {}; // TODO: error handling
}
size_len = rpc_impl_::LARGE_STRING_SIZE_LEN + 1;
const auto first{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value())};
const auto second{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 8};
const auto third{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 16};
const auto fourth{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 24};
const auto fifth{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 32};
const auto sixth{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 40};
const auto seventh{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 48};
string_len = first | second | third | fourth | fifth | sixth | seventh;

const auto total_len_with_padding{(size_len + string_len + 3) & ~static_cast<uint64_t>(3)};
rpc_buf.adjust(total_len_with_padding - size_len);
php_warning("large strings aren't supported");
return {};
}
case rpc_impl_::MEDIUM_STRING_MAGIC: {
if (rpc_buf.remaining() < rpc_impl_::MEIDUM_STRING_SIZE_LEN) {
return {}; // TODO: error handling
}
size_len = rpc_impl_::MEIDUM_STRING_SIZE_LEN + 1;
const auto first{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value())};
const auto second{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 8};
const auto third{static_cast<uint64_t>(rpc_buf.fetch_trivial<uint8_t>().value()) << 16};
string_len = first | second | third;

if (string_len <= rpc_impl_::SMALL_STRING_MAX_LEN) {
php_warning("long string's length is less than 254");
}
break;
}
default: {
size_len = rpc_impl_::SMALL_STRING_SIZE_LEN;
string_len = static_cast<uint64_t>(first_byte);
break;
}
}

const auto total_len_with_padding{(size_len + string_len + 3) & ~static_cast<uint64_t>(3)};
if (rpc_buf.remaining() < total_len_with_padding - size_len) {
return {}; // TODO: error handling
}

string res{rpc_buf.data() + rpc_buf.pos(), static_cast<string::size_type>(string_len)};
rpc_buf.adjust(total_len_with_padding - size_len);
return res;
const std::string_view str{RpcComponentContext::get().rpc_buffer.fetch_string()};
return {str.data(), static_cast<string::size_type>(str.length())};
}

// === Rpc Query ==================================================================================
Expand Down Expand Up @@ -489,7 +392,7 @@ bool is_int32_overflow(int64_t v) noexcept {
}

void store_raw_vector_double(const array<double> &vector) noexcept { // TODO: didn't we forget vector's length?
RpcComponentContext::get().rpc_buffer.store(reinterpret_cast<const char *>(vector.get_const_vector_pointer()), sizeof(double) * vector.count());
RpcComponentContext::get().rpc_buffer.store_bytes(reinterpret_cast<const char *>(vector.get_const_vector_pointer()), sizeof(double) * vector.count());
}

void fetch_raw_vector_double(array<double> &vector, int64_t num_elems) noexcept {
Expand Down
4 changes: 2 additions & 2 deletions runtime-light/stdlib/rpc/rpc-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
#include "common/mixin/not_copyable.h"
#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/runtime-core.h"
#include "runtime-light/stdlib/rpc/rpc-buffer.h"
#include "runtime-light/stdlib/rpc/rpc-extra-info.h"
#include "runtime-light/stdlib/rpc/rpc-tl-defs.h"
#include "runtime-light/stdlib/rpc/rpc-tl-query.h"
#include "runtime-light/streams/component-stream.h"
#include "runtime-light/tl/tl-core.h"

struct RpcComponentContext final : private vk::not_copyable {
template<typename Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

RpcBuffer rpc_buffer;
tl::TLBuffer rpc_buffer;
int64_t current_query_id{0};
CurrentTlQuery current_query;
unordered_map<int64_t, class_instance<C$ComponentQuery>> pending_component_queries;
Expand Down
97 changes: 97 additions & 0 deletions runtime-light/tl/tl-core.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/tl/tl-core.h"

namespace tl {
void TLBuffer::store_string(std::string_view str) noexcept {
const char *str_buf{str.data()};
size_t str_len{str.size()};
uint8_t size_len{};
if (str_len <= SMALL_STRING_MAX_LEN) {
size_len = SMALL_STRING_SIZE_LEN;
store_trivial<uint8_t>(str_len);
} else if (str_len <= MEDIUM_STRING_MAX_LEN) {
size_len = MEDIUM_STRING_SIZE_LEN + 1;
store_trivial<uint8_t>(MEDIUM_STRING_MAGIC);
store_trivial<uint8_t>(str_len & 0xff);
store_trivial<uint8_t>((str_len >> 8) & 0xff);
store_trivial<uint8_t>((str_len >> 16) & 0xff);
} else {
php_warning("large strings aren't supported");
size_len = SMALL_STRING_SIZE_LEN;
str_len = 0;
store_trivial<uint8_t>(str_len);
}
store_bytes(str_buf, str_len);

const auto total_len{size_len + str_len};
const auto total_len_with_padding{(total_len + 3) & ~static_cast<string::size_type>(3)};
const auto padding{total_len_with_padding - total_len};

std::array padding_array{'\0', '\0', '\0', '\0'};
store_bytes(padding_array.data(), padding);
}

std::string_view TLBuffer::fetch_string() noexcept {
uint8_t first_byte{};
if (const auto opt_first_byte{fetch_trivial<uint8_t>()}; opt_first_byte) {
first_byte = opt_first_byte.value();
} else {
return {}; // TODO: error handling
}

uint8_t size_len{};
uint64_t string_len{};
switch (first_byte) {
case LARGE_STRING_MAGIC: {
if (remaining() < LARGE_STRING_SIZE_LEN) {
return {}; // TODO: error handling
}
size_len = LARGE_STRING_SIZE_LEN + 1;
const auto first{static_cast<uint64_t>(fetch_trivial<uint8_t>().value())};
const auto second{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 8};
const auto third{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 16};
const auto fourth{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 24};
const auto fifth{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 32};
const auto sixth{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 40};
const auto seventh{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 48};
string_len = first | second | third | fourth | fifth | sixth | seventh;

const auto total_len_with_padding{(size_len + string_len + 3) & ~static_cast<uint64_t>(3)};
adjust(total_len_with_padding - size_len);
php_warning("large strings aren't supported");
return {};
}
case MEDIUM_STRING_MAGIC: {
if (remaining() < MEDIUM_STRING_SIZE_LEN) {
return {}; // TODO: error handling
}
size_len = MEDIUM_STRING_SIZE_LEN + 1;
const auto first{static_cast<uint64_t>(fetch_trivial<uint8_t>().value())};
const auto second{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 8};
const auto third{static_cast<uint64_t>(fetch_trivial<uint8_t>().value()) << 16};
string_len = first | second | third;

if (string_len <= SMALL_STRING_MAX_LEN) {
php_warning("long string's length is less than 254");
}
break;
}
default: {
size_len = SMALL_STRING_SIZE_LEN;
string_len = static_cast<uint64_t>(first_byte);
break;
}
}
const auto total_len_with_padding{(size_len + string_len + 3) & ~static_cast<uint64_t>(3)};
if (m_remaining < total_len_with_padding - size_len) {
return {}; // TODO: error handling
}

std::string_view response{data() + m_pos, static_cast<size_t>(string_len)};
adjust(total_len_with_padding - size_len);
return response;
}
} // namespace tl
Loading

0 comments on commit 152e60b

Please sign in to comment.