forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc_stream_test.cc
241 lines (216 loc) · 10.4 KB
/
grpc_stream_test.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#include "envoy/service/discovery/v3/discovery.pb.h"
#include "source/common/config/grpc_stream.h"
#include "source/common/protobuf/protobuf.h"
#include "test/common/stats/stat_test_utility.h"
#include "test/mocks/common.h"
#include "test/mocks/config/mocks.h"
#include "test/mocks/event/mocks.h"
#include "test/mocks/grpc/mocks.h"
#include "test/test_common/logging.h"
#include "test/test_common/simulated_time_system.h"
#include "test/test_common/utility.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
using testing::NiceMock;
using testing::Return;
namespace Envoy {
namespace Config {
namespace {
class GrpcStreamTest : public testing::Test {
protected:
GrpcStreamTest()
: async_client_owner_(std::make_unique<Grpc::MockAsyncClient>()),
async_client_(async_client_owner_.get()),
grpc_stream_(&callbacks_, std::move(async_client_owner_),
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"),
random_, dispatcher_, stats_, rate_limit_settings_) {}
NiceMock<Event::MockDispatcher> dispatcher_;
Grpc::MockAsyncStream async_stream_;
Stats::TestUtil::TestStore stats_;
NiceMock<Random::MockRandomGenerator> random_;
Envoy::Config::RateLimitSettings rate_limit_settings_;
NiceMock<MockGrpcStreamCallbacks> callbacks_;
std::unique_ptr<Grpc::MockAsyncClient> async_client_owner_;
Grpc::MockAsyncClient* async_client_;
Event::SimulatedTimeSystem time_system_;
GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>
grpc_stream_;
};
// Tests that establishNewStream() establishes it, a second call does nothing, and a third call
// after the stream was disconnected re-establishes it.
TEST_F(GrpcStreamTest, EstablishStream) {
EXPECT_FALSE(grpc_stream_.grpcStreamAvailable());
// Successful establishment
{
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
EXPECT_CALL(callbacks_, onStreamEstablished());
grpc_stream_.establishNewStream();
EXPECT_TRUE(grpc_stream_.grpcStreamAvailable());
}
// Idempotent
{
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).Times(0);
EXPECT_CALL(callbacks_, onStreamEstablished()).Times(0);
grpc_stream_.establishNewStream();
EXPECT_TRUE(grpc_stream_.grpcStreamAvailable());
}
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "");
EXPECT_FALSE(grpc_stream_.grpcStreamAvailable());
// Successful re-establishment
{
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
EXPECT_CALL(callbacks_, onStreamEstablished());
grpc_stream_.establishNewStream();
EXPECT_TRUE(grpc_stream_.grpcStreamAvailable());
}
}
// Tests reducing log level depending on remote close status.
TEST_F(GrpcStreamTest, LogClose) {
// Failures with statuses that do not need special handling. They are always logged in the same
// way and so never saved.
{
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
// Benign status: debug.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Ok");
});
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
// Non-retriable failure: warn.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::NotFound, "Not Found");
});
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
}
// Repeated failures that warn after enough time.
{
// Retriable failure: debug.
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, "Unavailable");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::Unavailable);
// Different retriable failure: warn.
time_system_.advanceTimeWait(std::chrono::seconds(1));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS(
"warn", "stream closed: 4, Deadline Exceeded (previously 14, Unavailable since 1s ago)", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);
// Same retriable failure after a short amount of time: debug.
time_system_.advanceTimeWait(std::chrono::seconds(1));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("debug", "gRPC config stream closed", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);
// Same retriable failure after a long time: warn.
time_system_.advanceTimeWait(std::chrono::seconds(100));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed since 101s ago: 4, Deadline Exceeded", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"Deadline Exceeded");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);
// Warn again, using the newest message.
time_system_.advanceTimeWait(std::chrono::seconds(1));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS("warn", "gRPC config stream closed since 102s ago: 4, new message", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
"new message");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded);
// Different retriable failure, using the most recent error message from the previous one.
time_system_.advanceTimeWait(std::chrono::seconds(1));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
EXPECT_LOG_CONTAINS(
"warn",
"gRPC config stream closed: 14, Unavailable (previously 4, new message since 103s ago)", {
grpc_stream_.onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, "Unavailable");
});
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::Unavailable);
}
// Successfully receiving a message clears close status.
{
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
EXPECT_CALL(callbacks_, onStreamEstablished());
grpc_stream_.establishNewStream();
EXPECT_TRUE(grpc_stream_.grpcStreamAvailable());
// Status isn't cleared yet.
EXPECT_EQ(grpc_stream_.getCloseStatus().value(),
Grpc::Status::WellKnownGrpcStatus::Unavailable);
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
grpc_stream_.onReceiveMessage(std::move(response));
EXPECT_FALSE(grpc_stream_.getCloseStatus().has_value());
}
}
// A failure in the underlying gRPC machinery should result in grpcStreamAvailable() false. Calling
// sendMessage would segfault.
TEST_F(GrpcStreamTest, FailToEstablishNewStream) {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(nullptr));
EXPECT_CALL(callbacks_, onEstablishmentFailure());
grpc_stream_.establishNewStream();
EXPECT_FALSE(grpc_stream_.grpcStreamAvailable());
}
// Checks that sendMessage correctly passes a DiscoveryRequest down to the underlying gRPC
// machinery.
TEST_F(GrpcStreamTest, SendMessage) {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_));
grpc_stream_.establishNewStream();
envoy::service::discovery::v3::DiscoveryRequest request;
request.set_response_nonce("grpc_stream_test_noncense");
EXPECT_CALL(async_stream_, sendMessageRaw_(Grpc::ProtoBufferEq(request), false));
grpc_stream_.sendMessage(request);
}
// Tests that, upon a call of the GrpcStream::onReceiveMessage() callback, which is called by the
// underlying gRPC machinery, the received proto will make it up to the GrpcStreamCallbacks that the
// GrpcStream was given.
TEST_F(GrpcStreamTest, ReceiveMessage) {
envoy::service::discovery::v3::DiscoveryResponse response_copy;
response_copy.set_type_url("faketypeURL");
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>(response_copy);
envoy::service::discovery::v3::DiscoveryResponse received_message;
EXPECT_CALL(callbacks_, onDiscoveryResponse(_, _))
.WillOnce([&received_message](
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
ControlPlaneStats&) { received_message = *message; });
grpc_stream_.onReceiveMessage(std::move(response));
EXPECT_TRUE(TestUtility::protoEqual(response_copy, received_message));
}
// If the value has only ever been 0, the stat should remain unused, including after an attempt to
// write a 0 to it.
TEST_F(GrpcStreamTest, QueueSizeStat) {
grpc_stream_.maybeUpdateQueueSizeStat(0);
Stats::Gauge& pending_requests =
stats_.gauge("control_plane.pending_requests", Stats::Gauge::ImportMode::Accumulate);
EXPECT_FALSE(pending_requests.used());
grpc_stream_.maybeUpdateQueueSizeStat(123);
EXPECT_EQ(123, pending_requests.value());
grpc_stream_.maybeUpdateQueueSizeStat(0);
EXPECT_EQ(0, pending_requests.value());
}
// Just to add coverage to the no-op implementations of these callbacks (without exposing us to
// crashes from a badly behaved peer like PANIC("not implemented") would).
TEST_F(GrpcStreamTest, HeaderTrailerJustForCodeCoverage) {
Http::ResponseHeaderMapPtr response_headers{new Http::TestResponseHeaderMapImpl{}};
grpc_stream_.onReceiveInitialMetadata(std::move(response_headers));
Http::TestRequestHeaderMapImpl request_headers;
grpc_stream_.onCreateInitialMetadata(request_headers);
Http::ResponseTrailerMapPtr trailers{new Http::TestResponseTrailerMapImpl{}};
grpc_stream_.onReceiveTrailingMetadata(std::move(trailers));
}
} // namespace
} // namespace Config
} // namespace Envoy