diff --git a/api/api.go b/api/api.go index 5bdda5c09..327fb1866 100644 --- a/api/api.go +++ b/api/api.go @@ -46,7 +46,8 @@ type Boost interface { BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error) //perm:read // MethodGroup: PieceDirectory - PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin + PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin + PdRemoveDealForPiece(ctx context.Context, piececid cid.Cid, dealID string) error //perm:admin // MethodGroup: Misc OnlineBackup(context.Context, string) error //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 019692161..372c6ee3b 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -67,6 +67,8 @@ type BoostStruct struct { OnlineBackup func(p0 context.Context, p1 string) error `perm:"admin"` PdBuildIndexForPieceCid func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` + + PdRemoveDealForPiece func(p0 context.Context, p1 cid.Cid, p2 string) error `perm:"admin"` } } @@ -393,6 +395,17 @@ func (s *BoostStub) PdBuildIndexForPieceCid(p0 context.Context, p1 cid.Cid) erro return ErrNotSupported } +func (s *BoostStruct) PdRemoveDealForPiece(p0 context.Context, p1 cid.Cid, p2 string) error { + if s.Internal.PdRemoveDealForPiece == nil { + return ErrNotSupported + } + return s.Internal.PdRemoveDealForPiece(p0, p1, p2) +} + +func (s *BoostStub) PdRemoveDealForPiece(p0 context.Context, p1 cid.Cid, p2 string) error { + return ErrNotSupported +} + func (s *ChainIOStruct) ChainHasObj(p0 context.Context, p1 cid.Cid) (bool, error) { if s.Internal.ChainHasObj == nil { return false, ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index c44c6cf82..f354288ca 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/cmd/boostd/piecedir.go b/cmd/boostd/piecedir.go index 08854b6aa..f508af37d 100644 --- a/cmd/boostd/piecedir.go +++ b/cmd/boostd/piecedir.go @@ -7,6 +7,7 @@ import ( bcli "github.com/filecoin-project/boost/cli" lcli "github.com/filecoin-project/lotus/cli" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" ) @@ -17,6 +18,7 @@ var pieceDirCmd = &cli.Command{ Subcommands: []*cli.Command{ pdIndexGenerate, recoverCmd, + removeDealCmd, }, } @@ -56,3 +58,54 @@ var pdIndexGenerate = &cli.Command{ return nil }, } + +var removeDealCmd = &cli.Command{ + Name: "remove-deal", + Usage: "Removes a deal from piece metadata in LID. If the specified deal is the only one in piece metadata, index and metadata are also removed", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + + ctx := lcli.ReqContext(cctx) + + if cctx.Args().Len() > 2 { + return fmt.Errorf("must specify piece CID and deal UUID/Proposal CID") + } + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + // parse piececid + piececid, err := cid.Decode(cctx.Args().Get(0)) + if err != nil { + return fmt.Errorf("parsing piece CID: %w", err) + } + + id := cctx.Args().Get(1) + + // Parse to avoid sending garbage data to API + dealUuid, err := uuid.Parse(id) + if err != nil { + propCid, err := cid.Decode(id) + if err != nil { + return fmt.Errorf("could not parse '%s' as deal uuid or proposal cid", id) + } + err = napi.PdRemoveDealForPiece(ctx, piececid, propCid.String()) + if err != nil { + return err + } + fmt.Printf("Deal %s removed for piece %s\n", propCid, piececid) + return nil + } + + err = napi.PdRemoveDealForPiece(ctx, piececid, dealUuid.String()) + if err != nil { + return err + } + fmt.Printf("Deal %s removed for piece %s\n", dealUuid, piececid) + return nil + + }, +} diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 033e7d633..065025d50 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -55,6 +55,7 @@ * [OnlineBackup](#onlinebackup) * [Pd](#pd) * [PdBuildIndexForPieceCid](#pdbuildindexforpiececid) + * [PdRemoveDealForPiece](#pdremovedealforpiece) ## @@ -1244,3 +1245,20 @@ Inputs: Response: `{}` +### PdRemoveDealForPiece + + +Perms: admin + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "string value" +] +``` + +Response: `{}` + diff --git a/lib/pdcleaner/pdcleaner.go b/lib/pdcleaner/pdcleaner.go new file mode 100644 index 000000000..4efca4d56 --- /dev/null +++ b/lib/pdcleaner/pdcleaner.go @@ -0,0 +1,365 @@ +package pdcleaner + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/lib/legacy" + "github.com/filecoin-project/boost/node/config" + "github.com/filecoin-project/boost/piecedirectory" + "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" + "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/builtin/verifreg" + cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log/v2" + "go.uber.org/fx" + "golang.org/x/net/context" +) + +var log = logging.Logger("pdcleaner") + +type PieceDirectoryCleanup interface { + Start(ctx context.Context) + CleanOnce() error +} + +type pdcleaner struct { + ctx context.Context + miner address.Address + dealsDB *db.DealsDB + directDealsDB *db.DirectDealsDB + legacyDeals legacy.LegacyDealManager + pd *piecedirectory.PieceDirectory + full v1api.FullNode + startOnce sync.Once + lk sync.Mutex + cleanupInterval time.Duration +} + +func NewPieceDirectoryCleaner(cfg *config.Boost) func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup { + return func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup { + + // Don't start cleanup loop if duration is '0s' + if time.Duration(cfg.LocalIndexDirectory.LidCleanupInterval).Seconds() == 0 { + return nil + } + + pdc := newPDC(dealsDB, directDealsDB, legacyDeals, pd, full, time.Duration(cfg.LocalIndexDirectory.LidCleanupInterval)) + + cctx, cancel := context.WithCancel(context.Background()) + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + mid, err := address.NewFromString(cfg.Wallets.Miner) + if err != nil { + return fmt.Errorf("failed to parse the miner ID %s: %w", cfg.Wallets.Miner, err) + } + pdc.miner = mid + go pdc.Start(cctx) + return nil + }, + OnStop: func(ctx context.Context) error { + cancel() + return nil + }, + }) + + return pdc + + } +} + +func newPDC(dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode, cleanupInterval time.Duration) *pdcleaner { + return &pdcleaner{ + dealsDB: dealsDB, + directDealsDB: directDealsDB, + legacyDeals: legacyDeals, + pd: pd, + full: full, + cleanupInterval: cleanupInterval, + } +} + +func (p *pdcleaner) Start(ctx context.Context) { + p.startOnce.Do(func() { + p.ctx = ctx + go p.clean() + }) + +} + +func (p *pdcleaner) clean() { + // Create a ticker with an hour tick + ticker := time.NewTicker(p.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + log.Infof("Starting LID clean up") + err := p.CleanOnce() + if err != nil { + log.Errorf("Failed to cleanup LID: %s", err) + continue + } + log.Debugf("Finished cleaning up LID") + case <-p.ctx.Done(): + return + } + } +} + +// CleanOnce generates a list of all Expired-Boost, Legacy and Direct deals. It then attempts to clean up these deals. +// It also generated a list of all pieces in LID and tries to find any pieceMetadata with no deals in Boost, Direct or Legacy DB. +// If such a deal is found, it is cleaned up as well +func (p *pdcleaner) CleanOnce() error { + p.lk.Lock() + defer p.lk.Unlock() + + head, err := p.full.ChainHead(p.ctx) + if err != nil { + return fmt.Errorf("getting chain head: %w", err) + } + tskey := head.Key() + deals, err := p.full.StateMarketDeals(p.ctx, tskey) + if err != nil { + return fmt.Errorf("getting market deals: %w", err) + } + + boostCompleteDeals, err := p.dealsDB.ListCompleted(p.ctx) + if err != nil { + return fmt.Errorf("getting complete boost deals: %w", err) + } + boostActiveDeals, err := p.dealsDB.ListActive(p.ctx) + if err != nil { + return fmt.Errorf("getting active boost deals: %w", err) + } + + boostDeals := make([]*types.ProviderDealState, 0, len(boostActiveDeals)+len(boostCompleteDeals)) + + boostDeals = append(boostDeals, boostCompleteDeals...) + boostDeals = append(boostDeals, boostActiveDeals...) + + legacyDeals, err := p.legacyDeals.ListDeals() + if err != nil { + return fmt.Errorf("getting legacy deals: %w", err) + } + completeDirectDeals, err := p.directDealsDB.ListCompleted(p.ctx) + if err != nil { + return fmt.Errorf("getting complete direct deals: %w", err) + } + + // Clean up completed/slashed Boost deals + for _, d := range boostDeals { + // Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up + if d.ChainDealID > abi.DealID(0) { + // If deal exists online + md, ok := deals[strconv.FormatInt(int64(d.ChainDealID), 10)] + if ok { + // If deal is slashed or end epoch has passed. No other reason for deal to reach termination + // Same is true for verified deals. We rely on EndEpoch/SlashEpoch for verified deals created by f05 + toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch) + if toCheck < head.Height() { + err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.DealUuid.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up boost deal %s for piece %s: %s", d.DealUuid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) + } + } + } + } + } + + // Clean up completed/slashed legacy deals + for _, d := range legacyDeals { + // Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up + if d.DealID > abi.DealID(0) { + // If deal exists online + md, ok := deals[strconv.FormatInt(int64(d.DealID), 10)] + if ok { + // If deal is slashed or end epoch has passed. No other reason for deal to reach termination + toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch) + if toCheck < head.Height() { + err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.ProposalCid.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up legacy deal %s for piece %s: %s", d.ProposalCid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) + } + } + } + } + } + + // Clean up direct deals if there are any otherwise skip this step + if len(completeDirectDeals) > 0 { + claims, err := p.full.StateGetClaims(p.ctx, p.miner, tskey) + if err != nil { + return fmt.Errorf("getting claims for the miner %s: %w", p.miner, err) + } + + // Loading miner actor locally is preferred to avoid getting unnecessary data from full.StateMinerActiveSectors() + mActor, err := p.full.StateGetActor(p.ctx, p.miner, tskey) + if err != nil { + return fmt.Errorf("getting actor for the miner %s: %w", p.miner, err) + } + store := adt.WrapStore(p.ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(p.full))) + mas, err := miner.Load(store, mActor) + if err != nil { + return fmt.Errorf("loading miner actor state %s: %w", p.miner, err) + } + activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors) + if err != nil { + return fmt.Errorf("getting active sector sets for miner %s: %w", p.miner, err) + } + unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + if err != nil { + return fmt.Errorf("getting unproven sector sets for miner %s: %w", p.miner, err) + } + finalSectors, err := bitfield.MergeBitFields(activeSectors, unProvenSectors) + if err != nil { + return fmt.Errorf("merging bitfields to generate all deal sectors on miner %s: %w", p.miner, err) + } + + // Load verifreg actor locally + verifregActor, err := p.full.StateGetActor(p.ctx, verifreg.Address, tskey) + if err != nil { + return fmt.Errorf("getting verified registry actor state: %w", err) + } + verifregState, err := verifreg.Load(store, verifregActor) + if err != nil { + return fmt.Errorf("loading verified registry actor state: %w", err) + } + + for _, d := range completeDirectDeals { + cID := verifregtypes.ClaimId(d.AllocationID) + c, ok := claims[cID] + // If claim found + if ok { + // Claim Sector number and Deal Sector number should match(regardless of how DDO works) + // If they don't match and older sector is removed, then we can't use the metadata + // This check can be removed once Curio has resealing enabled, and it can provide + // new replacement sector details to Boost before deal reached "Complete" state. + if c.Sector != d.SectorID { + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + continue + } + present, err := finalSectors.IsSet(uint64(c.Sector)) + if err != nil { + return fmt.Errorf("checking if bitfield is set: %w", err) + } + // Each claim is created with ProveCommit message. So, a sector in claim cannot be unproven. + // it must be either Active(Proving, Faulty, Recovering) or terminated. If bitfield is not set + // then sector must have been terminated. This method will also account for future change in sector numbers + // of a claim. Even if the sector is changed then it must be Active as this change will require a + // ProveCommit message. + if !present { + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + } + continue + } + + // If no claim found + alloc, ok, err := verifregState.GetAllocation(d.Client, d.AllocationID) + if err != nil { + return fmt.Errorf("getting allocation %d for client %s: %w", d.AllocationID, d.Client, err) + } + if !ok || alloc.Expiration < head.Height() { + // If allocation is expired, clean up the deal. If the allocation does not exist anymore. + // Either it was claimed and then claim was cleaned up after TermMax + // or allocation expired before it could be claimed and was cleaned up + // Deal should be cleaned up in either case + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + continue + } + + if alloc.Expiration < head.Height() { + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + continue + } + } + } + + // Clean up dangling LID deals with no Boost, Direct or Legacy deals attached to them + plist, err := p.pd.ListPieces(p.ctx) + if err != nil { + return fmt.Errorf("getting piece list from LID: %w", err) + } + + for _, piece := range plist { + pdeals, err := p.pd.GetPieceDeals(p.ctx, piece) + if err != nil { + return fmt.Errorf("getting piece deals from LID: %w", err) + } + for _, deal := range pdeals { + // Remove only if the miner ID matches to avoid removing for other miners in case of shared LID + if deal.MinerAddr == p.miner { + + bd, err := p.dealsDB.ByPieceCID(p.ctx, piece) + if err != nil { + return err + } + if len(bd) > 0 { + continue + } + + ld, err := p.legacyDeals.ByPieceCid(p.ctx, piece) + if err != nil { + return err + } + if len(ld) > 0 { + continue + } + + dd, err := p.directDealsDB.ByPieceCID(p.ctx, piece) + if err != nil { + return err + } + if len(dd) > 0 { + continue + } + + err = p.pd.RemoveDealForPiece(p.ctx, piece, deal.DealUuid) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error()) + } + } + } + } + + return nil +} + +func termOrSlash(term, slash abi.ChainEpoch) abi.ChainEpoch { + if term > slash && slash > 0 { + return slash + } + + return term +} diff --git a/lib/pdcleaner/pdcleaner_test.go b/lib/pdcleaner/pdcleaner_test.go new file mode 100644 index 000000000..bad5d52df --- /dev/null +++ b/lib/pdcleaner/pdcleaner_test.go @@ -0,0 +1,192 @@ +package pdcleaner + +import ( + "context" + "fmt" + "os" + "strconv" + "testing" + + bdb "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/db/migrations" + "github.com/filecoin-project/boost/extern/boostd-data/client" + "github.com/filecoin-project/boost/extern/boostd-data/model" + "github.com/filecoin-project/boost/extern/boostd-data/svc" + mocks_legacy "github.com/filecoin-project/boost/lib/legacy/mocks" + "github.com/filecoin-project/boost/piecedirectory" + "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + lotusmocks "github.com/filecoin-project/lotus/api/mocks" + test "github.com/filecoin-project/lotus/chain/events/state/mock" + chaintypes "github.com/filecoin-project/lotus/chain/types" + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/stretchr/testify/require" +) + +func TestPieceDirectoryCleaner(t *testing.T) { + ctx := context.Background() + + sqldb := bdb.CreateTestTmpDB(t) + require.NoError(t, bdb.CreateAllBoostTables(ctx, sqldb, sqldb)) + require.NoError(t, migrations.Migrate(sqldb)) + + dealsDB := bdb.NewDealsDB(sqldb) + directDB := bdb.NewDirectDealsDB(sqldb) + + bdsvc, err := svc.NewLevelDB("") + require.NoError(t, err) + ln, err := bdsvc.Start(ctx, "localhost:0") + require.NoError(t, err) + + cl := client.NewStore() + err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln)) + require.NoError(t, err) + defer cl.Close(ctx) + + pieceCount := 5 + readers := make(map[abi.SectorNumber]car.SectionReader) + for i := 0; i < pieceCount; i++ { + // Create a random CAR file + _, carFilePath := piecedirectory.CreateCarFile(t, i+1) + carFile, err := os.Open(carFilePath) + require.NoError(t, err) + defer carFile.Close() + + carReader, err := car.OpenReader(carFilePath) + require.NoError(t, err) + defer carReader.Close() + carv1Reader, err := carReader.DataReader() + require.NoError(t, err) + + readers[abi.SectorNumber(i+1)] = carv1Reader + } + + // Any calls to get a reader over data should return a reader over the random CAR file + pr := piecedirectory.CreateMockPieceReaders(t, readers) + + pm := piecedirectory.NewPieceDirectory(cl, pr, 1) + pm.Start(ctx) + + type dealData struct { + sector abi.SectorNumber + chainDealID abi.DealID + piece cid.Cid + used bool + } + + deals, err := bdb.GenerateDeals() + require.NoError(t, err) + + // Create and update a map to keep track of chainDealID and UUID bindings + dealMap := make(map[uuid.UUID]*dealData) + for _, deal := range deals { + dealMap[deal.DealUuid] = &dealData{chainDealID: deal.ChainDealID, used: false} + } + + // Add deals to LID and note down details to update SQL DB + for sectorNumber, reader := range readers { + pieceCid := piecedirectory.CalculateCommp(t, reader).PieceCID + + var uid uuid.UUID + var cdid abi.DealID + + for id, data := range dealMap { + // If this value from deals list has not be used + if !data.used { + uid = id // Use the UUID from deals list + cdid = data.chainDealID + data.used = true + data.sector = sectorNumber // Use the sector number from deals list + data.piece = pieceCid + break + } + } + + // Add deal info for each piece + di := model.DealInfo{ + DealUuid: uid.String(), + ChainDealID: cdid, + SectorID: sectorNumber, + PieceOffset: 0, + PieceLength: 0, + } + err := pm.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + } + + // Setup Full node, legacy manager + ctrl := gomock.NewController(t) + fn := lotusmocks.NewMockFullNode(ctrl) + legacyProv := mocks_legacy.NewMockLegacyDealManager(ctrl) + provAddr, err := address.NewIDAddress(1523) + require.NoError(t, err) + + // Start a new PieceDirectoryCleaner + pdc := newPDC(dealsDB, directDB, legacyProv, pm, fn, 1) + pdc.ctx = ctx + + chainHead, err := test.MockTipset(provAddr, 1) + require.NoError(t, err) + chainHeadFn := func(ctx context.Context) (*chaintypes.TipSet, error) { + return chainHead, nil + } + + // Add deals to SQL DB + cDealMap := make(map[string]*api.MarketDeal) + for i, deal := range deals { + data, ok := dealMap[deal.DealUuid] + require.True(t, ok) + deal.SectorID = data.sector + deal.ClientDealProposal.Proposal.PieceCID = data.piece + deal.ClientDealProposal.Proposal.EndEpoch = 3 // because chain head is always 5 + deal.Checkpoint = dealcheckpoints.Complete + p, err := deal.SignedProposalCid() + require.NoError(t, err) + t.Logf("signed p %s", p.String()) + // Test a slashed deal + if i == 0 { + deal.Checkpoint = dealcheckpoints.Accepted + deal.ClientDealProposal.Proposal.EndEpoch = 6 + cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{ + Proposal: deal.ClientDealProposal.Proposal, + State: api.MarketDealState{ + SlashEpoch: 3, // Slash this deal + }, + } + err = dealsDB.Insert(ctx, &deal) + require.NoError(t, err) + continue + } + cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{ + Proposal: deal.ClientDealProposal.Proposal, + State: api.MarketDealState{ + SlashEpoch: -1, + }, + } + err = dealsDB.Insert(ctx, &deal) + require.NoError(t, err) + } + + // Confirm we have 5 pieces in LID + pl, err := pm.ListPieces(ctx) + require.NoError(t, err) + require.Len(t, pl, 5) + + fn.EXPECT().ChainHead(gomock.Any()).DoAndReturn(chainHeadFn).AnyTimes() + fn.EXPECT().StateMarketDeals(gomock.Any(), gomock.Any()).Return(cDealMap, nil).AnyTimes() + legacyProv.EXPECT().ListDeals().Return(nil, nil).AnyTimes() + legacyProv.EXPECT().ByPieceCid(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + err = pdc.CleanOnce() + require.NoError(t, err) + + // Confirm we have 0 pieces in LID after clean up + pl, err = pm.ListPieces(ctx) + require.NoError(t, err) + require.Len(t, pl, 0) +} diff --git a/node/builder.go b/node/builder.go index 4ae61cde6..152511d1d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/boost/indexprovider" "github.com/filecoin-project/boost/lib/legacy" "github.com/filecoin-project/boost/lib/mpoolmonitor" + "github.com/filecoin-project/boost/lib/pdcleaner" "github.com/filecoin-project/boost/markets/idxprov" "github.com/filecoin-project/boost/markets/storageadapter" "github.com/filecoin-project/boost/node/config" @@ -555,6 +556,7 @@ func ConfigBoost(cfg *config.Boost) Option { Override(new(sealer.StorageAuth), lotus_modules.StorageAuthWithURL(cfg.SectorIndexApiInfo)), Override(new(*backupmgr.BackupMgr), modules.NewOnlineBackupMgr(cfg)), + Override(new(pdcleaner.PieceDirectoryCleanup), pdcleaner.NewPieceDirectoryCleaner(cfg)), // Dynamic Boost configs Override(new(dtypes.ConsiderOnlineStorageDealsConfigFunc), modules.NewConsiderOnlineStorageDealsConfigFunc), diff --git a/node/config/def.go b/node/config/def.go index 817e7411f..7348db15c 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -87,6 +87,7 @@ func DefaultBoost() *Boost { ServiceApiInfo: "", ServiceRPCTimeout: Duration(15 * time.Minute), EnablePieceDoctor: true, + LidCleanupInterval: Duration(6 * time.Hour), }, ContractDeals: ContractDealsConfig{ diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index b846e018a..73861abee 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -662,6 +662,14 @@ Set this value to "" if the local index directory data service is embedded.`, Comment: `PieceDoctor runs a continuous background process to check each piece in LID for retrievability`, }, + { + Name: "LidCleanupInterval", + Type: "Duration", + + Comment: `Interval at which LID clean up job should rerun. The cleanup entails removing indices and metadata +for the expired/slashed deals. Disabled if set to '0s'. Please DO NOT set a value lower than 6 hours +as this task consumes considerable resources and time`, + }, }, "LocalIndexDirectoryLeveldbConfig": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index d6717ce39..872f4f90f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -289,6 +289,10 @@ type LocalIndexDirectoryConfig struct { ServiceRPCTimeout Duration // PieceDoctor runs a continuous background process to check each piece in LID for retrievability EnablePieceDoctor bool + // Interval at which LID clean up job should rerun. The cleanup entails removing indices and metadata + // for the expired/slashed deals. Disabled if set to '0s'. Please DO NOT set a value lower than 6 hours + // as this task consumes considerable resources and time + LidCleanupInterval Duration } type LocalIndexDirectoryLeveldbConfig struct { diff --git a/node/impl/boost.go b/node/impl/boost.go index 0e23c3215..305b833de 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -213,3 +213,11 @@ func (sm *BoostAPI) PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Ci func (sm *BoostAPI) OnlineBackup(ctx context.Context, dstDir string) error { return sm.Bkp.Backup(ctx, dstDir) } + +func (sm *BoostAPI) PdRemoveDealForPiece(ctx context.Context, piececid cid.Cid, dealID string) error { + ctx, span := tracing.Tracer.Start(ctx, "Boost.PdRemoveDealForPiece") + span.SetAttributes(attribute.String("piececid", piececid.String())) + defer span.End() + + return sm.Pd.RemoveDealForPiece(ctx, piececid, dealID) +} diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index ed7e758f5..947f252a4 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -156,6 +156,12 @@ func (ps *PieceDirectory) PiecesCount(ctx context.Context, maddr address.Address return ps.store.PiecesCount(ctx, maddr) } +func (ps *PieceDirectory) ListPieces(ctx context.Context) ([]cid.Cid, error) { + defer func(start time.Time) { log.Debugw("piece directory ; PiecesList span", "took", time.Since(start)) }(time.Now()) + + return ps.store.ListPieces(ctx) +} + func (ps *PieceDirectory) ScanProgress(ctx context.Context, maddr address.Address) (*bdtypes.ScanProgress, error) { defer func(start time.Time) { log.Debugw("piece directory ; ScanProgress span", "took", time.Since(start)) }(time.Now())