forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc_subscription_impl.h
90 lines (73 loc) · 3.54 KB
/
grpc_subscription_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#pragma once
#include <chrono>
#include <memory>
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription.h"
#include "envoy/event/dispatcher.h"
#include "source/common/common/logger.h"
#include "xds/core/v3/resource_locator.pb.h"
namespace Envoy {
namespace Config {
/**
* Adapter from typed Subscription to untyped GrpcMux. Also handles per-xDS API stats/logging.
*/
class GrpcSubscriptionImpl : public Subscription,
protected SubscriptionCallbacks,
Logger::Loggable<Logger::Id::config> {
public:
GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats,
absl::string_view type_url, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
const SubscriptionOptions& options);
// Config::Subscription
void start(const absl::flat_hash_set<std::string>& resource_names) override;
void
updateResourceInterest(const absl::flat_hash_set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const absl::flat_hash_set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) override;
void onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) override;
GrpcMuxSharedPtr grpcMux() { return grpc_mux_; }
ScopedResume pause();
private:
void disableInitFetchTimeoutTimer();
GrpcMuxSharedPtr grpc_mux_;
SubscriptionCallbacks& callbacks_;
OpaqueResourceDecoder& resource_decoder_;
SubscriptionStats stats_;
const std::string type_url_;
GrpcMuxWatchPtr watch_;
Event::Dispatcher& dispatcher_;
// NOTE: if another subscription of the same type_url has already been started, this value will be
// ignored in favor of the other subscription's.
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
const bool is_aggregated_;
const SubscriptionOptions options_;
struct ResourceNameFormatter {
void operator()(std::string* out, const Config::DecodedResourceRef& resource) {
out->append(resource.get().name());
}
};
};
using GrpcSubscriptionImplPtr = std::unique_ptr<GrpcSubscriptionImpl>;
using GrpcSubscriptionImplSharedPtr = std::shared_ptr<GrpcSubscriptionImpl>;
class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl {
public:
GrpcCollectionSubscriptionImpl(const xds::core::v3::ResourceLocator& collection_locator,
GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats,
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
const SubscriptionOptions& options);
void start(const absl::flat_hash_set<std::string>& resource_names) override;
private:
xds::core::v3::ResourceLocator collection_locator_;
};
} // namespace Config
} // namespace Envoy