From e6222c8fd126c9df67c9d2ec3a8868f00ebe6584 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Fri, 22 Dec 2023 15:30:44 -0500 Subject: [PATCH 1/2] remove legacy orphans background --- nodedb.go | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/nodedb.go b/nodedb.go index d446b7c1d..0ff768a7b 100644 --- a/nodedb.go +++ b/nodedb.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "time" "cosmossdk.io/log" dbm "github.com/cosmos/cosmos-db" @@ -470,9 +471,38 @@ func (ndb *nodeDB) deleteLegacyVersions() error { ndb.legacyLatestVersion = -1 // Delete all orphan nodes of the legacy versions - return ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error { - return ndb.batch.Delete(key) - }) + go func() { + if err := ndb.deleteOrphans(); err != nil { + ndb.logger.Error("failed to clean legacy orphans", "err", err) + } + }() + + return nil +} + +// deleteOrphans cleans all legacy orphans from the nodeDB. +func (ndb *nodeDB) deleteOrphans() error { + itr, err := dbm.IteratePrefix(ndb.db, legacyOrphanKeyFormat.Key()) + if err != nil { + return err + } + defer itr.Close() + + count := 0 + for ; itr.Valid(); itr.Next() { + if err := ndb.batch.Delete(itr.Key()); err != nil { + return err + } + + // Sleep for a while to avoid blocking the main thread i/o. + count++ + if count > 1000 { + count = 0 + time.Sleep(100 * time.Millisecond) + } + } + + return nil } // DeleteVersionsFrom permanently deletes all tree versions from the given version upwards. From 2a02f66ebf1f0ab91de7a6e99edad49b630ce569 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Fri, 22 Dec 2023 15:46:43 -0500 Subject: [PATCH 2/2] add mutex --- batch.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/batch.go b/batch.go index b25e879a2..a7e5e20b0 100644 --- a/batch.go +++ b/batch.go @@ -1,6 +1,8 @@ package iavl import ( + "sync" + dbm "github.com/cosmos/cosmos-db" ) @@ -11,6 +13,7 @@ type BatchWithFlusher struct { db dbm.DB // This is only used to create new batch batch dbm.Batch // Batched writing buffer. + mtx sync.Mutex flushThreshold int // The threshold to flush the batch to disk. } @@ -46,6 +49,9 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i // the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold. // The addition entry is then added to the batch. func (b *BatchWithFlusher) Set(key, value []byte) error { + b.mtx.Lock() + defer b.mtx.Unlock() + batchSizeAfter, err := b.estimateSizeAfterSetting(key, value) if err != nil { return err @@ -67,6 +73,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error { // the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold. // The deletion entry is then added to the batch. func (b *BatchWithFlusher) Delete(key []byte) error { + b.mtx.Lock() + defer b.mtx.Unlock() + batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{}) if err != nil { return err