diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index 8282ed7cb4..4f4c272a0a 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -363,6 +363,10 @@ func (beacon *Beacon) SetThreads(threads int) { } } +func (p *Beacon) DropOnNewBlock(*types.Header) bool { + return true +} + // IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block. // It depends on the parentHash already being stored in the database. // If the parentHash is not stored in the database a UnknownAncestor error is returned. diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index a258f1fe5f..11287e74ef 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -705,6 +705,11 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { }} } +func (p *Clique) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // SealHash returns the hash of a block prior to it being sealed. func SealHash(header *types.Header) (hash common.Hash) { hasher := sha3.NewLegacyKeccak256() diff --git a/consensus/consensus.go b/consensus/consensus.go index 87632a9d0d..367a703678 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,6 +130,12 @@ type Engine interface { // Close terminates any background threads maintained by the consensus engine. Close() error + + // DropOnNewBlock determine the action of mining when it is interrupted by new imported block. + // Return + // true: the mining result will be dropped + // false: the mining result will be kept and move on to the next mine step. + DropOnNewBlock(header *types.Header) bool } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 12a69c127a..0bf77a8ae0 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -647,6 +647,10 @@ var ( big32 = big.NewInt(32) ) +func (p *Ethash) DropOnNewBlock(*types.Header) bool { + return true +} + // AccumulateRewards credits the coinbase of the given block with the mining // reward. The total reward consists of the static block reward and rewards for // included uncles. The coinbase of each uncle block is also rewarded. diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 2e544803ef..c16b36ac12 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -976,6 +976,11 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, return CalcDifficulty(snap, p.val) } +func (p *Parlia) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. diff --git a/miner/worker.go b/miner/worker.go index 2d43f6c866..d376ae9ad8 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -19,6 +19,7 @@ package miner import ( "errors" "fmt" + "math/big" "sync" "sync/atomic" "time" @@ -67,6 +68,11 @@ const ( var ( writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil) finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil) + + errBlockInterruptedByNewHead = errors.New("new head arrived while building block") + errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") + errBlockInterruptedByTimeout = errors.New("timeout while building block") + errBlockInterruptedByOutOfGas = errors.New("out of gas while building block") ) // environment is the worker's current environment and holds all @@ -142,8 +148,11 @@ type task struct { } const ( - commitInterruptNewHead int32 = 1 - commitInterruptResubmit int32 = 2 + commitInterruptNone int32 = iota + commitInterruptNewHead + commitInterruptResubmit + commitInterruptTimeout + commitInterruptOutOfGas ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. @@ -374,6 +383,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { close(interruptCh) } interruptCh = make(chan int32, 1) + log.Debug("newWorkLoop commit", "reason", reason) select { case w.newWorkCh <- &newWorkReq{interruptCh: interruptCh, timestamp: timestamp}: case <-w.exitCh: @@ -754,7 +764,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece } func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, - interruptCh chan int32, stopTimer *time.Timer) bool { + interruptCh chan int32, stopTimer *time.Timer) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -766,7 +776,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } var coalescedLogs []*types.Log - // initilise bloom processors + // initialize bloom processors processorCapacity := 100 if txs.CurrentSize() < processorCapacity { processorCapacity = txs.CurrentSize() @@ -781,6 +791,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP txCurr := &tx w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) + signal := commitInterruptNone LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -791,24 +802,27 @@ LOOP: // For the third case, the semi-finished work will be submitted to the consensus engine. if interruptCh != nil { select { - case reason, ok := <-interruptCh: + case signal, ok := <-interruptCh: if !ok { // should never be here, since interruptCh should not be read before - log.Warn("commit transactions stopped unknown") + log.Error("commit transactions stopped unknown") } - return reason == commitInterruptNewHead + return signalToErr(signal) default: } } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + signal = commitInterruptOutOfGas break } if stopTimer != nil { select { case <-stopTimer.C: log.Info("Not enough time for further transactions", "txs", len(env.txs)) + stopTimer.Reset(0) // re-active the timer, in case it will be used later. + signal = commitInterruptTimeout break LOOP default: } @@ -884,7 +898,7 @@ LOOP: } w.pendingLogsFeed.Send(cpy) } - return false + return signalToErr(signal) } // generateParams wraps various of settings for generating sealing task. @@ -987,7 +1001,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { +func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stopTimer *time.Timer) (err error) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(false) @@ -999,26 +1013,23 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { } } - var stopTimer *time.Timer - delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver) - if delay != nil { - stopTimer = time.NewTimer(*delay) - log.Debug("Time left for mining work", "delay", delay.String()) - defer stopTimer.Stop() - } - + err = nil if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interruptCh, stopTimer) { + err = w.commitTransactions(env, txs, interruptCh, stopTimer) + if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interruptCh, stopTimer) { + err = w.commitTransactions(env, txs, interruptCh, stopTimer) + if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout { return } } + + return } // generateWork generates a sealing block based on the given parameters. @@ -1029,7 +1040,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - w.fillTransactions(nil, work) + w.fillTransactions(nil, work, nil) block, _, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) return block, err } @@ -1048,24 +1059,116 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { } coinbase = w.coinbase // Use the preset address as the fee recipient } - work, err := w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - }) - if err != nil { - return - } - // Fill pending transactions from the txpool - w.fillTransactions(interruptCh, work) - w.commit(work, w.fullTaskHook, true, start) + stopTimer := time.NewTimer(0) + defer stopTimer.Stop() + <-stopTimer.C // discard the initial tick + + // validator can try several times to get the most profitable block, + // as long as the timestamp is not reached. + workList := make([]*environment, 0, 10) + var bestWork *environment + // workList clean up + defer func() { + for _, w := range workList { + // only keep the best work, discard others. + if w == bestWork { + continue + } + w.discard() + } + }() +LOOP: + for { + work, err := w.prepareWork(&generateParams{ + timestamp: uint64(timestamp), + coinbase: coinbase, + }) + if err != nil { + return + } + + log.Debug("commitWork for", "block", work.header.Number, + "until header time", time.Until(time.Unix(int64(work.header.Time), 0))) + workList = append(workList, work) + + delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) + if delay == nil { + log.Warn("commitWork delay is nil, something is wrong") + stopTimer = nil + } else if *delay <= 0 { + log.Debug("Not enough time for commitWork") + break + } else { + log.Debug("commitWork stopTimer", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + stopTimer.Reset(*delay) + } + + // subscribe before fillTransactions + txsCh := make(chan core.NewTxsEvent, txChanSize) + sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) + defer sub.Unsubscribe() + // Fill pending transactions from the txpool + fillStart := time.Now() + err = w.fillTransactions(interruptCh, work, stopTimer) + fillDuration := time.Since(fillStart) + switch { + case errors.Is(err, errBlockInterruptedByNewHead): + // For Parlia, it will drop the work on receiving new block if it is not inturn. + if w.engine.DropOnNewBlock(work.header) { + log.Debug("drop the block, when new block is imported") + return + } + case errors.Is(err, errBlockInterruptedByTimeout): + // break the loop to get the best work + log.Debug("commitWork timeout") + break LOOP + case errors.Is(err, errBlockInterruptedByOutOfGas): + log.Debug("commitWork out of gas") + break LOOP + } + + if interruptCh == nil || stopTimer == nil { + // it is single commit work, no need to try several time. + log.Info("commitWork interruptCh or stopTimer is nil") + break + } + + select { + case <-txsCh: + delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) + log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(), "delay", delay.String()) + if fillDuration > *delay { + // there may not have enough time for another fillTransactions + break LOOP + } + case <-stopTimer.C: + log.Debug("commitWork stopTimer expired") + break LOOP + case <-interruptCh: + log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered") + return + } + } + // get the most profitable work + bestWork = workList[0] + bestReward := new(big.Int) + for i, w := range workList { + balance := w.state.GetBalance(consensus.SystemAddress) + log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward) + if balance.Cmp(bestReward) > 0 { + bestWork = w + bestReward = balance + } + } + w.commit(bestWork, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover // prefetcher processes in the mean time and starting a new one. if w.current != nil { w.current.discard() } - w.current = work + w.current = bestWork } // commit runs any post-transaction state modifications, assembles the final block @@ -1166,3 +1269,22 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) { case <-w.exitCh: } } + +// signalToErr converts the interruption signal to a concrete error type for return. +// The given signal must be a valid interruption signal. +func signalToErr(signal int32) error { + switch signal { + case commitInterruptNone: + return nil + case commitInterruptNewHead: + return errBlockInterruptedByNewHead + case commitInterruptResubmit: + return errBlockInterruptedByRecommit + case commitInterruptTimeout: + return errBlockInterruptedByTimeout + case commitInterruptOutOfGas: + return errBlockInterruptedByOutOfGas + default: + panic(fmt.Errorf("undefined signal %d", signal)) + } +}