Skip to content

Commit

Permalink
quick and dirty implementation as a new gradient controller (todo: cl…
Browse files Browse the repository at this point in the history
…ean up impl)

Signed-off-by: William <wtzhang23@gmail.com>
  • Loading branch information
wtzhang23 committed Feb 17, 2025
1 parent feaa042 commit f10a217
Show file tree
Hide file tree
Showing 8 changed files with 585 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ message GradientControllerConfig {
}

// Parameters controlling the periodic minRTT recalculation.
// [#next-free-field: 7]
// [#next-free-field: 6]
message MinimumRTTCalculationParams {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.http.adaptive_concurrency.v2alpha.GradientControllerConfig."
"MinimumRTTCalculationParams";

// The time interval between recalculating the minimum request round-trip time. Has to be
// positive. Either this or ``fixed_value`` must be set.
google.protobuf.Duration interval = 1 [(validate.rules).duration = {gte {nanos: 1000000}}];
// positive.
google.protobuf.Duration interval = 1 [(validate.rules).duration = {
required: true
gte {nanos: 1000000}
}];

// The number of requests to aggregate/sample during the minRTT recalculation window before
// updating. Defaults to 50.
Expand All @@ -71,10 +74,6 @@ message GradientControllerConfig {
// The concurrency limit set while measuring the minRTT. Defaults to 3.
google.protobuf.UInt32Value min_concurrency = 4 [(validate.rules).uint32 = {gt: 0}];

// A fixed value for the minRTT. This disables the dynamic calculation of minRTT. Either
// this or ``interval`` must be set.
google.protobuf.Duration fixed_value = 6 [(validate.rules).duration = {gt {}}];

// Amount added to the measured minRTT to add stability to the concurrency limit during natural
// variability in latency. This is expressed as a percentage of the measured value and can be
// adjusted to allow more or less tolerance to the sampled latency values.
Expand All @@ -92,6 +91,41 @@ message GradientControllerConfig {
MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message = {required: true}];
}

// Configuration parameters for the pinned gradient controller.
// [#next-free-field: 6]
message PinnedGradientControllerConfig {

// Parameters controlling the periodic recalculation of the concurrency limit from sampled request
// latencies.
message ConcurrencyLimitCalculationParams {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.http.adaptive_concurrency.v2alpha.GradientControllerConfig."
"ConcurrencyLimitCalculationParams";

// The allowed upper-bound on the calculated concurrency limit. Defaults to 1000.
google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32 = {gt: 0}];

// The period of time samples are taken to recalculate the concurrency limit.
google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = {
required: true
gt {}
}];
}

// The percentile to use when summarizing aggregated samples. Defaults to p50.
type.v3.Percent sample_aggregate_percentile = 1;

ConcurrencyLimitCalculationParams concurrency_limit_params = 2
[(validate.rules).message = {required: true}];

google.protobuf.Duration min_rtt = 3 [(validate.rules).duration = {
required: true
gt {}
}];

google.protobuf.UInt32Value min_concurrency = 4 [(validate.rules).uint32 = {gt: 0}];
}

message AdaptiveConcurrency {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.http.adaptive_concurrency.v2alpha.AdaptiveConcurrency";
Expand All @@ -102,6 +136,10 @@ message AdaptiveConcurrency {
// Gradient concurrency control will be used.
GradientControllerConfig gradient_controller_config = 1
[(validate.rules).message = {required: true}];

// Pinned gradient concurrency control will be used.
PinnedGradientControllerConfig pinned_gradient_controller_config = 4
[(validate.rules).message = {required: true}];
}

// If set to false, the adaptive concurrency filter will operate as a pass-through filter. If the
Expand Down
3 changes: 2 additions & 1 deletion changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,6 @@ new_features:
feature :ref:`here <arch_overview_dynamic_modules>`.
- area: adaptive concurrency
change: |
Allow fixing the minimum RTT by introducing ``fixed_value``.
Allow pinning the minimum RTT by introducing a new gradient concurrency controller, :ref:`pinned_gradient_controller
<envoy_v3_api_field_extensions.filters.http.adaptive_concurrency.v3.PinnedGradientControllerConfig>`.
deprecated:
23 changes: 16 additions & 7 deletions source/extensions/filters/http/adaptive_concurrency/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,22 @@ Http::FilterFactoryCb AdaptiveConcurrencyFilterFactory::createFilterFactoryFromP
using Proto = envoy::extensions::filters::http::adaptive_concurrency::v3::AdaptiveConcurrency;
ASSERT(config.concurrency_controller_config_case() ==
Proto::ConcurrencyControllerConfigCase::kGradientControllerConfig);
auto gradient_controller_config = Controller::GradientControllerConfig(
config.gradient_controller_config(), server_context.runtime());
controller = std::make_shared<Controller::GradientController>(
std::move(gradient_controller_config), server_context.mainThreadDispatcher(),
server_context.runtime(), acc_stats_prefix + "gradient_controller.", context.scope(),
server_context.api().randomGenerator(), server_context.timeSource());

if (config.has_gradient_controller_config()) {
auto gradient_controller_config = Controller::DynamicGradientControllerConfig(
config.gradient_controller_config(), server_context.runtime());
controller = std::make_shared<Controller::DynamicGradientController>(
std::move(gradient_controller_config), server_context.mainThreadDispatcher(),
server_context.runtime(), acc_stats_prefix + "gradient_controller.", context.scope(),
server_context.api().randomGenerator(), server_context.timeSource());
} else {
ASSERT(config.has_pinned_gradient_controller_config());
auto gradient_controller_config = Controller::PinnedGradientControllerConfig(
config.pinned_gradient_controller_config(), server_context.runtime());
controller = std::make_shared<Controller::PinnedGradientController>(
std::move(gradient_controller_config), server_context.mainThreadDispatcher(),
server_context.runtime(), acc_stats_prefix + "gradient_controller.", context.scope(),
server_context.api().randomGenerator(), server_context.timeSource());
}
AdaptiveConcurrencyFilterConfigSharedPtr filter_config(new AdaptiveConcurrencyFilterConfig(
config, server_context.runtime(), std::move(acc_stats_prefix), context.scope(),
server_context.timeSource()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ envoy_extension_package()

envoy_cc_library(
name = "controller_lib",
srcs = ["gradient_controller.cc"],
srcs = [
"gradient_controller.cc",
"pinned_gradient_controller.cc",
],
hdrs = [
"controller.h",
"gradient_controller.h",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,34 @@ namespace HttpFilters {
namespace AdaptiveConcurrency {
namespace Controller {

GradientControllerConfig::GradientControllerConfig(
DynamicGradientControllerConfig::DynamicGradientControllerConfig(
const envoy::extensions::filters::http::adaptive_concurrency::v3::GradientControllerConfig&
proto_config,
Runtime::Loader& runtime)
: runtime_(runtime),
: GradientControllerConfig::GradientControllerConfig(
std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
proto_config.concurrency_limit_params().concurrency_update_interval())),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.concurrency_limit_params(),
max_concurrency_limit, 1000),
PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config, sample_aggregate_percentile, 50),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), min_concurrency, 3),
runtime),
min_rtt_calc_interval_(std::chrono::milliseconds(
DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().interval()))),
sample_rtt_calc_interval_(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
proto_config.concurrency_limit_params().concurrency_update_interval()))),
jitter_pct_(
PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config.min_rtt_calc_params(), jitter, 15)),
max_concurrency_limit_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
proto_config.concurrency_limit_params(), max_concurrency_limit, 1000)),
min_rtt_aggregate_request_count_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), request_count, 50)),
sample_aggregate_percentile_(
PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config, sample_aggregate_percentile, 50)),
min_concurrency_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), min_concurrency, 3)),
fixed_value_(std::chrono::milliseconds(
DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().fixed_value()))),
min_rtt_buffer_pct_(
PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config.min_rtt_calc_params(), buffer, 25)) {
PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config.min_rtt_calc_params(), buffer, 25)) {}

if (min_rtt_calc_interval_ < std::chrono::milliseconds(1) &&
fixed_value_ <= std::chrono::milliseconds::zero()) {
throw EnvoyException(
"adaptive_concurrency: neither `concurrency_update_interval` nor `fixed_value` set");
}
}
GradientController::GradientController(GradientControllerConfig config,
Event::Dispatcher& dispatcher, Runtime::Loader&,
const std::string& stats_prefix, Stats::Scope& scope,
Random::RandomGenerator& random, TimeSource& time_source)
DynamicGradientController::DynamicGradientController(
DynamicGradientControllerConfig config, Event::Dispatcher& dispatcher, Runtime::Loader&,
const std::string& stats_prefix, Stats::Scope& scope, Random::RandomGenerator& random,
TimeSource& time_source)
: config_(std::move(config)), dispatcher_(dispatcher), scope_(scope),
stats_(generateStats(scope_, stats_prefix)), random_(random), time_source_(time_source),
deferred_limit_value_(0), num_rq_outstanding_(0),
stats_(GradientControllerStats::generateStats(scope_, stats_prefix)), random_(random),
time_source_(time_source), deferred_limit_value_(0), num_rq_outstanding_(0),
concurrency_limit_(config_.minConcurrency()),
latency_sample_hist_(hist_fast_alloc(), hist_free) {
min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void { enterMinRTTSamplingWindow(); });
Expand All @@ -79,27 +70,18 @@ GradientController::GradientController(GradientControllerConfig config,
sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval());
});

if (isMinRTTSamplingEnabled()) {
enterMinRTTSamplingWindow();
} else {
min_rtt_ = config_.fixedValue();
stats_.min_rtt_msecs_.set(
std::chrono::duration_cast<std::chrono::milliseconds>(min_rtt_).count());
updateConcurrencyLimit(config_.minConcurrency());
}
enterMinRTTSamplingWindow();
sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval());
stats_.concurrency_limit_.set(concurrency_limit_.load());
}

GradientControllerStats GradientController::generateStats(Stats::Scope& scope,
const std::string& stats_prefix) {
GradientControllerStats GradientControllerStats::generateStats(Stats::Scope& scope,
const std::string& stats_prefix) {
return {ALL_GRADIENT_CONTROLLER_STATS(POOL_COUNTER_PREFIX(scope, stats_prefix),
POOL_GAUGE_PREFIX(scope, stats_prefix))};
}

void GradientController::enterMinRTTSamplingWindow() {
// precondition: isMinRTTSamplingEnabled() == true

void DynamicGradientController::enterMinRTTSamplingWindow() {
// There a potential race condition where setting the minimum concurrency multiple times in a row
// resets the minRTT sampling timer and triggers the calculation immediately. This could occur
// after the minRTT sampling window has already been entered, so we can simply return here knowing
Expand All @@ -115,7 +97,7 @@ void GradientController::enterMinRTTSamplingWindow() {
// Set the minRTT flag to indicate we're gathering samples to update the value. This will
// prevent the sample window from resetting until enough requests are gathered to complete the
// recalculation.
deferred_limit_value_.store(GradientController::concurrencyLimit());
deferred_limit_value_.store(DynamicGradientController::concurrencyLimit());
updateConcurrencyLimit(config_.minConcurrency());

// Throw away any latency samples from before the recalculation window as it may not represent
Expand All @@ -125,11 +107,7 @@ void GradientController::enterMinRTTSamplingWindow() {
min_rtt_epoch_ = time_source_.monotonicTime();
}

void GradientController::updateMinRTT() {
if (!isMinRTTSamplingEnabled()) {
return;
}

void DynamicGradientController::updateMinRTT() {
// Only update minRTT when it is in minRTT sampling window and
// number of samples is greater than or equal to the minRTTAggregateRequestCount.
if (!inMinRTTSamplingWindow() ||
Expand All @@ -149,8 +127,8 @@ void GradientController::updateMinRTT() {
sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval());
}

std::chrono::milliseconds GradientController::applyJitter(std::chrono::milliseconds interval,
double jitter_pct) const {
std::chrono::milliseconds DynamicGradientController::applyJitter(std::chrono::milliseconds interval,
double jitter_pct) const {
if (jitter_pct == 0) {
return interval;
}
Expand All @@ -159,7 +137,7 @@ std::chrono::milliseconds GradientController::applyJitter(std::chrono::milliseco
return std::chrono::milliseconds(interval.count() + (random_.random() % jitter_range_ms));
}

void GradientController::resetSampleWindow() {
void DynamicGradientController::resetSampleWindow() {
// The sampling window must not be reset while sampling for the new minRTT value.
ASSERT(!inMinRTTSamplingWindow());

Expand All @@ -173,15 +151,15 @@ void GradientController::resetSampleWindow() {
updateConcurrencyLimit(calculateNewLimit());
}

std::chrono::microseconds GradientController::processLatencySamplesAndClear() {
std::chrono::microseconds DynamicGradientController::processLatencySamplesAndClear() {
const std::array<double, 1> quantile{config_.sampleAggregatePercentile()};
std::array<double, 1> calculated_quantile;
hist_approx_quantile(latency_sample_hist_.get(), quantile.data(), 1, calculated_quantile.data());
hist_clear(latency_sample_hist_.get());
return std::chrono::microseconds(static_cast<int>(calculated_quantile[0]));
}

uint32_t GradientController::calculateNewLimit() {
uint32_t DynamicGradientController::calculateNewLimit() {
ASSERT(sample_rtt_.count() > 0);

// Calculate the gradient value, ensuring it's clamped between 0.5 and 2.0.
Expand All @@ -206,7 +184,7 @@ uint32_t GradientController::calculateNewLimit() {
std::min<uint32_t>(config_.maxConcurrencyLimit(), new_limit));
}

RequestForwardingAction GradientController::forwardingDecision() {
RequestForwardingAction DynamicGradientController::forwardingDecision() {
// Note that a race condition exists here which would allow more outstanding requests than the
// concurrency limit bounded by the number of worker threads. After loading num_rq_outstanding_
// and before loading concurrency_limit_, another thread could potentially swoop in and modify
Expand All @@ -222,7 +200,7 @@ RequestForwardingAction GradientController::forwardingDecision() {
return RequestForwardingAction::Block;
}

void GradientController::recordLatencySample(MonotonicTime rq_send_time) {
void DynamicGradientController::recordLatencySample(MonotonicTime rq_send_time) {
ASSERT(num_rq_outstanding_.load() > 0);
--num_rq_outstanding_;

Expand All @@ -242,12 +220,12 @@ void GradientController::recordLatencySample(MonotonicTime rq_send_time) {
}
}

void GradientController::cancelLatencySample() {
void DynamicGradientController::cancelLatencySample() {
ASSERT(num_rq_outstanding_.load() > 0);
--num_rq_outstanding_;
}

void GradientController::updateConcurrencyLimit(const uint32_t new_limit) {
void DynamicGradientController::updateConcurrencyLimit(const uint32_t new_limit) {
const auto old_limit = concurrency_limit_.load();
concurrency_limit_.store(new_limit);
stats_.concurrency_limit_.set(concurrency_limit_.load());
Expand All @@ -268,7 +246,7 @@ void GradientController::updateConcurrencyLimit(const uint32_t new_limit) {
// cancel/re-enable the timer below and triggers overlapping minRTT windows. To protect against
// this, there is an explicit check when entering the minRTT measurement that ensures there is
// only a single minRTT measurement active at a time.
if (consecutive_min_concurrency_set_ >= 5 && isMinRTTSamplingEnabled()) {
if (consecutive_min_concurrency_set_ >= 5) {
min_rtt_calc_timer_->enableTimer(std::chrono::milliseconds(0));
}
}
Expand Down
Loading

0 comments on commit f10a217

Please sign in to comment.