forked from OffchainLabs/nitro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader_aggregator_strategies.go
116 lines (92 loc) · 2.97 KB
/
reader_aggregator_strategies.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright 2021-2022, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE
package das
import (
"errors"
"math/rand"
"sort"
"sync"
"sync/atomic"
"github.com/offchainlabs/nitro/arbstate/daprovider"
)
var ErrNoReadersResponded = errors.New("no DAS readers responded successfully")
type aggregatorStrategy interface {
newInstance() aggregatorStrategyInstance
update([]daprovider.DASReader, map[daprovider.DASReader]readerStats)
}
type abstractAggregatorStrategy struct {
sync.RWMutex
readers []daprovider.DASReader
stats map[daprovider.DASReader]readerStats
}
func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stats map[daprovider.DASReader]readerStats) {
s.Lock()
defer s.Unlock()
s.readers = make([]daprovider.DASReader, len(readers))
copy(s.readers, readers)
s.stats = make(map[daprovider.DASReader]readerStats)
for k, v := range stats {
s.stats[k] = v
}
}
// Exponentially growing Explore Exploit Strategy
type simpleExploreExploitStrategy struct {
iterations atomic.Uint32
exploreIterations uint32
exploitIterations uint32
abstractAggregatorStrategy
}
func (s *simpleExploreExploitStrategy) newInstance() aggregatorStrategyInstance {
iterations := s.iterations.Add(1)
readerSets := make([][]daprovider.DASReader, 0)
s.RLock()
defer s.RUnlock()
readers := make([]daprovider.DASReader, len(s.readers))
copy(readers, s.readers)
if iterations%(s.exploreIterations+s.exploitIterations) < s.exploreIterations {
// Explore phase
rand.Shuffle(len(readers), func(i, j int) { readers[i], readers[j] = readers[j], readers[i] })
} else {
// Exploit phase
sort.Slice(readers, func(i, j int) bool {
a, b := s.stats[readers[i]], s.stats[readers[j]]
return a.successRatioWeightedMeanLatency() < b.successRatioWeightedMeanLatency()
})
}
for i, maxTake := 0, 1; i < len(readers); maxTake = maxTake * 2 {
readerSet := make([]daprovider.DASReader, 0, maxTake)
for taken := 0; taken < maxTake && i < len(readers); i, taken = i+1, taken+1 {
readerSet = append(readerSet, readers[i])
}
readerSets = append(readerSets, readerSet)
}
return &basicStrategyInstance{readerSets: readerSets}
}
// Sequential Strategy for Testing
type testingSequentialStrategy struct {
abstractAggregatorStrategy
}
func (s *testingSequentialStrategy) newInstance() aggregatorStrategyInstance {
s.RLock()
defer s.RUnlock()
si := basicStrategyInstance{}
for _, reader := range s.readers {
si.readerSets = append(si.readerSets, []daprovider.DASReader{reader})
}
return &si
}
// Instance of a strategy that returns readers in an order according to the strategy
type aggregatorStrategyInstance interface {
nextReaders() []daprovider.DASReader
}
type basicStrategyInstance struct {
readerSets [][]daprovider.DASReader
}
func (si *basicStrategyInstance) nextReaders() []daprovider.DASReader {
if len(si.readerSets) == 0 {
return nil
}
next := si.readerSets[0]
si.readerSets = si.readerSets[1:]
return next
}