diff --git a/harmony/harmonydb/sql/20240611-snap-pipeline.sql b/harmony/harmonydb/sql/20240611-snap-pipeline.sql index bfcd3e994..8da4bb80c 100644 --- a/harmony/harmonydb/sql/20240611-snap-pipeline.sql +++ b/harmony/harmonydb/sql/20240611-snap-pipeline.sql @@ -39,6 +39,14 @@ CREATE TABLE sectors_snap_pipeline ( task_id_move_storage BIGINT, after_move_storage BOOLEAN NOT NULL DEFAULT FALSE, + -- fail + -- added in 20240809-snap-failures.sql + -- Failure handling + -- failed bool not null default false, + -- failed_at timestamp with timezone, + -- failed_reason varchar(20) not null default '', + -- failed_reason_msg text not null default '', + FOREIGN KEY (sp_id, sector_number) REFERENCES sectors_meta (sp_id, sector_num), PRIMARY KEY (sp_id, sector_number) ); diff --git a/harmony/harmonydb/sql/20240809-snap-failures.sql b/harmony/harmonydb/sql/20240809-snap-failures.sql new file mode 100644 index 000000000..352786be3 --- /dev/null +++ b/harmony/harmonydb/sql/20240809-snap-failures.sql @@ -0,0 +1,11 @@ +ALTER TABLE sectors_snap_pipeline + ADD COLUMN failed BOOLEAN NOT NULL DEFAULT FALSE; + +ALTER TABLE sectors_snap_pipeline + ADD COLUMN failed_at TIMESTAMP WITH TIME ZONE; + +ALTER TABLE sectors_snap_pipeline + ADD COLUMN failed_reason VARCHAR(20) NOT NULL DEFAULT ''; + +ALTER TABLE sectors_snap_pipeline + ADD COLUMN failed_reason_msg TEXT NOT NULL DEFAULT ''; diff --git a/market/deal_ingest_seal.go b/market/deal_ingest_seal.go index 2c6fd393a..039965cee 100644 --- a/market/deal_ingest_seal.go +++ b/market/deal_ingest_seal.go @@ -45,6 +45,7 @@ type PieceIngesterApi interface { StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error) StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) } type openSector struct { diff --git a/market/deal_ingest_snap.go b/market/deal_ingest_snap.go index 6a4a6ef0d..6d3b4b0c6 100644 --- a/market/deal_ingest_snap.go +++ b/market/deal_ingest_snap.go @@ -236,6 +236,11 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add piece.PublishCid = nil } + head, err := p.api.ChainHead(ctx) + if err != nil { + return api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err) + } + var maxExpiration int64 vd.isVerified = piece.PieceActivationManifest.VerifiedAllocationKey != nil if vd.isVerified { @@ -253,7 +258,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add vd.tmin = alloc.TermMin vd.tmax = alloc.TermMax - maxExpiration = int64(piece.DealSchedule.EndEpoch + alloc.TermMax) + maxExpiration = int64(head.Height() + alloc.TermMax) } propJson, err = json.Marshal(piece.PieceActivationManifest) if err != nil { @@ -309,16 +314,47 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add } if len(candidates) == 0 { - return false, xerrors.Errorf("no suitable sectors found") + minEpoch := piece.DealSchedule.EndEpoch + maxEpoch := abi.ChainEpoch(maxExpiration) + + minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay + maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay + + return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays) } // todo - nice to have: - // * double check the sector expiration // * check sector liveness // * check deadline mutable candidate := candidates[0] // this one works best + si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("getting sector info: %w", err) + } + + sectorLifeTime := si.Expiration - head.Height() + if sectorLifeTime < 0 { + return false, xerrors.Errorf("sector lifetime is negative!?") + } + if piece.DealSchedule.EndEpoch > si.Expiration { + return false, xerrors.Errorf("sector expiration is too soon: %d < %d", si.Expiration, piece.DealSchedule.EndEpoch) + } + if maxExpiration != 0 && si.Expiration > abi.ChainEpoch(maxExpiration) { + return false, xerrors.Errorf("sector expiration is too late: %d > %d", si.Expiration, maxExpiration) + } + + // info log detailing EVERYTHING including all the epoch bounds + log.Infow("allocating piece to sector", + "sector", candidate.Sector, + "expiration", si.Expiration, + "sectorLifeTime", sectorLifeTime, + "dealStartEpoch", piece.DealSchedule.StartEpoch, + "dealEndEpoch", piece.DealSchedule.EndEpoch, + "maxExpiration", maxExpiration, + ) + _, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, p.mid, candidate.Sector, 0, piece.PieceActivationManifest.CID, piece.PieceActivationManifest.Size, @@ -358,6 +394,11 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece var allocated bool var rerr error + head, err := p.api.ChainHead(ctx) + if err != nil { + return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err) + } + comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { openSectors, err := p.getOpenSectors(tx) if err != nil { @@ -368,14 +409,23 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece sec := sec if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) { if vd.isVerified { - sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch + si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK) + if err != nil { + log.Errorw("getting sector info", "error", err, "sector", sec.number, "miner", p.miner) + continue + } + + sectorLifeTime := si.Expiration - head.Height() + if sectorLifeTime < 0 { + log.Errorw("sector lifetime is negative", "sector", sec.number, "miner", p.miner, "lifetime", sectorLifeTime) + continue + } + // Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more // Based on https://github.com/filecoin-project/builtin-actors/blob/a0e34d22665ac8c84f02fea8a099216f29ffaeeb/actors/verifreg/src/lib.rs#L1071-L1086 if sectorLifeTime <= vd.tmin && sectorLifeTime >= vd.tmax { continue } - - // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TODO ADD SNAP SECTOR EXP CHECKS } ret.Sector = sec.number diff --git a/tasks/seal/poller_commit_msg.go b/tasks/seal/poller_commit_msg.go index d53b4e8de..d1a6c8d7e 100644 --- a/tasks/seal/poller_commit_msg.go +++ b/tasks/seal/poller_commit_msg.go @@ -50,7 +50,9 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err } if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok { - return s.pollCommitMsgFail(ctx, task, execResult[0]) + if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil { + return err + } } si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) @@ -78,13 +80,25 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err return nil } -func (s *SealPoller) pollCommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error { +func (s *SealPoller) pollCommitMsgFail(ctx context.Context, maddr address.Address, task pollTask, execResult dbExecResult) error { switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) { case exitcode.SysErrInsufficientFunds: fallthrough case exitcode.SysErrOutOfGas: // just retry return s.pollRetryCommitMsgSend(ctx, task, execResult) + case exitcode.ErrNotFound: + // message not found, but maybe it's fine? + + si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("get sector info: %w", err) + } + if si != nil { + return nil + } + + return xerrors.Errorf("sector not found after, commit message can't be found either") default: return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode)) } diff --git a/tasks/seal/task_submit_commit.go b/tasks/seal/task_submit_commit.go index 5211a33fe..aa4b82519 100644 --- a/tasks/seal/task_submit_commit.go +++ b/tasks/seal/task_submit_commit.go @@ -217,7 +217,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) if err != nil { return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) } - err = AllocationCheck(ctx, s.api, pam, pci, abi.ActorID(sectorParams.SpID), ts) + _, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts) if err != nil { return false, err } @@ -428,41 +428,44 @@ type AllocNodeApi interface { StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error) } -func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, precomitInfo *miner.SectorPreCommitOnChainInfo, miner abi.ActorID, ts *types.TipSet) error { +func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, expiration abi.ChainEpoch, miner abi.ActorID, ts *types.TipSet) (permanent bool, err error) { // skip pieces not claiming an allocation if piece.VerifiedAllocationKey == nil { - return nil + return false, nil } addr, err := address.NewIDAddress(uint64(piece.VerifiedAllocationKey.Client)) if err != nil { - return err + return false, err } alloc, err := api.StateGetAllocation(ctx, addr, verifregtypes9.AllocationId(piece.VerifiedAllocationKey.ID), ts.Key()) if err != nil { - return err + return false, err } if alloc == nil { - return xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID) + return true, xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID) } if alloc.Provider != miner { - return xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider) + return true, xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider) } if alloc.Size != piece.Size { - return xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size) + return true, xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size) } - if precomitInfo == nil { - return nil - } - if precomitInfo.Info.Expiration < ts.Height()+alloc.TermMin { - return xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMin, piece.CID.String()) + if expiration < ts.Height()+alloc.TermMin { + tooLittleBy := ts.Height() + alloc.TermMin - expiration + + return true, xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s (should be at least %d epochs more)", expiration, ts.Height()+alloc.TermMin, piece.CID.String(), tooLittleBy) } - if precomitInfo.Info.Expiration > ts.Height()+alloc.TermMax { - return xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMax, piece.CID.String()) + if expiration > ts.Height()+alloc.TermMax { + tooMuchBy := expiration - (ts.Height() + alloc.TermMax) + + return true, xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s (should be at least %d epochs less)", expiration, ts.Height()+alloc.TermMax, piece.CID.String(), tooMuchBy) } - return nil + log.Infow("allocation check details", "piece", piece.CID.String(), "client", alloc.Client, "provider", alloc.Provider, "size", alloc.Size, "term_min", alloc.TermMin, "term_max", alloc.TermMax, "sector_expiration", expiration) + + return false, nil } var _ harmonytask.TaskInterface = &SubmitCommitTask{} diff --git a/tasks/snap/task_submit.go b/tasks/snap/task_submit.go index 7aa5ca46a..76efee46f 100644 --- a/tasks/snap/task_submit.go +++ b/tasks/snap/task_submit.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -129,9 +130,10 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done var pieces []struct { Manifest json.RawMessage `db:"direct_piece_activation_manifest"` Size int64 `db:"piece_size"` + Start int64 `db:"direct_start_epoch"` } err = s.db.Select(ctx, &pieces, ` - SELECT direct_piece_activation_manifest, piece_size + SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch FROM sectors_snap_initial_pieces WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber) if err != nil { @@ -143,23 +145,55 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting chain head: %w", err) } + maddr, err := address.NewIDAddress(uint64(update.SpID)) + if err != nil { + return false, xerrors.Errorf("parsing miner address: %w", err) + } + + snum := abi.SectorNumber(update.SectorNumber) + + onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) + if err != nil { + return false, xerrors.Errorf("getting sector info: %w", err) + } + if onChainInfo == nil { + return false, xerrors.Errorf("sector not found on chain") + } + var pams []miner.PieceActivationManifest var weight, weightVerif = big.Zero(), big.Zero() + var minStart abi.ChainEpoch for _, piece := range pieces { var pam *miner.PieceActivationManifest err = json.Unmarshal(piece.Manifest, &pam) if err != nil { return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) } - err = seal.AllocationCheck(ctx, s.api, pam, nil, abi.ActorID(update.SpID), ts) + unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts) if err != nil { + if unrecoverable { + _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1, + task_id_submit = NULL, after_submit = FALSE + WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) + + log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err) + return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + } + return false, err } + pieceWeight := big.Mul(abi.NewStoragePower(piece.Size), big.NewInt(int64(onChainInfo.Expiration-ts.Height()))) + if pam.VerifiedAllocationKey != nil { - weightVerif = big.Add(weightVerif, abi.NewStoragePower(piece.Size)) + weightVerif = big.Add(weightVerif, pieceWeight) } else { - weight = big.Add(weight, abi.NewStoragePower(piece.Size)) + weight = big.Add(weight, pieceWeight) + } + + if minStart == 0 || abi.ChainEpoch(piece.Start) < minStart { + minStart = abi.ChainEpoch(piece.Start) } pams = append(pams, *pam) @@ -170,13 +204,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("parsing new sealed cid: %w", err) } - maddr, err := address.NewIDAddress(uint64(update.SpID)) - if err != nil { - return false, xerrors.Errorf("parsing miner address: %w", err) - } - - snum := abi.SectorNumber(update.SectorNumber) - sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK) if err != nil { return false, xerrors.Errorf("getting sector location: %w", err) @@ -210,14 +237,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting miner info: %w", err) } - onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting sector info: %w", err) - } - if onChainInfo == nil { - return false, xerrors.Errorf("sector not found on chain") - } - ssize, err := onChainInfo.SealProof.SectorSize() if err != nil { return false, xerrors.Errorf("getting sector size: %w", err) @@ -255,7 +274,18 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done mcid, err := s.sender.Send(ctx, msg, mss, "update") if err != nil { - return false, xerrors.Errorf("pushing message to mpool: %w", err) + if minStart != 0 && ts.Height() > minStart { + _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + failed = TRUE, failed_at = NOW(), failed_reason = 'start-expired', failed_reason_msg = $1, + task_id_submit = NULL, after_submit = FALSE + WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) + + log.Errorw("failed to push message to mpool (beyond deal start epoch)", "sp", update.SpID, "sector", update.SectorNumber, "err", err) + + return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2)) + } + + return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err) } _, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID) @@ -364,7 +394,7 @@ func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskF SectorNumber int64 `db:"sector_number"` } - err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`) + err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE failed = FALSE AND after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`) if err != nil { return false, xerrors.Errorf("getting tasks: %w", err) } diff --git a/web/api/webrpc/upgrade.go b/web/api/webrpc/upgrade.go index e0261826b..109d2ae76 100644 --- a/web/api/webrpc/upgrade.go +++ b/web/api/webrpc/upgrade.go @@ -19,11 +19,15 @@ type UpgradeSector struct { TaskIDMoveStorage *uint64 `db:"task_id_move_storage"` AfterMoveStorage bool `db:"after_move_storage"` + + Failed bool `db:"failed"` + FailedReason string `db:"failed_reason"` + FailedMsg string `db:"failed_reason_msg"` } func (a *WebRPC) UpgradeSectors(ctx context.Context) ([]UpgradeSector, error) { sectors := []UpgradeSector{} - err := a.deps.DB.Select(ctx, §ors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage FROM sectors_snap_pipeline`) + err := a.deps.DB.Select(ctx, §ors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage, failed, failed_reason, failed_reason_msg FROM sectors_snap_pipeline`) if err != nil { return nil, err } diff --git a/web/static/snap/upgrade-sectors.mjs b/web/static/snap/upgrade-sectors.mjs index e17e03204..4f88ba4ae 100644 --- a/web/static/snap/upgrade-sectors.mjs +++ b/web/static/snap/upgrade-sectors.mjs @@ -34,6 +34,7 @@ class UpgradeSectors extends LitElement {
FAILED
${entry.FailedReason}
` : 'Healthy'}