Skip to content

Commit

Permalink
added confdata binlog update timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
DrDet committed Dec 5, 2023
1 parent 7fefaf4 commit 865c916
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 7 deletions.
70 changes: 63 additions & 7 deletions server/confdata-binlog-replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cmath>
#include <forward_list>
#include <map>
#include <optional>
#include <string_view>

#include "common/binlog/binlog-replayer.h"
Expand All @@ -36,6 +37,7 @@ struct {
size_t memory_limit{2u * 1024u * 1024u * 1024u};
double soft_oom_threshold_ratio = 0.85;
double hard_oom_threshold_ratio = 0.95;
double confdata_update_timeout_sec = 0.3;
std::unique_ptr<re2::RE2> key_blacklist_pattern;
std::forward_list<vk::string_view> force_ignore_prefixes;
std::unordered_set<vk::string_view> predefined_wildcards;
Expand All @@ -50,6 +52,7 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
enum class OperationStatus {
no_update,
throttled_out,
timed_out,
blacklisted,
ttl_update_only,
full_update
Expand Down Expand Up @@ -170,6 +173,7 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
for (int i = 0; i < nrecords; i++) {
if (index_offset[i] >= 0) {
auto res = store_element(reinterpret_cast<const entry_type &>(index_binary_data[index_offset[i]]));
assert(res != OperationStatus::timed_out);
if (res == OperationStatus::throttled_out) {
ret_code = -1;
raise_confdata_oom_error("Can't read confdata snapshot on start");
Expand Down Expand Up @@ -280,7 +284,7 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
}
assert(head->second.size() <= std::numeric_limits<short>::max());
auto res = delete_element(head->second.c_str(), static_cast<short>(head->second.size()));
if (res == OperationStatus::throttled_out) {
if (res == OperationStatus::throttled_out || res == OperationStatus::timed_out) {
return;
}
assert(expired_elements == expiration_trace_.size() + 1);
Expand All @@ -295,7 +299,24 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
const ConfdataStats::EventCounters &get_event_counters() const noexcept {
return event_counters_;
}

void on_start_update_cycle(double update_timeout_sec) noexcept {
binlog_update_start_time_point_ = std::chrono::steady_clock::now();
if (update_timeout_sec > 0) {
update_timeout_sec_ = std::chrono::duration<double>{update_timeout_sec};
}
}

bool on_finish_update_cycle() noexcept {
bool ok = !is_update_timeout_expired();
update_timeout_sec_.reset();
return ok;
}

bool is_update_timeout_expired() const noexcept {
return update_timeout_sec_.has_value() &&
std::chrono::steady_clock::now() - binlog_update_start_time_point_ > update_timeout_sec_;
}
private:
ConfdataBinlogReplayer() noexcept:
garbage_from_previous_confdata_sample_(new(&garbage_mem_) GarbageList{}),
Expand Down Expand Up @@ -355,6 +376,9 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {

template<typename F>
OperationStatus generic_operation(const char *key, short key_len, int delay, const F &operation) noexcept {
if (is_update_timeout_expired()) {
return OperationStatus::timed_out;
}
if (get_memory_status() == MemoryStatus::HARD_OOM) {
return OperationStatus::throttled_out;
}
Expand Down Expand Up @@ -426,7 +450,6 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
}

replay_binlog_result finish_operation(OperationStatus status, ConfdataStats::EventCounters::Event &event) noexcept {
++event.total;
switch (status) {
case OperationStatus::no_update:
++event.ignored;
Expand All @@ -435,6 +458,9 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
++event.throttled_out;
++event_counters_.throttled_out_total_events;
break;
case OperationStatus::timed_out:
++event_counters_.update_timeouts_total;
break;
case OperationStatus::blacklisted:
++event.blacklisted;
break;
Expand All @@ -444,7 +470,11 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
case OperationStatus::full_update:
break;
}
return get_memory_status() == MemoryStatus::HARD_OOM ? REPLAY_BINLOG_STOP_READING : REPLAY_BINLOG_OK;
if (get_memory_status() == MemoryStatus::HARD_OOM || status == OperationStatus::timed_out) {
return REPLAY_BINLOG_STOP_READING;
}
++event.total;
return REPLAY_BINLOG_OK;
}

OperationStatus delete_processing_element(MemoryStatus memory_status) noexcept {
Expand Down Expand Up @@ -778,6 +808,9 @@ class ConfdataBinlogReplayer : vk::binlog::replayer {
size_t soft_oom_memory_limit_, hard_oom_memory_limit_;
bool soft_oom_reached_{false}, hard_oom_reached_{false};

std::chrono::steady_clock::time_point binlog_update_start_time_point_{std::chrono::nanoseconds::zero()};
std::optional<std::chrono::duration<double>> update_timeout_sec_;

std::aligned_storage_t<sizeof(confdata_sample_storage), alignof(confdata_sample_storage)> confdata_mem_;
confdata_sample_storage *updating_confdata_storage_{nullptr};
std::aligned_storage_t<sizeof(GarbageList), alignof(GarbageList)> garbage_mem_;
Expand Down Expand Up @@ -813,6 +846,10 @@ void set_confdata_blacklist_pattern(std::unique_ptr<re2::RE2> &&key_blacklist_pa
confdata_settings.key_blacklist_pattern = std::move(key_blacklist_pattern);
}

void set_confdata_update_timeout(double timeout_sec) noexcept {
confdata_settings.confdata_update_timeout_sec = timeout_sec;
}

void add_confdata_force_ignore_prefix(const char *key_ignore_prefix) noexcept {
assert(key_ignore_prefix && *key_ignore_prefix);
vk::string_view ignore_prefix{key_ignore_prefix};
Expand Down Expand Up @@ -876,8 +913,7 @@ void init_confdata_binlog_reader() noexcept {
auto &confdata_binlog_replayer = ConfdataBinlogReplayer::get();
confdata_binlog_replayer.init(confdata_manager.get_resource());
engine_default_load_index(confdata_settings.binlog_mask);
engine_default_read_binlog();
confdata_binlog_replayer.delete_expired_elements();
update_confdata_state_from_binlog(true, 10 * confdata_settings.confdata_update_timeout_sec);
if (confdata_binlog_replayer.get_memory_status() != ConfdataBinlogReplayer::MemoryStatus::NORMAL) {
confdata_binlog_replayer.raise_confdata_oom_error("Can't read confdata binlog on start");
exit(1);
Expand Down Expand Up @@ -932,8 +968,7 @@ void confdata_binlog_update_cron() noexcept {
if (!ok) {
return;
}
binlog_try_read_events();
confdata_binlog_replayer.delete_expired_elements();
update_confdata_state_from_binlog(false, confdata_settings.confdata_update_timeout_sec);

if (confdata_binlog_replayer.get_memory_status() == ConfdataBinlogReplayer::MemoryStatus::HARD_OOM) {
return;
Expand All @@ -956,6 +991,27 @@ void confdata_binlog_update_cron() noexcept {
confdata_manager.clear_unused_samples();
}

bool update_confdata_state_from_binlog(bool is_initial_reading, double timeout_sec) noexcept {
auto &confdata_binlog_replayer = ConfdataBinlogReplayer::get();
confdata_binlog_replayer.on_start_update_cycle(timeout_sec);

if (is_initial_reading) {
engine_default_read_binlog();
} else {
binlog_try_read_events();
}

confdata_binlog_replayer.delete_expired_elements();

bool ok = confdata_binlog_replayer.on_finish_update_cycle();

if (!ok) {
// TODO: critical?
log_server_warning("Confdata binlog %supdate timeout %f sec expired", is_initial_reading ? "initial " : "", timeout_sec);
}
return ok;
}

void write_confdata_stats_to(stats_t *stats) noexcept {
if (confdata_settings.is_enabled()) {
auto &confdata_stats = ConfdataStats::get();
Expand Down
2 changes: 2 additions & 0 deletions server/confdata-binlog-replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ void set_confdata_binlog_mask(const char *mask) noexcept;

void set_confdata_memory_limit(size_t memory_limit) noexcept;
void set_confdata_blacklist_pattern(std::unique_ptr<re2::RE2> &&key_blacklist_pattern) noexcept;
void set_confdata_update_timeout(double timeout_sec) noexcept;
void add_confdata_force_ignore_prefix(const char *key_ignore_prefix) noexcept;
void add_confdata_predefined_wildcard(const char *wildcard) noexcept;
void clear_confdata_predefined_wildcards() noexcept;

void init_confdata_binlog_reader() noexcept;

void confdata_binlog_update_cron() noexcept;
bool update_confdata_state_from_binlog(bool is_initial_reading, double timeout_sec) noexcept;

void write_confdata_stats_to(stats_t *stats) noexcept;
1 change: 1 addition & 0 deletions server/confdata-stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct ConfdataStats : private vk::not_copyable {

size_t unsupported_total_events{0};
size_t throttled_out_total_events{0};
size_t update_timeouts_total{0};
} event_counters;

struct HeaviestSections {
Expand Down
8 changes: 8 additions & 0 deletions server/php-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2215,6 +2215,12 @@ int main_args_handler(int i, const char *long_option) {
add_confdata_force_ignore_prefix(optarg);
return 0;
}
case 2038: {
double timeout_sec;
int res = read_option_to(long_option, 0.0, std::numeric_limits<double>::max(), timeout_sec);
set_confdata_update_timeout(timeout_sec);
return res;
}
default:
return -1;
}
Expand Down Expand Up @@ -2325,6 +2331,8 @@ void parse_main_args(int argc, char *argv[]) {
parse_option("thread-pool-ratio", required_argument, 2035, "the thread pool size ratio of the overall cpu numbers");
parse_option("thread-pool-size", required_argument, 2036, "the total threads num per worker");
parse_option("confdata-force-ignore-keys-prefix", required_argument, 2037, "an emergency option, e.g. 'highload.vid*', to forcibly drop keys from snapshot/binlog; may be used multiple times");
parse_option("confdata-update-timeout", required_argument, 2038, "cron confdata binlog replaying will be forcibly stopped after the specified timeout (default: 0.3 sec)"
"Initial binlog is readed with x10 times larger timeout");

parse_engine_options_long(argc, argv, main_args_handler);
parse_main_args_till_option(argc, argv);
Expand Down
1 change: 1 addition & 0 deletions server/statshouse/statshouse-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ void StatsHouseManager::add_confdata_master_stats(const ConfdataStats &confdata_
client.metric("kphp_confdata_events").tag("delete").write_value(events.delete_events.total);
client.metric("kphp_confdata_events").tag("delete_blacklisted").write_value(events.delete_events.blacklisted);
client.metric("kphp_confdata_events").tag("throttled_out").write_value(events.throttled_out_total_events);
client.metric("kphp_confdata_events").tag("timed_out").write_value(events.update_timeouts_total);

for (const auto &[section_name, size] : confdata_stats.heaviest_sections_by_count.sorted_desc) {
if (section_name != nullptr && size > 0) { // section_name looks like "highload."
Expand Down

0 comments on commit 865c916

Please sign in to comment.