Skip to content

Commit

Permalink
upstream: Fix moving EDS hosts between priorities. (envoyproxy#14483)
Browse files Browse the repository at this point in the history
At present if health checks are enabled and passing then moving an EDS host from P0->P1 is a NOOP, and P1->P0 results in an abort.

In the first case:
* P0 processing treats A as being removed because it's not in P0's list of endpoints anymore.
* P0 marks A's existing Host as PENDING_DYNAMIC_REMOVAL. It marks A as having been updated in this config apply.
* P1 skips over A because it is marked as updated in this update cycle already.

In the second case:
* P0 updates the priority on the existing Host. It is appended to the vector of added hosts.
* P1 marks A's existing Host as PENDING_DYNAMIC_REMOVAL. It does adjust the removed host vector as the host is still pending removal.
* A's Host is now in both priorities and is PENDING_DYNAMIC_REMOVAL. This is wrong, and would cause problems later but doesn't have a chance to because:
* onClusterMemberUpdate fires with A's existing Host in the added vector (remember it wasn't removed from P1!)
* HealthChecker attempts to create a new health check session on A, which results in an abort from the destructor of the already existing one.

This was masked in tests by the tests enabling ignore_health_on_host_removal.

We fix this by passing in the set of addresses that appear in the endpoint update. If a host being considered for removal appears in this set,
and it isn't being duplicated into the current priority as a result of a health check address change, then we assume it's being moved and will
immediately remove it.

To simplify the case where a host's health check address is being changed AND it is being moved between priorities we always apply priority moves in
place before we attempt any other modifications. This means such a case is treated as a separate priority move, followed by the health check address change.

fixes envoyproxy#11517

Signed-off-by: Jonathan Oddy <[email protected]>
  • Loading branch information
JonathanO authored Jan 13, 2021
1 parent a4d8d91 commit d830384
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 44 deletions.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Bug Fixes
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* active http health checks: properly handles HTTP/2 GOAWAY frames from the upstream. Previously a GOAWAY frame due to a graceful listener drain could cause improper failed health checks due to streams being refused by the upstream on a connection that is going away. To revert to old GOAWAY handling behavior, set the runtime feature `envoy.reloadable_features.health_check.graceful_goaway_handling` to false.
* upstream: fix handling of moving endpoints between priorities when active health checks are enabled. Previously moving to a higher numbered priority was a NOOP, and moving to a lower numbered priority caused an abort.

Removed Config or Runtime
-------------------------
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ using HostVector = std::vector<HostSharedPtr>;
using HealthyHostVector = Phantom<HostVector, Healthy>;
using DegradedHostVector = Phantom<HostVector, Degraded>;
using ExcludedHostVector = Phantom<HostVector, Excluded>;
using HostMap = absl::node_hash_map<std::string, Upstream::HostSharedPtr>;
using HostMap = absl::flat_hash_map<std::string, Upstream::HostSharedPtr>;
using HostVectorSharedPtr = std::shared_ptr<HostVector>;
using HostVectorConstSharedPtr = std::shared_ptr<const HostVector>;

Expand Down
26 changes: 15 additions & 11 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,20 @@ EdsClusterImpl::EdsClusterImpl(
void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}); }

void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) {
absl::node_hash_map<std::string, HostSharedPtr> updated_hosts;
absl::flat_hash_map<std::string, HostSharedPtr> updated_hosts;
absl::flat_hash_set<std::string> all_new_hosts;
PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb);
for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) {
parent_.validateEndpointsForZoneAwareRouting(locality_lb_endpoint);

priority_state_manager.initializePriorityFor(locality_lb_endpoint);

for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
priority_state_manager.registerHostForPriority(
lb_endpoint.endpoint().hostname(),
parent_.resolveProtoAddress(lb_endpoint.endpoint().address()), locality_lb_endpoint,
lb_endpoint, parent_.time_source_);
auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address());
priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
locality_lb_endpoint, lb_endpoint,
parent_.time_source_);
all_new_hosts.emplace(address->asString());
}
}

Expand All @@ -79,13 +81,13 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
if (priority_state[i].first != nullptr) {
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, *priority_state[i].first, parent_.locality_weights_map_[i],
priority_state[i].second, priority_state_manager, updated_hosts);
priority_state[i].second, priority_state_manager, updated_hosts, all_new_hosts);
} else {
// If the new update contains a priority with no hosts, call the update function with an empty
// set of hosts.
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map,
priority_state_manager, updated_hosts);
priority_state_manager, updated_hosts, all_new_hosts);
}
}

Expand All @@ -98,7 +100,7 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
}
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map,
priority_state_manager, updated_hosts);
priority_state_manager, updated_hosts, all_new_hosts);
}

parent_.all_hosts_ = std::move(updated_hosts);
Expand Down Expand Up @@ -236,7 +238,8 @@ bool EdsClusterImpl::updateHostsPerLocality(
const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts,
LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager,
absl::node_hash_map<std::string, HostSharedPtr>& updated_hosts) {
absl::flat_hash_map<std::string, HostSharedPtr>& updated_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts) {
const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor);
HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts()));

Expand All @@ -252,8 +255,9 @@ bool EdsClusterImpl::updateHostsPerLocality(
// performance implications, since this has the knock on effect that we rebuild the load balancers
// and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList
// about this. In the future we may need to do better here.
const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added,
hosts_removed, updated_hosts, all_hosts_);
const bool hosts_updated =
updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed,
updated_hosts, all_hosts_, all_new_hosts);
if (hosts_updated || host_set.overprovisioningFactor() != overprovisioning_factor ||
locality_weights_map != new_locality_weights_map) {
ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(),
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class EdsClusterImpl
bool updateHostsPerLocality(const uint32_t priority, const uint32_t overprovisioning_factor,
const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map,
LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager,
absl::node_hash_map<std::string, HostSharedPtr>& updated_hosts);
PriorityStateManager& priority_state_manager, HostMap& updated_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts);
bool validateUpdateSize(int num_resources);

// ClusterImplBase
Expand Down
7 changes: 4 additions & 3 deletions source/common/upstream/strict_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
if (status == Network::DnsResolver::ResolutionStatus::Success) {
parent_.info_->stats().update_success_.inc();

absl::node_hash_map<std::string, HostSharedPtr> updated_hosts;
HostMap updated_hosts;
HostVector new_hosts;
std::chrono::seconds ttl_refresh_rate = std::chrono::seconds::max();
absl::flat_hash_set<std::string> all_new_hosts;
for (const auto& resp : response) {
// TODO(mattklein123): Currently the DNS interface does not consider port. We need to
// make a new address that has port in it. We need to both support IPv6 as well as
Expand All @@ -135,14 +136,14 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoint_.locality(),
lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoint_.priority(),
lb_endpoint_.health_status(), parent_.time_source_));

all_new_hosts.emplace(new_hosts.back()->address()->asString());
ttl_refresh_rate = min(ttl_refresh_rate, resp.ttl_);
}

HostVector hosts_added;
HostVector hosts_removed;
if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
updated_hosts, all_hosts_)) {
updated_hosts, all_hosts_, all_new_hosts)) {
ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_);
ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
return host->priority() == locality_lb_endpoint_.priority();
Expand Down
29 changes: 21 additions & 8 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,11 @@ void PriorityStateManager::updateClusterPrioritySet(
}
}

bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
HostVector& current_priority_hosts,
HostVector& hosts_added_to_current_priority,
HostVector& hosts_removed_from_current_priority,
HostMap& updated_hosts,
const HostMap& all_hosts) {
bool BaseDynamicClusterImpl::updateDynamicHostList(
const HostVector& new_hosts, HostVector& current_priority_hosts,
HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority,
HostMap& updated_hosts, const HostMap& all_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts) {
uint64_t max_host_weight = 1;

// Did hosts change?
Expand All @@ -1414,8 +1413,10 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
// do the same thing.

// Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
absl::node_hash_set<std::string> existing_hosts_for_current_priority(
absl::flat_hash_set<std::string> existing_hosts_for_current_priority(
current_priority_hosts.size());
// Keep track of hosts we're adding (or replacing)
absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size());
HostVector final_hosts;
for (const HostSharedPtr& host : new_hosts) {
if (updated_hosts.count(host->address()->asString())) {
Expand Down Expand Up @@ -1499,6 +1500,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
final_hosts.push_back(existing_host->second);
updated_hosts[existing_host->second->address()->asString()] = existing_host->second;
} else {
new_hosts_for_current_priority.emplace(host->address()->asString());
if (host->weight() > max_host_weight) {
max_host_weight = host->weight();
}
Expand Down Expand Up @@ -1555,7 +1557,18 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
erase_from =
std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
[&updated_hosts, &final_hosts, &max_host_weight](const HostSharedPtr& p) {
[&all_new_hosts, &new_hosts_for_current_priority, &updated_hosts,
&final_hosts, &max_host_weight](const HostSharedPtr& p) {
if (all_new_hosts.contains(p->address()->asString()) &&
!new_hosts_for_current_priority.contains(p->address()->asString())) {
// If the address is being completely deleted from this priority, but is
// referenced from another priority, then we assume that the other
// priority will perform an in-place update to re-use the existing Host.
// We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but
// instead remove it immediately from this priority.
return false;
}

if (!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if (p->weight() > max_host_weight) {
Expand Down
6 changes: 4 additions & 2 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -913,13 +913,15 @@ class BaseDynamicClusterImpl : public ClusterImplBase {
* priority.
* @param updated_hosts is used to aggregate the new state of all hosts across priority, and will
* be updated with the hosts that remain in this priority after the update.
* @param all_hosts all known hosts prior to this host update.
* @param all_hosts all known hosts prior to this host update across all priorities.
* @param all_new_hosts addresses of all hosts in the new configuration across all priorities.
* @return whether the hosts for the priority changed.
*/
bool updateDynamicHostList(const HostVector& new_hosts, HostVector& current_priority_hosts,
HostVector& hosts_added_to_current_priority,
HostVector& hosts_removed_from_current_priority,
HostMap& updated_hosts, const HostMap& all_hosts);
HostMap& updated_hosts, const HostMap& all_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts);
};

/**
Expand Down
7 changes: 5 additions & 2 deletions source/extensions/clusters/redis/redis_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,22 @@ void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added,

void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) {
Upstream::HostVector new_hosts;
absl::flat_hash_set<std::string> all_new_hosts;

for (const ClusterSlot& slot : *slots) {
new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_));
all_new_hosts.emplace(slot.primary()->asString());
for (auto const& replica : slot.replicas()) {
new_hosts.emplace_back(new RedisHost(info(), "", replica, *this, false, time_source_));
all_new_hosts.emplace(replica->asString());
}
}

absl::node_hash_map<std::string, Upstream::HostSharedPtr> updated_hosts;
Upstream::HostMap updated_hosts;
Upstream::HostVector hosts_added;
Upstream::HostVector hosts_removed;
const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
updated_hosts, all_hosts_);
updated_hosts, all_hosts_, all_new_hosts);
const bool slot_updated =
lb_factory_ ? lb_factory_->onClusterSlotUpdate(std::move(slots), updated_hosts) : false;

Expand Down
Loading

0 comments on commit d830384

Please sign in to comment.