4
4
package sync
5
5
6
6
import (
7
- "bytes"
8
7
"context"
9
8
"encoding/binary"
10
9
"errors"
@@ -13,12 +12,13 @@ import (
13
12
"github.com/ava-labs/avalanchego/database/versiondb"
14
13
"github.com/ava-labs/avalanchego/utils/wrappers"
15
14
"github.com/ava-labs/libevm/common"
15
+ "github.com/ava-labs/libevm/libevm/options"
16
16
"github.com/ava-labs/libevm/trie"
17
17
18
18
"github.com/ava-labs/coreth/plugin/evm/message"
19
+ "github.com/ava-labs/coreth/sync"
19
20
20
21
atomicstate "github.com/ava-labs/coreth/plugin/evm/atomic/state"
21
- synccommon "github.com/ava-labs/coreth/sync"
22
22
syncclient "github.com/ava-labs/coreth/sync/client"
23
23
)
24
24
@@ -31,53 +31,48 @@ const (
31
31
)
32
32
33
33
var (
34
- errInvalidTargetHeight = errors .New ("TargetHeight must be greater than 0" )
35
-
36
- // Pre-allocate zero bytes to avoid repeated allocations.
37
- zeroBytes = bytes .Repeat ([]byte {0x00 }, common .HashLength )
38
-
39
- _ synccommon.Syncer = (* syncer )(nil )
34
+ _ sync.Syncer = (* Syncer )(nil )
40
35
_ syncclient.LeafSyncTask = (* syncerLeafTask )(nil )
41
- )
42
-
43
- // Name returns the human-readable name for this sync task.
44
- func (* syncer ) Name () string { return "Atomic State Syncer" }
45
-
46
- // ID returns the stable identifier for this sync task.
47
- func (* syncer ) ID () string { return "state_atomic_sync" }
48
36
49
- // Config holds the configuration for creating a new atomic syncer.
50
- type Config struct {
51
- // TargetHeight is the target block height to sync to.
52
- TargetHeight uint64
37
+ errTargetHeightRequired = errors .New ("target height must be > 0" )
38
+ )
53
39
54
- // RequestSize is the maximum number of leaves to request in a single network call.
40
+ // config holds the configuration for creating a new atomic syncer.
41
+ type config struct {
42
+ // requestSize is the maximum number of leaves to request in a single network call.
55
43
// NOTE: user facing option validated as the parameter [plugin/evm/config.Config.StateSyncRequestSize].
56
- RequestSize uint16
44
+ requestSize uint16
57
45
58
- // NumWorkers is the number of worker goroutines to use for syncing.
46
+ // numWorkers is the number of worker goroutines to use for syncing.
59
47
// If not set, [defaultNumWorkers] will be used.
60
- NumWorkers int
48
+ numWorkers int
61
49
}
62
50
63
- // WithUnsetDefaults returns a copy of the config with defaults applied for any
64
- // unset (zero) fields.
65
- func (c Config ) WithUnsetDefaults () Config {
66
- out := c
67
- if out .NumWorkers == 0 {
68
- out .NumWorkers = defaultNumWorkers
69
- }
70
- if out .RequestSize == 0 {
71
- out .RequestSize = defaultRequestSize
72
- }
51
+ // SyncerOption configures the atomic syncer via functional options.
52
+ type SyncerOption = options.Option [config ]
73
53
74
- return out
54
+ // WithRequestSize sets the request size per network call.
55
+ func WithRequestSize (n uint16 ) SyncerOption {
56
+ return options.Func [config ](func (c * config ) {
57
+ if n > 0 {
58
+ c .requestSize = n
59
+ }
60
+ })
75
61
}
76
62
77
- // syncer is used to sync the atomic trie from the network. The CallbackLeafSyncer
78
- // is responsible for orchestrating the sync while syncer is responsible for maintaining
63
+ // WithNumWorkers sets the number of worker goroutines for syncing.
64
+ func WithNumWorkers (n int ) SyncerOption {
65
+ return options.Func [config ](func (c * config ) {
66
+ if n > 0 {
67
+ c .numWorkers = n
68
+ }
69
+ })
70
+ }
71
+
72
+ // Syncer is used to sync the atomic trie from the network. The CallbackLeafSyncer
73
+ // is responsible for orchestrating the sync while Syncer is responsible for maintaining
79
74
// the state of progress and writing the actual atomic trie to the trieDB.
80
- type syncer struct {
75
+ type Syncer struct {
81
76
db * versiondb.Database
82
77
atomicTrie * atomicstate.AtomicTrie
83
78
trie * trie.Trie // used to update the atomic trie
@@ -92,62 +87,76 @@ type syncer struct {
92
87
lastHeight uint64
93
88
}
94
89
95
- // addZeros adds [common.HashLenth] zeros to [height] and returns the result as []byte
96
- func addZeroes (height uint64 ) []byte {
97
- packer := wrappers.Packer {Bytes : make ([]byte , atomicstate .TrieKeyLength )}
98
- packer .PackLong (height )
99
- packer .PackFixedBytes (zeroBytes )
100
- return packer .Bytes
101
- }
102
-
103
- // newSyncer returns a new syncer instance that will sync the atomic trie from the network.
104
- func newSyncer (client syncclient.LeafClient , db * versiondb.Database , atomicTrie * atomicstate.AtomicTrie , targetRoot common.Hash , config * Config ) (* syncer , error ) {
105
- if config .TargetHeight == 0 {
106
- return nil , errInvalidTargetHeight
90
+ // NewSyncer returns a new syncer instance that will sync the atomic trie from the network.
91
+ func NewSyncer (client syncclient.LeafClient , db * versiondb.Database , atomicTrie * atomicstate.AtomicTrie , targetRoot common.Hash , targetHeight uint64 , opts ... SyncerOption ) (* Syncer , error ) {
92
+ if targetHeight == 0 {
93
+ return nil , errTargetHeightRequired
107
94
}
108
95
109
- // Apply defaults for unset fields.
110
- cfg := config .WithUnsetDefaults ()
96
+ cfg := config {
97
+ numWorkers : defaultNumWorkers ,
98
+ requestSize : defaultRequestSize ,
99
+ }
100
+ options .ApplyTo (& cfg , opts ... )
111
101
112
102
lastCommittedRoot , lastCommit := atomicTrie .LastCommitted ()
113
103
trie , err := atomicTrie .OpenTrie (lastCommittedRoot )
114
104
if err != nil {
115
105
return nil , err
116
106
}
117
107
118
- syncer := & syncer {
108
+ syncer := & Syncer {
119
109
db : db ,
120
110
atomicTrie : atomicTrie ,
121
111
trie : trie ,
122
112
targetRoot : targetRoot ,
123
- targetHeight : cfg . TargetHeight ,
113
+ targetHeight : targetHeight ,
124
114
lastHeight : lastCommit ,
125
115
}
126
116
127
117
// Create tasks channel with capacity for the number of workers.
128
- tasks := make (chan syncclient.LeafSyncTask , cfg .NumWorkers )
118
+ tasks := make (chan syncclient.LeafSyncTask , cfg .numWorkers )
129
119
130
120
// For atomic trie syncing, we typically want a single task since the trie is sequential.
131
121
// But we can create multiple tasks if needed for parallel processing of different ranges.
132
122
tasks <- & syncerLeafTask {syncer : syncer }
133
123
close (tasks )
134
124
135
125
syncer .syncer = syncclient .NewCallbackLeafSyncer (client , tasks , & syncclient.LeafSyncerConfig {
136
- RequestSize : cfg .RequestSize ,
137
- NumWorkers : cfg .NumWorkers ,
126
+ RequestSize : cfg .requestSize ,
127
+ NumWorkers : cfg .numWorkers ,
138
128
OnFailure : func () {}, // No-op since we flush progress to disk at the regular commit interval.
139
129
})
140
130
141
131
return syncer , nil
142
132
}
143
133
134
+ // Name returns the human-readable name for this sync task.
135
+ func (* Syncer ) Name () string {
136
+ return "Atomic State Syncer"
137
+ }
138
+
139
+ // ID returns the stable identifier for this sync task.
140
+ func (* Syncer ) ID () string {
141
+ return "state_atomic_sync"
142
+ }
143
+
144
144
// Sync begins syncing the target atomic root with the configured number of worker goroutines.
145
- func (s * syncer ) Sync (ctx context.Context ) error {
145
+ func (s * Syncer ) Sync (ctx context.Context ) error {
146
146
return s .syncer .Sync (ctx )
147
147
}
148
148
149
+ // addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes.
150
+ func addZeroes (height uint64 ) []byte {
151
+ // Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the
152
+ // smallest key for the given height, i.e., height followed by zeroed blockchainID.
153
+ b := make ([]byte , wrappers .LongLen + common .HashLength )
154
+ binary .BigEndian .PutUint64 (b [:wrappers .LongLen ], height )
155
+ return b
156
+ }
157
+
149
158
// onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie.
150
- func (s * syncer ) onLeafs (keys [][]byte , values [][]byte ) error {
159
+ func (s * Syncer ) onLeafs (keys [][]byte , values [][]byte ) error {
151
160
for i , key := range keys {
152
161
if len (key ) != atomicstate .TrieKeyLength {
153
162
return fmt .Errorf ("unexpected key len (%d) in atomic trie sync" , len (key ))
@@ -195,7 +204,7 @@ func (s *syncer) onLeafs(keys [][]byte, values [][]byte) error {
195
204
196
205
// onFinish is called when sync for this trie is complete.
197
206
// commit the trie to disk and perform the final checks that we synced the target root correctly.
198
- func (s * syncer ) onFinish () error {
207
+ func (s * Syncer ) onFinish () error {
199
208
// commit the trie on finish
200
209
root , nodes , err := s .trie .Commit (false )
201
210
if err != nil {
@@ -220,7 +229,7 @@ func (s *syncer) onFinish() error {
220
229
}
221
230
222
231
type syncerLeafTask struct {
223
- syncer * syncer
232
+ syncer * Syncer
224
233
}
225
234
226
235
func (a * syncerLeafTask ) Start () []byte { return addZeroes (a .syncer .lastHeight + 1 ) }
0 commit comments