forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdelta_subscription_impl_test.cc
174 lines (154 loc) · 8.38 KB
/
delta_subscription_impl_test.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/endpoint/v3/endpoint.pb.h"
#include "envoy/service/discovery/v3/discovery.pb.h"
#include "source/common/buffer/zero_copy_input_stream_impl.h"
#include "source/common/config/api_version.h"
#include "test/common/config/delta_subscription_test_harness.h"
namespace Envoy {
namespace Config {
namespace {
class DeltaSubscriptionImplTest : public DeltaSubscriptionTestHarness,
public testing::TestWithParam<LegacyOrUnified> {
protected:
DeltaSubscriptionImplTest() : DeltaSubscriptionTestHarness(GetParam()){};
// We need to destroy the subscription before the test's destruction, because the subscription's
// destructor removes its watch from the NewGrpcMuxImpl, and that removal process involves
// some things held by the test fixture.
void TearDown() override { doSubscriptionTearDown(); }
};
INSTANTIATE_TEST_SUITE_P(DeltaSubscriptionImplTest, DeltaSubscriptionImplTest,
testing::ValuesIn({LegacyOrUnified::Legacy, LegacyOrUnified::Unified}));
TEST_P(DeltaSubscriptionImplTest, UpdateResourcesCausesRequest) {
startSubscription({"name1", "name2", "name3"});
expectSendMessage({"name4"}, {"name1", "name2"}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->updateResourceInterest({"name3", "name4"});
expectSendMessage({"name1", "name2"}, {}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->updateResourceInterest({"name1", "name2", "name3", "name4"});
expectSendMessage({}, {"name1", "name2"}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->updateResourceInterest({"name3", "name4"});
expectSendMessage({"name1", "name2"}, {}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->updateResourceInterest({"name1", "name2", "name3", "name4"});
expectSendMessage({}, {"name1", "name2", "name3"}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->updateResourceInterest({"name4"});
}
// Checks that after a pause(), no requests are sent until resume().
// Also demonstrates the collapsing of subscription interest updates into a single
// request. (This collapsing happens any time multiple updates arrive before a request
// can be sent, not just with pausing: rate limiting or a down gRPC stream would also do it).
TEST_P(DeltaSubscriptionImplTest, PauseHoldsRequest) {
startSubscription({"name1", "name2", "name3"});
auto resume_sub = subscription_->pause();
// If nested pause wasn't handled correctly, the single expectedSendMessage below would be
// insufficient.
auto nested_resume_sub = subscription_->pause();
expectSendMessage({"name4"}, {"name1", "name2"}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
// If not for the pause, these updates would make the expectSendMessage fail due to too many
// messages being sent.
subscription_->updateResourceInterest({"name3", "name4"});
subscription_->updateResourceInterest({"name1", "name2", "name3", "name4"});
subscription_->updateResourceInterest({"name3", "name4"});
subscription_->updateResourceInterest({"name1", "name2", "name3", "name4"});
subscription_->updateResourceInterest({"name3", "name4"});
}
TEST_P(DeltaSubscriptionImplTest, ResponseCausesAck) {
startSubscription({"name1"});
deliverConfigUpdate({"name1"}, "someversion", true);
}
// Checks that after a pause(), no ACK requests are sent until resume(), but that after the
// resume, *all* ACKs that arrived during the pause are sent (in order).
TEST_P(DeltaSubscriptionImplTest, PauseQueuesAcks) {
startSubscription({"name1", "name2", "name3"});
auto resume_sub = subscription_->pause();
// The server gives us our first version of resource name1.
// subscription_ now wants to ACK name1 (but can't due to pause).
{
auto message = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryResponse>();
auto* resource = message->mutable_resources()->Add();
resource->set_name("name1");
resource->set_version("version1A");
const std::string nonce = std::to_string(HashUtil::xxHash64("version1A"));
message->set_nonce(nonce);
message->set_type_url(Config::TypeUrl::get().ClusterLoadAssignment);
nonce_acks_required_.push(nonce);
onDiscoveryResponse(std::move(message));
}
// The server gives us our first version of resource name2.
// subscription_ now wants to ACK name1 and then name2 (but can't due to pause).
{
auto message = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryResponse>();
auto* resource = message->mutable_resources()->Add();
resource->set_name("name2");
resource->set_version("version2A");
const std::string nonce = std::to_string(HashUtil::xxHash64("version2A"));
message->set_nonce(nonce);
message->set_type_url(Config::TypeUrl::get().ClusterLoadAssignment);
nonce_acks_required_.push(nonce);
onDiscoveryResponse(std::move(message));
}
// The server gives us an updated version of resource name1.
// subscription_ now wants to ACK name1A, then name2, then name1B (but can't due to pause).
{
auto message = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryResponse>();
auto* resource = message->mutable_resources()->Add();
resource->set_name("name1");
resource->set_version("version1B");
const std::string nonce = std::to_string(HashUtil::xxHash64("version1B"));
message->set_nonce(nonce);
message->set_type_url(Config::TypeUrl::get().ClusterLoadAssignment);
nonce_acks_required_.push(nonce);
onDiscoveryResponse(std::move(message));
}
// All ACK sendMessage()s will happen upon calling resume().
EXPECT_CALL(async_stream_, sendMessageRaw_(_, _))
.WillRepeatedly(Invoke([this](Buffer::InstancePtr& buffer, bool) {
API_NO_BOOST(envoy::service::discovery::v3::DeltaDiscoveryRequest) message;
EXPECT_TRUE(Grpc::Common::parseBufferInstance(std::move(buffer), message));
const std::string nonce = message.response_nonce();
if (!nonce.empty()) {
nonce_acks_sent_.push(nonce);
}
}));
// DeltaSubscriptionTestHarness's dtor will check that all ACKs were sent with the correct nonces,
// in the correct order.
}
class DeltaSubscriptionNoGrpcStreamTest : public testing::TestWithParam<LegacyOrUnified> {};
INSTANTIATE_TEST_SUITE_P(DeltaSubscriptionNoGrpcStreamTest, DeltaSubscriptionNoGrpcStreamTest,
testing::ValuesIn({LegacyOrUnified::Legacy, LegacyOrUnified::Unified}));
TEST_P(DeltaSubscriptionNoGrpcStreamTest, NoGrpcStream) {
Stats::IsolatedStoreImpl stats_store;
SubscriptionStats stats(Utility::generateStats(stats_store));
envoy::config::core::v3::Node node;
node.set_id("fo0");
NiceMock<LocalInfo::MockLocalInfo> local_info;
EXPECT_CALL(local_info, node()).WillRepeatedly(testing::ReturnRef(node));
NiceMock<Event::MockDispatcher> dispatcher;
NiceMock<Random::MockRandomGenerator> random;
Envoy::Config::RateLimitSettings rate_limit_settings;
NiceMock<Config::MockSubscriptionCallbacks> callbacks;
NiceMock<Config::MockOpaqueResourceDecoder> resource_decoder;
auto* async_client = new Grpc::MockAsyncClient();
const Protobuf::MethodDescriptor* method_descriptor =
Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints");
GrpcMuxSharedPtr xds_context;
if (GetParam() == LegacyOrUnified::Unified) {
xds_context = std::make_shared<Config::XdsMux::GrpcMuxDelta>(
std::unique_ptr<Grpc::MockAsyncClient>(async_client), dispatcher, *method_descriptor,
random, stats_store, rate_limit_settings, local_info, false,
std::make_unique<NiceMock<MockCustomConfigValidators>>());
} else {
xds_context = std::make_shared<NewGrpcMuxImpl>(
std::unique_ptr<Grpc::MockAsyncClient>(async_client), dispatcher, *method_descriptor,
random, stats_store, rate_limit_settings, local_info,
std::make_unique<NiceMock<MockCustomConfigValidators>>());
}
GrpcSubscriptionImplPtr subscription = std::make_unique<GrpcSubscriptionImpl>(
xds_context, callbacks, resource_decoder, stats, Config::TypeUrl::get().ClusterLoadAssignment,
dispatcher, std::chrono::milliseconds(12345), false, SubscriptionOptions());
EXPECT_CALL(*async_client, startRaw(_, _, _, _)).WillOnce(Return(nullptr));
subscription->start({"name1"});
subscription->updateResourceInterest({"name1", "name2"});
}
} // namespace
} // namespace Config
} // namespace Envoy