-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathblockchain_l2.go
257 lines (232 loc) · 9.7 KB
/
blockchain_l2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package core
import (
"errors"
"math/big"
"time"
"github.com/morph-l2/go-ethereum/common"
"github.com/morph-l2/go-ethereum/consensus"
"github.com/morph-l2/go-ethereum/core/rawdb"
"github.com/morph-l2/go-ethereum/core/state"
"github.com/morph-l2/go-ethereum/core/types"
"github.com/morph-l2/go-ethereum/ethdb"
"github.com/morph-l2/go-ethereum/log"
)
func (bc *BlockChain) UpdateBlockProcessMetrics(statedb *state.StateDB, procTime time.Duration) {
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(procTime - trieproc - triehash)
}
func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, safe bool) (*state.StateDB, types.Receipts, uint64, time.Duration, error) {
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, 0, err
}
// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
// Process block using the parent state as reference point
start := time.Now()
receipts, _, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
return nil, nil, 0, 0, err
}
bc.UpdateBlockProcessMetrics(statedb, time.Since(start))
triehash := statedb.AccountHashes + statedb.StorageHashes
if !safe {
// Validate the state using the default validator
substart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
return nil, nil, 0, 0, err
}
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
}
proctime := time.Since(start)
// Update the metrics touched during block validation
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
return statedb, receipts, usedGas, proctime, nil
}
// writeBlockStateWithoutHead writes block, metadata and corresponding state data to the
// database.
func (bc *BlockChain) writeBlockStateWithoutHead(block *types.Block, receipts []*types.Receipt, state *state.StateDB) error {
// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
externTd := new(big.Int).Add(block.Difficulty(), ptd)
// Irrelevant of the canonical status, write the block itself to the database.
//
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
rawdb.WritePreimages(blockBatch, state.Preimages())
queueIndex := rawdb.ReadFirstQueueIndexNotInL2Block(bc.db, block.ParentHash())
// note: we can insert blocks with header-only ancestors here,
// so queueIndex might not yet be available in DB.
if queueIndex != nil {
numProcessed := uint64(block.NumL1MessagesProcessed(*queueIndex))
newIndex := *queueIndex + numProcessed
log.Trace(
"Blockchain.writeBlockStateWithoutHead WriteFirstQueueIndexNotInL2Block",
"number", block.Number(),
"hash", block.Hash().String(),
"queueIndex", *queueIndex,
"numProcessed", numProcessed,
"newIndex", newIndex,
)
rawdb.WriteFirstQueueIndexNotInL2Block(blockBatch, block.Hash(), newIndex)
}
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
current := block.NumberU64()
origin := state.GetOriginRoot()
// Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return err
}
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
if triedb.Scheme() == rawdb.PathScheme {
// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
return triedb.CommitState(root, origin, current, false)
}
return triedb.Commit(root, false, nil)
}
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))
// Flush limits are not considered for the first TriesInMemory blocks.
if current <= TriesInMemory {
return nil
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory
//flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
// If we exceeded time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash))
}
return nil
}
// SetCanonical rewinds the chain to set the new head block as the specified
// block.
func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
//if !bc.chainmu.TryLock() {
// return common.Hash{}, errChainStopped
//}
//defer bc.chainmu.Unlock()
// Re-execute the reorged chain in case the head state is missing.
if !bc.HasState(head.Root()) {
//if latestValidHash, err := bc.recoverAncestors(head); err != nil {
// return latestValidHash, err
//}
//log.Info("Recovered head state", "number", head.Number(), "hash", head.Hash())
log.Error("Head state is missing", "number", head.Number(), "hash", head.Hash())
return common.Hash{}, errors.New("head state is missing")
}
// Run the reorg if necessary and set the given block as new head.
start := time.Now()
if head.ParentHash() != bc.CurrentBlock().Hash() {
if err := bc.reorg(bc.CurrentBlock(), head); err != nil {
return common.Hash{}, err
}
}
bc.writeHeadBlock(head)
// Emit events
logs := bc.collectLogs(head, false)
bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
}
bc.chainHeadFeed.Send(ChainHeadEvent{Block: head})
context := []interface{}{
"number", head.Number(),
"hash", head.Hash(),
"root", head.Root(),
"elapsed", time.Since(start),
}
if timestamp := time.Unix(int64(head.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
log.Info("Chain head was updated", context...)
return head.Hash(), nil
}
func (bc *BlockChain) WriteStateAndSetHead(block *types.Block, receipts types.Receipts, state *state.StateDB, procTime time.Duration) error {
if !bc.chainmu.TryLock() {
return errInsertionInterrupted
}
defer bc.chainmu.Unlock()
bc.gcproc += procTime
if err := bc.writeBlockStateWithoutHead(block, receipts, state); err != nil {
return err
}
_, err := bc.SetCanonical(block)
return err
}
// collectLogs collects the logs that were generated or removed during
// the processing of a block. These logs are later announced as deleted or reborn.
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
receipts := rawdb.ReadRawReceipts(bc.db, b.Hash(), b.NumberU64())
receipts.DeriveFields(bc.chainConfig, b.Hash(), b.NumberU64(), b.Transactions())
var logs []*types.Log
for _, receipt := range receipts {
for _, receiptLog := range receipt.Logs {
l := *receiptLog
if removed {
l.Removed = true
}
logs = append(logs, &l)
}
}
return logs
}