Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
HTTP Gateway now uses a separate GRPC port when TLS is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Nov 11, 2020
1 parent 4174455 commit 28c7e67
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 69 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Added TLS Support for both GRPC and HTTP interfaces #76
* Prometheus metrics are now prefixed with `gubernator_`
* Switched prometheus Histograms to Summary's
* Changed gubernator.Config.GRPCServer to GRPCServers to support registering
with GRPC instances on multiple ports.
* Gubernator now opens a second GRPC instance on a random localhost port when
TLS is enabled for use by the HTTP API Gateway.

## [1.0.0-rc.2] - 2020-11-05
### Change
Expand Down
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ don't have either, the docker-compose method is the simplest way to try gubernat
$ docker run -p 8081:81 -p 9080:80 -e GUBER_ETCD_ENDPOINTS=etcd1:2379,etcd2:2379 \
thrawn01/gubernator:latest

# Hit the API at localhost:9080
# Hit the HTTP API at localhost:9080
$ curl http://localhost:9080/v1/HealthCheck
```

Expand All @@ -271,7 +271,7 @@ $ vi docker-compose.yaml
# Run the docker container
$ docker-compose up -d

# Hit the API at localhost:9080 (GRPC is at 9081)
# Hit the HTTP API at localhost:9080 (GRPC is at 9081)
$ curl http://localhost:9080/v1/HealthCheck
```

Expand All @@ -287,16 +287,24 @@ $ vi k8s-deployment.yaml
$ kubectl create -f k8s-deployment.yaml
```

##### TLS
Gubernator supports TLS for both HTTP and GRPC connections. You can see an example with
self signed certs by running `docker-compose-tls.yaml`
```bash
# Run docker compose
$ docker-compose -f docker-compose-tls.yaml up -d

# Hit the HTTP API at localhost:9080 (GRPC is at 9081)
$ curl --cacert certs/ca.pem --cert certs/gubernator.pem --key certs/gubernator.key https://localhost:9080/v1/HealthCheck
`
### Configuration
Gubernator is configured via environment variables with an optional `--config` flag
which takes a file of key/values and places them into the local environment before startup.
See the `example.conf` for all available config options and their descriptions.

### Architecture
See [architecture.md](/architecture.md) for a full description of the architecture and the inner
workings of gubernator.


2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func StartWith(localPeers []gubernator.PeerInfo) error {

// Add the peers and daemons to the package level variables
peers = append(peers, gubernator.PeerInfo{
GRPCAddress: d.GRPCListener.Addr().String(),
GRPCAddress: d.GRPCListeners[0].Addr().String(),
HTTPAddress: d.HTTPListener.Addr().String(),
})
daemons = append(daemons, d)
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func TestStartMultipleDaemons(t *testing.T) {
daemons := cluster.GetDaemons()
assert.Equal(t, wantPeers, cluster.GetPeers())
assert.Equal(t, 2, len(daemons))
assert.Equal(t, "127.0.0.1:1111", daemons[0].GRPCListener.Addr().String())
assert.Equal(t, "127.0.0.1:2222", daemons[1].GRPCListener.Addr().String())
assert.Equal(t, "127.0.0.1:2222", cluster.DaemonAt(1).GRPCListener.Addr().String())
assert.Equal(t, "127.0.0.1:1111", daemons[0].GRPCListeners[0].Addr().String())
assert.Equal(t, "127.0.0.1:2222", daemons[1].GRPCListeners[0].Addr().String())
assert.Equal(t, "127.0.0.1:2222", cluster.DaemonAt(1).GRPCListeners[0].Addr().String())
assert.Equal(t, "127.0.0.1:2222", cluster.PeerAt(1).GRPCAddress)
}

Expand Down
54 changes: 28 additions & 26 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type BehaviorConfig struct {

// config for a gubernator instance
type Config struct {
// Required
GRPCServer *grpc.Server
// (Required) A list of GRPC servers to register our instance with
GRPCServers []*grpc.Server

// (Optional) Adjust how gubernator behaviors are configured
Behaviors BehaviorConfig
Expand Down Expand Up @@ -267,32 +267,34 @@ func SetupDaemonConfig(logger *logrus.Logger, configFile string) (DaemonConfig,
}

// TLS Config
setter.SetDefault(&conf.TLS, &TLSConfig{})
setter.SetDefault(&conf.TLS.CaFile, os.Getenv("GUBER_TLS_CA"))
setter.SetDefault(&conf.TLS.CaKeyFile, os.Getenv("GUBER_TLS_CA_KEY"))
setter.SetDefault(&conf.TLS.KeyFile, os.Getenv("GUBER_TLS_KEY"))
setter.SetDefault(&conf.TLS.CertFile, os.Getenv("GUBER_TLS_CERT"))
setter.SetDefault(&conf.TLS.AutoTLS, getEnvBool(log, "GUBER_TLS_AUTO"))

clientAuth := os.Getenv("GUBER_TLS_CLIENT_AUTH")
if clientAuth != "" {
clientAuthTypes := map[string]tls.ClientAuthType{
"request-cert": tls.RequestClientCert,
"verify-cert": tls.VerifyClientCertIfGiven,
"require-any-cert": tls.RequireAnyClientCert,
"require-and-verify": tls.RequireAndVerifyClientCert,
}
t, ok := clientAuthTypes[clientAuth]
if !ok {
return conf, errors.Errorf("'GUBER_TLS_CLIENT_AUTH=%s' is invalid; choices are [%s]",
clientAuth, validClientAuthTypes(clientAuthTypes))
if anyHasPrefix("GUBER_TLS_", os.Environ()) {
conf.TLS = &TLSConfig{}
setter.SetDefault(&conf.TLS.CaFile, os.Getenv("GUBER_TLS_CA"))
setter.SetDefault(&conf.TLS.CaKeyFile, os.Getenv("GUBER_TLS_CA_KEY"))
setter.SetDefault(&conf.TLS.KeyFile, os.Getenv("GUBER_TLS_KEY"))
setter.SetDefault(&conf.TLS.CertFile, os.Getenv("GUBER_TLS_CERT"))
setter.SetDefault(&conf.TLS.AutoTLS, getEnvBool(log, "GUBER_TLS_AUTO"))

clientAuth := os.Getenv("GUBER_TLS_CLIENT_AUTH")
if clientAuth != "" {
clientAuthTypes := map[string]tls.ClientAuthType{
"request-cert": tls.RequestClientCert,
"verify-cert": tls.VerifyClientCertIfGiven,
"require-any-cert": tls.RequireAnyClientCert,
"require-and-verify": tls.RequireAndVerifyClientCert,
}
t, ok := clientAuthTypes[clientAuth]
if !ok {
return conf, errors.Errorf("'GUBER_TLS_CLIENT_AUTH=%s' is invalid; choices are [%s]",
clientAuth, validClientAuthTypes(clientAuthTypes))
}
conf.TLS.ClientAuth = t
}
conf.TLS.ClientAuth = t
setter.SetDefault(&conf.TLS.ClientAuthKeyFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_KEY"))
setter.SetDefault(&conf.TLS.ClientAuthCertFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CERT"))
setter.SetDefault(&conf.TLS.ClientAuthCaFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CA_CERT"))
setter.SetDefault(&conf.TLS.InsecureSkipVerify, getEnvBool(log, "GUBER_TLS_INSECURE_SKIP_VERIFY"))
}
setter.SetDefault(&conf.TLS.ClientAuthKeyFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_KEY"))
setter.SetDefault(&conf.TLS.ClientAuthCertFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CERT"))
setter.SetDefault(&conf.TLS.ClientAuthCaFile, os.Getenv("GUBER_TLS_CLIENT_AUTH_CA_CERT"))
setter.SetDefault(&conf.TLS.InsecureSkipVerify, getEnvBool(log, "GUBER_TLS_INSECURE_SKIP_VERIFY"))

// ETCD Config
setter.SetDefault(&conf.EtcdPoolConf.KeyPrefix, os.Getenv("GUBER_ETCD_KEY_PREFIX"), "/gubernator-peers")
Expand Down
65 changes: 47 additions & 18 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (
)

type Daemon struct {
GRPCListener net.Listener
HTTPListener net.Listener
V1Server *V1Instance
GRPCListeners []net.Listener
HTTPListener net.Listener
V1Server *V1Instance

log logrus.FieldLogger
pool PoolInterface
conf DaemonConfig
httpSrv *http.Server
grpcSrv *grpc.Server
grpcSrvs []*grpc.Server
wg syncutil.WaitGroup
statsHandler *GRPCStatsHandler
promRegister *prometheus.Registry
Expand Down Expand Up @@ -91,19 +91,17 @@ func (s *Daemon) Start(ctx context.Context) error {
}

if s.conf.ServerTLS() != nil {
creds := credentials.NewTLS(s.conf.ServerTLS())
opts = append(opts, grpc.Creds(creds))
// Create two GRPC server instances, one for TLS and the other for the API Gateway
s.grpcSrvs = append(s.grpcSrvs, grpc.NewServer(append(opts, grpc.Creds(credentials.NewTLS(s.conf.ServerTLS())))...))
}

// New GRPC server
s.grpcSrv = grpc.NewServer(opts...)
s.grpcSrvs = append(s.grpcSrvs, grpc.NewServer(opts...))

// Registers a new gubernator instance with the GRPC server
s.V1Server, err = NewV1Instance(Config{
PeerTLS: s.conf.ClientTLS(),
DataCenter: s.conf.DataCenter,
LocalPicker: s.conf.Picker,
GRPCServer: s.grpcSrv,
GRPCServers: s.grpcSrvs,
Logger: s.log,
Cache: cache,
})
Expand All @@ -114,19 +112,45 @@ func (s *Daemon) Start(ctx context.Context) error {
// V1Server instance also implements prometheus.Collector interface
s.promRegister.Register(s.V1Server)

s.GRPCListener, err = net.Listen("tcp", s.conf.GRPCListenAddress)
l, err := net.Listen("tcp", s.conf.GRPCListenAddress)
if err != nil {
return errors.Wrap(err, "while starting GRPC listener")
}
s.GRPCListeners = append(s.GRPCListeners, l)

// Start serving GRPC Requests
s.wg.Go(func() {
s.log.Infof("GRPC Listening on %s ...", s.conf.GRPCListenAddress)
if err := s.grpcSrv.Serve(s.GRPCListener); err != nil {
if err := s.grpcSrvs[0].Serve(l); err != nil {
s.log.WithError(err).Error("while starting GRPC server")
}
})

var gatewayAddr string
if s.conf.ServerTLS() != nil {
// We start a new local GRPC instance because we can't guarantee the TLS cert provided by the
// user has localhost or the local interface included in the certs valid hostnames. If they are not
// included it means the local gateway connections will not be able to connect.
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return errors.Wrap(err, "while starting GRPC Gateway listener")
}
s.GRPCListeners = append(s.GRPCListeners, l)

s.wg.Go(func() {
s.log.Infof("GRPC Gateway Listening on %s ...", l.Addr())
if err := s.grpcSrvs[1].Serve(l); err != nil {
s.log.WithError(err).Error("while starting GRPC Gateway server")
}
})
gatewayAddr = l.Addr().String()
} else {
gatewayAddr, err = ResolveHostIP(s.conf.GRPCListenAddress)
if err != nil {
return errors.Wrap(err, "while resolving GRPC gateway client address")
}
}

switch s.conf.PeerDiscoveryType {
case "k8s":
// Source our list of peers from kubernetes endpoint API
Expand Down Expand Up @@ -162,8 +186,7 @@ func (s *Daemon) Start(ctx context.Context) error {
gateway := runtime.NewServeMux()
var gwCtx context.Context
gwCtx, s.gwCancel = context.WithCancel(context.Background())
err = RegisterV1HandlerFromEndpoint(gwCtx, gateway,
s.conf.GRPCListenAddress, []grpc.DialOption{grpc.WithInsecure()})
err = RegisterV1HandlerFromEndpoint(gwCtx, gateway, gatewayAddr, []grpc.DialOption{grpc.WithInsecure()})
if err != nil {
return errors.Wrap(err, "while registering GRPC gateway handler")
}
Expand Down Expand Up @@ -207,7 +230,11 @@ func (s *Daemon) Start(ctx context.Context) error {
}

// Validate we can reach the GRPC and HTTP endpoints before returning
if err := WaitForConnect(ctx, []string{s.conf.HTTPListenAddress, s.conf.GRPCListenAddress}); err != nil {
addrs := []string{s.conf.HTTPListenAddress}
for _, l := range s.GRPCListeners {
addrs = append(addrs, l.Addr().String())
}
if err := WaitForConnect(ctx, addrs); err != nil {
return err
}

Expand All @@ -226,13 +253,15 @@ func (s *Daemon) Close() {

s.log.Infof("HTTP Gateway close for %s ...", s.conf.HTTPListenAddress)
s.httpSrv.Shutdown(context.Background())
s.log.Infof("GRPC close for %s ...", s.conf.GRPCListenAddress)
s.grpcSrv.GracefulStop()
for i, srv := range s.grpcSrvs {
s.log.Infof("GRPC close for %s ...", s.GRPCListeners[i].Addr())
srv.GracefulStop()
}
s.wg.Stop()
s.statsHandler.Close()
s.gwCancel()
s.httpSrv = nil
s.grpcSrv = nil
s.grpcSrvs = nil
}

// SetPeers sets the peers for this daemon
Expand Down
2 changes: 1 addition & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func TestResetRemaining(t *testing.T) {
}

func TestHealthCheck(t *testing.T) {
client, err := guber.DialV1Server(cluster.DaemonAt(0).GRPCListener.Addr().String(), nil)
client, err := guber.DialV1Server(cluster.DaemonAt(0).GRPCListeners[0].Addr().String(), nil)
require.NoError(t, err)

// Check that the cluster is healthy to start with
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/mailgun/holster/v3 v3.14.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/segmentio/fasthash v1.0.2
github.com/sirupsen/logrus v1.4.2
Expand Down
12 changes: 7 additions & 5 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type V1Instance struct {
// NewV1Instance instantiate a single instance of a gubernator peer and registers this
// instance with the provided GRPCServer.
func NewV1Instance(conf Config) (*V1Instance, error) {
if conf.GRPCServer == nil {
return nil, errors.New("GRPCServer instance is required")
if conf.GRPCServers == nil {
return nil, errors.New("At least one GRPCServer instance is required")
}

if err := conf.SetDefaults(); err != nil {
Expand All @@ -67,9 +67,11 @@ func NewV1Instance(conf Config) (*V1Instance, error) {
s.global = newGlobalManager(conf.Behaviors, &s)
s.mutliRegion = newMultiRegionManager(conf.Behaviors, &s)

// Register our server with GRPC
RegisterV1Server(conf.GRPCServer, &s)
RegisterPeersV1Server(conf.GRPCServer, &s)
// Register our instance with all GRPC servers
for _, srv := range conf.GRPCServers {
RegisterV1Server(srv, &s)
RegisterPeersV1Server(srv, &s)
}

if s.conf.Loader == nil {
return &s, nil
Expand Down
6 changes: 3 additions & 3 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type v1Server struct {
}

func (s *v1Server) Close() {
s.conf.GRPCServer.GracefulStop()
s.conf.GRPCServers[0].GracefulStop()
s.srv.Close()
}

// Start a single instance of V1Server with the provided config and listening address.
func newV1Server(t *testing.T, address string, conf gubernator.Config) *v1Server {
t.Helper()
conf.GRPCServer = grpc.NewServer()
conf.GRPCServers = append(conf.GRPCServers, grpc.NewServer())

srv, err := gubernator.NewV1Instance(conf)
require.NoError(t, err)
Expand All @@ -52,7 +52,7 @@ func newV1Server(t *testing.T, address string, conf gubernator.Config) *v1Server
require.NoError(t, err)

go func() {
if err := conf.GRPCServer.Serve(listener); err != nil {
if err := conf.GRPCServers[0].Serve(listener); err != nil {
fmt.Printf("while serving: %s\n", err)
}
}()
Expand Down
13 changes: 9 additions & 4 deletions tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,20 @@ func fromFile(name string) (*bytes.Buffer, error) {
func SetupTLS(conf *TLSConfig) error {
var err error

if conf == nil || conf.ServerTLS == nil {
if conf == nil {
return nil
}

// If both client and server tls configs provided, nothing to do!
if conf.ServerTLS != nil && conf.ClientTLS != nil {
return nil
}

setter.SetDefault(&conf.Logger, logrus.WithField("category", "gubernator"))
conf.Logger.Info("Detected TLS Configuration")

// Basic config with reasonably secure defaults
conf.ServerTLS = &tls.Config{
setter.SetDefault(&conf.ServerTLS, &tls.Config{
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
Expand All @@ -151,8 +156,8 @@ func SetupTLS(conf *TLSConfig) error {
NextProtos: []string{
"h2", "http/1.1", // enable HTTP/2
},
}
conf.ClientTLS = &tls.Config{}
})
setter.SetDefault(&conf.ClientTLS, &tls.Config{})

// Attempt to load any files provided
conf.CaPEM, err = fromFile(conf.CaFile)
Expand Down
Loading

0 comments on commit 28c7e67

Please sign in to comment.