Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 38 additions & 44 deletions internal/xds/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,23 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte {
}

type virtualHost struct {
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
// retry policy present in virtual host
retryConfig *xdsresource.RetryConfig
}

// routeCluster holds information about a cluster as referenced by a route.
type routeCluster struct {
name string
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
name string // Name of the cluster.
interceptor iresolver.ClientInterceptor // HTTP filters to run for RPCs matching this route.
}

type route struct {
m *xdsresource.CompositeMatcher // converted from route matchers
actionType xdsresource.RouteActionType // holds route action type
clusters wrr.WRR // holds *routeCluster entries
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
retryConfig *xdsresource.RetryConfig
hashPolicies []*xdsresource.HashPolicy
retryConfig *xdsresource.RetryConfig
hashPolicies []*xdsresource.HashPolicy
}

func (r route) String() string {
Expand Down Expand Up @@ -200,11 +195,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
ref := &cs.clusters[cluster.name].refCount
atomic.AddInt32(ref, 1)

interceptor, err := cs.newInterceptor(rt, cluster)
if err != nil {
return nil, annotateErrorWithNodeID(err, cs.xdsNodeID)
}

lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))

Expand All @@ -220,7 +210,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
cs.sendNewServiceConfig()
}
},
Interceptor: interceptor,
Interceptor: cluster.interceptor,
}

if rt.maxStreamDuration != 0 {
Expand Down Expand Up @@ -310,35 +300,6 @@ func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies [
return rand.Uint64()
}

func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
if len(cs.httpFilterConfig) == 0 {
return nil, nil
}
interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
for _, filter := range cs.httpFilterConfig {
override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
if override == nil {
override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
}
if override == nil {
override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
}
ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
if !ok {
// Should not happen if it passed xdsClient validation.
return nil, fmt.Errorf("filter does not support use in client")
}
i, err := ib.BuildClientInterceptor(filter.Config, override)
if err != nil {
return nil, fmt.Errorf("error constructing filter: %v", err)
}
if i != nil {
interceptors = append(interceptors, i)
}
}
return &interceptorList{interceptors: interceptors}, nil
}

// stop decrements refs of all clusters referenced by this config selector.
func (cs *configSelector) stop() {
// The resolver's old configSelector may be nil. Handle that here.
Expand All @@ -363,6 +324,39 @@ func (cs *configSelector) stop() {
}
}

// newInterceptor builds a chain of client interceptors for the given filters.
// The filter config override maps contain overrides from the route, cluster,
// and virtual host respectively. The cluster override has the highest priority,
// followed by the route override, and finally the virtual host override.
func newInterceptor(filters []xdsresource.HTTPFilter, cluster, route, virtualHost map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What do you think about adding an override suffix to cluster, route, and virtualHost? I find it can help with readability, but no worries if you prefer the current naming.

if len(filters) == 0 {
return nil, nil
}
interceptors := make([]iresolver.ClientInterceptor, 0, len(filters))
for _, filter := range filters {
override := cluster[filter.Name]
if override == nil {
override = route[filter.Name]
}
if override == nil {
override = virtualHost[filter.Name]
}
ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
if !ok {
// Should not happen if it passed xdsClient validation.
panic(fmt.Sprintf("filter %q does not support use in client", filter.Name))
Comment on lines +346 to +347
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about continuing to return an error here? The method signature already allows for it, so callers presumably have error handling in place. Panicking to crash the process feels like a worse alternative, especially if this is a recoverable error.

}
i, err := ib.BuildClientInterceptor(filter.Config, override)
if err != nil {
return nil, fmt.Errorf("error constructing filter: %v", err)
}
if i != nil {
interceptors = append(interceptors, i)
}
}
return &interceptorList{interceptors: interceptors}, nil
}

type interceptorList struct {
interceptors []iresolver.ClientInterceptor
}
Expand Down
Loading
Loading