diff --git a/web/api/sector/sector.go b/web/api/sector/sector.go index 16771715d..bac4bc82e 100644 --- a/web/api/sector/sector.go +++ b/web/api/sector/sector.go @@ -11,14 +11,19 @@ import ( "github.com/docker/go-units" "github.com/gorilla/mux" "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" "github.com/samber/lo" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin" + miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner" "github.com/filecoin-project/go-state-types/builtin/v9/market" "github.com/filecoin-project/curio/deps" @@ -26,10 +31,10 @@ import ( "github.com/filecoin-project/curio/web/api/apihelper" "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/cli/spcli" ) const verifiedPowerGainMul = 9 @@ -73,23 +78,16 @@ func (c *cfg) terminateSectors(w http.ResponseWriter, r *http.Request) { toDel[m] = append(toDel[m], sec{Sector: abi.SectorNumber(s.Sector), Terminate: false}) } - del, err := c.shouldTerminate(r.Context(), toDel) + // We should context.Background to avoid cancellation due to page reload or other possible scenarios + ctx := context.Background() + err := c.terminate(ctx, toDel) apihelper.OrHTTPFail(w, err) - for m, sectorList := range del { - mi, err := c.Chain.StateMinerInfo(r.Context(), m.Addr, types.EmptyTSK) - apihelper.OrHTTPFail(w, err) - var term []int - for _, s := range sectorList { - if s.Terminate { - term = append(term, int(s.Sector)) - } - } - _, err = spcli.TerminateSectors(r.Context(), c.Chain, m.Addr, term, mi.Worker) - apihelper.OrHTTPFail(w, err) + // Remove sectors + for m, sectorList := range toDel { for _, s := range sectorList { id := abi.SectorID{Miner: m.ID, Number: s.Sector} - apihelper.OrHTTPFail(w, c.removeSector(r.Context(), id)) + apihelper.OrHTTPFail(w, c.removeSector(ctx, id)) } } } @@ -440,27 +438,13 @@ func (c *cfg) shouldTerminate(ctx context.Context, smap map[minerDetail][]sec) ( return nil, err } - list := bitfield.New() - - if err := mas.ForEachDeadline(func(dlIdx uint64, dl miner.Deadline) error { - return dl.ForEachPartition(func(partIdx uint64, part miner.Partition) error { - live, err := part.LiveSectors() - if err != nil { - return err - } - - list, err = bitfield.SubtractBitField(list, live) - if err != nil { - return err - } - return err - }) - }); err != nil { - return nil, err + liveSectors, err := miner.AllPartSectors(mas, miner.Partition.LiveSectors) + if err != nil { + return nil, fmt.Errorf("getting live sector sets for miner %s: %w", m, err) } for i := range sectors { - ok, err := list.IsSet(uint64(sectors[i].Sector)) + ok, err := liveSectors.IsSet(uint64(sectors[i].Sector)) if err != nil { return nil, err } @@ -503,3 +487,100 @@ func (c *cfg) removeSector(ctx context.Context, sector abi.SectorID) error { return err } + +const batchSize = 100 + +func (c *cfg) terminate(ctx context.Context, toDel map[minerDetail][]sec) error { + del, err := c.shouldTerminate(ctx, toDel) + if err != nil { + return err + } + + var msgs []cid.Cid + + for m, sectorNumbers := range del { + maddr := m.Addr + mi, err := c.Chain.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return err + } + + var terminationDeclarationParams []miner2.TerminationDeclaration + + // Get Deadline/Partition for all sectors. + for _, sector := range sectorNumbers { + sectorNum := sector.Sector + sectorbit := bitfield.New() + sectorbit.Set(uint64(sectorNum)) + + loca, err := c.Chain.StateSectorPartition(ctx, maddr, sectorNum, types.EmptyTSK) + if err != nil { + return fmt.Errorf("get state sector partition %s", err) + } + + para := miner2.TerminationDeclaration{ + Deadline: loca.Deadline, + Partition: loca.Partition, + Sectors: sectorbit, + } + + terminationDeclarationParams = append(terminationDeclarationParams, para) + } + + // Batch message for batchSize + var batches [][]miner2.TerminationDeclaration + for i := 0; i < len(terminationDeclarationParams); i += batchSize { + batch := terminationDeclarationParams[i:min(i+batchSize, len(terminationDeclarationParams))] + batches = append(batches, batch) + } + + // Send messages for all batches + for _, batch := range batches { + terminateSectorParams := &miner2.TerminateSectorsParams{ + Terminations: batch, + } + + sp, errA := actors.SerializeParams(terminateSectorParams) + if errA != nil { + return xerrors.Errorf("serializing params: %w", errA) + } + + smsg, err := c.Chain.MpoolPushMessage(ctx, &types.Message{ + From: mi.Worker, + To: maddr, + Method: builtin.MethodsMiner.TerminateSectors, + + Value: big.Zero(), + Params: sp, + }, nil) + if err != nil { + return xerrors.Errorf("mpool push message: %w", err) + } + + msgs = append(msgs, smsg.Cid()) + + log.Infof("sent termination message: %s", smsg.Cid()) + } + } + + // wait for msgs to get mined into a block for all minerID + eg := errgroup.Group{} + eg.SetLimit(10) + for _, msg := range msgs { + m := msg + eg.Go(func() error { + wait, err := c.Chain.StateWaitMsg(ctx, m, 2, 2000, true) + if err != nil { + log.Errorf("timeout waiting for message to land on chain %s", wait.Message) + return fmt.Errorf("timeout waiting for message to land on chain %s", wait.Message) + } + + if wait.Receipt.ExitCode.IsError() { + log.Errorf("failed to execute message %s: %d", wait.Message, wait.Receipt.ExitCode) + return fmt.Errorf("failed to execute message %s: %w", wait.Message, wait.Receipt.ExitCode) + } + return nil + }) + } + return eg.Wait() +}