From 893e957d6d7b97f5baf0630a5617ab92799f7eec Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 7 Feb 2019 13:28:24 -0600 Subject: [PATCH] Implemented healthz endpoint in http and grpc --- benchmark_test.go | 4 +- cache/lru.go | 24 +++--- client.go | 2 +- cmd/gubernator-server/main.go | 15 ++++ cmd/gubernator/main.go | 7 +- config.yaml | 9 +- etc/server-1.yaml | 2 +- etc/server-2.yaml | 2 +- etc/server-3.yaml | 2 +- functional_test.go | 1 - pb/peers.pb.go | 4 +- pb/ratelimit.pb.go | 152 ++++++++++++++++++++-------------- pb/ratelimit.proto | 13 ++- server.go | 37 ++++++--- 14 files changed, 172 insertions(+), 102 deletions(-) diff --git a/benchmark_test.go b/benchmark_test.go index d238ed6f..e3858119 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -63,10 +63,10 @@ func BenchmarkServer_NoOp(b *testing.B) { //total := time.Second / dur //fmt.Printf("Total: %d\n", total) - b.Run("Ping", func(b *testing.B) { + b.Run("HealthCheck", func(b *testing.B) { for n := 0; n < b.N; n++ { if err := client.Ping(context.Background()); err != nil { - b.Errorf("client.Ping() err: %s", err) + b.Errorf("client.HealthCheck() err: %s", err) } } }) diff --git a/cache/lru.go b/cache/lru.go index 84c9fa6b..9b7cdec0 100644 --- a/cache/lru.go +++ b/cache/lru.go @@ -20,12 +20,13 @@ package cache import ( "container/list" - "github.com/mailgun/gubernator/pb" - "github.com/sirupsen/logrus" "sync" "time" + "github.com/mailgun/gubernator/pb" "github.com/mailgun/holster" + "github.com/mailgun/holster/clock" + "github.com/sirupsen/logrus" ) type LRUCacheConfig struct { @@ -41,6 +42,9 @@ type LRUCacheConfig struct { // The initial cache size. If not provided defaults to 30% of the max cache size. InitialCacheSize int + + // Interval at which the cache should check if it needs to shrink or grow + InspectInterval clock.DurationJSON } // Cache is an thread unsafe LRU cache that supports expiration @@ -66,6 +70,8 @@ func NewLRUCache(conf LRUCacheConfig) *LRUCache { holster.SetDefault(&conf.MaxCacheSize, 50000) // If not provided init cache with 30 percent of the max cache size holster.SetDefault(&conf.InitialCacheSize, int(float32(conf.MaxCacheSize)*0.30)) + // Inspect the cache for possible resize every 30 seconds + holster.SetDefault(&conf.InspectInterval.Duration, time.Second*30) return &LRUCache{ log: logrus.WithField("category", "lru-cache"), @@ -82,23 +88,18 @@ func (c *LRUCache) inspectAndResize() { c.mutex.Lock() defer c.mutex.Unlock() - // If we have NOT reached the size of our cache - if c.cacheSize != c.Size() { - return - } - // Inspect the bottom 20% of the cache for expired items inspectSize := int(float32(c.cacheSize) * 0.20) ele := c.ll.Back() if ele == nil { - // Not sure how this would happened + // return if the cache is empty return } var prev *list.Element var count, expired = 0, 0 for { - if count == inspectSize { + if count == inspectSize || ele == nil { break } @@ -119,8 +120,7 @@ func (c *LRUCache) inspectAndResize() { // If all the elements expired, we can shrink the cache size if expired == inspectSize { - // TODO: Will never be called, since this code doesn't execute unless the cache is at capacity - // Increase the cache size by 30% + // Decrease the cache size by 30% newSize := c.cacheSize - int(float32(c.cacheSize)*0.30) // Don't shrink beyond the initial cache size if newSize < c.conf.InitialCacheSize { @@ -148,7 +148,7 @@ func (c *LRUCache) inspectAndResize() { } func (c *LRUCache) Start() error { - tick := time.NewTicker(time.Second * 5) + tick := time.NewTicker(c.conf.InspectInterval.Duration) c.wg.Until(func(done chan struct{}) bool { select { case <-tick.C: diff --git a/client.go b/client.go index d63699e0..733af0ff 100644 --- a/client.go +++ b/client.go @@ -73,7 +73,7 @@ func (c *Client) GetClient() pb.RateLimitServiceClient { } func (c *Client) Ping(ctx context.Context) error { - _, err := c.client.Ping(ctx, &pb.PingRequest{}) + _, err := c.client.HealthCheck(ctx, &pb.HealthCheckRequest{}) return err } diff --git a/cmd/gubernator-server/main.go b/cmd/gubernator-server/main.go index ba1c7399..2a090b03 100644 --- a/cmd/gubernator-server/main.go +++ b/cmd/gubernator-server/main.go @@ -2,13 +2,16 @@ package main import ( "context" + "net/http" "os" "github.com/mailgun/gubernator" "github.com/mailgun/gubernator/cache" "github.com/mailgun/gubernator/metrics" + "github.com/mailgun/gubernator/pb" "github.com/mailgun/gubernator/sync" "github.com/mailgun/service" + "github.com/mailgun/service/httpapi" ) var Version = "dev-build" @@ -43,6 +46,18 @@ func (s *Service) Start(ctx context.Context) error { return err } + err = s.AddHandler(httpapi.Spec{ + Method: "GET", + Path: "/healthz", + Scope: httpapi.ScopeProtected, + Handler: func(w http.ResponseWriter, r *http.Request, p map[string]string) (interface{}, error) { + return s.srv.HealthCheck(r.Context(), &pb.HealthCheckRequest{}) + }, + }) + if err != nil { + return err + } + return s.srv.Start() } diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index ba7f9b9e..e5006e92 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -7,7 +7,6 @@ import ( "os" "time" - "github.com/davecgh/go-spew/spew" "github.com/mailgun/gubernator" "github.com/mailgun/holster" ) @@ -47,12 +46,12 @@ func main() { fan.Run(func(obj interface{}) error { r := obj.(*gubernator.Request) // Now hit our cluster with the rate limits - resp, err := client.GetRateLimit(context.Background(), r) + _, err := client.GetRateLimit(context.Background(), r) checkErr(err) - if resp.Status == gubernator.OverLimit { + /*if resp.Status == gubernator.OverLimit { spew.Dump(resp) - } + }*/ return nil }, rateLimit) } diff --git a/config.yaml b/config.yaml index c44ac25a..318b51c7 100644 --- a/config.yaml +++ b/config.yaml @@ -29,10 +29,15 @@ AdvertiseAddress: localhost:9040 LRUCache: # Max size of the cache; The cache size will never grow beyond # this size. - MaxCacheSize: 100 + MaxCacheSize: 100000 # The initial size of the cache; The cache may grow or shrink # depending on the number of rate limits the server is handling # currently. However the cache will never shrink lower than the # initial size. - InitialCacheSize: 50 + InitialCacheSize: 10000 + # Interval at which gubernator should check if the cache size + # needs to shrink or grow. Lower the inspection interval to + # make the cache more responsive to high volume bursts, raise + # the interval to conserve more memory during bursts. + InspectInterval: 30s diff --git a/etc/server-1.yaml b/etc/server-1.yaml index 7252f809..9e192e0b 100644 --- a/etc/server-1.yaml +++ b/etc/server-1.yaml @@ -23,5 +23,5 @@ statsd: # Gubernator config ListenAddress: localhost:9040 LRUCache: - MaxCacheSize: 1000 + MaxCacheSize: 3000 InitialCacheSize: 50 diff --git a/etc/server-2.yaml b/etc/server-2.yaml index 69fb9ec0..964390ab 100644 --- a/etc/server-2.yaml +++ b/etc/server-2.yaml @@ -23,5 +23,5 @@ statsd: # Gubernator config ListenAddress: localhost:9041 LRUCache: - MaxCacheSize: 1000 + MaxCacheSize: 3000 InitialCacheSize: 50 diff --git a/etc/server-3.yaml b/etc/server-3.yaml index 442a5340..3a77e20c 100644 --- a/etc/server-3.yaml +++ b/etc/server-3.yaml @@ -23,5 +23,5 @@ statsd: # Gubernator config ListenAddress: localhost:9042 LRUCache: - MaxCacheSize: 1000 + MaxCacheSize: 3000 InitialCacheSize: 50 diff --git a/functional_test.go b/functional_test.go index a6609597..f15a2d11 100644 --- a/functional_test.go +++ b/functional_test.go @@ -144,7 +144,6 @@ func TestTokenBucket(t *testing.T) { } } -// TODO: This test is very time (clock) sensitive, We could ignore the number remaining and increase the limit duration func TestLeakyBucket(t *testing.T) { client, errs := gubernator.NewClient(gubernator.RandomPeer(peers)) require.Nil(t, errs) diff --git a/pb/peers.pb.go b/pb/peers.pb.go index 3a2e571c..c10920b8 100644 --- a/pb/peers.pb.go +++ b/pb/peers.pb.go @@ -16,8 +16,8 @@ It has these top-level messages: RateLimitRequest RateLimitConfig RateLimitResponse - PingRequest - PingResponse + HealthCheckRequest + HealthCheckResponse */ package pb diff --git a/pb/ratelimit.pb.go b/pb/ratelimit.pb.go index db14605a..92d52953 100644 --- a/pb/ratelimit.pb.go +++ b/pb/ratelimit.pb.go @@ -231,21 +231,48 @@ func (m *RateLimitResponse) GetMetadata() map[string]string { return nil } -type PingRequest struct { +type HealthCheckRequest struct { } -func (m *PingRequest) Reset() { *m = PingRequest{} } -func (m *PingRequest) String() string { return proto.CompactTextString(m) } -func (*PingRequest) ProtoMessage() {} -func (*PingRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} } +func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } +func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } +func (*HealthCheckRequest) ProtoMessage() {} +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{5} } -type PingResponse struct { +type HealthCheckResponse struct { + // Valid entries are 'healthy' or 'unhealthy' + Status string `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` + // If 'unhealthy' message indicates the problem + Message string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"` + // The number of peers we know about + PeerCount int32 `protobuf:"varint,3,opt,name=peer_count,json=peerCount" json:"peer_count,omitempty"` } -func (m *PingResponse) Reset() { *m = PingResponse{} } -func (m *PingResponse) String() string { return proto.CompactTextString(m) } -func (*PingResponse) ProtoMessage() {} -func (*PingResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} } +func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } +func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } +func (*HealthCheckResponse) ProtoMessage() {} +func (*HealthCheckResponse) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{6} } + +func (m *HealthCheckResponse) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *HealthCheckResponse) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + +func (m *HealthCheckResponse) GetPeerCount() int32 { + if m != nil { + return m.PeerCount + } + return 0 +} func init() { proto.RegisterType((*RateLimitRequestList)(nil), "pb.gubernator.RateLimitRequestList") @@ -253,8 +280,8 @@ func init() { proto.RegisterType((*RateLimitRequest)(nil), "pb.gubernator.RateLimitRequest") proto.RegisterType((*RateLimitConfig)(nil), "pb.gubernator.RateLimitConfig") proto.RegisterType((*RateLimitResponse)(nil), "pb.gubernator.RateLimitResponse") - proto.RegisterType((*PingRequest)(nil), "pb.gubernator.PingRequest") - proto.RegisterType((*PingResponse)(nil), "pb.gubernator.PingResponse") + proto.RegisterType((*HealthCheckRequest)(nil), "pb.gubernator.HealthCheckRequest") + proto.RegisterType((*HealthCheckResponse)(nil), "pb.gubernator.HealthCheckResponse") proto.RegisterEnum("pb.gubernator.RateLimitConfig_Algorithm", RateLimitConfig_Algorithm_name, RateLimitConfig_Algorithm_value) proto.RegisterEnum("pb.gubernator.RateLimitResponse_Status", RateLimitResponse_Status_name, RateLimitResponse_Status_value) } @@ -274,7 +301,7 @@ type RateLimitServiceClient interface { GetRateLimits(ctx context.Context, in *RateLimitRequestList, opts ...grpc.CallOption) (*RateLimitResponseList, error) // This method is for round trip benchmarking and can be used by // the client to determine connectivity to the server - Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) + HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) } type rateLimitServiceClient struct { @@ -294,9 +321,9 @@ func (c *rateLimitServiceClient) GetRateLimits(ctx context.Context, in *RateLimi return out, nil } -func (c *rateLimitServiceClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { - out := new(PingResponse) - err := grpc.Invoke(ctx, "/pb.gubernator.RateLimitService/Ping", in, out, c.cc, opts...) +func (c *rateLimitServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := grpc.Invoke(ctx, "/pb.gubernator.RateLimitService/HealthCheck", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -310,7 +337,7 @@ type RateLimitServiceServer interface { GetRateLimits(context.Context, *RateLimitRequestList) (*RateLimitResponseList, error) // This method is for round trip benchmarking and can be used by // the client to determine connectivity to the server - Ping(context.Context, *PingRequest) (*PingResponse, error) + HealthCheck(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) } func RegisterRateLimitServiceServer(s *grpc.Server, srv RateLimitServiceServer) { @@ -335,20 +362,20 @@ func _RateLimitService_GetRateLimits_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } -func _RateLimitService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PingRequest) +func _RateLimitService_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HealthCheckRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(RateLimitServiceServer).Ping(ctx, in) + return srv.(RateLimitServiceServer).HealthCheck(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/pb.gubernator.RateLimitService/Ping", + FullMethod: "/pb.gubernator.RateLimitService/HealthCheck", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(RateLimitServiceServer).Ping(ctx, req.(*PingRequest)) + return srv.(RateLimitServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest)) } return interceptor(ctx, in, info, handler) } @@ -362,8 +389,8 @@ var _RateLimitService_serviceDesc = grpc.ServiceDesc{ Handler: _RateLimitService_GetRateLimits_Handler, }, { - MethodName: "Ping", - Handler: _RateLimitService_Ping_Handler, + MethodName: "HealthCheck", + Handler: _RateLimitService_HealthCheck_Handler, }, }, Streams: []grpc.StreamDesc{}, @@ -373,41 +400,44 @@ var _RateLimitService_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("ratelimit.proto", fileDescriptor1) } var fileDescriptor1 = []byte{ - // 568 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0x4d, 0x6f, 0x13, 0x31, - 0x10, 0xad, 0xb3, 0x6d, 0x69, 0x26, 0x9f, 0xb5, 0x8a, 0x14, 0x85, 0xaf, 0x68, 0x8b, 0xd4, 0x70, - 0x59, 0xa4, 0x70, 0x41, 0x70, 0x80, 0xb4, 0x04, 0xd4, 0x26, 0x6d, 0x91, 0x9b, 0x22, 0xa8, 0x90, - 0x56, 0x4e, 0x3a, 0xa4, 0x16, 0xd9, 0xdd, 0xd4, 0xeb, 0xad, 0x94, 0x1b, 0xff, 0x88, 0x03, 0x57, - 0xfe, 0x16, 0x77, 0x64, 0x7b, 0xf3, 0x49, 0x4b, 0x6e, 0x9e, 0x37, 0xf3, 0x66, 0x9f, 0xe7, 0x8d, - 0x17, 0x4a, 0x92, 0x2b, 0x1c, 0x8a, 0x40, 0x28, 0x6f, 0x24, 0x23, 0x15, 0xd1, 0xc2, 0xa8, 0xe7, - 0x0d, 0x92, 0x1e, 0xca, 0x90, 0xab, 0x48, 0xba, 0x9f, 0x61, 0x87, 0x71, 0x85, 0x1d, 0x5d, 0xc1, - 0xf0, 0x3a, 0xc1, 0x58, 0x75, 0x44, 0xac, 0xe8, 0x5b, 0xc8, 0x69, 0xa6, 0x6f, 0xa8, 0x71, 0x85, - 0xd4, 0x9c, 0x7a, 0xae, 0xf1, 0xc4, 0x5b, 0x20, 0x7b, 0xcb, 0x4c, 0x06, 0x72, 0x82, 0xc4, 0xee, - 0x05, 0xdc, 0x9f, 0xcb, 0xc7, 0xa3, 0x28, 0x8c, 0xd1, 0xb4, 0x6e, 0xde, 0xd6, 0xba, 0x76, 0x77, - 0x6b, 0x4b, 0x5d, 0xe8, 0xfd, 0x93, 0x40, 0x79, 0xf9, 0xe3, 0xf4, 0x21, 0x64, 0x43, 0x1e, 0x60, - 0x3c, 0xe2, 0x7d, 0xac, 0x90, 0x1a, 0xa9, 0x67, 0xd9, 0x0c, 0xa0, 0x8f, 0x00, 0x92, 0x50, 0x5c, - 0x27, 0xe8, 0x7f, 0xc7, 0x71, 0x25, 0x63, 0xd3, 0x16, 0x69, 0xe3, 0x98, 0x52, 0x58, 0xbf, 0xd2, - 0x6a, 0x9c, 0x1a, 0xa9, 0x3b, 0xcc, 0x9c, 0xe9, 0x11, 0x6c, 0xcf, 0x84, 0xfa, 0xfd, 0x28, 0xfc, - 0x26, 0x06, 0x95, 0xf5, 0x1a, 0xa9, 0xe7, 0x1a, 0x8f, 0xef, 0x92, 0x7b, 0x60, 0xaa, 0x58, 0x49, - 0x2e, 0x02, 0xee, 0x6f, 0x02, 0xa5, 0xa5, 0x22, 0xba, 0x03, 0x1b, 0xa6, 0xb5, 0x11, 0xeb, 0x30, - 0x1b, 0xd0, 0x2a, 0x6c, 0x5d, 0x26, 0x92, 0x2b, 0x11, 0x85, 0x46, 0xa6, 0xc3, 0xa6, 0x31, 0x7d, - 0x0f, 0x59, 0x3e, 0x1c, 0x44, 0x52, 0xa8, 0xab, 0xc0, 0x48, 0x2d, 0x36, 0xea, 0xff, 0x57, 0xe2, - 0x35, 0x27, 0xf5, 0x6c, 0x46, 0x75, 0x9f, 0x43, 0x76, 0x8a, 0xd3, 0x32, 0xe4, 0xbb, 0xa7, 0xed, - 0xd6, 0x89, 0xbf, 0x7f, 0x7e, 0xd0, 0x6e, 0x75, 0xcb, 0x6b, 0x1a, 0xe9, 0xb4, 0x9a, 0xed, 0x2f, - 0x13, 0x84, 0xb8, 0x7f, 0x32, 0xb0, 0xfd, 0x8f, 0x25, 0xf4, 0x0d, 0x6c, 0xc6, 0x8a, 0xab, 0x24, - 0x36, 0x37, 0x28, 0x36, 0xf6, 0x56, 0x99, 0xe8, 0x9d, 0x99, 0x72, 0x96, 0xd2, 0xe8, 0x2e, 0x14, - 0xfa, 0x89, 0x94, 0x18, 0x2a, 0x3b, 0xe4, 0xf4, 0xc2, 0xf9, 0x14, 0x34, 0x5c, 0xba, 0x07, 0x25, - 0xeb, 0x80, 0xc4, 0x80, 0x8b, 0x50, 0x84, 0x83, 0xd4, 0xa5, 0xe2, 0xd0, 0xf6, 0x4e, 0x51, 0x6d, - 0xb1, 0xc4, 0x18, 0x95, 0xaf, 0x44, 0x80, 0xc6, 0x28, 0x87, 0x65, 0x0d, 0xd2, 0x15, 0x01, 0xd2, - 0x23, 0xd8, 0x0a, 0x50, 0xf1, 0x4b, 0xae, 0x78, 0x65, 0xc3, 0x2c, 0x9d, 0xb7, 0x52, 0xef, 0x71, - 0x4a, 0x68, 0x85, 0x4a, 0x8e, 0xd9, 0x94, 0x5f, 0x7d, 0x0d, 0x85, 0x85, 0x14, 0x2d, 0x83, 0xa3, - 0xf7, 0xca, 0xae, 0x9d, 0x3e, 0x6a, 0x77, 0x6f, 0xf8, 0x30, 0xc1, 0x74, 0xd7, 0x6c, 0xf0, 0x2a, - 0xf3, 0x92, 0xb8, 0xcf, 0x60, 0xd3, 0xce, 0x81, 0x96, 0x20, 0x77, 0x7e, 0xf2, 0xae, 0xc5, 0xfc, - 0xce, 0xe1, 0xf1, 0xa1, 0x9e, 0x7c, 0x11, 0xe0, 0xf4, 0xd3, 0x34, 0x26, 0x6e, 0x01, 0x72, 0x1f, - 0x45, 0x38, 0x48, 0x57, 0xdc, 0x2d, 0x42, 0xde, 0x86, 0x56, 0x5e, 0xe3, 0xd7, 0xfc, 0x3b, 0x38, - 0x43, 0x79, 0x23, 0xfa, 0x48, 0xbf, 0x42, 0xe1, 0x03, 0xaa, 0x29, 0x1c, 0xd3, 0xdd, 0x15, 0xcf, - 0x56, 0xbf, 0xca, 0xea, 0xd3, 0x55, 0xb3, 0xd0, 0x55, 0xee, 0x1a, 0x6d, 0xc2, 0xba, 0x96, 0x40, - 0xab, 0x4b, 0xf5, 0x73, 0x32, 0xab, 0x0f, 0x6e, 0xcd, 0xd9, 0x36, 0xee, 0xda, 0xfe, 0xbd, 0x8b, - 0xcc, 0xa8, 0xf7, 0x83, 0x90, 0xde, 0xa6, 0xf9, 0x25, 0xbd, 0xf8, 0x1b, 0x00, 0x00, 0xff, 0xff, - 0xa2, 0x91, 0x46, 0xfc, 0xa5, 0x04, 0x00, 0x00, + // 614 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x54, 0xcd, 0x6e, 0xd3, 0x4c, + 0x14, 0xad, 0xe3, 0xfe, 0xf9, 0xa6, 0xf9, 0xe9, 0x7c, 0xfd, 0x50, 0x14, 0xf1, 0x13, 0x5c, 0xa4, + 0x86, 0x8d, 0x91, 0xc2, 0x06, 0xc1, 0x02, 0xda, 0x10, 0xa0, 0x4d, 0xda, 0x4a, 0xd3, 0x1f, 0x41, + 0x85, 0x64, 0x4d, 0xd2, 0xdb, 0x64, 0xd4, 0xd8, 0x4e, 0x67, 0xc6, 0x95, 0xba, 0xe3, 0x8d, 0x78, + 0x01, 0x1e, 0x80, 0x17, 0x62, 0x8f, 0x66, 0xec, 0xa4, 0x71, 0x4a, 0x9b, 0x9d, 0xef, 0x99, 0x7b, + 0xae, 0xcf, 0x9c, 0x73, 0x6d, 0x28, 0x09, 0xa6, 0x70, 0xc8, 0x03, 0xae, 0xbc, 0x91, 0x88, 0x54, + 0x44, 0x0a, 0xa3, 0xae, 0xd7, 0x8f, 0xbb, 0x28, 0x42, 0xa6, 0x22, 0xe1, 0x7e, 0x85, 0x0d, 0xca, + 0x14, 0x76, 0x74, 0x07, 0xc5, 0xab, 0x18, 0xa5, 0xea, 0x70, 0xa9, 0xc8, 0x07, 0xc8, 0x6b, 0xa6, + 0x6f, 0xa8, 0xb2, 0x62, 0xd5, 0xec, 0x7a, 0xbe, 0xf1, 0xcc, 0xcb, 0x90, 0xbd, 0x59, 0x26, 0x05, + 0x31, 0x46, 0xa4, 0x7b, 0x06, 0xff, 0x4f, 0x9d, 0xcb, 0x51, 0x14, 0x4a, 0x34, 0xa3, 0xb7, 0xff, + 0x35, 0xba, 0x76, 0xff, 0xe8, 0x84, 0x9a, 0x99, 0xfd, 0xd3, 0x82, 0xf2, 0xec, 0xcb, 0xc9, 0x63, + 0x70, 0x42, 0x16, 0xa0, 0x1c, 0xb1, 0x1e, 0x56, 0xac, 0x9a, 0x55, 0x77, 0xe8, 0x2d, 0x40, 0x9e, + 0x00, 0xc4, 0x21, 0xbf, 0x8a, 0xd1, 0xbf, 0xc4, 0x9b, 0x4a, 0x2e, 0x39, 0x4e, 0x90, 0x36, 0xde, + 0x10, 0x02, 0x8b, 0x03, 0xad, 0xc6, 0xae, 0x59, 0x75, 0x9b, 0x9a, 0x67, 0xb2, 0x07, 0xeb, 0xb7, + 0x42, 0xfd, 0x5e, 0x14, 0x5e, 0xf0, 0x7e, 0x65, 0xb1, 0x66, 0xd5, 0xf3, 0x8d, 0xa7, 0xf7, 0xc9, + 0x6d, 0x9a, 0x2e, 0x5a, 0x12, 0x59, 0xc0, 0xfd, 0x65, 0x41, 0x69, 0xa6, 0x89, 0x6c, 0xc0, 0x92, + 0x19, 0x6d, 0xc4, 0xda, 0x34, 0x29, 0x48, 0x15, 0x56, 0xcf, 0x63, 0xc1, 0x14, 0x8f, 0x42, 0x23, + 0xd3, 0xa6, 0x93, 0x9a, 0x7c, 0x02, 0x87, 0x0d, 0xfb, 0x91, 0xe0, 0x6a, 0x10, 0x18, 0xa9, 0xc5, + 0x46, 0xfd, 0x61, 0x25, 0xde, 0xf6, 0xb8, 0x9f, 0xde, 0x52, 0xdd, 0x57, 0xe0, 0x4c, 0x70, 0x52, + 0x86, 0xb5, 0xe3, 0xc3, 0x76, 0xeb, 0xc0, 0xdf, 0x39, 0x69, 0xb6, 0x5b, 0xc7, 0xe5, 0x05, 0x8d, + 0x74, 0x5a, 0xdb, 0xed, 0x6f, 0x63, 0xc4, 0x72, 0xff, 0xe4, 0x60, 0xfd, 0x4e, 0x24, 0xe4, 0x3d, + 0x2c, 0x4b, 0xc5, 0x54, 0x2c, 0xcd, 0x0d, 0x8a, 0x8d, 0xad, 0x79, 0x21, 0x7a, 0x47, 0xa6, 0x9d, + 0xa6, 0x34, 0xb2, 0x09, 0x85, 0x5e, 0x2c, 0x04, 0x86, 0x2a, 0x31, 0x39, 0xbd, 0xf0, 0x5a, 0x0a, + 0x1a, 0x2e, 0xd9, 0x82, 0x52, 0x92, 0x80, 0xc0, 0x80, 0xf1, 0x90, 0x87, 0xfd, 0x34, 0xa5, 0xe2, + 0x30, 0x99, 0x9d, 0xa2, 0x3a, 0x62, 0x81, 0x12, 0x95, 0xaf, 0x78, 0x80, 0x26, 0x28, 0x9b, 0x3a, + 0x06, 0x39, 0xe6, 0x01, 0x92, 0x3d, 0x58, 0x0d, 0x50, 0xb1, 0x73, 0xa6, 0x58, 0x65, 0xc9, 0x2c, + 0x9d, 0x37, 0x57, 0xef, 0x7e, 0x4a, 0x68, 0x85, 0x4a, 0xdc, 0xd0, 0x09, 0xbf, 0xfa, 0x0e, 0x0a, + 0x99, 0x23, 0x52, 0x06, 0x5b, 0xef, 0x55, 0xb2, 0x76, 0xfa, 0x51, 0xa7, 0x7b, 0xcd, 0x86, 0x31, + 0xa6, 0xbb, 0x96, 0x14, 0x6f, 0x73, 0x6f, 0x2c, 0xf7, 0x25, 0x2c, 0x27, 0x3e, 0x90, 0x12, 0xe4, + 0x4f, 0x0e, 0x3e, 0xb6, 0xa8, 0xdf, 0xd9, 0xdd, 0xdf, 0xd5, 0xce, 0x17, 0x01, 0x0e, 0x4f, 0x27, + 0xb5, 0xe5, 0x6e, 0x00, 0xf9, 0x82, 0x6c, 0xa8, 0x06, 0xcd, 0x01, 0xf6, 0x2e, 0xd3, 0x4d, 0x77, + 0x2f, 0xe0, 0xbf, 0x0c, 0x9a, 0xc6, 0xf1, 0x28, 0x13, 0x87, 0x33, 0x71, 0xb9, 0x02, 0x2b, 0x01, + 0x4a, 0xc9, 0xfa, 0x63, 0x2d, 0xe3, 0x52, 0x3b, 0x36, 0x42, 0x14, 0x7e, 0x2f, 0x8a, 0x43, 0x65, + 0x5c, 0x5d, 0xa2, 0x8e, 0x46, 0x9a, 0x1a, 0x68, 0xfc, 0x9e, 0xfe, 0xcc, 0x8e, 0x50, 0x5c, 0xf3, + 0x1e, 0x92, 0xef, 0x50, 0xf8, 0x8c, 0x6a, 0x02, 0x4b, 0xb2, 0x39, 0xe7, 0xaf, 0xa0, 0x3f, 0xfa, + 0xea, 0x8b, 0x79, 0x56, 0xeb, 0x2e, 0x77, 0x81, 0x9c, 0x42, 0x7e, 0xea, 0x6a, 0xe4, 0xf9, 0x0c, + 0xed, 0xae, 0x19, 0x55, 0xf7, 0xa1, 0x96, 0x64, 0xb6, 0xbb, 0xb0, 0xb3, 0x72, 0x96, 0x1b, 0x75, + 0x7f, 0x58, 0x56, 0x77, 0xd9, 0xfc, 0x06, 0x5f, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x0f, 0x65, + 0xeb, 0x06, 0x19, 0x05, 0x00, 0x00, } diff --git a/pb/ratelimit.proto b/pb/ratelimit.proto index 0f22847f..086de077 100644 --- a/pb/ratelimit.proto +++ b/pb/ratelimit.proto @@ -12,7 +12,7 @@ service RateLimitService { // This method is for round trip benchmarking and can be used by // the client to determine connectivity to the server - rpc Ping (PingRequest) returns (PingResponse) {} + rpc HealthCheck (HealthCheckRequest) returns (HealthCheckResponse) {} } // Must specify at least one RateLimitRequest. @@ -79,5 +79,12 @@ message RateLimitResponse { map metadata = 5; } -message PingRequest {} -message PingResponse {} +message HealthCheckRequest {} +message HealthCheckResponse { + // Valid entries are 'healthy' or 'unhealthy' + string status = 1; + // If 'unhealthy' message indicates the problem + string message = 2; + // The number of peers we know about + int32 peer_count = 3; +} diff --git a/server.go b/server.go index f79ef2e0..76da21b6 100644 --- a/server.go +++ b/server.go @@ -2,7 +2,9 @@ package gubernator import ( "context" + "fmt" "net" + "strings" "sync" "time" @@ -15,16 +17,19 @@ import ( const ( maxRequestSize = 1 * 1024 * 1024 // 1Mb + Healthy = "healthy" + UnHealthy = "unhealthy" ) type Server struct { + health pb.HealthCheckResponse wg holster.WaitGroup + log *logrus.Entry conf ServerConfig listener net.Listener server *grpc.Server peerMutex sync.RWMutex client *PeerClient - log *logrus.Entry } // New creates a server instance. @@ -89,7 +94,7 @@ func (s *Server) Start() error { client := pb.NewRateLimitServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() - _, err = client.Ping(ctx, &pb.PingRequest{}) + _, err = client.HealthCheck(ctx, &pb.HealthCheckRequest{}) return err }) }() @@ -123,7 +128,6 @@ func (s *Server) GetRateLimits(ctx context.Context, reqs *pb.RateLimitRequestLis var result pb.RateLimitResponseList // TODO: Support getting multiple keys in an async manner (FanOut) - // TODO: Determine what the server Quality of Service is and set context timeouts for each async request? for i, req := range reqs.RateLimits { if req.RateLimitConfig == nil { return nil, errors.Errorf("required field 'RateLimitConfig' missing from 'RateLimit[%d]'", i) @@ -184,9 +188,11 @@ func (s *Server) GetPeerRateLimits(ctx context.Context, req *pb.PeerRateLimitReq return &resp, nil } -// Used for GRPC Benchmarking and liveliness checks -func (s *Server) Ping(ctx context.Context, in *pb.PingRequest) (*pb.PingResponse, error) { - return &pb.PingResponse{}, nil +// Returns the health of the peer. +func (s *Server) HealthCheck(ctx context.Context, in *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) { + s.peerMutex.RLock() + defer s.peerMutex.RUnlock() + return &s.health, nil } func (s *Server) applyAlgorithm(entry *pb.RateLimitRequest) (*pb.RateLimitResponse, error) { @@ -212,14 +218,13 @@ func (s *Server) applyAlgorithm(entry *pb.RateLimitRequest) (*pb.RateLimitRespon // Called by PeerSyncer when the cluster config changes func (s *Server) updatePeers(conf *PeerConfig) { - s.log.WithField("peers", conf.Peers).Debug("Peers updated") picker := s.conf.Picker.New() + var errs []string for _, peer := range conf.Peers { peerInfo, err := NewPeerClient(peer) if err != nil { - // TODO: Notify someone that we are unhealthy - s.log.Errorf("Unable to connect to peer '%s'; skip add to consistent hash", peer) + errs = append(errs, fmt.Sprintf("failed to connect to peer '%s'; consistent hash is incomplete", peer)) continue } @@ -237,10 +242,20 @@ func (s *Server) updatePeers(conf *PeerConfig) { // TODO: schedule a disconnect for old PeerClients once they are no longer in flight - // Replace our current picker s.peerMutex.Lock() + defer s.peerMutex.Unlock() + + // Replace our current picker s.conf.Picker = picker - s.peerMutex.Unlock() + + // Update our health status + s.health.Status = Healthy + if len(errs) != 0 { + s.health.Status = UnHealthy + s.health.Message = strings.Join(errs, "|") + } + s.health.PeerCount = int32(picker.Size()) + s.log.WithField("peers", conf.Peers).Debug("Peers updated") } func retry(attempts int, d time.Duration, callback func() error) (err error) {