Skip to content

Commit

Permalink
Merge Master
Browse files Browse the repository at this point in the history
  • Loading branch information
tanvi-jagtap committed Jan 16, 2025
2 parents e414b33 + e904f72 commit 27dfa5b
Show file tree
Hide file tree
Showing 38 changed files with 314 additions and 324 deletions.
2 changes: 0 additions & 2 deletions src/core/lib/debug/trace_flags.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/core/lib/debug/trace_flags.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions src/core/lib/debug/trace_flags.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ xds_resolver:
xds_server_config_fetcher:
default: false
description: XDS Server config fetcher.
xds_unittest:
default: true
description: xDS unit tests.
internal: true
xds_wrr_locality_lb:
default: false
description: XDS WRR locality LB policy.
5 changes: 1 addition & 4 deletions src/core/lib/iomgr/closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,7 @@ inline grpc_closure* grpc_closure_create(grpc_iomgr_cb_func cb, void* cb_arg) {
grpc_closure_create(cb, cb_arg)
#endif

#define GRPC_CLOSURE_LIST_INIT \
{ \
nullptr, nullptr \
}
#define GRPC_CLOSURE_LIST_INIT {nullptr, nullptr}

inline void grpc_closure_list_init(grpc_closure_list* closure_list) {
closure_list->head = closure_list->tail = nullptr;
Expand Down
18 changes: 10 additions & 8 deletions src/core/load_balancing/lb_policy_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ LoadBalancingPolicyRegistry::ParseLoadBalancingConfigHelper(
return absl::InvalidArgumentError("oneOf violation");
}
auto it = lb_config.object().begin();
if (it->second.type() != Json::Type::kObject) {
auto& [policy_name, config] = *it;
if (config.type() != Json::Type::kObject) {
return absl::InvalidArgumentError("child entry should be of type object");
}
// If we support this policy, then select it.
if (LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(
it->first.c_str(), nullptr)) {
if (LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(policy_name,
nullptr)) {
return it;
}
policies_tried.push_back(it->first);
policies_tried.push_back(policy_name);
}
return absl::FailedPreconditionError(absl::StrCat(
"No known policies in list: ", absl::StrJoin(policies_tried, " ")));
Expand All @@ -126,15 +127,16 @@ absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(const Json& json) const {
auto policy = ParseLoadBalancingConfigHelper(json);
if (!policy.ok()) return policy.status();
auto& [policy_name, config] = **policy;
// Find factory.
LoadBalancingPolicyFactory* factory =
GetLoadBalancingPolicyFactory((*policy)->first.c_str());
GetLoadBalancingPolicyFactory(policy_name);
if (factory == nullptr) {
return absl::FailedPreconditionError(absl::StrFormat(
"Factory not found for policy \"%s\"", (*policy)->first));
return absl::FailedPreconditionError(
absl::StrFormat("Factory not found for policy \"%s\"", policy_name));
}
// Parse load balancing config via factory.
return factory->ParseLoadBalancingConfig((*policy)->second);
return factory->ParseLoadBalancingConfig(config);
}

} // namespace grpc_core
40 changes: 19 additions & 21 deletions src/core/load_balancing/outlier_detection/outlier_detection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -696,17 +696,18 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
// Remove any entries we no longer need in the subchannel map.
for (auto it = subchannel_state_map_.begin();
it != subchannel_state_map_.end();) {
if (current_addresses.find(it->first) == current_addresses.end()) {
auto& [address, subchannel_state] = *it;
if (current_addresses.find(address) == current_addresses.end()) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
std::string address_str =
grpc_sockaddr_to_string(&it->first, false).value_or("<unknown>");
grpc_sockaddr_to_string(&address, false).value_or("<unknown>");
LOG(INFO) << "[outlier_detection_lb " << this
<< "] removing subchannel map entry " << address_str;
}
// Don't hold a ref to the corresponding EndpointState object,
// because there could be subchannel wrappers keeping this alive
// for a while, and we don't need them to do any call tracking.
it->second->set_endpoint_state(nullptr);
subchannel_state->set_endpoint_state(nullptr);
it = subchannel_state_map_.erase(it);
} else {
++it;
Expand All @@ -715,10 +716,13 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
// Remove any entries we no longer need in the endpoint map.
for (auto it = endpoint_state_map_.begin();
it != endpoint_state_map_.end();) {
if (current_endpoints.find(it->first) == current_endpoints.end()) {
auto& endpoint_addresses = it->first;
if (current_endpoints.find(endpoint_addresses) ==
current_endpoints.end()) {
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this
<< "] removing endpoint map entry " << it->first.ToString();
<< "] removing endpoint map entry "
<< endpoint_addresses.ToString();
it = endpoint_state_map_.erase(it);
} else {
++it;
Expand Down Expand Up @@ -871,16 +875,11 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
endpoint_state->RotateBucket();
// Gather data to run success rate algorithm or failure percentage
// algorithm.
if (endpoint_state->ejection_time().has_value()) {
++ejected_host_count;
}
if (endpoint_state->ejection_time().has_value()) ++ejected_host_count;
std::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
endpoint_state->GetSuccessRateAndVolume();
if (!host_success_rate_and_volume.has_value()) {
continue;
}
double success_rate = host_success_rate_and_volume->first;
uint64_t request_volume = host_success_rate_and_volume->second;
if (!host_success_rate_and_volume.has_value()) continue;
auto [success_rate, request_volume] = *host_success_rate_and_volume;
if (config.success_rate_ejection.has_value()) {
if (request_volume >= config.success_rate_ejection->request_volume) {
success_rate_ejection_candidates[endpoint_state.get()] = success_rate;
Expand Down Expand Up @@ -1047,15 +1046,14 @@ class OutlierDetectionLbFactory final : public LoadBalancingPolicyFactory {
auto it = json.object().find("childPolicy");
if (it == json.object().end()) {
errors.AddError("field not present");
} else if (auto child_policy_config =
CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
!child_policy_config.ok()) {
errors.AddError(child_policy_config.status().message());
} else {
auto child_policy_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!child_policy_config.ok()) {
errors.AddError(child_policy_config.status().message());
} else {
child_policy = std::move(*child_policy_config);
}
child_policy = std::move(*child_policy_config);
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/core/load_balancing/ring_hash/ring_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -904,26 +904,28 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
auto it =
endpoint_map_.find(EndpointAddressSet(endpoints_[i].addresses()));
CHECK(it != endpoint_map_.end());
if (it->second->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
auto& endpoint = it->second;
if (endpoint->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
first_idle_index = endpoints_.size();
break;
}
if (first_idle_index == endpoints_.size() &&
it->second->connectivity_state() == GRPC_CHANNEL_IDLE) {
endpoint->connectivity_state() == GRPC_CHANNEL_IDLE) {
first_idle_index = i;
}
}
if (first_idle_index != endpoints_.size()) {
auto it = endpoint_map_.find(
EndpointAddressSet(endpoints_[first_idle_index].addresses()));
CHECK(it != endpoint_map_.end());
auto& endpoint = it->second;
GRPC_TRACE_LOG(ring_hash_lb, INFO)
<< "[RH " << this
<< "] triggering internal connection attempt for endpoint "
<< it->second.get() << " (" << endpoints_[first_idle_index].ToString()
<< endpoint.get() << " (" << endpoints_[first_idle_index].ToString()
<< ") (index " << first_idle_index << " of " << endpoints_.size()
<< ")";
it->second->RequestConnectionLocked();
endpoint->RequestConnectionLocked();
}
}
}
Expand Down
69 changes: 33 additions & 36 deletions src/core/load_balancing/rls/rls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -851,23 +851,20 @@ std::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
ValidationErrors::ScopedField json_field(errors, absl::StrCat("[", i, "]"));
if (child_json.type() != Json::Type::kObject) {
errors->AddError("is not an object");
} else if (const Json::Object& child = child_json.object();
child.size() != 1) {
errors->AddError("child policy object contains more than one field");
} else {
const Json::Object& child = child_json.object();
if (child.size() != 1) {
errors->AddError("child policy object contains more than one field");
const auto& [child_name, child_config_json] = *child.begin();
ValidationErrors::ScopedField json_field(
errors, absl::StrCat("[\"", child_name, "\"]"));
if (child_config_json.type() != Json::Type::kObject) {
errors->AddError("child policy config is not an object");
} else {
const std::string& child_name = child.begin()->first;
ValidationErrors::ScopedField json_field(
errors, absl::StrCat("[\"", child_name, "\"]"));
const Json& child_config_json = child.begin()->second;
if (child_config_json.type() != Json::Type::kObject) {
errors->AddError("child policy config is not an object");
} else {
Json::Object child_config = child_config_json.object();
child_config[field] = Json::FromString(value);
array.emplace_back(Json::FromObject(
{{child_name, Json::FromObject(std::move(child_config))}}));
}
Json::Object child_config = child_config_json.object();
child_config[field] = Json::FromString(value);
array.emplace_back(Json::FromObject(
{{child_name, Json::FromObject(std::move(child_config))}}));
}
}
}
Expand Down Expand Up @@ -1491,9 +1488,10 @@ void RlsLb::Cache::OnCleanupTimer() {
if (!cleanup_timer_handle_.has_value()) return;
if (lb_policy_->is_shutdown_) return;
for (auto it = map_.begin(); it != map_.end();) {
if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
size_ -= it->second->Size();
it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
auto& entry = it->second;
if (GPR_UNLIKELY(entry->ShouldRemove() && entry->CanEvict())) {
size_ -= entry->Size();
entry->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
it = map_.erase(it);
} else {
++it;
Expand All @@ -1515,12 +1513,13 @@ void RlsLb::Cache::MaybeShrinkSize(
if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
auto map_it = map_.find(*lru_it);
CHECK(map_it != map_.end());
if (!map_it->second->CanEvict()) break;
auto& entry = map_it->second;
if (!entry->CanEvict()) break;
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] LRU eviction: removing entry "
<< map_it->second.get() << " " << lru_it->ToString();
size_ -= map_it->second->Size();
map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
<< entry.get() << " " << lru_it->ToString();
size_ -= entry->Size();
entry->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
map_.erase(map_it);
}
GRPC_TRACE_LOG(rls_lb, INFO)
Expand Down Expand Up @@ -2013,21 +2012,19 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] unsetting default target";
default_child_policy_.reset();
} else if (auto it = child_policy_map_.find(config_->default_target());
it == child_policy_map_.end()) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] creating new default target";
default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
config_->default_target());
created_default_child = true;
} else {
auto it = child_policy_map_.find(config_->default_target());
if (it == child_policy_map_.end()) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] creating new default target";
default_child_policy_ = MakeRefCounted<ChildPolicyWrapper>(
RefAsSubclass<RlsLb>(DEBUG_LOCATION, "ChildPolicyWrapper"),
config_->default_target());
created_default_child = true;
} else {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] using existing child for default target";
default_child_policy_ =
it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
}
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] using existing child for default target";
default_child_policy_ =
it->second->Ref(DEBUG_LOCATION, "DefaultChildPolicy");
}
}
// Now grab the lock to swap out the state it guards.
Expand Down
14 changes: 6 additions & 8 deletions src/core/load_balancing/xds/xds_cluster_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,15 +884,13 @@ void XdsClusterImplLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
auto it = json.object().find("childPolicy");
if (it == json.object().end()) {
errors->AddError("field not present");
} else if (auto lb_config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
!lb_config.ok()) {
errors->AddError(lb_config.status().message());
} else {
auto lb_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
} else {
child_policy_ = std::move(*lb_config);
}
child_policy_ = std::move(*lb_config);
}
}

Expand Down
31 changes: 16 additions & 15 deletions src/core/load_balancing/xds/xds_override_host.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,30 +482,32 @@ XdsOverrideHostLb::Picker::PickOverriddenHost(
for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) {
auto it = policy_->subchannel_map_.find(address);
if (it == policy_->subchannel_map_.end()) continue;
auto& subchannel_entry = it->second;
if (!override_host_health_status_set_.Contains(
it->second->eds_health_status())) {
subchannel_entry->eds_health_status())) {
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Subchannel " << address << " health status is not overridden ("
<< it->second->eds_health_status().ToString() << ")";
<< subchannel_entry->eds_health_status().ToString() << ")";
continue;
}
auto subchannel = it->second->GetSubchannelRef();
auto subchannel = subchannel_entry->GetSubchannelRef();
if (subchannel == nullptr) {
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "No subchannel for " << address;
if (address_with_no_subchannel.empty()) {
address_with_no_subchannel = it->first;
address_with_no_subchannel = address;
}
continue;
}
auto connectivity_state = it->second->connectivity_state();
auto connectivity_state = subchannel_entry->connectivity_state();
if (connectivity_state == GRPC_CHANNEL_READY) {
// Found a READY subchannel. Pass back the actual address list
// and return the subchannel.
GRPC_TRACE_LOG(xds_override_host_lb, INFO)
<< "Picker override found READY subchannel " << address;
it->second->set_last_used_time();
override_host_attr->set_actual_address_list(it->second->address_list());
subchannel_entry->set_last_used_time();
override_host_attr->set_actual_address_list(
subchannel_entry->address_list());
return PickResult::Complete(subchannel->wrapped_subchannel());
} else if (connectivity_state == GRPC_CHANNEL_IDLE) {
if (idle_subchannel == nullptr) idle_subchannel = std::move(subchannel);
Expand Down Expand Up @@ -1234,15 +1236,14 @@ void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
auto it = json.object().find("childPolicy");
if (it == json.object().end()) {
errors->AddError("field not present");
} else if (auto child_policy_config =
CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
} else {
auto child_policy_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!child_policy_config.ok()) {
errors->AddError(child_policy_config.status().message());
} else {
child_config_ = std::move(*child_policy_config);
}
child_config_ = std::move(*child_policy_config);
}
}

Expand Down
Loading

0 comments on commit 27dfa5b

Please sign in to comment.