Skip to content

Commit c69a1e1

Browse files
Michael Mokryszameowlia
Michael Mokrysz
authored andcommitted
Support talking to NATS over mTLS
Until now Gorouter has been unable to encrypt its connection to NATS. Mutual TLS has been added for other components that talk to NATS, but not to Gorouter. This commit adds support for configuring the NATS connection with Mutual TLS. This code is based heavily on the implementation in route-emitter [1]. The config YAML's structure gets some changes: the username and password is no longer provided separately for every NATS machine. This isn't a limitation in real-world Cloud Foundry deployments. Port is repeated for each NATS machine so that the integration tests can still run multiple NATS on multiple ports. [1] https://github.com/cloudfoundry/route-emitter/tree/master/diegonats
1 parent a4f717b commit c69a1e1

File tree

6 files changed

+142
-53
lines changed

6 files changed

+142
-53
lines changed

config/config.go

+39-17
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,25 @@ var defaultStatusConfig = StatusConfig{
7878
}
7979

8080
type NatsConfig struct {
81-
Host string `yaml:"host"`
82-
Port uint16 `yaml:"port"`
83-
User string `yaml:"user"`
84-
Pass string `yaml:"pass"`
81+
Hosts []NatsHost `yaml:"hosts"`
82+
User string `yaml:"user"`
83+
Pass string `yaml:"pass"`
84+
TLSEnabled bool `yaml:"tls_enabled"`
85+
CACerts string `yaml:"ca_certs"`
86+
CAPool *x509.CertPool `yaml:"-"`
87+
ClientAuthCertificate tls.Certificate `yaml:"-"`
88+
TLSPem `yaml:",inline"` // embed to get cert_chain and private_key for client authentication
89+
}
90+
91+
type NatsHost struct {
92+
Hostname string
93+
Port uint16
94+
}
95+
96+
var defaultNatsConfig = NatsConfig{
97+
Hosts: []NatsHost{{Hostname: "localhost", Port: 42222}},
98+
User: "",
99+
Pass: "",
85100
}
86101

87102
type RoutingApiConfig struct {
@@ -94,13 +109,6 @@ type RoutingApiConfig struct {
94109
TLSPem `yaml:",inline"` // embed to get cert_chain and private_key for client authentication
95110
}
96111

97-
var defaultNatsConfig = NatsConfig{
98-
Host: "localhost",
99-
Port: 4222,
100-
User: "",
101-
Pass: "",
102-
}
103-
104112
type OAuthConfig struct {
105113
TokenEndpoint string `yaml:"token_endpoint"`
106114
Port int `yaml:"port"`
@@ -181,7 +189,7 @@ type HTTPRewriteResponses struct {
181189

182190
type Config struct {
183191
Status StatusConfig `yaml:"status,omitempty"`
184-
Nats []NatsConfig `yaml:"nats,omitempty"`
192+
Nats NatsConfig `yaml:"nats,omitempty"`
185193
Logging LoggingConfig `yaml:"logging,omitempty"`
186194
Port uint16 `yaml:"port,omitempty"`
187195
Index uint `yaml:"index,omitempty"`
@@ -283,7 +291,7 @@ type Config struct {
283291

284292
var defaultConfig = Config{
285293
Status: defaultStatusConfig,
286-
Nats: []NatsConfig{defaultNatsConfig},
294+
Nats: defaultNatsConfig,
287295
Logging: defaultLoggingConfig,
288296
Port: 8081,
289297
Index: 0,
@@ -399,6 +407,21 @@ func (c *Config) Process() error {
399407
c.RoutingApi.CAPool = certPool
400408
}
401409

410+
if c.Nats.TLSEnabled {
411+
certificate, err := tls.X509KeyPair([]byte(c.Nats.CertChain), []byte(c.Nats.PrivateKey))
412+
if err != nil {
413+
errMsg := fmt.Sprintf("Error loading NATS key pair: %s", err.Error())
414+
return fmt.Errorf(errMsg)
415+
}
416+
c.Nats.ClientAuthCertificate = certificate
417+
418+
certPool := x509.NewCertPool()
419+
if ok := certPool.AppendCertsFromPEM([]byte(c.Nats.CACerts)); !ok {
420+
return fmt.Errorf("Error while adding CACerts to gorouter's routing-api cert pool: \n%s\n", c.Nats.CACerts)
421+
}
422+
c.Nats.CAPool = certPool
423+
}
424+
402425
if c.EnableSSL {
403426
switch c.ClientCertificateValidationString {
404427
case "none":
@@ -650,11 +673,11 @@ func convertCipherStringToInt(cipherStrs []string, cipherMap map[string]uint16)
650673

651674
func (c *Config) NatsServers() []string {
652675
var natsServers []string
653-
for _, info := range c.Nats {
676+
for _, host := range c.Nats.Hosts {
654677
uri := url.URL{
655678
Scheme: "nats",
656-
User: url.UserPassword(info.User, info.Pass),
657-
Host: fmt.Sprintf("%s:%d", info.Host, info.Port),
679+
User: url.UserPassword(c.Nats.User, c.Nats.Pass),
680+
Host: fmt.Sprintf("%s:%d", host.Hostname, host.Port),
658681
}
659682
natsServers = append(natsServers, uri.String())
660683
}
@@ -667,7 +690,6 @@ func (c *Config) RoutingApiEnabled() bool {
667690
}
668691

669692
func (c *Config) Initialize(configYAML []byte) error {
670-
c.Nats = []NatsConfig{}
671693
return yaml.Unmarshal(configYAML, &c)
672694
}
673695

config/config_test.go

+75-20
Original file line numberDiff line numberDiff line change
@@ -116,22 +116,78 @@ endpoint_keep_alive_probe_interval: 500ms
116116
Expect(config.EndpointKeepAliveProbeInterval).To(Equal(500 * time.Millisecond))
117117
})
118118

119-
It("sets nats config", func() {
120-
var b = []byte(`
119+
Context("NATS Config", func() {
120+
It("handles basic nats config", func() {
121+
var b = []byte(`
121122
nats:
122-
- host: remotehost
123+
user: user
124+
pass: pass
125+
hosts:
126+
- hostname: remotehost
123127
port: 4223
124-
user: user
125-
pass: pass
126128
`)
127-
err := config.Initialize(b)
128-
Expect(err).ToNot(HaveOccurred())
129+
err := config.Initialize(b)
130+
Expect(err).ToNot(HaveOccurred())
131+
132+
Expect(config.Nats.User).To(Equal("user"))
133+
Expect(config.Nats.Pass).To(Equal("pass"))
134+
Expect(config.Nats.Hosts).To(HaveLen(1))
135+
Expect(config.Nats.Hosts[0].Hostname).To(Equal("remotehost"))
136+
Expect(config.Nats.Hosts[0].Port).To(Equal(uint16(4223)))
137+
})
138+
139+
Context("when TLSEnabled is set to true", func() {
140+
var (
141+
err error
142+
configSnippet *Config
143+
caCert tls.Certificate
144+
clientPair tls.Certificate
145+
)
146+
147+
createYMLSnippet := func(snippet *Config) []byte {
148+
cfgBytes, err := yaml.Marshal(snippet)
149+
Expect(err).ToNot(HaveOccurred())
150+
return cfgBytes
151+
}
152+
153+
BeforeEach(func() {
154+
caCertChain := test_util.CreateSignedCertWithRootCA(test_util.CertNames{CommonName: "spinach.com"})
155+
clientKeyPEM, clientCertPEM := test_util.CreateKeyPair("potato.com")
156+
157+
caCert, err = tls.X509KeyPair(append(caCertChain.CertPEM, caCertChain.CACertPEM...), caCertChain.PrivKeyPEM)
158+
Expect(err).ToNot(HaveOccurred())
159+
clientPair, err = tls.X509KeyPair(clientCertPEM, clientKeyPEM)
160+
Expect(err).ToNot(HaveOccurred())
161+
162+
configSnippet = &Config{
163+
Nats: NatsConfig{
164+
TLSEnabled: true,
165+
CACerts: fmt.Sprintf("%s%s", caCertChain.CertPEM, caCertChain.CACertPEM),
166+
TLSPem: TLSPem{
167+
CertChain: string(clientCertPEM),
168+
PrivateKey: string(clientKeyPEM),
169+
},
170+
},
171+
}
172+
})
129173

130-
Expect(config.Nats).To(HaveLen(1))
131-
Expect(config.Nats[0].Host).To(Equal("remotehost"))
132-
Expect(config.Nats[0].Port).To(Equal(uint16(4223)))
133-
Expect(config.Nats[0].User).To(Equal("user"))
134-
Expect(config.Nats[0].Pass).To(Equal("pass"))
174+
It("configures TLS", func() {
175+
configBytes := createYMLSnippet(configSnippet)
176+
err = config.Initialize(configBytes)
177+
Expect(err).NotTo(HaveOccurred())
178+
err = config.Process()
179+
Expect(err).NotTo(HaveOccurred())
180+
181+
Expect(config.Nats.CAPool).ToNot(BeNil())
182+
poolSubjects := config.Nats.CAPool.Subjects()
183+
parsedCert, err := x509.ParseCertificate(caCert.Certificate[0])
184+
Expect(err).NotTo(HaveOccurred())
185+
expectedSubject := parsedCert.RawSubject
186+
187+
Expect(string(poolSubjects[0])).To(Equal(string(expectedSubject)))
188+
Expect(config.Nats.ClientAuthCertificate).To(Equal(clientPair))
189+
})
190+
})
135191
})
136192

137193
Context("Suspend Pruning option", func() {
@@ -752,14 +808,13 @@ secure_cookies: false
752808
Describe("NatsServers", func() {
753809
var b = []byte(`
754810
nats:
755-
- host: remotehost
756-
port: 4223
757-
user: user
758-
pass: pass
759-
- host: remotehost2
811+
user: user
812+
pass: pass
813+
hosts:
814+
- hostname: remotehost
760815
port: 4223
761-
user: user2
762-
pass: pass2
816+
- hostname: remotehost2
817+
port: 4224
763818
`)
764819

765820
It("returns a slice of the configured NATS servers", func() {
@@ -768,7 +823,7 @@ nats:
768823

769824
natsServers := config.NatsServers()
770825
Expect(natsServers[0]).To(Equal("nats://user:pass@remotehost:4223"))
771-
Expect(natsServers[1]).To(Equal("nats://user2:pass2@remotehost2:4223"))
826+
Expect(natsServers[1]).To(Equal("nats://user:pass@remotehost2:4224"))
772827
})
773828
})
774829

integration/common_integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func (s *testState) registerAndWait(rm mbus.RegistryMessage) {
254254
func (s *testState) StartGorouter() *Session {
255255
Expect(s.cfg).NotTo(BeNil(), "set up test cfg before calling this function")
256256

257-
s.natsRunner = test_util.NewNATSRunner(int(s.cfg.Nats[0].Port))
257+
s.natsRunner = test_util.NewNATSRunner(int(s.cfg.Nats.Hosts[0].Port))
258258
s.natsRunner.Start()
259259

260260
var err error

integration/main_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1111,14 +1111,14 @@ func hostnameAndPort(url string) (string, int) {
11111111
}
11121112

11131113
func newMessageBus(c *config.Config) (*nats.Conn, error) {
1114-
natsMembers := make([]string, len(c.Nats))
1114+
natsMembers := make([]string, len(c.Nats.Hosts))
11151115
options := nats.DefaultOptions
11161116
options.PingInterval = 200 * time.Millisecond
1117-
for _, info := range c.Nats {
1117+
for _, host := range c.Nats.Hosts {
11181118
uri := url.URL{
11191119
Scheme: "nats",
1120-
User: url.UserPassword(info.User, info.Pass),
1121-
Host: fmt.Sprintf("%s:%d", info.Host, info.Port),
1120+
User: url.UserPassword(c.Nats.User, c.Nats.Pass),
1121+
Host: fmt.Sprintf("%s:%d", host.Hostname, host.Port),
11221122
}
11231123
natsMembers = append(natsMembers, uri.String())
11241124
}

mbus/client.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"code.cloudfoundry.org/gorouter/config"
1010
"code.cloudfoundry.org/gorouter/logger"
11+
"code.cloudfoundry.org/tlsconfig"
1112
"github.com/nats-io/nats.go"
1213
"github.com/uber-go/zap"
1314
)
@@ -54,10 +55,20 @@ func Connect(c *config.Config, reconnected chan<- Signal, l logger.Logger) *nats
5455
}
5556

5657
func natsOptions(l logger.Logger, c *config.Config, natsHost *atomic.Value, reconnected chan<- Signal) nats.Options {
57-
natsServers := c.NatsServers()
58-
5958
options := nats.DefaultOptions
60-
options.Servers = natsServers
59+
options.Servers = c.NatsServers()
60+
if c.Nats.TLSEnabled {
61+
var err error
62+
options.TLSConfig, err = tlsconfig.Build(
63+
tlsconfig.WithInternalServiceDefaults(),
64+
tlsconfig.WithIdentity(c.Nats.ClientAuthCertificate),
65+
).Client(
66+
tlsconfig.WithAuthority(c.Nats.CAPool),
67+
)
68+
if err != nil {
69+
l.Fatal("nats-tls-config-invalid", zap.Object("error", err))
70+
}
71+
}
6172
options.PingInterval = c.NatsClientPingInterval
6273
options.MaxReconnect = -1
6374
notDisconnected := make(chan Signal)

test_util/helpers.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,15 @@ func generateConfig(statusPort, proxyPort uint16, natsPorts ...uint16) *config.C
240240
Pass: "pass",
241241
}
242242

243-
c.Nats = []config.NatsConfig{}
244-
for _, natsPort := range natsPorts {
245-
c.Nats = append(c.Nats, config.NatsConfig{
246-
Host: "localhost",
247-
Port: natsPort,
248-
User: "nats",
249-
Pass: "nats",
250-
})
243+
natsHosts := make([]config.NatsHost, len(natsPorts))
244+
for i, natsPort := range natsPorts {
245+
natsHosts[i].Hostname = "localhost"
246+
natsHosts[i].Port = natsPort
247+
}
248+
c.Nats = config.NatsConfig{
249+
User: "nats",
250+
Pass: "nats",
251+
Hosts: natsHosts,
251252
}
252253

253254
c.Logging.Level = "debug"

0 commit comments

Comments
 (0)