From 7eea8f1f671231238ec0fee212b4be946b0ca781 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 27 Feb 2025 14:25:20 -0500 Subject: [PATCH 1/4] try parallelization on mempool check tx --- internal/mempool/mempool.go | 62 +++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 10 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index bff52a0ed..82e45d57d 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -33,6 +33,9 @@ type TxMempool struct { config *config.MempoolConfig proxyAppConn abciclient.Client + checkTxCh chan *checkTxPayload + checkTxWorkers int + // txsAvailable fires once for each height when the mempool is not empty txsAvailable chan struct{} notifiedTxsAvailable bool @@ -112,15 +115,17 @@ func NewTxMempool( ) *TxMempool { txmp := &TxMempool{ - logger: logger, - config: cfg, - proxyAppConn: proxyAppConn, - height: -1, - cache: NopTxCache{}, - metrics: NopMetrics(), - txStore: NewTxStore(), - gossipIndex: clist.New(), - priorityIndex: NewTxPriorityQueue(), + logger: logger, + checkTxCh: make(chan *checkTxPayload, 10000), + checkTxWorkers: 10, + config: cfg, + proxyAppConn: proxyAppConn, + height: -1, + cache: NopTxCache{}, + metrics: NopMetrics(), + txStore: NewTxStore(), + gossipIndex: clist.New(), + priorityIndex: NewTxPriorityQueue(), heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { return wtx1.height >= wtx2.height }), @@ -140,6 +145,8 @@ func NewTxMempool( opt(txmp) } + go txmp.listenForCheckTx() + return txmp } @@ -251,6 +258,41 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } +type checkTxPayload struct { + ctx context.Context + tx types.Tx + cb func(*abci.ResponseCheckTx) + txInfo TxInfo +} + +func (txmp *TxMempool) listenForCheckTx() { + for i := 0; i < txmp.checkTxWorkers; i++ { + go func() { + for payload := range txmp.checkTxCh { + if err := txmp.checkTx(payload.ctx, payload.tx, payload.cb, payload.txInfo); err != nil { + txmp.logger.Error("failed to check tx", "err", err) + } + } + }() + } +} + +func (txmp *TxMempool) CheckTx( + ctx context.Context, + tx types.Tx, + cb func(*abci.ResponseCheckTx), + txInfo TxInfo, +) error { + payload := &checkTxPayload{ + ctx: ctx, + tx: tx, + cb: cb, + txInfo: txInfo, + } + txmp.checkTxCh <- payload + return nil +} + // CheckTx executes the ABCI CheckTx method for a given transaction. // It acquires a read-lock and attempts to execute the application's // CheckTx ABCI method synchronously. We return an error if any of @@ -272,7 +314,7 @@ func (txmp *TxMempool) TxsAvailable() <-chan struct{} { // NOTE: // - The applications' CheckTx implementation may panic. // - The caller is not to explicitly require any locks for executing CheckTx. -func (txmp *TxMempool) CheckTx( +func (txmp *TxMempool) checkTx( ctx context.Context, tx types.Tx, cb func(*abci.ResponseCheckTx), From 74ea2caa4d55c5fd3073e4aaa000f64b0f86f99c Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 27 Feb 2025 15:44:06 -0500 Subject: [PATCH 2/4] move the duplicate logic out --- internal/mempool/mempool.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 82e45d57d..9bd2ce195 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -283,6 +283,22 @@ func (txmp *TxMempool) CheckTx( cb func(*abci.ResponseCheckTx), txInfo TxInfo, ) error { + + if txSize := len(tx); txSize > txmp.config.MaxTxBytes { + return types.ErrTxTooLarge{ + Max: txmp.config.MaxTxBytes, + Actual: txSize, + } + } + + // We add the transaction to the mempool's cache and if the + // transaction is already present in the cache, i.e. false is returned, then we + // check if we've seen this transaction and error if we have. + if !txmp.cache.Push(tx) { + txmp.txStore.GetOrSetPeerByTxHash(tx.Key(), txInfo.SenderID) + return types.ErrTxInCache + } + payload := &checkTxPayload{ ctx: ctx, tx: tx, @@ -323,13 +339,6 @@ func (txmp *TxMempool) checkTx( txmp.mtx.RLock() defer txmp.mtx.RUnlock() - if txSize := len(tx); txSize > txmp.config.MaxTxBytes { - return types.ErrTxTooLarge{ - Max: txmp.config.MaxTxBytes, - Actual: txSize, - } - } - if txmp.preCheck != nil { if err := txmp.preCheck(tx); err != nil { return types.ErrPreCheck{Reason: err} @@ -342,14 +351,6 @@ func (txmp *TxMempool) checkTx( txHash := tx.Key() - // We add the transaction to the mempool's cache and if the - // transaction is already present in the cache, i.e. false is returned, then we - // check if we've seen this transaction and error if we have. - if !txmp.cache.Push(tx) { - txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) - return types.ErrTxInCache - } - res, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx}) // when a transaction is removed/expired/rejected, this should be called From 93df298448211be77d8f3ee75d0b5e2421d04d26 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 27 Feb 2025 15:54:18 -0500 Subject: [PATCH 3/4] move cache check to checktx --- internal/mempool/mempool.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 9bd2ce195..1123e96fd 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -270,7 +270,10 @@ func (txmp *TxMempool) listenForCheckTx() { go func() { for payload := range txmp.checkTxCh { if err := txmp.checkTx(payload.ctx, payload.tx, payload.cb, payload.txInfo); err != nil { - txmp.logger.Error("failed to check tx", "err", err) + // if not a duplicate tx, print the error. + if !errors.Is(err, types.ErrTxInCache) { + txmp.logger.Error("failed to check tx", "err", err) + } } } }() @@ -291,14 +294,6 @@ func (txmp *TxMempool) CheckTx( } } - // We add the transaction to the mempool's cache and if the - // transaction is already present in the cache, i.e. false is returned, then we - // check if we've seen this transaction and error if we have. - if !txmp.cache.Push(tx) { - txmp.txStore.GetOrSetPeerByTxHash(tx.Key(), txInfo.SenderID) - return types.ErrTxInCache - } - payload := &checkTxPayload{ ctx: ctx, tx: tx, @@ -351,6 +346,14 @@ func (txmp *TxMempool) checkTx( txHash := tx.Key() + // We add the transaction to the mempool's cache and if the + // transaction is already present in the cache, i.e. false is returned, then we + // check if we've seen this transaction and error if we have. + if !txmp.cache.Push(tx) { + txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) + return types.ErrTxInCache + } + res, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx}) // when a transaction is removed/expired/rejected, this should be called From 5941e3fcddcfd57006f0cf66b7afce1ce3d38de3 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 27 Feb 2025 16:05:12 -0500 Subject: [PATCH 4/4] use 16 workers and show worker dry log --- internal/mempool/mempool.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 1123e96fd..4d14b23a2 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -117,7 +117,7 @@ func NewTxMempool( txmp := &TxMempool{ logger: logger, checkTxCh: make(chan *checkTxPayload, 10000), - checkTxWorkers: 10, + checkTxWorkers: 16, config: cfg, proxyAppConn: proxyAppConn, height: -1, @@ -267,16 +267,24 @@ type checkTxPayload struct { func (txmp *TxMempool) listenForCheckTx() { for i := 0; i < txmp.checkTxWorkers; i++ { - go func() { - for payload := range txmp.checkTxCh { - if err := txmp.checkTx(payload.ctx, payload.tx, payload.cb, payload.txInfo); err != nil { - // if not a duplicate tx, print the error. - if !errors.Is(err, types.ErrTxInCache) { - txmp.logger.Error("failed to check tx", "err", err) + go func(w int) { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case payload := <-txmp.checkTxCh: + ticker.Reset(1 * time.Second) + if err := txmp.checkTx(payload.ctx, payload.tx, payload.cb, payload.txInfo); err != nil { + // if not a duplicate tx, print the error. + if !errors.Is(err, types.ErrTxInCache) { + txmp.logger.Error("failed to check tx", "err", err) + } } + case <-ticker.C: + // log when worker hasn't seen anything for 1 second. + txmp.logger.Info("mempool checkTx worker has no work", "worker", w) } } - }() + }(i) } }