Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 71 additions & 17 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type TxMempool struct {
config *config.MempoolConfig
proxyAppConn abciclient.Client

checkTxCh chan *checkTxPayload
checkTxWorkers int

// txsAvailable fires once for each height when the mempool is not empty
txsAvailable chan struct{}
notifiedTxsAvailable bool
Expand Down Expand Up @@ -112,15 +115,17 @@ func NewTxMempool(
) *TxMempool {

txmp := &TxMempool{
logger: logger,
config: cfg,
proxyAppConn: proxyAppConn,
height: -1,
cache: NopTxCache{},
metrics: NopMetrics(),
txStore: NewTxStore(),
gossipIndex: clist.New(),
priorityIndex: NewTxPriorityQueue(),
logger: logger,
checkTxCh: make(chan *checkTxPayload, 10000),
checkTxWorkers: 16,
config: cfg,
proxyAppConn: proxyAppConn,
height: -1,
cache: NopTxCache{},
metrics: NopMetrics(),
txStore: NewTxStore(),
gossipIndex: clist.New(),
priorityIndex: NewTxPriorityQueue(),
heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
}),
Expand All @@ -140,6 +145,8 @@ func NewTxMempool(
opt(txmp)
}

go txmp.listenForCheckTx()

return txmp
}

Expand Down Expand Up @@ -251,6 +258,60 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
return txmp.txsAvailable
}

type checkTxPayload struct {
ctx context.Context
tx types.Tx
cb func(*abci.ResponseCheckTx)
txInfo TxInfo
}

func (txmp *TxMempool) listenForCheckTx() {
for i := 0; i < txmp.checkTxWorkers; i++ {
go func(w int) {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case payload := <-txmp.checkTxCh:
ticker.Reset(1 * time.Second)
if err := txmp.checkTx(payload.ctx, payload.tx, payload.cb, payload.txInfo); err != nil {
// if not a duplicate tx, print the error.
if !errors.Is(err, types.ErrTxInCache) {
txmp.logger.Error("failed to check tx", "err", err)
}
}
case <-ticker.C:
// log when worker hasn't seen anything for 1 second.
txmp.logger.Info("mempool checkTx worker has no work", "worker", w)
}
}
}(i)
}
}

func (txmp *TxMempool) CheckTx(
ctx context.Context,
tx types.Tx,
cb func(*abci.ResponseCheckTx),
txInfo TxInfo,
) error {

if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
return types.ErrTxTooLarge{
Max: txmp.config.MaxTxBytes,
Actual: txSize,
}
}

payload := &checkTxPayload{
ctx: ctx,
tx: tx,
cb: cb,
txInfo: txInfo,
}
txmp.checkTxCh <- payload
return nil
}

// CheckTx executes the ABCI CheckTx method for a given transaction.
// It acquires a read-lock and attempts to execute the application's
// CheckTx ABCI method synchronously. We return an error if any of
Expand All @@ -272,7 +333,7 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} {
// NOTE:
// - The applications' CheckTx implementation may panic.
// - The caller is not to explicitly require any locks for executing CheckTx.
func (txmp *TxMempool) CheckTx(
func (txmp *TxMempool) checkTx(
ctx context.Context,
tx types.Tx,
cb func(*abci.ResponseCheckTx),
Expand All @@ -281,13 +342,6 @@ func (txmp *TxMempool) CheckTx(
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

if txSize := len(tx); txSize > txmp.config.MaxTxBytes {
return types.ErrTxTooLarge{
Max: txmp.config.MaxTxBytes,
Actual: txSize,
}
}

if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
return types.ErrPreCheck{Reason: err}
Expand Down
Loading