Skip to content

Commit

Permalink
refactor pdcleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Mar 7, 2024
1 parent b30e28d commit d290160
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 163 deletions.
203 changes: 70 additions & 133 deletions lib/pdcleaner/pdcleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pdcleaner

import (
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -14,12 +13,11 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/verifreg"
chaintypes "github.com/filecoin-project/lotus/chain/types"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"
Expand All @@ -31,6 +29,7 @@ var log = logging.Logger("pdcleaner")
type PieceDirectoryCleanup interface {
Start(ctx context.Context)
CleanOnce() error
getActiveUnprovenSectors(tskey chaintypes.TipSetKey) (bitfield.BitField, error)
}

type pdcleaner struct {
Expand All @@ -44,6 +43,7 @@ type pdcleaner struct {
startOnce sync.Once
lk sync.Mutex
cleanupInterval time.Duration
testSetup map[bool][]abi.SectorNumber
}

func NewPieceDirectoryCleaner(cfg *config.Boost) func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup {
Expand Down Expand Up @@ -131,10 +131,6 @@ func (p *pdcleaner) CleanOnce() error {
return fmt.Errorf("getting chain head: %w", err)
}
tskey := head.Key()
deals, err := p.full.StateMarketDeals(p.ctx, tskey)
if err != nil {
return fmt.Errorf("getting market deals: %w", err)
}

boostCompleteDeals, err := p.dealsDB.ListCompleted(p.ctx)
if err != nil {
Expand All @@ -159,142 +155,50 @@ func (p *pdcleaner) CleanOnce() error {
return fmt.Errorf("getting complete direct deals: %w", err)
}

// Clean up completed/slashed Boost deals
finalSectors, err := p.getActiveUnprovenSectors(tskey)
if err != nil {
return err
}

// Clean up Boost deals where sector does not exist anymore
for _, d := range boostDeals {
// Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up
if d.ChainDealID > abi.DealID(0) {
// If deal exists online
md, ok := deals[strconv.FormatInt(int64(d.ChainDealID), 10)]
if ok {
// If deal is slashed or end epoch has passed. No other reason for deal to reach termination
// Same is true for verified deals. We rely on EndEpoch/SlashEpoch for verified deals created by f05
toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch)
if toCheck < head.Height() {
err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.DealUuid.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up boost deal %s for piece %s: %s", d.DealUuid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
}
}
present, err := finalSectors.IsSet(uint64(d.SectorID))
if err != nil {
return fmt.Errorf("checking if bitfield is set: %w", err)
}
if !present {
err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.DealUuid.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up boost deal %s for piece %s: %s", d.DealUuid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
}
}
}

// Clean up completed/slashed legacy deals
// Clean up legacy deals where sector does not exist anymore
for _, d := range legacyDeals {
// Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up
if d.DealID > abi.DealID(0) {
// If deal exists online
md, ok := deals[strconv.FormatInt(int64(d.DealID), 10)]
if ok {
// If deal is slashed or end epoch has passed. No other reason for deal to reach termination
toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch)
if toCheck < head.Height() {
err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.ProposalCid.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up legacy deal %s for piece %s: %s", d.ProposalCid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
}
}
present, err := finalSectors.IsSet(uint64(d.SectorNumber))
if err != nil {
return fmt.Errorf("checking if bitfield is set: %w", err)
}
if !present {
err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.ProposalCid.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up legacy deal %s for piece %s: %s", d.ProposalCid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error())
}
}
}

// Clean up direct deals if there are any otherwise skip this step
// Clean up Direct deals where sector does not exist anymore
// TODO: Refactor for Curio sealing redundancy
if len(completeDirectDeals) > 0 {
claims, err := p.full.StateGetClaims(p.ctx, p.miner, tskey)
if err != nil {
return fmt.Errorf("getting claims for the miner %s: %w", p.miner, err)
}

// Loading miner actor locally is preferred to avoid getting unnecessary data from full.StateMinerActiveSectors()
mActor, err := p.full.StateGetActor(p.ctx, p.miner, tskey)
if err != nil {
return fmt.Errorf("getting actor for the miner %s: %w", p.miner, err)
}
store := adt.WrapStore(p.ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(p.full)))
mas, err := miner.Load(store, mActor)
if err != nil {
return fmt.Errorf("loading miner actor state %s: %w", p.miner, err)
}
activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors)
if err != nil {
return fmt.Errorf("getting active sector sets for miner %s: %w", p.miner, err)
}
unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors)
if err != nil {
return fmt.Errorf("getting unproven sector sets for miner %s: %w", p.miner, err)
}
finalSectors, err := bitfield.MergeBitFields(activeSectors, unProvenSectors)
if err != nil {
return fmt.Errorf("merging bitfields to generate all deal sectors on miner %s: %w", p.miner, err)
}

// Load verifreg actor locally
verifregActor, err := p.full.StateGetActor(p.ctx, verifreg.Address, tskey)
if err != nil {
return fmt.Errorf("getting verified registry actor state: %w", err)
}
verifregState, err := verifreg.Load(store, verifregActor)
if err != nil {
return fmt.Errorf("loading verified registry actor state: %w", err)
}

for _, d := range completeDirectDeals {
cID := verifregtypes.ClaimId(d.AllocationID)
c, ok := claims[cID]
// If claim found
if ok {
// Claim Sector number and Deal Sector number should match(regardless of how DDO works)
// If they don't match and older sector is removed, then we can't use the metadata
// This check can be removed once Curio has resealing enabled, and it can provide
// new replacement sector details to Boost before deal reached "Complete" state.
if c.Sector != d.SectorID {
err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error())
}
continue
}
present, err := finalSectors.IsSet(uint64(c.Sector))
if err != nil {
return fmt.Errorf("checking if bitfield is set: %w", err)
}
// Each claim is created with ProveCommit message. So, a sector in claim cannot be unproven.
// it must be either Active(Proving, Faulty, Recovering) or terminated. If bitfield is not set
// then sector must have been terminated. This method will also account for future change in sector numbers
// of a claim. Even if the sector is changed then it must be Active as this change will require a
// ProveCommit message.
if !present {
err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error())
}
}
continue
}

// If no claim found
alloc, ok, err := verifregState.GetAllocation(d.Client, d.AllocationID)
present, err := finalSectors.IsSet(uint64(d.SectorID))
if err != nil {
return fmt.Errorf("getting allocation %d for client %s: %w", d.AllocationID, d.Client, err)
return fmt.Errorf("checking if bitfield is set: %w", err)
}
if !ok || alloc.Expiration < head.Height() {
// If allocation is expired, clean up the deal. If the allocation does not exist anymore.
// Either it was claimed and then claim was cleaned up after TermMax
// or allocation expired before it could be claimed and was cleaned up
// Deal should be cleaned up in either case
err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error())
}
continue
}

if alloc.Expiration < head.Height() {
if !present {
err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String())
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
Expand Down Expand Up @@ -356,10 +260,43 @@ func (p *pdcleaner) CleanOnce() error {
return nil
}

func termOrSlash(term, slash abi.ChainEpoch) abi.ChainEpoch {
if term > slash && slash > 0 {
return slash
func (p *pdcleaner) getActiveUnprovenSectors(tskey chaintypes.TipSetKey) (bitfield.BitField, error) {
// Test output for func
if len(p.testSetup) > 0 {
testData, ok := p.testSetup[true]
if !ok {
return bitfield.BitField{}, fmt.Errorf("test data not true")
}
out := bitfield.New()
for i, s := range testData {
if i > 3 {
out.Set(uint64(s))
}
}
return out, nil
}

mActor, err := p.full.StateGetActor(p.ctx, p.miner, tskey)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("getting actor for the miner %s: %w", p.miner, err)
}

return term
store := adt.WrapStore(p.ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(p.full)))
mas, err := miner.Load(store, mActor)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("loading miner actor state %s: %w", p.miner, err)
}
activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("getting active sector sets for miner %s: %w", p.miner, err)
}
unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("getting unproven sector sets for miner %s: %w", p.miner, err)
}
finalSectors, err := bitfield.MergeBitFields(activeSectors, unProvenSectors)
if err != nil {
return bitfield.BitField{}, fmt.Errorf("merging bitfields to generate all sealed sectors on miner %s: %w", p.miner, err)
}
return finalSectors, nil
}
53 changes: 23 additions & 30 deletions lib/pdcleaner/pdcleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"strconv"
"testing"

bdb "github.com/filecoin-project/boost/db"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
lotusmocks "github.com/filecoin-project/lotus/api/mocks"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
chaintypes "github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -129,6 +127,13 @@ func TestPieceDirectoryCleaner(t *testing.T) {
// Start a new PieceDirectoryCleaner
pdc := newPDC(dealsDB, directDB, legacyProv, pm, fn, 1)
pdc.ctx = ctx
var sectorNums []abi.SectorNumber
for _, d := range dealMap {
sectorNums = append(sectorNums, d.sector)
}
testData := make(map[bool][]abi.SectorNumber)
testData[true] = sectorNums
pdc.testSetup = testData

chainHead, err := test.MockTipset(provAddr, 1)
require.NoError(t, err)
Expand All @@ -137,37 +142,13 @@ func TestPieceDirectoryCleaner(t *testing.T) {
}

// Add deals to SQL DB
cDealMap := make(map[string]*api.MarketDeal)
for i, deal := range deals {
for _, deal := range deals {
data, ok := dealMap[deal.DealUuid]
require.True(t, ok)
deal.SectorID = data.sector
deal.ClientDealProposal.Proposal.PieceCID = data.piece
deal.ClientDealProposal.Proposal.EndEpoch = 3 // because chain head is always 5
deal.Checkpoint = dealcheckpoints.Complete
p, err := deal.SignedProposalCid()
require.NoError(t, err)
t.Logf("signed p %s", p.String())
// Test a slashed deal
if i == 0 {
deal.Checkpoint = dealcheckpoints.Accepted
deal.ClientDealProposal.Proposal.EndEpoch = 6
cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{
Proposal: deal.ClientDealProposal.Proposal,
State: api.MarketDealState{
SlashEpoch: 3, // Slash this deal
},
}
err = dealsDB.Insert(ctx, &deal)
require.NoError(t, err)
continue
}
cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{
Proposal: deal.ClientDealProposal.Proposal,
State: api.MarketDealState{
SlashEpoch: -1,
},
}
err = dealsDB.Insert(ctx, &deal)
require.NoError(t, err)
}
Expand All @@ -177,16 +158,28 @@ func TestPieceDirectoryCleaner(t *testing.T) {
require.NoError(t, err)
require.Len(t, pl, 5)

ac := chaintypes.ActorV5{
Address: nil,
Code: cid.MustParse("bafk2bzacedo75pabe4i2l3hvhtsjmijrcytd2y76xwe573uku25fi7sugqld6"),
Head: cid.MustParse("bafy2bzaceddnxeoeyarr4iogahxtgckd4y5jq4xpx66sk4ulb2a6a45wquklo"),
Nonce: 0,
Balance: abi.NewTokenAmount(int64(5347369358335063414)),
}

objstr := "j9gqWCcAAXGg5AIgnNa9WH0Bqjm/t4d/rNXuAoAgr3dO/oRkISmLDkKKRmtASgALDpa14/gUSyDYKlgnAAFxoOQCIInM1H8WA5BAqlYy0Cyecdq/ul0/Fs6OUAon1c2+HglDQEsAAQ2S407ePdPgDtgqWCcAAXGg5AIgGP5qzGGjo2sMNzxKOo6mS4Er8sqbUoBQkJx41AhVigzYKlgnAAFxoOQCIAGktx8yjDs1A+KosYKeC2psSl0Msu5THqIVFoqjp1gt2CpYJwABcaDkAiB7LaMaRW6fA4I79wGR0qYpYtKzHAihqzrLYlCbFKTuHNgqWCcAAXGg5AIgs2f3ZwRIxoC7iKmoMzoBAYk0Iii3GndePNxcx4LgOKMaADixtxLYKlgnAAFxoOQCIFLQ9lLyEu3HCE3IDAg+djoEpRN5eLuB2AI2PWnNjAIMQPU="
obj := []byte(objstr)

fn.EXPECT().ChainHead(gomock.Any()).DoAndReturn(chainHeadFn).AnyTimes()
fn.EXPECT().StateMarketDeals(gomock.Any(), gomock.Any()).Return(cDealMap, nil).AnyTimes()
fn.EXPECT().StateGetActor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ac, nil).AnyTimes()
fn.EXPECT().ChainReadObj(gomock.Any(), gomock.Any()).Return(obj, nil).AnyTimes()
legacyProv.EXPECT().ListDeals().Return(nil, nil).AnyTimes()
legacyProv.EXPECT().ByPieceCid(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

err = pdc.CleanOnce()
require.NoError(t, err)

// Confirm we have 0 pieces in LID after clean up
// Confirm we have 1 piece in LID after clean up
pl, err = pm.ListPieces(ctx)
require.NoError(t, err)
require.Len(t, pl, 0)
require.Len(t, pl, 1)
}

0 comments on commit d290160

Please sign in to comment.