Skip to content

Commit 2f5e010

Browse files
authored
feat: add TLS config to enable xDS Auth (#13)
* feat: xDS auth
1 parent 5cd7573 commit 2f5e010

10 files changed

+246
-26
lines changed

core/manager/auth/jwt.go

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2023 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package auth
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
24+
"github.com/cloudwego/kitex/pkg/klog"
25+
"github.com/cloudwego/kitex/pkg/remote"
26+
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
27+
"github.com/cloudwego/kitex/pkg/rpcinfo"
28+
"github.com/cloudwego/kitex/transport"
29+
)
30+
31+
const (
32+
jwtTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" // token path in the pod.
33+
34+
jwtTokenKey = "Authorization" // Istiod gets the jwt from meta using this key.
35+
36+
// If Istio is deployed in a different cluster, set this env as the cluster id of this service.
37+
// Usually, we can get the value using "kubectl config get-clusters".
38+
clusterIDEnvKey = "ISTIO_META_CLUSTER_ID"
39+
40+
clusterIDMetadataKey = "clusterid" // Istiod retrieves clusterid and use it for auth of JWT.
41+
)
42+
43+
var (
44+
clusterID string
45+
jwtToken string
46+
)
47+
48+
func init() {
49+
// init clusterID
50+
clusterID = os.Getenv(clusterIDEnvKey)
51+
// watch jwtToken file
52+
if token, err := getJWTToken(); err != nil {
53+
klog.Warnf("[XDS] Auth, getJWTToken error=%s. Ignore this log if not deploying on multiple clusters.\n", err.Error())
54+
} else {
55+
jwtToken = token
56+
}
57+
}
58+
59+
// ClientHTTP2JwtHandler is used to set jwt token to http2 header
60+
var ClientHTTP2JwtHandler = &clientHTTP2JwtHandler{}
61+
62+
type clientHTTP2JwtHandler struct{}
63+
64+
var (
65+
_ remote.MetaHandler = ClientHTTP2JwtHandler
66+
_ remote.StreamingMetaHandler = ClientHTTP2JwtHandler
67+
)
68+
69+
func (*clientHTTP2JwtHandler) OnConnectStream(ctx context.Context) (context.Context, error) {
70+
ri := rpcinfo.GetRPCInfo(ctx)
71+
if !isGRPC(ri) {
72+
return ctx, nil
73+
}
74+
var md metadata.MD
75+
md, ok := metadata.FromOutgoingContext(ctx)
76+
if !ok {
77+
md = metadata.MD{}
78+
}
79+
// Set JWT and clusterID for auth
80+
md.Set(jwtTokenKey, jwtTokenValueFmt(jwtToken))
81+
md.Set(clusterIDMetadataKey, clusterID)
82+
return metadata.NewOutgoingContext(ctx, md), nil
83+
}
84+
85+
func (*clientHTTP2JwtHandler) OnReadStream(ctx context.Context) (context.Context, error) {
86+
return ctx, nil
87+
}
88+
89+
func (ch *clientHTTP2JwtHandler) WriteMeta(ctx context.Context, msg remote.Message) (context.Context, error) {
90+
return ctx, nil
91+
}
92+
93+
func (ch *clientHTTP2JwtHandler) ReadMeta(ctx context.Context, msg remote.Message) (context.Context, error) {
94+
return ctx, nil
95+
}
96+
97+
func isGRPC(ri rpcinfo.RPCInfo) bool {
98+
return ri.Config().TransportProtocol()&transport.GRPC == transport.GRPC
99+
}
100+
101+
var jwtTokenValueFmt = func(jwtToken string) string {
102+
return fmt.Sprintf("Bearer %s", jwtToken)
103+
}
104+
105+
func getJWTToken() (string, error) {
106+
saToken := jwtTokenPath
107+
108+
token, err := os.ReadFile(saToken)
109+
if err != nil {
110+
return "", err
111+
}
112+
113+
return string(token), nil
114+
}

core/manager/auth/tls.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2023 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package auth
18+
19+
import (
20+
"crypto/tls"
21+
)
22+
23+
// GetTLSConfig returns a tls.Config for xDS Client.
24+
func GetTLSConfig(xdsServerName string) (*tls.Config, error) {
25+
cfg := &tls.Config{
26+
// Skip this and use JWT for auth in Istiod.
27+
InsecureSkipVerify: true,
28+
}
29+
return cfg, nil
30+
}

core/manager/auth/utils.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2023 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package auth
18+
19+
import (
20+
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
21+
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
22+
)
23+
24+
func IsAuthError(err error) bool {
25+
if s, ok := status.FromError(err); ok {
26+
if s.Code() == codes.Unauthenticated {
27+
return true
28+
}
29+
}
30+
return false
31+
}

core/manager/bootstrap.go

+13-17
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ import (
2525
)
2626

2727
const (
28-
PodNamespace = "POD_NAMESPACE"
29-
PodName = "POD_NAME"
30-
InstanceIP = "INSTANCE_IP"
31-
IstiodAddr = "istiod.istio-system.svc:15010"
32-
IstioVersion = "ISTIO_VERSION"
33-
nodeIDSuffix = "svc.cluster.local"
28+
PodNamespace = "POD_NAMESPACE"
29+
PodName = "POD_NAME"
30+
InstanceIP = "INSTANCE_IP"
31+
IstiodAddr = "istiod.istio-system.svc:15010"
32+
IstiodSvrName = "istiod.istio-system.svc"
33+
IstioVersion = "ISTIO_VERSION"
34+
nodeIDSuffix = "svc.cluster.local"
3435
)
3536

3637
type BootstrapConfig struct {
@@ -39,12 +40,14 @@ type BootstrapConfig struct {
3940
}
4041

4142
type XDSServerConfig struct {
42-
SvrAddr string
43-
NDSNotRequired bool // required by default for Istio
43+
SvrName string // The name of the xDS server
44+
SvrAddr string // The address of the xDS server
45+
XDSAuth bool // If this xDS enable the authentication of xDS stream
46+
NDSNotRequired bool // required by default for Istio
4447
}
4548

4649
// newBootstrapConfig constructs the bootstrapConfig
47-
func newBootstrapConfig(xdsSvrAddress string) (*BootstrapConfig, error) {
50+
func newBootstrapConfig(config *XDSServerConfig) (*BootstrapConfig, error) {
4851
// Get info from env
4952
// Info needed for construct the nodeID
5053
namespace := os.Getenv(PodNamespace)
@@ -62,11 +65,6 @@ func newBootstrapConfig(xdsSvrAddress string) (*BootstrapConfig, error) {
6265
// specify the version of istio in case of the canary deployment of istiod
6366
istioVersion := os.Getenv(IstioVersion)
6467

65-
// use default istiod address if not specified
66-
if xdsSvrAddress == "" {
67-
xdsSvrAddress = IstiodAddr
68-
}
69-
7068
return &BootstrapConfig{
7169
node: &v3core.Node{
7270
//"sidecar~" + podIP + "~" + podName + "." + namespace + "~" + namespace + ".svc.cluster.local",
@@ -79,8 +77,6 @@ func newBootstrapConfig(xdsSvrAddress string) (*BootstrapConfig, error) {
7977
},
8078
},
8179
},
82-
xdsSvrCfg: &XDSServerConfig{
83-
SvrAddr: xdsSvrAddress,
84-
},
80+
xdsSvrCfg: config,
8581
}, nil
8682
}

core/manager/client.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cenkalti/backoff/v4"
2727

2828
"github.com/kitex-contrib/xds/core/api/kitex_gen/envoy/service/discovery/v3/aggregateddiscoveryservice"
29+
"github.com/kitex-contrib/xds/core/manager/auth"
2930
"github.com/kitex-contrib/xds/core/xdsresource"
3031

3132
"github.com/cloudwego/kitex/client"
@@ -41,10 +42,20 @@ type (
4142
)
4243

4344
// newADSClient constructs a new stream client that communicates with the xds server
44-
func newADSClient(addr string) (ADSClient, error) {
45-
cli, err := aggregateddiscoveryservice.NewClient("xds_servers",
46-
client.WithHostPorts(addr),
47-
)
45+
func newADSClient(xdsSvrCfg *XDSServerConfig) (ADSClient, error) {
46+
var opts []client.Option
47+
opts = append(opts, client.WithHostPorts(xdsSvrCfg.SvrAddr))
48+
49+
if xdsSvrCfg.XDSAuth {
50+
if tlsConfig, err := auth.GetTLSConfig(xdsSvrCfg.SvrAddr); err != nil {
51+
return nil, err
52+
} else {
53+
opts = append(opts, client.WithGRPCTLSConfig(tlsConfig))
54+
}
55+
opts = append(opts, client.WithMetaHandler(auth.ClientHTTP2JwtHandler))
56+
}
57+
58+
cli, err := aggregateddiscoveryservice.NewClient(xdsSvrCfg.SvrName, opts...)
4859
if err != nil {
4960
return nil, err
5061
}
@@ -252,6 +263,12 @@ func (c *xdsClient) receiver(as ADSStream) {
252263
if err != nil {
253264
klog.Errorf("KITEX: [XDS] client, receive failed, error=%s", err)
254265
currStream.Close()
266+
if auth.IsAuthError(err) {
267+
// if it is auth error, return directly.
268+
klog.Errorf("KITEX: [XDS] client, authentication of the control plane failed, close the xDS client. Please check the error log in control plane for more details.")
269+
c.close()
270+
return
271+
}
255272
if s, e := c.reconnect(); e == nil {
256273
currStream = s
257274
}

core/manager/client_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func Test_newXdsClient(t *testing.T) {
102102

103103
c, err := initXDSClient(&BootstrapConfig{
104104
node: &v3core.Node{},
105-
xdsSvrCfg: &XDSServerConfig{SvrAddr: address},
105+
xdsSvrCfg: &XDSServerConfig{SvrAddr: address, SvrName: IstiodSvrName},
106106
}, nil)
107107
defer c.close()
108108
assert.Nil(t, err)

core/manager/manager.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*x
7474
}
7575
// Initial xds client
7676
if bootstrapConfig == nil {
77-
bootstrapConfig, err = newBootstrapConfig(m.opts.XDSSvrConfig.SvrAddr)
77+
bootstrapConfig, err = newBootstrapConfig(m.opts.XDSSvrConfig)
7878
if err != nil {
7979
return nil, err
8080
}
@@ -92,7 +92,7 @@ func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*x
9292

9393
func initXDSClient(bootstrapConfig *BootstrapConfig, m *xdsResourceManager) (*xdsClient, error) {
9494
// build ads client that communicates with the xds server
95-
ac, err := newADSClient(bootstrapConfig.xdsSvrCfg.SvrAddr)
95+
ac, err := newADSClient(bootstrapConfig.xdsSvrCfg)
9696
if err != nil {
9797
return nil, fmt.Errorf("[XDS] client: construct ads client failed, %s", err.Error())
9898
}

core/manager/manager_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ var (
3838
}
3939
XdsServerConfig = &XDSServerConfig{
4040
SvrAddr: XdsServerAddress,
41+
SvrName: IstiodSvrName,
4142
}
4243
XdsBootstrapConfig = &BootstrapConfig{
4344
node: NodeProto,

core/manager/option.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package manager
1818

19+
import "fmt"
20+
1921
type Options struct {
2022
XDSSvrConfig *XDSServerConfig
2123
DumpPath string
@@ -31,13 +33,30 @@ type Option struct {
3133
F func(o *Options)
3234
}
3335

34-
func NewOptions(opts []Option) *Options {
35-
o := &Options{
36+
func DefaultOptions() *Options {
37+
return &Options{
3638
XDSSvrConfig: &XDSServerConfig{
3739
SvrAddr: IstiodAddr,
40+
SvrName: IstiodSvrName,
41+
XDSAuth: false,
3842
},
3943
DumpPath: defaultDumpPath,
4044
}
45+
}
46+
47+
func NewOptions(opts []Option) *Options {
48+
o := DefaultOptions()
4149
o.Apply(opts)
4250
return o
4351
}
52+
53+
func CheckXDSSvrConfig(cfg *XDSServerConfig) error {
54+
if cfg.SvrAddr == "" {
55+
return fmt.Errorf("[XDS] Option: xDS server address should be specified")
56+
}
57+
if cfg.SvrName == "" {
58+
// set default server name
59+
cfg.SvrName = IstiodSvrName
60+
}
61+
return nil
62+
}

option.go

+12
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,15 @@ func WithXDSServerAddress(address string) manager.Option {
2828
},
2929
}
3030
}
31+
32+
// WithXDSServerConfig set the xDS server config.
33+
func WithXDSServerConfig(cfg *manager.XDSServerConfig) manager.Option {
34+
return manager.Option{
35+
F: func(o *manager.Options) {
36+
if err := manager.CheckXDSSvrConfig(cfg); err != nil {
37+
panic(err)
38+
}
39+
o.XDSSvrConfig = cfg
40+
},
41+
}
42+
}

0 commit comments

Comments
 (0)