diff --git a/CHANGELOG b/CHANGELOG index 0eb5f1c7..20d7535d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Added TLS Support for both GRPC and HTTP interfaces #76 * Prometheus metrics are now prefixed with `gubernator_` * Switched prometheus Histograms to Summary's +* Changed gubernator.Config.GRPCServer to GRPCServers to support registering +with GRPC instances on multiple ports. +* Gubernator now opens a second GRPC instance on a random localhost port when +TLS is enabled for use by the HTTP API Gateway. ## [1.0.0-rc.2] - 2020-11-05 ### Change diff --git a/README.md b/README.md index fb34c09c..3980617f 100644 --- a/README.md +++ b/README.md @@ -255,7 +255,7 @@ don't have either, the docker-compose method is the simplest way to try gubernat $ docker run -p 8081:81 -p 9080:80 -e GUBER_ETCD_ENDPOINTS=etcd1:2379,etcd2:2379 \ thrawn01/gubernator:latest -# Hit the API at localhost:9080 +# Hit the HTTP API at localhost:9080 $ curl http://localhost:9080/v1/HealthCheck ``` @@ -271,7 +271,7 @@ $ vi docker-compose.yaml # Run the docker container $ docker-compose up -d -# Hit the API at localhost:9080 (GRPC is at 9081) +# Hit the HTTP API at localhost:9080 (GRPC is at 9081) $ curl http://localhost:9080/v1/HealthCheck ``` @@ -287,16 +287,24 @@ $ vi k8s-deployment.yaml $ kubectl create -f k8s-deployment.yaml ``` +##### TLS +Gubernator supports TLS for both HTTP and GRPC connections. You can see an example with +self signed certs by running `docker-compose-tls.yaml` +```bash +# Run docker compose +$ docker-compose -f docker-compose-tls.yaml up -d + +# Hit the HTTP API at localhost:9080 (GRPC is at 9081) +$ curl --cacert certs/ca.pem --cert certs/gubernator.pem --key certs/gubernator.key https://localhost:9080/v1/HealthCheck +` + ### Configuration Gubernator is configured via environment variables with an optional `--config` flag which takes a file of key/values and places them into the local environment before startup. See the `example.conf` for all available config options and their descriptions. - ### Architecture See [architecture.md](/architecture.md) for a full description of the architecture and the inner workings of gubernator. - - diff --git a/cluster/cluster.go b/cluster/cluster.go index 1fedc168..dfa39985 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -96,7 +96,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error { // Add the peers and daemons to the package level variables peers = append(peers, gubernator.PeerInfo{ - GRPCAddress: d.GRPCListener.Addr().String(), + GRPCAddress: d.GRPCListeners[0].Addr().String(), HTTPAddress: d.HTTPListener.Addr().String(), }) daemons = append(daemons, d) diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index e44a6e9f..c904781c 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -59,9 +59,9 @@ func TestStartMultipleDaemons(t *testing.T) { daemons := cluster.GetDaemons() assert.Equal(t, wantPeers, cluster.GetPeers()) assert.Equal(t, 2, len(daemons)) - assert.Equal(t, "127.0.0.1:1111", daemons[0].GRPCListener.Addr().String()) - assert.Equal(t, "127.0.0.1:2222", daemons[1].GRPCListener.Addr().String()) - assert.Equal(t, "127.0.0.1:2222", cluster.DaemonAt(1).GRPCListener.Addr().String()) + assert.Equal(t, "127.0.0.1:1111", daemons[0].GRPCListeners[0].Addr().String()) + assert.Equal(t, "127.0.0.1:2222", daemons[1].GRPCListeners[0].Addr().String()) + assert.Equal(t, "127.0.0.1:2222", cluster.DaemonAt(1).GRPCListeners[0].Addr().String()) assert.Equal(t, "127.0.0.1:2222", cluster.PeerAt(1).GRPCAddress) } diff --git a/config.go b/config.go index b74fe01c..29b15f4d 100644 --- a/config.go +++ b/config.go @@ -64,8 +64,8 @@ type BehaviorConfig struct { // config for a gubernator instance type Config struct { - // Required - GRPCServer *grpc.Server + // (Required) A list of GRPC servers to register our instance with + GRPCServers []*grpc.Server // (Optional) Adjust how gubernator behaviors are configured Behaviors BehaviorConfig @@ -267,32 +267,34 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig, } // TLS Config - setter.SetDefault(&conf.TLS, &TLSConfig{}) - setter.SetDefault(&conf.TLS.CaFile, os.Getenv("GUBER_TLS_CA")) - setter.SetDefault(&conf.TLS.CaKeyFile, os.Getenv("GUBER_TLS_CA_KEY")) - setter.SetDefault(&conf.TLS.KeyFile, os.Getenv("GUBER_TLS_KEY")) - setter.SetDefault(&conf.TLS.CertFile, os.Getenv("GUBER_TLS_CERT")) - setter.SetDefault(&conf.TLS.AutoTLS, getEnvBool(log, "GUBER_TLS_AUTO")) - - clientAuth := os.Getenv("GUBER_TLS_CLIENT_AUTH") - if clientAuth != "" { - clientAuthTypes := map[string]tls.ClientAuthType{ - "request-cert": tls.RequestClientCert, - "verify-cert": tls.VerifyClientCertIfGiven, - "require-any-cert": tls.RequireAnyClientCert, - "require-and-verify": tls.RequireAndVerifyClientCert, - } - t, ok := clientAuthTypes[clientAuth] - if !ok { - return conf, errors.Errorf("'GUBER_TLS_CLIENT_AUTH=%s' is invalid; choices are [%s]", - clientAuth, validClientAuthTypes(clientAuthTypes)) + if anyHasPrefix("GUBER_TLS_", os.Environ()) { + conf.TLS = &TLSConfig{} + setter.SetDefault(&conf.TLS.CaFile, os.Getenv("GUBER_TLS_CA")) + setter.SetDefault(&conf.TLS.CaKeyFile, os.Getenv("GUBER_TLS_CA_KEY")) + setter.SetDefault(&conf.TLS.KeyFile, os.Getenv("GUBER_TLS_KEY")) + setter.SetDefault(&conf.TLS.CertFile, os.Getenv("GUBER_TLS_CERT")) + setter.SetDefault(&conf.TLS.AutoTLS, getEnvBool(log, "GUBER_TLS_AUTO")) + + clientAuth := os.Getenv("GUBER_TLS_CLIENT_AUTH") + if clientAuth != "" { + clientAuthTypes := map[string]tls.ClientAuthType{ + "request-cert": tls.RequestClientCert, + "verify-cert": tls.VerifyClientCertIfGiven, + "require-any-cert": tls.RequireAnyClientCert, + "require-and-verify": tls.RequireAndVerifyClientCert, + } + t, ok := clientAuthTypes[clientAuth] + if !ok { + return conf, errors.Errorf("'GUBER_TLS_CLIENT_AUTH=%s' is invalid; choices are [%s]", + clientAuth, validClientAuthTypes(clientAuthTypes)) + } + conf.TLS.ClientAuth = t } - conf.TLS.ClientAuth = t + setter.SetDefault(&conf.TLS.ClientAuthKeyFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_KEY")) + setter.SetDefault(&conf.TLS.ClientAuthCertFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CERT")) + setter.SetDefault(&conf.TLS.ClientAuthCaFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CA_CERT")) + setter.SetDefault(&conf.TLS.InsecureSkipVerify, getEnvBool(log, "GUBER_TLS_INSECURE_SKIP_VERIFY")) } - setter.SetDefault(&conf.TLS.ClientAuthKeyFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_KEY")) - setter.SetDefault(&conf.TLS.ClientAuthCertFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CERT")) - setter.SetDefault(&conf.TLS.ClientAuthCaFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CA_CERT")) - setter.SetDefault(&conf.TLS.InsecureSkipVerify, getEnvBool(log, "GUBER_TLS_INSECURE_SKIP_VERIFY")) // ETCD Config setter.SetDefault(&conf.EtcdPoolConf.KeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers") diff --git a/daemon.go b/daemon.go index 9b56714a..8b6e96a9 100644 --- a/daemon.go +++ b/daemon.go @@ -36,15 +36,15 @@ import ( ) type Daemon struct { - GRPCListener net.Listener - HTTPListener net.Listener - V1Server *V1Instance + GRPCListeners []net.Listener + HTTPListener net.Listener + V1Server *V1Instance log logrus.FieldLogger pool PoolInterface conf DaemonConfig httpSrv *http.Server - grpcSrv *grpc.Server + grpcSrvs []*grpc.Server wg syncutil.WaitGroup statsHandler *GRPCStatsHandler promRegister *prometheus.Registry @@ -91,19 +91,17 @@ func (s *Daemon) Start(ctx context.Context) error { } if s.conf.ServerTLS() != nil { - creds := credentials.NewTLS(s.conf.ServerTLS()) - opts = append(opts, grpc.Creds(creds)) + // Create two GRPC server instances, one for TLS and the other for the API Gateway + s.grpcSrvs = append(s.grpcSrvs, grpc.NewServer(append(opts, grpc.Creds(credentials.NewTLS(s.conf.ServerTLS())))...)) } - - // New GRPC server - s.grpcSrv = grpc.NewServer(opts...) + s.grpcSrvs = append(s.grpcSrvs, grpc.NewServer(opts...)) // Registers a new gubernator instance with the GRPC server s.V1Server, err = NewV1Instance(Config{ PeerTLS: s.conf.ClientTLS(), DataCenter: s.conf.DataCenter, LocalPicker: s.conf.Picker, - GRPCServer: s.grpcSrv, + GRPCServers: s.grpcSrvs, Logger: s.log, Cache: cache, }) @@ -114,19 +112,45 @@ func (s *Daemon) Start(ctx context.Context) error { // V1Server instance also implements prometheus.Collector interface s.promRegister.Register(s.V1Server) - s.GRPCListener, err = net.Listen("tcp", s.conf.GRPCListenAddress) + l, err := net.Listen("tcp", s.conf.GRPCListenAddress) if err != nil { return errors.Wrap(err, "while starting GRPC listener") } + s.GRPCListeners = append(s.GRPCListeners, l) // Start serving GRPC Requests s.wg.Go(func() { s.log.Infof("GRPC Listening on %s ...", s.conf.GRPCListenAddress) - if err := s.grpcSrv.Serve(s.GRPCListener); err != nil { + if err := s.grpcSrvs[0].Serve(l); err != nil { s.log.WithError(err).Error("while starting GRPC server") } }) + var gatewayAddr string + if s.conf.ServerTLS() != nil { + // We start a new local GRPC instance because we can't guarantee the TLS cert provided by the + // user has localhost or the local interface included in the certs valid hostnames. If they are not + // included it means the local gateway connections will not be able to connect. + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return errors.Wrap(err, "while starting GRPC Gateway listener") + } + s.GRPCListeners = append(s.GRPCListeners, l) + + s.wg.Go(func() { + s.log.Infof("GRPC Gateway Listening on %s ...", l.Addr()) + if err := s.grpcSrvs[1].Serve(l); err != nil { + s.log.WithError(err).Error("while starting GRPC Gateway server") + } + }) + gatewayAddr = l.Addr().String() + } else { + gatewayAddr, err = ResolveHostIP(s.conf.GRPCListenAddress) + if err != nil { + return errors.Wrap(err, "while resolving GRPC gateway client address") + } + } + switch s.conf.PeerDiscoveryType { case "k8s": // Source our list of peers from kubernetes endpoint API @@ -162,8 +186,7 @@ func (s *Daemon) Start(ctx context.Context) error { gateway := runtime.NewServeMux() var gwCtx context.Context gwCtx, s.gwCancel = context.WithCancel(context.Background()) - err = RegisterV1HandlerFromEndpoint(gwCtx, gateway, - s.conf.GRPCListenAddress, []grpc.DialOption{grpc.WithInsecure()}) + err = RegisterV1HandlerFromEndpoint(gwCtx, gateway, gatewayAddr, []grpc.DialOption{grpc.WithInsecure()}) if err != nil { return errors.Wrap(err, "while registering GRPC gateway handler") } @@ -207,7 +230,11 @@ func (s *Daemon) Start(ctx context.Context) error { } // Validate we can reach the GRPC and HTTP endpoints before returning - if err := WaitForConnect(ctx, []string{s.conf.HTTPListenAddress, s.conf.GRPCListenAddress}); err != nil { + addrs := []string{s.conf.HTTPListenAddress} + for _, l := range s.GRPCListeners { + addrs = append(addrs, l.Addr().String()) + } + if err := WaitForConnect(ctx, addrs); err != nil { return err } @@ -226,13 +253,15 @@ func (s *Daemon) Close() { s.log.Infof("HTTP Gateway close for %s ...", s.conf.HTTPListenAddress) s.httpSrv.Shutdown(context.Background()) - s.log.Infof("GRPC close for %s ...", s.conf.GRPCListenAddress) - s.grpcSrv.GracefulStop() + for i, srv := range s.grpcSrvs { + s.log.Infof("GRPC close for %s ...", s.GRPCListeners[i].Addr()) + srv.GracefulStop() + } s.wg.Stop() s.statsHandler.Close() s.gwCancel() s.httpSrv = nil - s.grpcSrv = nil + s.grpcSrvs = nil } // SetPeers sets the peers for this daemon diff --git a/functional_test.go b/functional_test.go index fb42cae6..cb2d6e21 100644 --- a/functional_test.go +++ b/functional_test.go @@ -518,7 +518,7 @@ func TestResetRemaining(t *testing.T) { } func TestHealthCheck(t *testing.T) { - client, err := guber.DialV1Server(cluster.DaemonAt(0).GRPCListener.Addr().String(), nil) + client, err := guber.DialV1Server(cluster.DaemonAt(0).GRPCListeners[0].Addr().String(), nil) require.NoError(t, err) // Check that the cluster is healthy to start with diff --git a/go.mod b/go.mod index e22cee6b..1411a37e 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/mailgun/holster/v3 v3.14.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.1.0 - github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.6.0 github.com/segmentio/fasthash v1.0.2 github.com/sirupsen/logrus v1.4.2 diff --git a/gubernator.go b/gubernator.go index bf76b785..ee398cb3 100644 --- a/gubernator.go +++ b/gubernator.go @@ -50,8 +50,8 @@ type V1Instance struct { // NewV1Instance instantiate a single instance of a gubernator peer and registers this // instance with the provided GRPCServer. func NewV1Instance(conf Config) (*V1Instance, error) { - if conf.GRPCServer == nil { - return nil, errors.New("GRPCServer instance is required") + if conf.GRPCServers == nil { + return nil, errors.New("At least one GRPCServer instance is required") } if err := conf.SetDefaults(); err != nil { @@ -67,9 +67,11 @@ func NewV1Instance(conf Config) (*V1Instance, error) { s.global = newGlobalManager(conf.Behaviors, &s) s.mutliRegion = newMultiRegionManager(conf.Behaviors, &s) - // Register our server with GRPC - RegisterV1Server(conf.GRPCServer, &s) - RegisterPeersV1Server(conf.GRPCServer, &s) + // Register our instance with all GRPC servers + for _, srv := range conf.GRPCServers { + RegisterV1Server(srv, &s) + RegisterPeersV1Server(srv, &s) + } if s.conf.Loader == nil { return &s, nil diff --git a/store_test.go b/store_test.go index 37872c06..d6339e6a 100644 --- a/store_test.go +++ b/store_test.go @@ -36,14 +36,14 @@ type v1Server struct { } func (s *v1Server) Close() { - s.conf.GRPCServer.GracefulStop() + s.conf.GRPCServers[0].GracefulStop() s.srv.Close() } // Start a single instance of V1Server with the provided config and listening address. func newV1Server(t *testing.T, address string, conf gubernator.Config) *v1Server { t.Helper() - conf.GRPCServer = grpc.NewServer() + conf.GRPCServers = append(conf.GRPCServers, grpc.NewServer()) srv, err := gubernator.NewV1Instance(conf) require.NoError(t, err) @@ -52,7 +52,7 @@ func newV1Server(t *testing.T, address string, conf gubernator.Config) *v1Server require.NoError(t, err) go func() { - if err := conf.GRPCServer.Serve(listener); err != nil { + if err := conf.GRPCServers[0].Serve(listener); err != nil { fmt.Printf("while serving: %s\n", err) } }() diff --git a/tls.go b/tls.go index 3b967860..d149dd8b 100644 --- a/tls.go +++ b/tls.go @@ -118,7 +118,12 @@ func fromFile(name string) (*bytes.Buffer, error) { func SetupTLS(conf *TLSConfig) error { var err error - if conf == nil || conf.ServerTLS == nil { + if conf == nil { + return nil + } + + // If both client and server tls configs provided, nothing to do! + if conf.ServerTLS != nil && conf.ClientTLS != nil { return nil } @@ -126,7 +131,7 @@ func SetupTLS(conf *TLSConfig) error { conf.Logger.Info("Detected TLS Configuration") // Basic config with reasonably secure defaults - conf.ServerTLS = &tls.Config{ + setter.SetDefault(&conf.ServerTLS, &tls.Config{ CipherSuites: []uint16{ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, @@ -151,8 +156,8 @@ func SetupTLS(conf *TLSConfig) error { NextProtos: []string{ "h2", "http/1.1", // enable HTTP/2 }, - } - conf.ClientTLS = &tls.Config{} + }) + setter.SetDefault(&conf.ClientTLS, &tls.Config{}) // Attempt to load any files provided conf.CaPEM, err = fromFile(conf.CaFile) diff --git a/tls_test.go b/tls_test.go index 0cb54c65..c005bb94 100644 --- a/tls_test.go +++ b/tls_test.go @@ -227,11 +227,11 @@ func TestTLSClusterWithClientAuthentication(t *testing.T) { peers := []gubernator.PeerInfo{ { - GRPCAddress: d1.GRPCListener.Addr().String(), + GRPCAddress: d1.GRPCListeners[0].Addr().String(), HTTPAddress: d1.HTTPListener.Addr().String(), }, { - GRPCAddress: d2.GRPCListener.Addr().String(), + GRPCAddress: d2.GRPCListeners[0].Addr().String(), HTTPAddress: d2.HTTPListener.Addr().String(), }, } @@ -258,3 +258,33 @@ func TestTLSClusterWithClientAuthentication(t *testing.T) { // Should have called GetPeerRateLimits on d2 assert.Contains(t, string(b), `{method="/pb.gubernator.PeersV1/GetPeerRateLimits"} 1`) } + +func TestHTTPSClientAuth(t *testing.T) { + conf := gubernator.DaemonConfig{ + GRPCListenAddress: "127.0.0.1:9695", + HTTPListenAddress: "127.0.0.1:9685", + TLS: &gubernator.TLSConfig{ + CaFile: "certs/ca.pem", + CertFile: "certs/gubernator.pem", + KeyFile: "certs/gubernator.key", + ClientAuth: tls.RequireAndVerifyClientCert, + }, + } + + d := spawnDaemon(t, conf) + defer d.Close() + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: conf.TLS.ClientTLS, + }, + } + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("https://%s/v1/HealthCheck", conf.HTTPListenAddress), nil) + require.NoError(t, err) + resp, err := client.Do(req) + require.NoError(t, err) + b, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, `{"status":"healthy"}`, string(b)) +}