Skip to content

Commit

Permalink
feat(circuitbreak): support config circuitbreak policy with xds (#29)
Browse files Browse the repository at this point in the history
* feat(circuitbreak): support config circuitbreak policy with xds
  • Loading branch information
whalecold authored Mar 12, 2024
1 parent 64bbf75 commit e3c343b
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 7 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ This project adds xDS support for Kitex and enables Kitex to perform in Proxyles
* Timeout Configuration:
* Configuration inside HTTP route configuration: configure via VirtualService.

* CircuitBreaking
* Configuration inside [Cluster](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/outlier_detection.proto) configuration: configure via EnvoyFilter.

## Usage
There are two steps for enabling xDS for Kitex applications: 1. xDS module initialization and 2. Kitex Client/Server Option configuration.

Expand Down Expand Up @@ -255,7 +258,7 @@ spec:
```

### Limited support for Service Governance
Current version only support Service Discovery, Traffic route and Timeout Configuration via xDS on the client-side.
Current version only support Service Discovery, Traffic route, Timeout Configuration via xDS on the client-side and circuit-breaking.

Other features supported via xDS, including Load Balancing, Rate Limit and Retry etc, will be added in the future.

Expand Down
5 changes: 4 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Kitex 通过外部扩展 [kitex-contrib/xds](https://github.com/kitex-contrib/xd
* 超时配置:
* HTTP route configuration 内包含的配置,同样通过 VirtualService 来配置。

* 熔断:
* [Cluster](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/outlier_detection.proto) 内包含的配置,需要通过 EnvoyFilter 来配置,目前只支持错误率熔断。

## 开启方式
开启的步骤分为两个部分:1. xDS 模块的初始化和 2. Kitex Client/Server 的 Option 配置。

Expand Down Expand Up @@ -244,7 +247,7 @@ spec:
```

### 有限的服务治理功能
当前版本仅支持客户端通过 xDS 进行服务发现、流量路由和超时配置
当前版本仅支持客户端通过 xDS 进行服务发现、流量路由、超时配置和熔断

xDS 所支持的其他服务治理功能,包括负载平衡、速率限制和重试等,将在未来补齐。

Expand Down
47 changes: 47 additions & 0 deletions core/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"

"github.com/kitex-contrib/xds/core/xdsresource"
Expand All @@ -47,6 +48,8 @@ type xdsResourceManager struct {

// options
opts *Options

cbHandlers []xdsresource.UpdateCircuitbreakCallback
}

// notifier is used to notify the resource update along with error
Expand Down Expand Up @@ -117,6 +120,20 @@ func (m *xdsResourceManager) getFromCache(rType xdsresource.ResourceType, rName
return nil, false
}

// RegisterCircuitBreaker registers the circuit breaker handler to resourceManager. The config stores in ClusterType xDS resource,
// If the config changed, manager will invoke the handler.
func (m *xdsResourceManager) RegisterCircuitBreaker(handler xdsresource.UpdateCircuitbreakCallback) {
m.mu.Lock()
defer m.mu.Unlock()

m.cbHandlers = append(m.cbHandlers, handler)

res, ok := m.cache[xdsresource.ClusterType]
if ok {
updateCircuitPolicy(res, []xdsresource.UpdateCircuitbreakCallback{handler})
}
}

// Get gets the specified resource from cache or from the control plane.
// If the resource is not in the cache, it will be fetched from the control plane via client.
// This will be a synchronous call. It uses the notifier to notify the resource update and return the resource.
Expand Down Expand Up @@ -248,11 +265,41 @@ func (m *xdsResourceManager) updateMeta(rType xdsresource.ResourceType, version
}
}

func updateCircuitPolicy(res map[string]xdsresource.Resource, handlers []xdsresource.UpdateCircuitbreakCallback) {
// update circuit break policy
policies := make(map[string]circuitbreak.CBConfig)
for key, resource := range res {
cluster, ok := resource.(*xdsresource.ClusterResource)
if !ok {
continue
}
if cluster.OutlierDetection == nil {
continue
}
cbconfig := circuitbreak.CBConfig{}
if cluster.OutlierDetection.FailurePercentageRequestVolume != 0 && cluster.OutlierDetection.FailurePercentageThreshold != 0 {
cbconfig.Enable = true
cbconfig.ErrRate = float64(cluster.OutlierDetection.FailurePercentageThreshold) / 100
cbconfig.MinSample = int64(cluster.OutlierDetection.FailurePercentageRequestVolume)
}
policies[key] = cbconfig
}
for _, handler := range handlers {
handler(policies)
}
}

// UpdateResource is invoked by client to update the cache
func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[string]xdsresource.Resource, version string) {
m.mu.Lock()
defer m.mu.Unlock()

// should update circuit policy first, as it may affect the traffic when the
// circuit break policy is updated at the first time.
if rt == xdsresource.ClusterType {
updateCircuitPolicy(up, m.cbHandlers)
}

for name, res := range up {
if _, ok := m.cache[rt]; !ok {
m.cache[rt] = make(map[string]xdsresource.Resource)
Expand Down
53 changes: 53 additions & 0 deletions core/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/stretchr/testify/assert"

"github.com/kitex-contrib/xds/core/manager/mock"
Expand Down Expand Up @@ -308,3 +309,55 @@ func Test_xdsResourceManager_ConcurrentGet(t *testing.T) {
}
wg.Wait()
}

func TestRegisterCircuitBreaker(t *testing.T) {
m := &xdsResourceManager{
cache: map[xdsresource.ResourceType]map[string]xdsresource.Resource{
xdsresource.ClusterType: {
xdsresource.ClusterName1: &xdsresource.ClusterResource{
OutlierDetection: &xdsresource.OutlierDetection{
FailurePercentageThreshold: 10,
FailurePercentageRequestVolume: 1001,
},
},
xdsresource.ClusterName2: &xdsresource.ClusterResource{
OutlierDetection: &xdsresource.OutlierDetection{
FailurePercentageThreshold: 10,
FailurePercentageRequestVolume: 0,
},
},
},
},
meta: map[xdsresource.ResourceType]map[string]*xdsresource.ResourceMeta{},
}

policies := make(map[string]circuitbreak.CBConfig)
updater := func(configs map[string]circuitbreak.CBConfig) {
policies = configs
}
m.RegisterCircuitBreaker(updater)
assert.Equal(t, policies, map[string]circuitbreak.CBConfig{
"cluster1": {
Enable: true,
ErrRate: 0.1,
MinSample: 1001,
},
"cluster2": {},
})

m.UpdateResource(xdsresource.ClusterType, map[string]xdsresource.Resource{
xdsresource.ClusterName1: &xdsresource.ClusterResource{
OutlierDetection: &xdsresource.OutlierDetection{
FailurePercentageThreshold: 1,
FailurePercentageRequestVolume: 100,
},
},
}, "latest")
assert.Equal(t, policies, map[string]circuitbreak.CBConfig{
"cluster1": {
Enable: true,
ErrRate: 0.01,
MinSample: 100,
},
})
}
20 changes: 16 additions & 4 deletions core/xdsresource/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ func (p ClusterLbPolicy) String() string {
return ""
}

type OutlierDetection struct {
FailurePercentageThreshold uint32
FailurePercentageRequestVolume uint32
}

type ClusterResource struct {
DiscoveryType ClusterDiscoveryType
LbPolicy ClusterLbPolicy
EndpointName string
InlineEndpoints *EndpointsResource
DiscoveryType ClusterDiscoveryType
LbPolicy ClusterLbPolicy
EndpointName string
InlineEndpoints *EndpointsResource
OutlierDetection *OutlierDetection
}

func (c *ClusterResource) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -99,6 +105,12 @@ func unmarshalCluster(r *any.Any) (string, *ClusterResource, error) {
LbPolicy: convertLbPolicy(c.GetLbPolicy()),
EndpointName: c.Name,
}
if c.OutlierDetection != nil {
ret.OutlierDetection = &OutlierDetection{
FailurePercentageRequestVolume: c.OutlierDetection.FailurePercentageRequestVolume.GetValue(),
FailurePercentageThreshold: c.OutlierDetection.FailurePercentageThreshold.GetValue(),
}
}
if n := c.GetEdsClusterConfig().GetServiceName(); n != "" {
ret.EndpointName = n
}
Expand Down
4 changes: 4 additions & 0 deletions core/xdsresource/cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,9 @@ func TestUnmarshalCDSSuccess(t *testing.T) {
assert.Equal(t, cluster.EndpointName, EndpointName1)
assert.Equal(t, cluster.DiscoveryType, ClusterDiscoveryTypeEDS)
assert.Equal(t, cluster.LbPolicy, ClusterLbRoundRobin)
assert.Equal(t, cluster.OutlierDetection, &OutlierDetection{
FailurePercentageThreshold: 10,
FailurePercentageRequestVolume: 100,
})
assert.Nil(t, cluster.InlineEndpoints)
}
19 changes: 18 additions & 1 deletion core/xdsresource/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
v3thrift_proxy "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/thrift_proxy/v3"
v3matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -289,7 +290,15 @@ var (
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
ServiceName: EndpointName1,
},
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
OutlierDetection: &v3clusterpb.OutlierDetection{
FailurePercentageThreshold: &wrappers.UInt32Value{
Value: 10,
},
FailurePercentageRequestVolume: &wrappers.UInt32Value{
Value: 100,
},
},
LoadAssignment: nil,
}
Cluster2 = &v3clusterpb.Cluster{
Expand All @@ -298,6 +307,14 @@ var (
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
ServiceName: EndpointName1,
},
OutlierDetection: &v3clusterpb.OutlierDetection{
FailurePercentageThreshold: &wrappers.UInt32Value{
Value: 10,
},
FailurePercentageRequestVolume: &wrappers.UInt32Value{
Value: 0,
},
},
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
LoadAssignment: nil,
}
Expand Down
5 changes: 5 additions & 0 deletions core/xdsresource/xdsresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package xdsresource
import (
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/circuitbreak"
)

type Resource interface{}
Expand Down Expand Up @@ -89,3 +91,6 @@ var ResourceTypeToName = map[ResourceType]string{
EndpointsType: "ClusterLoadAssignment",
NameTableType: "NameTable",
}

// UpdateCircuitbreakCallback is the callback function for circuit break policy when a resource is updated.
type UpdateCircuitbreakCallback func(configs map[string]circuitbreak.CBConfig)
85 changes: 85 additions & 0 deletions xdssuite/circuitbreak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdssuite

import (
"sync/atomic"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)

type circuitBreaker struct {
lastPolicies atomic.Value
cb *circuitbreak.CBSuite
}

func (cb *circuitBreaker) updateAllCircuitConfigs(configs map[string]circuitbreak.CBConfig) {
if cb.cb == nil {
return
}
newPolicies := make(map[string]struct{})
for k, v := range configs {
cb.cb.UpdateServiceCBConfig(k, v)
newPolicies[k] = struct{}{}
}

defer cb.lastPolicies.Store(newPolicies)

val := cb.lastPolicies.Load()
if val == nil {
return
}
lastPolicies, ok := val.(map[string]struct{})
if !ok {
return
}
// disable the old policies that are not in the new configs.
for k := range lastPolicies {
if _, ok := newPolicies[k]; !ok {
cb.cb.UpdateServiceCBConfig(k, circuitbreak.CBConfig{
Enable: false,
})
}
}
}

// NewCircuitBreaker integrate xds config and kitex circuitbreaker
func NewCircuitBreaker() client.Option {
m := xdsResourceManager.getManager()
if m == nil {
panic("xds resource manager has not been initialized")
}

cb := &circuitBreaker{
cb: circuitbreak.NewCBSuite(genServiceCBKey),
}
m.RegisterCircuitBreaker(cb.updateAllCircuitConfigs)
return client.WithCircuitBreaker(cb.cb)
}

// keep consistent when initialising the circuit breaker suit and updating
// the circuit breaker policy.
func genServiceCBKey(ri rpcinfo.RPCInfo) string {
if ri == nil {
return ""
}
// the value of RouterClusterKey is stored in route process.
key, _ := ri.To().Tag(RouterClusterKey)
return key
}
Loading

0 comments on commit e3c343b

Please sign in to comment.