diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index bff52a0ed..4d14b23a2 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -33,6 +33,9 @@ type TxMempool struct { config *config.MempoolConfig proxyAppConn abciclient.Client + checkTxCh chan *checkTxPayload + checkTxWorkers int + // txsAvailable fires once for each height when the mempool is not empty txsAvailable chan struct{} notifiedTxsAvailable bool @@ -112,15 +115,17 @@ func NewTxMempool( ) *TxMempool { txmp := &TxMempool{ - logger: logger, - config: cfg, - proxyAppConn: proxyAppConn, - height: -1, - cache: NopTxCache{}, - metrics: NopMetrics(), - txStore: NewTxStore(), - gossipIndex: clist.New(), - priorityIndex: NewTxPriorityQueue(), + logger: logger, + checkTxCh: make(chan *checkTxPayload, 10000), + checkTxWorkers: 16, + config: cfg, + proxyAppConn: proxyAppConn, + height: -1, + cache: NopTxCache{}, + metrics: NopMetrics(), + txStore: NewTxStore(), + gossipIndex: clist.New(), + priorityIndex: NewTxPriorityQueue(), heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { return wtx1.height >= wtx2.height }), @@ -140,6 +145,8 @@ func NewTxMempool( opt(txmp) } + go txmp.listenForCheckTx() + return txmp } @@ -251,6 +258,60 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } +type checkTxPayload struct { + ctx context.Context + tx types.Tx + cb func(*abci.ResponseCheckTx) + txInfo TxInfo +} + +func (txmp *TxMempool) listenForCheckTx() { + for i := 0; i < txmp.checkTxWorkers; i++ { + go func(w int) { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case payload := <-txmp.checkTxCh: + ticker.Reset(1 * time.Second) + if err := txmp.checkTx(payload.ctx, payload.tx, payload.cb, payload.txInfo); err != nil { + // if not a duplicate tx, print the error. + if !errors.Is(err, types.ErrTxInCache) { + txmp.logger.Error("failed to check tx", "err", err) + } + } + case <-ticker.C: + // log when worker hasn't seen anything for 1 second. + txmp.logger.Info("mempool checkTx worker has no work", "worker", w) + } + } + }(i) + } +} + +func (txmp *TxMempool) CheckTx( + ctx context.Context, + tx types.Tx, + cb func(*abci.ResponseCheckTx), + txInfo TxInfo, +) error { + + if txSize := len(tx); txSize > txmp.config.MaxTxBytes { + return types.ErrTxTooLarge{ + Max: txmp.config.MaxTxBytes, + Actual: txSize, + } + } + + payload := &checkTxPayload{ + ctx: ctx, + tx: tx, + cb: cb, + txInfo: txInfo, + } + txmp.checkTxCh <- payload + return nil +} + // CheckTx executes the ABCI CheckTx method for a given transaction. // It acquires a read-lock and attempts to execute the application's // CheckTx ABCI method synchronously. We return an error if any of @@ -272,7 +333,7 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} { // NOTE: // - The applications' CheckTx implementation may panic. // - The caller is not to explicitly require any locks for executing CheckTx. -func (txmp *TxMempool) CheckTx( +func (txmp *TxMempool) checkTx( ctx context.Context, tx types.Tx, cb func(*abci.ResponseCheckTx), @@ -281,13 +342,6 @@ func (txmp *TxMempool) CheckTx( txmp.mtx.RLock() defer txmp.mtx.RUnlock() - if txSize := len(tx); txSize > txmp.config.MaxTxBytes { - return types.ErrTxTooLarge{ - Max: txmp.config.MaxTxBytes, - Actual: txSize, - } - } - if txmp.preCheck != nil { if err := txmp.preCheck(tx); err != nil { return types.ErrPreCheck{Reason: err}