Skip to content

Commit f352b82

Browse files
committed
pan: add bundled selector
Add first, primitive version for a SelectorBundle. SelectorBundle combines the selectors for multiple connections, attempting to use (and keep using, e.g under changing policies and paths expring or going down) disjoint paths. The idea is that the bandwidths of different paths can be accumulated and the load of the indivdual connections can be distributed over different network links.
1 parent 00775c2 commit f352b82

File tree

3 files changed

+413
-0
lines changed

3 files changed

+413
-0
lines changed

pkg/pan/bundle.go

+218
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// Copyright 2021 ETH Zurich
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package pan
16+
17+
import (
18+
"fmt"
19+
"sync"
20+
)
21+
22+
// SelectorBundle combines the selectors for multiple connections, attempting
23+
// to use (and keep using, e.g under changing policies and paths expring or
24+
// going down) disjoint paths.
25+
// The idea is that the bandwidths of different paths can be accumulated and
26+
// the load of the indivdual connections can be distributed over different
27+
// network links.
28+
//
29+
// First create a SelectorBundle object and then use New() to create selectors
30+
// for each individual Conn (i.e. for each Dial call).
31+
// The bundle can be used for connections to different destinations.
32+
//
33+
// This first implementation of this bundle concept is somewhat primitive and
34+
// only determines path usage by the number of connections using a path.
35+
// Later on, we may want to consider bandwidth information from the path
36+
// metadata, allow customized selectors like e.g. the pinging selector, allow
37+
// defining priorities, etc.
38+
type SelectorBundle struct {
39+
mutex sync.Mutex
40+
selectors []*bundledSelector
41+
}
42+
43+
// New creates a Selector for a dialed connection. The path chosen by this selector
44+
// will attempt to minimize the usage overlap with other selectors from this bundle.
45+
func (b *SelectorBundle) New() Selector {
46+
b.mutex.Lock()
47+
defer b.mutex.Unlock()
48+
49+
s := &bundledSelector{
50+
bundle: b,
51+
}
52+
b.selectors = append(b.selectors, s)
53+
return s
54+
}
55+
56+
func (b *SelectorBundle) remove(s *bundledSelector) {
57+
b.mutex.Lock()
58+
defer b.mutex.Unlock()
59+
60+
if idx, ok := b.index(s); ok {
61+
b.selectors = append(b.selectors[:idx], b.selectors[idx+1:]...)
62+
}
63+
}
64+
65+
func (b *SelectorBundle) index(s *bundledSelector) (int, bool) {
66+
for i := range b.selectors {
67+
if b.selectors[i] == s {
68+
return i, true
69+
}
70+
}
71+
return -1, false
72+
}
73+
74+
func (b *SelectorBundle) firstMaxDisjoint(self *bundledSelector, paths []*Path) int {
75+
if len(paths) <= 1 {
76+
return 0
77+
}
78+
79+
// build up path usage information
80+
u := newBundlePathUsage()
81+
for _, s := range b.selectors {
82+
if s == self {
83+
continue
84+
}
85+
if p := s.Path(); p != nil {
86+
u.add(p)
87+
}
88+
}
89+
90+
return u.firstMaxDisjoint(paths)
91+
}
92+
93+
func (b *SelectorBundle) firstMaxDisjointMoreAlive(self *bundledSelector, current *Path, paths []*Path) int {
94+
moreAlive := make([]*Path, 0, len(paths))
95+
for _, p := range paths {
96+
if stats.IsMoreAlive(p, current) {
97+
moreAlive = append(moreAlive, p)
98+
}
99+
}
100+
if len(moreAlive) == 0 {
101+
return b.firstMaxDisjoint(self, paths)
102+
}
103+
best := moreAlive[b.firstMaxDisjoint(self, moreAlive)]
104+
for i, p := range paths {
105+
if p == best {
106+
return i
107+
}
108+
}
109+
panic("logic error, expected to find selected max disjoint in paths slice")
110+
}
111+
112+
// bundlePathUsage tracks the path usage by the selectors of a bundle.
113+
// Currently, only per-interface usage counts are tracked.
114+
type bundlePathUsage struct {
115+
intfs map[PathInterface]int
116+
}
117+
118+
func newBundlePathUsage() bundlePathUsage {
119+
return bundlePathUsage{
120+
intfs: make(map[PathInterface]int),
121+
}
122+
}
123+
124+
func (u *bundlePathUsage) add(p *Path) {
125+
intfs := p.Metadata.Interfaces
126+
for _, intf := range intfs {
127+
u.intfs[intf] = u.intfs[intf] + 1
128+
}
129+
}
130+
131+
// overlap returns how many (other) connections/selectors use paths that
132+
// overlap with p (i.e. use the same path interfaces / hops).
133+
func (u *bundlePathUsage) overlap(p *Path) (overlap int) {
134+
intfs := p.Metadata.Interfaces
135+
for _, intf := range intfs {
136+
overlap = maxInt(overlap, u.intfs[intf])
137+
}
138+
return
139+
}
140+
141+
func (u *bundlePathUsage) firstMaxDisjoint(paths []*Path) int {
142+
best := 0
143+
bestOverlap := u.overlap(paths[0])
144+
for i := 1; i < len(paths); i++ {
145+
overlap := u.overlap(paths[i])
146+
if overlap < bestOverlap {
147+
best, bestOverlap = i, overlap
148+
}
149+
}
150+
return best
151+
}
152+
153+
// bundledSelector is a Selector in a SelectorBundle.
154+
type bundledSelector struct {
155+
bundle *SelectorBundle
156+
mutex sync.Mutex
157+
paths []*Path
158+
current int
159+
}
160+
161+
func (s *bundledSelector) Path() *Path {
162+
s.mutex.Lock()
163+
defer s.mutex.Unlock()
164+
165+
if len(s.paths) == 0 {
166+
return nil
167+
}
168+
return s.paths[s.current]
169+
}
170+
171+
func (s *bundledSelector) Initialize(local, remote UDPAddr, paths []*Path) {
172+
s.mutex.Lock()
173+
defer s.mutex.Unlock()
174+
175+
s.paths = paths
176+
s.current = s.bundle.firstMaxDisjoint(s, paths)
177+
}
178+
179+
func (s *bundledSelector) Refresh(paths []*Path) {
180+
s.mutex.Lock()
181+
defer s.mutex.Unlock()
182+
183+
newcurrent := -1
184+
if len(s.paths) > 0 {
185+
currentFingerprint := s.paths[s.current].Fingerprint
186+
for i, p := range paths {
187+
if p.Fingerprint == currentFingerprint {
188+
newcurrent = i
189+
break
190+
}
191+
}
192+
}
193+
if newcurrent < 0 {
194+
newcurrent = s.bundle.firstMaxDisjoint(s, paths)
195+
}
196+
s.paths = paths
197+
s.current = newcurrent
198+
}
199+
200+
func (s *bundledSelector) PathDown(pf PathFingerprint, pi PathInterface) {
201+
s.mutex.Lock()
202+
defer s.mutex.Unlock()
203+
204+
current := s.paths[s.current]
205+
if isInterfaceOnPath(current, pi) || pf == current.Fingerprint {
206+
fmt.Println("down:", s.current, len(s.paths))
207+
better := s.bundle.firstMaxDisjointMoreAlive(s, current, s.paths)
208+
if better >= 0 {
209+
s.current = better
210+
fmt.Println("failover:", s.current, len(s.paths))
211+
}
212+
}
213+
}
214+
215+
func (s *bundledSelector) Close() error {
216+
s.bundle.remove(s)
217+
return nil
218+
}

0 commit comments

Comments
 (0)