From f10a21730c62bd1301853905a6162e128b3e1920 Mon Sep 17 00:00:00 2001 From: William Date: Sun, 16 Feb 2025 16:57:29 -0800 Subject: [PATCH] quick and dirty implementation as a new gradient controller (todo: clean up impl) Signed-off-by: William --- .../v3/adaptive_concurrency.proto | 52 ++- changelogs/current.yaml | 3 +- .../http/adaptive_concurrency/config.cc | 23 +- .../adaptive_concurrency/controller/BUILD | 5 +- .../controller/gradient_controller.cc | 86 ++-- .../controller/gradient_controller.h | 221 +++++++--- .../controller/pinned_gradient_controller.cc | 138 ++++++ .../controller/gradient_controller_test.cc | 397 +++++++++--------- 8 files changed, 585 insertions(+), 340 deletions(-) create mode 100644 source/extensions/filters/http/adaptive_concurrency/controller/pinned_gradient_controller.cc diff --git a/api/envoy/extensions/filters/http/adaptive_concurrency/v3/adaptive_concurrency.proto b/api/envoy/extensions/filters/http/adaptive_concurrency/v3/adaptive_concurrency.proto index 8d46ac5deb8ca..80b8f83b25cfd 100644 --- a/api/envoy/extensions/filters/http/adaptive_concurrency/v3/adaptive_concurrency.proto +++ b/api/envoy/extensions/filters/http/adaptive_concurrency/v3/adaptive_concurrency.proto @@ -47,15 +47,18 @@ message GradientControllerConfig { } // Parameters controlling the periodic minRTT recalculation. - // [#next-free-field: 7] + // [#next-free-field: 6] message MinimumRTTCalculationParams { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.http.adaptive_concurrency.v2alpha.GradientControllerConfig." "MinimumRTTCalculationParams"; // The time interval between recalculating the minimum request round-trip time. Has to be - // positive. Either this or ``fixed_value`` must be set. - google.protobuf.Duration interval = 1 [(validate.rules).duration = {gte {nanos: 1000000}}]; + // positive. + google.protobuf.Duration interval = 1 [(validate.rules).duration = { + required: true + gte {nanos: 1000000} + }]; // The number of requests to aggregate/sample during the minRTT recalculation window before // updating. Defaults to 50. @@ -71,10 +74,6 @@ message GradientControllerConfig { // The concurrency limit set while measuring the minRTT. Defaults to 3. google.protobuf.UInt32Value min_concurrency = 4 [(validate.rules).uint32 = {gt: 0}]; - // A fixed value for the minRTT. This disables the dynamic calculation of minRTT. Either - // this or ``interval`` must be set. - google.protobuf.Duration fixed_value = 6 [(validate.rules).duration = {gt {}}]; - // Amount added to the measured minRTT to add stability to the concurrency limit during natural // variability in latency. This is expressed as a percentage of the measured value and can be // adjusted to allow more or less tolerance to the sampled latency values. @@ -92,6 +91,41 @@ message GradientControllerConfig { MinimumRTTCalculationParams min_rtt_calc_params = 3 [(validate.rules).message = {required: true}]; } +// Configuration parameters for the pinned gradient controller. +// [#next-free-field: 6] +message PinnedGradientControllerConfig { + + // Parameters controlling the periodic recalculation of the concurrency limit from sampled request + // latencies. + message ConcurrencyLimitCalculationParams { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.filter.http.adaptive_concurrency.v2alpha.GradientControllerConfig." + "ConcurrencyLimitCalculationParams"; + + // The allowed upper-bound on the calculated concurrency limit. Defaults to 1000. + google.protobuf.UInt32Value max_concurrency_limit = 2 [(validate.rules).uint32 = {gt: 0}]; + + // The period of time samples are taken to recalculate the concurrency limit. + google.protobuf.Duration concurrency_update_interval = 3 [(validate.rules).duration = { + required: true + gt {} + }]; + } + + // The percentile to use when summarizing aggregated samples. Defaults to p50. + type.v3.Percent sample_aggregate_percentile = 1; + + ConcurrencyLimitCalculationParams concurrency_limit_params = 2 + [(validate.rules).message = {required: true}]; + + google.protobuf.Duration min_rtt = 3 [(validate.rules).duration = { + required: true + gt {} + }]; + + google.protobuf.UInt32Value min_concurrency = 4 [(validate.rules).uint32 = {gt: 0}]; +} + message AdaptiveConcurrency { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.http.adaptive_concurrency.v2alpha.AdaptiveConcurrency"; @@ -102,6 +136,10 @@ message AdaptiveConcurrency { // Gradient concurrency control will be used. GradientControllerConfig gradient_controller_config = 1 [(validate.rules).message = {required: true}]; + + // Pinned gradient concurrency control will be used. + PinnedGradientControllerConfig pinned_gradient_controller_config = 4 + [(validate.rules).message = {required: true}]; } // If set to false, the adaptive concurrency filter will operate as a pass-through filter. If the diff --git a/changelogs/current.yaml b/changelogs/current.yaml index f939e85a76f9d..f62ee0f5beb47 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -146,5 +146,6 @@ new_features: feature :ref:`here `. - area: adaptive concurrency change: | - Allow fixing the minimum RTT by introducing ``fixed_value``. + Allow pinning the minimum RTT by introducing a new gradient concurrency controller, :ref:`pinned_gradient_controller + `. deprecated: diff --git a/source/extensions/filters/http/adaptive_concurrency/config.cc b/source/extensions/filters/http/adaptive_concurrency/config.cc index 8bd16a945cea4..d2a99b5306222 100644 --- a/source/extensions/filters/http/adaptive_concurrency/config.cc +++ b/source/extensions/filters/http/adaptive_concurrency/config.cc @@ -23,13 +23,22 @@ Http::FilterFactoryCb AdaptiveConcurrencyFilterFactory::createFilterFactoryFromP using Proto = envoy::extensions::filters::http::adaptive_concurrency::v3::AdaptiveConcurrency; ASSERT(config.concurrency_controller_config_case() == Proto::ConcurrencyControllerConfigCase::kGradientControllerConfig); - auto gradient_controller_config = Controller::GradientControllerConfig( - config.gradient_controller_config(), server_context.runtime()); - controller = std::make_shared( - std::move(gradient_controller_config), server_context.mainThreadDispatcher(), - server_context.runtime(), acc_stats_prefix + "gradient_controller.", context.scope(), - server_context.api().randomGenerator(), server_context.timeSource()); - + if (config.has_gradient_controller_config()) { + auto gradient_controller_config = Controller::DynamicGradientControllerConfig( + config.gradient_controller_config(), server_context.runtime()); + controller = std::make_shared( + std::move(gradient_controller_config), server_context.mainThreadDispatcher(), + server_context.runtime(), acc_stats_prefix + "gradient_controller.", context.scope(), + server_context.api().randomGenerator(), server_context.timeSource()); + } else { + ASSERT(config.has_pinned_gradient_controller_config()); + auto gradient_controller_config = Controller::PinnedGradientControllerConfig( + config.pinned_gradient_controller_config(), server_context.runtime()); + controller = std::make_shared( + std::move(gradient_controller_config), server_context.mainThreadDispatcher(), + server_context.runtime(), acc_stats_prefix + "gradient_controller.", context.scope(), + server_context.api().randomGenerator(), server_context.timeSource()); + } AdaptiveConcurrencyFilterConfigSharedPtr filter_config(new AdaptiveConcurrencyFilterConfig( config, server_context.runtime(), std::move(acc_stats_prefix), context.scope(), server_context.timeSource())); diff --git a/source/extensions/filters/http/adaptive_concurrency/controller/BUILD b/source/extensions/filters/http/adaptive_concurrency/controller/BUILD index abceffc527c22..46eb3a47008e8 100644 --- a/source/extensions/filters/http/adaptive_concurrency/controller/BUILD +++ b/source/extensions/filters/http/adaptive_concurrency/controller/BUILD @@ -14,7 +14,10 @@ envoy_extension_package() envoy_cc_library( name = "controller_lib", - srcs = ["gradient_controller.cc"], + srcs = [ + "gradient_controller.cc", + "pinned_gradient_controller.cc", + ], hdrs = [ "controller.h", "gradient_controller.h", diff --git a/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.cc b/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.cc index 42fdf0edfff3b..37b59529c3eb2 100644 --- a/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.cc +++ b/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.cc @@ -22,43 +22,34 @@ namespace HttpFilters { namespace AdaptiveConcurrency { namespace Controller { -GradientControllerConfig::GradientControllerConfig( +DynamicGradientControllerConfig::DynamicGradientControllerConfig( const envoy::extensions::filters::http::adaptive_concurrency::v3::GradientControllerConfig& proto_config, Runtime::Loader& runtime) - : runtime_(runtime), + : GradientControllerConfig::GradientControllerConfig( + std::chrono::milliseconds(DurationUtil::durationToMilliseconds( + proto_config.concurrency_limit_params().concurrency_update_interval())), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.concurrency_limit_params(), + max_concurrency_limit, 1000), + PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config, sample_aggregate_percentile, 50), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), min_concurrency, 3), + runtime), min_rtt_calc_interval_(std::chrono::milliseconds( DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().interval()))), - sample_rtt_calc_interval_(std::chrono::milliseconds(DurationUtil::durationToMilliseconds( - proto_config.concurrency_limit_params().concurrency_update_interval()))), jitter_pct_( PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config.min_rtt_calc_params(), jitter, 15)), - max_concurrency_limit_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( - proto_config.concurrency_limit_params(), max_concurrency_limit, 1000)), min_rtt_aggregate_request_count_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), request_count, 50)), - sample_aggregate_percentile_( - PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config, sample_aggregate_percentile, 50)), - min_concurrency_( - PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.min_rtt_calc_params(), min_concurrency, 3)), - fixed_value_(std::chrono::milliseconds( - DurationUtil::durationToMilliseconds(proto_config.min_rtt_calc_params().fixed_value()))), min_rtt_buffer_pct_( - PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config.min_rtt_calc_params(), buffer, 25)) { + PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config.min_rtt_calc_params(), buffer, 25)) {} - if (min_rtt_calc_interval_ < std::chrono::milliseconds(1) && - fixed_value_ <= std::chrono::milliseconds::zero()) { - throw EnvoyException( - "adaptive_concurrency: neither `concurrency_update_interval` nor `fixed_value` set"); - } -} -GradientController::GradientController(GradientControllerConfig config, - Event::Dispatcher& dispatcher, Runtime::Loader&, - const std::string& stats_prefix, Stats::Scope& scope, - Random::RandomGenerator& random, TimeSource& time_source) +DynamicGradientController::DynamicGradientController( + DynamicGradientControllerConfig config, Event::Dispatcher& dispatcher, Runtime::Loader&, + const std::string& stats_prefix, Stats::Scope& scope, Random::RandomGenerator& random, + TimeSource& time_source) : config_(std::move(config)), dispatcher_(dispatcher), scope_(scope), - stats_(generateStats(scope_, stats_prefix)), random_(random), time_source_(time_source), - deferred_limit_value_(0), num_rq_outstanding_(0), + stats_(GradientControllerStats::generateStats(scope_, stats_prefix)), random_(random), + time_source_(time_source), deferred_limit_value_(0), num_rq_outstanding_(0), concurrency_limit_(config_.minConcurrency()), latency_sample_hist_(hist_fast_alloc(), hist_free) { min_rtt_calc_timer_ = dispatcher_.createTimer([this]() -> void { enterMinRTTSamplingWindow(); }); @@ -79,27 +70,18 @@ GradientController::GradientController(GradientControllerConfig config, sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval()); }); - if (isMinRTTSamplingEnabled()) { - enterMinRTTSamplingWindow(); - } else { - min_rtt_ = config_.fixedValue(); - stats_.min_rtt_msecs_.set( - std::chrono::duration_cast(min_rtt_).count()); - updateConcurrencyLimit(config_.minConcurrency()); - } + enterMinRTTSamplingWindow(); sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval()); stats_.concurrency_limit_.set(concurrency_limit_.load()); } -GradientControllerStats GradientController::generateStats(Stats::Scope& scope, - const std::string& stats_prefix) { +GradientControllerStats GradientControllerStats::generateStats(Stats::Scope& scope, + const std::string& stats_prefix) { return {ALL_GRADIENT_CONTROLLER_STATS(POOL_COUNTER_PREFIX(scope, stats_prefix), POOL_GAUGE_PREFIX(scope, stats_prefix))}; } -void GradientController::enterMinRTTSamplingWindow() { - // precondition: isMinRTTSamplingEnabled() == true - +void DynamicGradientController::enterMinRTTSamplingWindow() { // There a potential race condition where setting the minimum concurrency multiple times in a row // resets the minRTT sampling timer and triggers the calculation immediately. This could occur // after the minRTT sampling window has already been entered, so we can simply return here knowing @@ -115,7 +97,7 @@ void GradientController::enterMinRTTSamplingWindow() { // Set the minRTT flag to indicate we're gathering samples to update the value. This will // prevent the sample window from resetting until enough requests are gathered to complete the // recalculation. - deferred_limit_value_.store(GradientController::concurrencyLimit()); + deferred_limit_value_.store(DynamicGradientController::concurrencyLimit()); updateConcurrencyLimit(config_.minConcurrency()); // Throw away any latency samples from before the recalculation window as it may not represent @@ -125,11 +107,7 @@ void GradientController::enterMinRTTSamplingWindow() { min_rtt_epoch_ = time_source_.monotonicTime(); } -void GradientController::updateMinRTT() { - if (!isMinRTTSamplingEnabled()) { - return; - } - +void DynamicGradientController::updateMinRTT() { // Only update minRTT when it is in minRTT sampling window and // number of samples is greater than or equal to the minRTTAggregateRequestCount. if (!inMinRTTSamplingWindow() || @@ -149,8 +127,8 @@ void GradientController::updateMinRTT() { sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval()); } -std::chrono::milliseconds GradientController::applyJitter(std::chrono::milliseconds interval, - double jitter_pct) const { +std::chrono::milliseconds DynamicGradientController::applyJitter(std::chrono::milliseconds interval, + double jitter_pct) const { if (jitter_pct == 0) { return interval; } @@ -159,7 +137,7 @@ std::chrono::milliseconds GradientController::applyJitter(std::chrono::milliseco return std::chrono::milliseconds(interval.count() + (random_.random() % jitter_range_ms)); } -void GradientController::resetSampleWindow() { +void DynamicGradientController::resetSampleWindow() { // The sampling window must not be reset while sampling for the new minRTT value. ASSERT(!inMinRTTSamplingWindow()); @@ -173,7 +151,7 @@ void GradientController::resetSampleWindow() { updateConcurrencyLimit(calculateNewLimit()); } -std::chrono::microseconds GradientController::processLatencySamplesAndClear() { +std::chrono::microseconds DynamicGradientController::processLatencySamplesAndClear() { const std::array quantile{config_.sampleAggregatePercentile()}; std::array calculated_quantile; hist_approx_quantile(latency_sample_hist_.get(), quantile.data(), 1, calculated_quantile.data()); @@ -181,7 +159,7 @@ std::chrono::microseconds GradientController::processLatencySamplesAndClear() { return std::chrono::microseconds(static_cast(calculated_quantile[0])); } -uint32_t GradientController::calculateNewLimit() { +uint32_t DynamicGradientController::calculateNewLimit() { ASSERT(sample_rtt_.count() > 0); // Calculate the gradient value, ensuring it's clamped between 0.5 and 2.0. @@ -206,7 +184,7 @@ uint32_t GradientController::calculateNewLimit() { std::min(config_.maxConcurrencyLimit(), new_limit)); } -RequestForwardingAction GradientController::forwardingDecision() { +RequestForwardingAction DynamicGradientController::forwardingDecision() { // Note that a race condition exists here which would allow more outstanding requests than the // concurrency limit bounded by the number of worker threads. After loading num_rq_outstanding_ // and before loading concurrency_limit_, another thread could potentially swoop in and modify @@ -222,7 +200,7 @@ RequestForwardingAction GradientController::forwardingDecision() { return RequestForwardingAction::Block; } -void GradientController::recordLatencySample(MonotonicTime rq_send_time) { +void DynamicGradientController::recordLatencySample(MonotonicTime rq_send_time) { ASSERT(num_rq_outstanding_.load() > 0); --num_rq_outstanding_; @@ -242,12 +220,12 @@ void GradientController::recordLatencySample(MonotonicTime rq_send_time) { } } -void GradientController::cancelLatencySample() { +void DynamicGradientController::cancelLatencySample() { ASSERT(num_rq_outstanding_.load() > 0); --num_rq_outstanding_; } -void GradientController::updateConcurrencyLimit(const uint32_t new_limit) { +void DynamicGradientController::updateConcurrencyLimit(const uint32_t new_limit) { const auto old_limit = concurrency_limit_.load(); concurrency_limit_.store(new_limit); stats_.concurrency_limit_.set(concurrency_limit_.load()); @@ -268,7 +246,7 @@ void GradientController::updateConcurrencyLimit(const uint32_t new_limit) { // cancel/re-enable the timer below and triggers overlapping minRTT windows. To protect against // this, there is an explicit check when entering the minRTT measurement that ensures there is // only a single minRTT measurement active at a time. - if (consecutive_min_concurrency_set_ >= 5 && isMinRTTSamplingEnabled()) { + if (consecutive_min_concurrency_set_ >= 5) { min_rtt_calc_timer_->enableTimer(std::chrono::milliseconds(0)); } } diff --git a/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.h b/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.h index 501b33319a159..43c21c5a4ea5c 100644 --- a/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.h +++ b/source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.h @@ -41,65 +41,47 @@ namespace Controller { */ struct GradientControllerStats { ALL_GRADIENT_CONTROLLER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) + + static GradientControllerStats generateStats(Stats::Scope& scope, + const std::string& stats_prefix); }; -class GradientControllerConfig : public Logger::Loggable { +class GradientControllerConfig { public: - GradientControllerConfig( - const envoy::extensions::filters::http::adaptive_concurrency::v3::GradientControllerConfig& - proto_config, - Runtime::Loader& runtime); - - std::chrono::milliseconds minRTTCalcInterval() const { - const auto ms = runtime_.snapshot().getInteger(RuntimeKeys::get().MinRTTCalcIntervalKey, - min_rtt_calc_interval_.count()); - return std::chrono::milliseconds(ms); - } - - std::chrono::milliseconds sampleRTTCalcInterval() const { + GradientControllerConfig(std::chrono::milliseconds sample_rtt_calc_interval, + uint64_t max_concurrency_limit, double sample_aggregate_percentile, + uint64_t min_concurrency, Runtime::Loader& runtime) + : runtime_(runtime), sample_rtt_calc_interval_(sample_rtt_calc_interval), + max_concurrency_limit_(max_concurrency_limit), + sample_aggregate_percentile_(sample_aggregate_percentile), + min_concurrency_(min_concurrency) {} + virtual ~GradientControllerConfig() = default; + + virtual std::chrono::milliseconds sampleRTTCalcInterval() const { const auto ms = runtime_.snapshot().getInteger(RuntimeKeys::get().SampleRTTCalcIntervalKey, sample_rtt_calc_interval_.count()); return std::chrono::milliseconds(ms); } - uint32_t maxConcurrencyLimit() const { - return runtime_.snapshot().getInteger(RuntimeKeys::get().MaxConcurrencyLimitKey, - max_concurrency_limit_); + virtual uint32_t minConcurrency() const { + return runtime_.snapshot().getInteger(RuntimeKeys::get().MinConcurrencyKey, min_concurrency_); } - uint32_t minRTTAggregateRequestCount() const { - return runtime_.snapshot().getInteger(RuntimeKeys::get().MinRTTAggregateRequestCountKey, - min_rtt_aggregate_request_count_); + virtual uint32_t maxConcurrencyLimit() const { + return runtime_.snapshot().getInteger(RuntimeKeys::get().MaxConcurrencyLimitKey, + max_concurrency_limit_); } // The percentage is normalized to the range [0.0, 1.0]. - double sampleAggregatePercentile() const { + virtual double sampleAggregatePercentile() const { const double val = runtime_.snapshot().getDouble( RuntimeKeys::get().SampleAggregatePercentileKey, sample_aggregate_percentile_); return std::max(0.0, std::min(val, 100.0)) / 100.0; } - // The percentage is normalized to the range [0.0, 1.0]. - double jitterPercent() const { - const double val = - runtime_.snapshot().getDouble(RuntimeKeys::get().JitterPercentKey, jitter_pct_); - return std::max(0.0, std::min(val, 100.0)) / 100.0; - } + Runtime::Loader& runtime() const { return runtime_; } - uint32_t minConcurrency() const { - return runtime_.snapshot().getInteger(RuntimeKeys::get().MinConcurrencyKey, min_concurrency_); - } - - std::chrono::milliseconds fixedValue() const { return fixed_value_; } - - // The percentage is normalized to the range [0.0, 1.0]. - double minRTTBufferPercent() const { - const double val = runtime_.snapshot().getDouble(RuntimeKeys::get().MinRTTBufferPercentKey, - min_rtt_buffer_pct_); - return std::max(0.0, std::min(val, 100.0)) / 100.0; - } - -private: +protected: class RuntimeKeyValues { public: const std::string MinRTTCalcIntervalKey = @@ -121,36 +103,78 @@ class GradientControllerConfig : public Logger::Loggable { using RuntimeKeys = ConstSingleton; +private: Runtime::Loader& runtime_; + const std::chrono::milliseconds sample_rtt_calc_interval_; + const uint64_t max_concurrency_limit_; + const double sample_aggregate_percentile_; + const uint64_t min_concurrency_; +}; +class DynamicGradientControllerConfig : public Logger::Loggable, + public GradientControllerConfig { +public: + DynamicGradientControllerConfig( + const envoy::extensions::filters::http::adaptive_concurrency::v3::GradientControllerConfig& + proto_config, + Runtime::Loader& runtime); + + std::chrono::milliseconds minRTTCalcInterval() const { + const auto ms = GradientControllerConfig::runtime().snapshot().getInteger( + GradientControllerConfig::RuntimeKeys::get().MinRTTCalcIntervalKey, + min_rtt_calc_interval_.count()); + return std::chrono::milliseconds(ms); + } + + uint32_t minRTTAggregateRequestCount() const { + return GradientControllerConfig::runtime().snapshot().getInteger( + GradientControllerConfig::RuntimeKeys::get().MinRTTAggregateRequestCountKey, + min_rtt_aggregate_request_count_); + } + + // The percentage is normalized to the range [0.0, 1.0]. + double jitterPercent() const { + const double val = GradientControllerConfig::runtime().snapshot().getDouble( + GradientControllerConfig::RuntimeKeys::get().JitterPercentKey, jitter_pct_); + return std::max(0.0, std::min(val, 100.0)) / 100.0; + } + + // The percentage is normalized to the range [0.0, 1.0]. + double minRTTBufferPercent() const { + const double val = GradientControllerConfig::runtime().snapshot().getDouble( + RuntimeKeys::get().MinRTTBufferPercentKey, min_rtt_buffer_pct_); + return std::max(0.0, std::min(val, 100.0)) / 100.0; + } + +private: // The measured request round-trip time under ideal conditions. const std::chrono::milliseconds min_rtt_calc_interval_; - // The measured sample round-trip milliseconds from the previous time window. - const std::chrono::milliseconds sample_rtt_calc_interval_; - // Randomized time delta added to the start of the minRTT calculation window. const double jitter_pct_; - // The maximum allowed concurrency value. - const uint32_t max_concurrency_limit_; - // The number of requests to aggregate/sample during the minRTT recalculation. const uint32_t min_rtt_aggregate_request_count_; - // The percentile value considered when processing samples. - const double sample_aggregate_percentile_; + // The amount added to the measured minRTT as a hedge against natural variability in latency. + const double min_rtt_buffer_pct_; +}; +using DynamicGradientControllerConfigSharedPtr = std::shared_ptr; - // The concurrency limit set while measuring the minRTT. - const uint32_t min_concurrency_; +class PinnedGradientControllerConfig : public Logger::Loggable, + public GradientControllerConfig { +public: + PinnedGradientControllerConfig(const envoy::extensions::filters::http::adaptive_concurrency::v3:: + PinnedGradientControllerConfig& proto_config, + Runtime::Loader& runtime); - // The fixed value of minRTT, if present. - const std::chrono::milliseconds fixed_value_; + std::chrono::nanoseconds minRTT() const { return min_rtt_; } - // The amount added to the measured minRTT as a hedge against natural variability in latency. - const double min_rtt_buffer_pct_; +private: + // The fixed minRTT value. + const std::chrono::nanoseconds min_rtt_; }; -using GradientControllerConfigSharedPtr = std::shared_ptr; +using PinnedGradientControllerConfigSharedPtr = std::shared_ptr; /** * A concurrency controller that implements a variation of the Gradient algorithm described in: @@ -214,11 +238,12 @@ using GradientControllerConfigSharedPtr = std::shared_ptr 0; } - // True if minRTT is sampled. - bool isMinRTTSamplingEnabled() const { - return config_.fixedValue() <= std::chrono::milliseconds::zero(); - } - // ConcurrencyController. RequestForwardingAction forwardingDecision() override; void recordLatencySample(MonotonicTime rq_send_time) override; @@ -238,8 +258,6 @@ class GradientController : public ConcurrencyController { uint32_t concurrencyLimit() const override { return concurrency_limit_.load(); } private: - static GradientControllerStats generateStats(Stats::Scope& scope, - const std::string& stats_prefix); void updateMinRTT() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); std::chrono::microseconds processLatencySamplesAndClear() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); @@ -251,7 +269,7 @@ class GradientController : public ConcurrencyController { std::chrono::milliseconds applyJitter(std::chrono::milliseconds interval, double jitter_pct) const; - const GradientControllerConfig config_; + const DynamicGradientControllerConfig config_; Event::Dispatcher& dispatcher_; Stats::Scope& scope_; GradientControllerStats stats_; @@ -303,7 +321,74 @@ class GradientController : public ConcurrencyController { // Used for testing only. Thread::ThreadSynchronizer synchronizer_; }; -using GradientControllerSharedPtr = std::shared_ptr; +using DynamicGradientControllerSharedPtr = std::shared_ptr; + +class PinnedGradientController : public ConcurrencyController { +public: + PinnedGradientController(PinnedGradientControllerConfig config, Event::Dispatcher& dispatcher, + Runtime::Loader& runtime, const std::string& stats_prefix, + Stats::Scope& scope, Random::RandomGenerator& random, + TimeSource& time_source); + + // Used in unit tests to validate worker thread interactions. + Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; } + + // ConcurrencyController. + RequestForwardingAction forwardingDecision() override; + void recordLatencySample(MonotonicTime rq_send_time) override; + void cancelLatencySample() override; + uint32_t concurrencyLimit() const override { return concurrency_limit_.load(); } + +private: + static GradientControllerStats generateStats(Stats::Scope& scope, + const std::string& stats_prefix); + std::chrono::microseconds processLatencySamplesAndClear() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + uint32_t calculateNewLimit() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + void resetSampleWindow() ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + void updateConcurrencyLimit(const uint32_t new_limit) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(sample_mutation_mtx_); + + const PinnedGradientControllerConfig config_; + Event::Dispatcher& dispatcher_; + Stats::Scope& scope_; + GradientControllerStats stats_; + Random::RandomGenerator& random_; + TimeSource& time_source_; + + // Protects data related to latency sampling and RTT values. In addition to protecting the latency + // sample histogram, the mutex ensures that the minRTT calculation window and the sample window + // (where the new concurrency limit is determined) do not overlap. + absl::Mutex sample_mutation_mtx_; + + // Stores the aggregated sampled latencies for use in the gradient calculation. + std::chrono::nanoseconds sample_rtt_ ABSL_GUARDED_BY(sample_mutation_mtx_); + + // Tracks the count of requests that have been forwarded whose replies have + // not been sampled yet. Atomicity is required because this variable is used to make the + // forwarding decision without locking. + std::atomic num_rq_outstanding_; + + // Stores the current concurrency limit. Atomicity is required because this variable is used to + // make the forwarding decision without locking. + std::atomic concurrency_limit_; + + // Stores all sampled latencies and provides percentile estimations when using the sampled data to + // calculate a new concurrency limit. + std::unique_ptr + latency_sample_hist_ ABSL_GUARDED_BY(sample_mutation_mtx_); + + // Tracks the number of consecutive times that the concurrency limit is set to the minimum. This + // is used to determine whether the controller should trigger an additional minRTT measurement + // after remaining at the minimum limit for too long. + uint32_t consecutive_min_concurrency_set_ ABSL_GUARDED_BY(sample_mutation_mtx_); + + Event::TimerPtr sample_reset_timer_; + + // Used for testing only. + Thread::ThreadSynchronizer synchronizer_; +}; +using PinnedGradientControllerSharedPtr = std::shared_ptr; } // namespace Controller } // namespace AdaptiveConcurrency diff --git a/source/extensions/filters/http/adaptive_concurrency/controller/pinned_gradient_controller.cc b/source/extensions/filters/http/adaptive_concurrency/controller/pinned_gradient_controller.cc new file mode 100644 index 0000000000000..2bcfd994fcccc --- /dev/null +++ b/source/extensions/filters/http/adaptive_concurrency/controller/pinned_gradient_controller.cc @@ -0,0 +1,138 @@ +#include "source/common/protobuf/utility.h" +#include "source/extensions/filters/http/adaptive_concurrency/controller/gradient_controller.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace AdaptiveConcurrency { +namespace Controller { + +PinnedGradientControllerConfig::PinnedGradientControllerConfig( + const envoy::extensions::filters::http::adaptive_concurrency::v3:: + PinnedGradientControllerConfig& proto_config, + Runtime::Loader& runtime) + : GradientControllerConfig::GradientControllerConfig( + std::chrono::milliseconds(DurationUtil::durationToMilliseconds( + proto_config.concurrency_limit_params().concurrency_update_interval())), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config.concurrency_limit_params(), + max_concurrency_limit, 1000), + PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(proto_config, sample_aggregate_percentile, 50), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, min_concurrency, 3), runtime), + min_rtt_( + std::chrono::milliseconds(DurationUtil::durationToMilliseconds(proto_config.min_rtt()))) { +} + +PinnedGradientController::PinnedGradientController(PinnedGradientControllerConfig config, + Event::Dispatcher& dispatcher, Runtime::Loader&, + const std::string& stats_prefix, + Stats::Scope& scope, + Random::RandomGenerator& random, + TimeSource& time_source) + : config_(std::move(config)), dispatcher_(dispatcher), scope_(scope), + stats_(GradientControllerStats::generateStats(scope_, stats_prefix)), random_(random), + time_source_(time_source), num_rq_outstanding_(0), + concurrency_limit_(config_.minConcurrency()), + latency_sample_hist_(hist_fast_alloc(), hist_free) { + sample_reset_timer_ = dispatcher_.createTimer([this]() -> void { + { + absl::MutexLock ml(&sample_mutation_mtx_); + resetSampleWindow(); + } + + sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval()); + }); + + sample_reset_timer_->enableTimer(config_.sampleRTTCalcInterval()); + stats_.concurrency_limit_.set(concurrency_limit_.load()); + stats_.min_rtt_msecs_.set( + std::chrono::duration_cast(config_.minRTT()).count()); +} + +void PinnedGradientController::resetSampleWindow() { + if (hist_sample_count(latency_sample_hist_.get()) == 0) { + return; + } + + sample_rtt_ = processLatencySamplesAndClear(); + stats_.sample_rtt_msecs_.set( + std::chrono::duration_cast(sample_rtt_).count()); + updateConcurrencyLimit(calculateNewLimit()); +} + +std::chrono::microseconds PinnedGradientController::processLatencySamplesAndClear() { + const std::array quantile{config_.sampleAggregatePercentile()}; + std::array calculated_quantile; + hist_approx_quantile(latency_sample_hist_.get(), quantile.data(), 1, calculated_quantile.data()); + hist_clear(latency_sample_hist_.get()); + return std::chrono::microseconds(static_cast(calculated_quantile[0])); +} + +uint32_t PinnedGradientController::calculateNewLimit() { + ASSERT(sample_rtt_.count() > 0); + + // Calculate the gradient value, ensuring it's clamped between 0.5 and 2.0. + // This prevents extreme changes in the concurrency limit between each sample + // window. + const double raw_gradient = static_cast(config_.minRTT().count()) / sample_rtt_.count(); + const double gradient = std::max(0.5, std::min(2.0, raw_gradient)); + + // Scale the value by 1000 when reporting it to maintain the granularity of its details + // See: https://github.com/envoyproxy/envoy/issues/31695 + stats_.gradient_.set(gradient * 1000); + + const double limit = concurrencyLimit() * gradient; + const double burst_headroom = sqrt(limit); + stats_.burst_queue_size_.set(burst_headroom); + + // The final concurrency value factors in the burst headroom and must be clamped to keep the value + // in the range [configured_min, configured_max]. + const uint32_t new_limit = limit + burst_headroom; + return std::max(config_.minConcurrency(), + std::min(config_.maxConcurrencyLimit(), new_limit)); +} + +RequestForwardingAction PinnedGradientController::forwardingDecision() { + // Note that a race condition exists here which would allow more outstanding requests than the + // concurrency limit bounded by the number of worker threads. After loading num_rq_outstanding_ + // and before loading concurrency_limit_, another thread could potentially swoop in and modify + // num_rq_outstanding_, causing us to move forward with stale values and increment + // num_rq_outstanding_. + // + // TODO (tonya11en): Reconsider using a CAS loop here. + if (num_rq_outstanding_.load() < concurrencyLimit()) { + ++num_rq_outstanding_; + return RequestForwardingAction::Forward; + } + stats_.rq_blocked_.inc(); + return RequestForwardingAction::Block; +} + +void PinnedGradientController::recordLatencySample(MonotonicTime rq_send_time) { + ASSERT(num_rq_outstanding_.load() > 0); + --num_rq_outstanding_; + + const std::chrono::microseconds rq_latency = + std::chrono::duration_cast(time_source_.monotonicTime() - + rq_send_time); + synchronizer_.syncPoint("pre_hist_insert"); + { + absl::MutexLock ml(&sample_mutation_mtx_); + hist_insert(latency_sample_hist_.get(), rq_latency.count(), 1); + } +} + +void PinnedGradientController::cancelLatencySample() { + ASSERT(num_rq_outstanding_.load() > 0); + --num_rq_outstanding_; +} + +void PinnedGradientController::updateConcurrencyLimit(const uint32_t new_limit) { + concurrency_limit_.store(new_limit); + stats_.concurrency_limit_.set(concurrency_limit_.load()); +} + +} // namespace Controller +} // namespace AdaptiveConcurrency +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc b/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc index 525efd7e5f6f4..b3967bfbfc7be 100644 --- a/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc +++ b/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc @@ -30,11 +30,18 @@ namespace AdaptiveConcurrency { namespace Controller { namespace { -GradientControllerConfig makeConfig(const std::string& yaml_config, - NiceMock& runtime) { +DynamicGradientControllerConfig makeDynamicConfig(const std::string& yaml_config, + NiceMock& runtime) { envoy::extensions::filters::http::adaptive_concurrency::v3::GradientControllerConfig proto; TestUtility::loadFromYamlAndValidate(yaml_config, proto); - return GradientControllerConfig{proto, runtime}; + return DynamicGradientControllerConfig{proto, runtime}; +} + +PinnedGradientControllerConfig makePinnedConfig(const std::string& yaml_config, + NiceMock& runtime) { + envoy::extensions::filters::http::adaptive_concurrency::v3::PinnedGradientControllerConfig proto; + TestUtility::loadFromYamlAndValidate(yaml_config, proto); + return PinnedGradientControllerConfig{proto, runtime}; } class GradientControllerConfigTest : public testing::Test { @@ -51,37 +58,46 @@ class GradientControllerTest : public testing::Test { : api_(Api::createApiForTest(time_system_)), dispatcher_(api_->allocateDispatcher("test_thread")) {} - GradientControllerSharedPtr makeController(const std::string& yaml_config) { - const auto config = std::make_shared( - makeConfig(yaml_config, runtime_), *dispatcher_, runtime_, "test_prefix.", + DynamicGradientControllerSharedPtr makeDynamicController(const std::string& yaml_config) { + const auto controller = std::make_shared( + makeDynamicConfig(yaml_config, runtime_), *dispatcher_, runtime_, "test_prefix.", + *stats_.rootScope(), random_, time_system_); + + // Advance time so that the latency sample calculations don't underflow if monotonic time is 0. + time_system_.advanceTimeAndRun(std::chrono::hours(42), *dispatcher_, + Event::Dispatcher::RunType::Block); + + return controller; + } + + PinnedGradientControllerSharedPtr makePinnedController(const std::string& yaml_config) { + const auto controller = std::make_shared( + makePinnedConfig(yaml_config, runtime_), *dispatcher_, runtime_, "test_prefix.", *stats_.rootScope(), random_, time_system_); // Advance time so that the latency sample calculations don't underflow if monotonic time is 0. time_system_.advanceTimeAndRun(std::chrono::hours(42), *dispatcher_, Event::Dispatcher::RunType::Block); - return config; + return controller; } protected: - void sampleLatency(const GradientControllerSharedPtr& controller, - std::chrono::microseconds latency) { - controller->recordLatencySample(time_system_.monotonicTime() - latency); + void sampleLatency(ConcurrencyController& controller, std::chrono::microseconds latency) { + controller.recordLatencySample(time_system_.monotonicTime() - latency); } // Helper function that will attempt to pull forwarding decisions. - void tryForward(const GradientControllerSharedPtr& controller, - const bool expect_forward_response) { + void tryForward(ConcurrencyController& controller, const bool expect_forward_response) { const auto expected_resp = expect_forward_response ? RequestForwardingAction::Forward : RequestForwardingAction::Block; - EXPECT_EQ(expected_resp, controller->forwardingDecision()); + EXPECT_EQ(expected_resp, controller.forwardingDecision()); } // Gets the controller past the initial minRTT stage. - void advancePastMinRTTStage(const GradientControllerSharedPtr& controller, - const std::string& yaml_config, + void advancePastMinRTTStage(ConcurrencyController& controller, const std::string& yaml_config, std::chrono::milliseconds latency = std::chrono::milliseconds(5)) { - const auto config = makeConfig(yaml_config, runtime_); + const auto config = makeDynamicConfig(yaml_config, runtime_); for (uint32_t i = 0; i <= config.minRTTAggregateRequestCount(); ++i) { tryForward(controller, true); sampleLatency(controller, latency); @@ -129,10 +145,9 @@ TEST_F(GradientControllerConfigTest, BasicTest) { interval: 31s request_count: 52 min_concurrency: 8 - fixed_value: 42s )EOF"; - auto config = makeConfig(yaml, runtime_); + auto config = makeDynamicConfig(yaml, runtime_); EXPECT_EQ(config.minRTTCalcInterval(), std::chrono::seconds(31)); EXPECT_EQ(config.sampleRTTCalcInterval(), std::chrono::milliseconds(123)); @@ -141,26 +156,6 @@ TEST_F(GradientControllerConfigTest, BasicTest) { EXPECT_EQ(config.sampleAggregatePercentile(), .425); EXPECT_EQ(config.jitterPercent(), .132); EXPECT_EQ(config.minConcurrency(), 8); - EXPECT_EQ(config.fixedValue(), std::chrono::seconds(42)); -} - -TEST_F(GradientControllerConfigTest, MissingMinRTTValues) { - const std::string yaml = R"EOF( - sample_aggregate_percentile: - value: 42.5 - concurrency_limit_params: - max_concurrency_limit: 1337 - concurrency_update_interval: 0.123s - min_rtt_calc_params: - jitter: - value: 13.2 - request_count: 52 - min_concurrency: 8 - )EOF"; - - EXPECT_THROW_WITH_MESSAGE( - makeConfig(yaml, runtime_), EnvoyException, - "adaptive_concurrency: neither `concurrency_update_interval` nor `fixed_value` set"); } TEST_F(GradientControllerConfigTest, Clamping) { @@ -179,7 +174,7 @@ TEST_F(GradientControllerConfigTest, Clamping) { request_count: 52 )EOF"; - auto config = makeConfig(yaml, runtime_); + auto config = makeDynamicConfig(yaml, runtime_); // Should be clamped in the range [0,1]. @@ -213,7 +208,7 @@ TEST_F(GradientControllerConfigTest, BasicTestOverrides) { min_concurrency: 7 )EOF"; - auto config = makeConfig(yaml, runtime_); + auto config = makeDynamicConfig(yaml, runtime_); EXPECT_CALL(runtime_.snapshot_, getInteger(_, 31000)).WillOnce(Return(60000)); EXPECT_EQ(config.minRTTCalcInterval(), std::chrono::seconds(60)); @@ -248,7 +243,7 @@ TEST_F(GradientControllerConfigTest, DefaultValuesTest) { interval: 31s )EOF"; - auto config = makeConfig(yaml, runtime_); + auto config = makeDynamicConfig(yaml, runtime_); EXPECT_EQ(config.minRTTCalcInterval(), std::chrono::seconds(31)); EXPECT_EQ(config.sampleRTTCalcInterval(), std::chrono::milliseconds(123)); @@ -278,23 +273,23 @@ TEST_F(GradientControllerTest, MinRTTEpoch) { )EOF"; const int min_concurrency = 2; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); const auto min_rtt = std::chrono::milliseconds(1350); time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); verifyMinRTTActive(); EXPECT_EQ(controller->concurrencyLimit(), min_concurrency); - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(1350)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(1350)); verifyMinRTTInactive(); verifyMinRTTValue(std::chrono::milliseconds(1350)); // Advance time to just before the end of the epoch and inflate the concurrency limit. uint32_t last_limit = controller->concurrencyLimit(); for (int i = 0; i < 29; ++i) { - tryForward(controller, true); + tryForward(*controller, true); time_system_.advanceTimeAndRun(std::chrono::seconds(1), *dispatcher_, Event::Dispatcher::RunType::Block); - sampleLatency(controller, min_rtt); + sampleLatency(*controller, min_rtt); dispatcher_->run(Event::Dispatcher::RunType::Block); EXPECT_GT(controller->concurrencyLimit(), last_limit); last_limit = controller->concurrencyLimit(); @@ -304,7 +299,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) { // Send out requests that we won't attempt to sample until the next minRTT window so the requests // will be disregarded as they were started in the previous minRTT window. for (uint32_t i = 0; i < controller->concurrencyLimit(); ++i) { - tryForward(controller, true); + tryForward(*controller, true); ++active_rq_counter; } @@ -321,7 +316,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) { for (int i = 0; i < active_rq_counter; ++i) { // Sample requests that were send "5 minutes ago," which would surely be from an older minRTT // epoch. - sampleLatency(controller, std::chrono::minutes(5)); + sampleLatency(*controller, std::chrono::minutes(5)); } verifyMinRTTActive(); } @@ -341,7 +336,7 @@ TEST_F(GradientControllerTest, MinRTTLogicTest) { min_concurrency: 7 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); const auto min_rtt = std::chrono::milliseconds(13); // The controller should be measuring minRTT upon creation, so the concurrency window is 7 (the @@ -349,20 +344,20 @@ TEST_F(GradientControllerTest, MinRTTLogicTest) { verifyMinRTTActive(); EXPECT_EQ(controller->concurrencyLimit(), 7); for (int i = 0; i < 7; ++i) { - tryForward(controller, true); + tryForward(*controller, true); } - tryForward(controller, false); - tryForward(controller, false); + tryForward(*controller, false); + tryForward(*controller, false); time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); for (int i = 0; i < 7; ++i) { - sampleLatency(controller, min_rtt); + sampleLatency(*controller, min_rtt); } // 43 more requests should cause the minRTT to be done calculating. for (int i = 0; i < 43; ++i) { EXPECT_EQ(controller->concurrencyLimit(), 7); - tryForward(controller, true); - sampleLatency(controller, min_rtt); + tryForward(*controller, true); + sampleLatency(*controller, min_rtt); } // Verify the minRTT value measured is accurate. @@ -370,97 +365,6 @@ TEST_F(GradientControllerTest, MinRTTLogicTest) { verifyMinRTTValue(std::chrono::milliseconds(13)); } -TEST_F(GradientControllerTest, FixedMinRTT) { - const std::string yaml = R"EOF( -sample_aggregate_percentile: - value: 50 -concurrency_limit_params: - max_concurrency_limit: - concurrency_update_interval: 0.1s -min_rtt_calc_params: - fixed_value: 0.05s - min_concurrency: 7 -)EOF"; - - auto controller = makeController(yaml); - const auto min_rtt = std::chrono::milliseconds(50); - - verifyMinRTTInactive(); - EXPECT_EQ(controller->concurrencyLimit(), 7); // there is no sampled latency yet, so the - // concurrency limit defaults to the min concurrency - for (int i = 0; i < 7; ++i) { - tryForward(controller, true); - } - tryForward(controller, false); - tryForward(controller, false); - time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); - for (int i = 0; i < 7; ++i) { - EXPECT_EQ(controller->concurrencyLimit(), 7); - sampleLatency(controller, min_rtt); - } - - // Verify the minRTT value hasn't changed - time_system_.advanceTimeAndRun(std::chrono::milliseconds(101) - min_rtt, *dispatcher_, - Event::Dispatcher::RunType::Block); - verifyMinRTTInactive(); - verifyMinRTTValue(min_rtt); -} - -TEST_F(GradientControllerTest, FixedMinRTTChangeConcurrency) { - const std::string yaml = R"EOF( -sample_aggregate_percentile: - value: 50 -concurrency_limit_params: - max_concurrency_limit: - concurrency_update_interval: 0.1s -min_rtt_calc_params: - fixed_value: 0.05s - min_concurrency: 7 -)EOF"; - - auto controller = makeController(yaml); - const auto min_rtt = std::chrono::milliseconds(50); - int current_concurrency = 7; // there is no sampled latency yet, so the - // concurrency limit defaults to the min concurrency - - // lower sampled latency to trigger increased concurrency - verifyMinRTTInactive(); - for (int i = 0; i < current_concurrency; ++i) { - tryForward(controller, true); - } - time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); - for (int i = 0; i < current_concurrency; ++i) { - sampleLatency(controller, min_rtt - std::chrono::milliseconds(10)); - } - - time_system_.advanceTimeAndRun(std::chrono::milliseconds(101) - min_rtt, *dispatcher_, - Event::Dispatcher::RunType::Block); - EXPECT_GT(controller->concurrencyLimit(), current_concurrency); - current_concurrency = controller->concurrencyLimit(); - - // Ensure minRTT didn't change - verifyMinRTTInactive(); - verifyMinRTTValue(min_rtt); - - // increase sampled latency to trigger decreased concurrency - verifyMinRTTInactive(); - for (int i = 0; i < current_concurrency; ++i) { - tryForward(controller, true); - } - time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); - for (int i = 0; i < current_concurrency; ++i) { - sampleLatency(controller, min_rtt + std::chrono::milliseconds(50)); - } - - time_system_.advanceTimeAndRun(std::chrono::milliseconds(101) - min_rtt, *dispatcher_, - Event::Dispatcher::RunType::Block); - EXPECT_LT(controller->concurrencyLimit(), current_concurrency); - - // Ensure minRTT didn't change - verifyMinRTTInactive(); - verifyMinRTTValue(min_rtt); -} - TEST_F(GradientControllerTest, CancelLatencySample) { const std::string yaml = R"EOF( sample_aggregate_percentile: @@ -475,11 +379,11 @@ TEST_F(GradientControllerTest, CancelLatencySample) { request_count: 5 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(i)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(i)); } verifyMinRTTValue(std::chrono::milliseconds(3)); } @@ -498,15 +402,15 @@ TEST_F(GradientControllerTest, SamplePercentileProcessTest) { request_count: 5 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); - tryForward(controller, true); - tryForward(controller, true); - tryForward(controller, true); - tryForward(controller, false); + tryForward(*controller, true); + tryForward(*controller, true); + tryForward(*controller, true); + tryForward(*controller, false); controller->cancelLatencySample(); - tryForward(controller, true); - tryForward(controller, false); + tryForward(*controller, true); + tryForward(*controller, false); } TEST_F(GradientControllerTest, MinRTTBufferTest) { @@ -525,21 +429,21 @@ TEST_F(GradientControllerTest, MinRTTBufferTest) { value: 50 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); EXPECT_EQ(controller->concurrencyLimit(), 3); // Force a minRTT of 5ms. - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(5)); verifyMinRTTValue(std::chrono::milliseconds(5)); // Ensure that the minRTT doesn't decrease due to the buffer added. for (int recalcs = 0; recalcs < 10; ++recalcs) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); + tryForward(*controller, true); // Recording sample that's technically higher than the minRTT, but the 50% buffer should // prevent the concurrency limit from decreasing. - sampleLatency(controller, std::chrono::milliseconds(6)); + sampleLatency(*controller, std::chrono::milliseconds(6)); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -564,11 +468,11 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { min_concurrency: 7 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); EXPECT_EQ(controller->concurrencyLimit(), 7); // Force a minRTT of 5ms. - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(5)); verifyMinRTTValue(std::chrono::milliseconds(5)); // Ensure that the concurrency window increases on its own due to the headroom calculation with @@ -583,8 +487,8 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { for (int recalcs = 0; recalcs < 10; ++recalcs) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(4)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(4)); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -597,8 +501,8 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { for (int recalcs = 0; recalcs < 10; ++recalcs) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(6)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(6)); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -621,12 +525,12 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { request_count: 5 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); EXPECT_EQ(controller->concurrencyLimit(), 3); // Get initial minRTT measurement out of the way and advance time so request samples are not // thought to come from the previous minRTT epoch. - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(5)); time_system_.advanceTimeAndRun(std::chrono::seconds(1), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -634,8 +538,8 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(4)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(4)); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -657,8 +561,8 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { // 49 more requests should cause the minRTT to be done calculating. for (int i = 0; i < 5; ++i) { EXPECT_EQ(controller->concurrencyLimit(), 3); - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(13)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(13)); } // Check that we restored the old concurrency limit value. @@ -679,12 +583,12 @@ TEST_F(GradientControllerTest, MinRTTRescheduleTest) { request_count: 5 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); EXPECT_EQ(controller->concurrencyLimit(), 3); // Get initial minRTT measurement out of the way and advance time so request samples are not // thought to come from the previous minRTT epoch. - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(5)); time_system_.advanceTimeAndRun(std::chrono::seconds(1), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -692,8 +596,8 @@ TEST_F(GradientControllerTest, MinRTTRescheduleTest) { for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(4)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(4)); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -726,18 +630,18 @@ TEST_F(GradientControllerTest, NoSamplesTest) { request_count: 5 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); EXPECT_EQ(controller->concurrencyLimit(), 3); // Get minRTT measurement out of the way. - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(5)); // Force the limit calculation to run a few times from some measurements. for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::milliseconds(4)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::milliseconds(4)); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -779,9 +683,9 @@ TEST_F(GradientControllerTest, TimerAccuracyTest) { .WillOnce(Return(rtt_timer)) .WillOnce(Return(sample_timer)); EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); - auto controller = std::make_shared( - makeConfig(yaml, runtime_), fake_dispatcher, runtime_, "test_prefix.", *stats_.rootScope(), - random_, time_system_); + auto controller = std::make_shared( + makeDynamicConfig(yaml, runtime_), fake_dispatcher, runtime_, "test_prefix.", + *stats_.rootScope(), random_, time_system_); // Set the minRTT- this will trigger the timer for the next minRTT calculation. @@ -792,10 +696,10 @@ TEST_F(GradientControllerTest, TimerAccuracyTest) { // Verify the sample timer is reset after the minRTT calculation occurs. EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); for (int i = 0; i < 6; ++i) { - tryForward(controller, true); + tryForward(*controller, true); time_system_.advanceTimeAndRun(std::chrono::milliseconds(5), *dispatcher_, Event::Dispatcher::RunType::Block); - sampleLatency(controller, std::chrono::milliseconds(5)); + sampleLatency(*controller, std::chrono::milliseconds(5)); } } @@ -824,19 +728,19 @@ TEST_F(GradientControllerTest, TimerAccuracyTestNoJitter) { .WillOnce(Return(rtt_timer)) .WillOnce(Return(sample_timer)); EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); - auto controller = std::make_shared( - makeConfig(yaml, runtime_), fake_dispatcher, runtime_, "test_prefix.", *stats_.rootScope(), - random_, time_system_); + auto controller = std::make_shared( + makeDynamicConfig(yaml, runtime_), fake_dispatcher, runtime_, "test_prefix.", + *stats_.rootScope(), random_, time_system_); // Set the minRTT- this will trigger the timer for the next minRTT calculation. EXPECT_CALL(*rtt_timer, enableTimer(std::chrono::milliseconds(45000), _)); // Verify the sample timer is reset after the minRTT calculation occurs. EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); for (int i = 0; i < 6; ++i) { - tryForward(controller, true); + tryForward(*controller, true); time_system_.advanceTimeAndRun(std::chrono::milliseconds(5), *dispatcher_, Event::Dispatcher::RunType::Block); - sampleLatency(controller, std::chrono::milliseconds(5)); + sampleLatency(*controller, std::chrono::milliseconds(5)); } } @@ -859,11 +763,11 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) { min_concurrency: 7 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); EXPECT_EQ(controller->concurrencyLimit(), 7); // Force a minRTT of 5ms. - advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); + advancePastMinRTTStage(*controller, yaml, std::chrono::milliseconds(5)); verifyMinRTTValue(std::chrono::milliseconds(5)); // Ensure that the concurrency window increases on its own due to the headroom calculation with @@ -878,8 +782,8 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) { const auto elevated_latency = std::chrono::milliseconds(10); for (int recalcs = 0; recalcs < 5; ++recalcs) { for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, elevated_latency); + tryForward(*controller, true); + sampleLatency(*controller, elevated_latency); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -889,8 +793,8 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) { for (int recalcs = 0; recalcs < 10; ++recalcs) { const auto last_concurrency = controller->concurrencyLimit(); for (int i = 1; i <= 5; ++i) { - tryForward(controller, true); - sampleLatency(controller, elevated_latency); + tryForward(*controller, true); + sampleLatency(*controller, elevated_latency); } time_system_.advanceTimeAndRun(std::chrono::milliseconds(101), *dispatcher_, Event::Dispatcher::RunType::Block); @@ -916,13 +820,13 @@ TEST_F(GradientControllerTest, MultiThreadSampleInteractions) { min_concurrency: 100 )EOF"; - auto controller = makeController(yaml); + auto controller = makeDynamicController(yaml); auto& synchronizer = controller->synchronizer(); synchronizer.enable(); for (int i = 0; i < 4; ++i) { - tryForward(controller, true); - sampleLatency(controller, std::chrono::microseconds(1337)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::microseconds(1337)); } // The next sample will trigger the minRTT value update. We'll spin off a thread and block before @@ -930,8 +834,8 @@ TEST_F(GradientControllerTest, MultiThreadSampleInteractions) { EXPECT_TRUE(controller->inMinRTTSamplingWindow()); synchronizer.waitOn("pre_hist_insert"); std::thread t1([this, &controller]() { - tryForward(controller, true); - sampleLatency(controller, std::chrono::microseconds(1337)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::microseconds(1337)); }); // Wait for the thread to wait. @@ -941,8 +845,8 @@ TEST_F(GradientControllerTest, MultiThreadSampleInteractions) { // block since there is another thread in that critical section. We can just immediately join // after. std::thread t2([this, &controller]() { - tryForward(controller, true); - sampleLatency(controller, std::chrono::microseconds(1337)); + tryForward(*controller, true); + sampleLatency(*controller, std::chrono::microseconds(1337)); }); t2.join(); @@ -956,6 +860,95 @@ TEST_F(GradientControllerTest, MultiThreadSampleInteractions) { EXPECT_FALSE(controller->inMinRTTSamplingWindow()); } +TEST_F(GradientControllerTest, PinnedMinRTT) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_concurrency_limit: + concurrency_update_interval: 0.1s +min_rtt: 0.05s +min_concurrency: 7 +)EOF"; + + auto controller = makePinnedController(yaml); + const auto min_rtt = std::chrono::milliseconds(50); + + verifyMinRTTInactive(); + EXPECT_EQ(controller->concurrencyLimit(), 7); // there is no sampled latency yet, so the + // concurrency limit defaults to the min concurrency + for (int i = 0; i < 7; ++i) { + tryForward(*controller, true); + } + tryForward(*controller, false); + tryForward(*controller, false); + time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); + for (int i = 0; i < 7; ++i) { + EXPECT_EQ(controller->concurrencyLimit(), 7); + sampleLatency(*controller, min_rtt); + } + + // Verify the minRTT value hasn't changed + time_system_.advanceTimeAndRun(std::chrono::milliseconds(101) - min_rtt, *dispatcher_, + Event::Dispatcher::RunType::Block); + verifyMinRTTInactive(); + verifyMinRTTValue(min_rtt); +} + +TEST_F(GradientControllerTest, PinnedMinRTTChangeConcurrency) { + const std::string yaml = R"EOF( +sample_aggregate_percentile: + value: 50 +concurrency_limit_params: + max_concurrency_limit: + concurrency_update_interval: 0.1s +min_rtt: 0.05s +min_concurrency: 7 +)EOF"; + + auto controller = makePinnedController(yaml); + const auto min_rtt = std::chrono::milliseconds(50); + int current_concurrency = 7; // there is no sampled latency yet, so the + // concurrency limit defaults to the min concurrency + + // lower sampled latency to trigger increased concurrency + verifyMinRTTInactive(); + for (int i = 0; i < current_concurrency; ++i) { + tryForward(*controller, true); + } + time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); + for (int i = 0; i < current_concurrency; ++i) { + sampleLatency(*controller, min_rtt - std::chrono::milliseconds(10)); + } + + time_system_.advanceTimeAndRun(std::chrono::milliseconds(101) - min_rtt, *dispatcher_, + Event::Dispatcher::RunType::Block); + EXPECT_GT(controller->concurrencyLimit(), current_concurrency); + current_concurrency = controller->concurrencyLimit(); + + // Ensure minRTT didn't change + verifyMinRTTInactive(); + verifyMinRTTValue(min_rtt); + + // increase sampled latency to trigger decreased concurrency + verifyMinRTTInactive(); + for (int i = 0; i < current_concurrency; ++i) { + tryForward(*controller, true); + } + time_system_.advanceTimeAndRun(min_rtt, *dispatcher_, Event::Dispatcher::RunType::Block); + for (int i = 0; i < current_concurrency; ++i) { + sampleLatency(*controller, min_rtt + std::chrono::milliseconds(50)); + } + + time_system_.advanceTimeAndRun(std::chrono::milliseconds(101) - min_rtt, *dispatcher_, + Event::Dispatcher::RunType::Block); + EXPECT_LT(controller->concurrencyLimit(), current_concurrency); + + // Ensure minRTT didn't change + verifyMinRTTInactive(); + verifyMinRTTValue(min_rtt); +} + } // namespace } // namespace Controller } // namespace AdaptiveConcurrency