diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index 4373d42f7..250a852a5 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -139,6 +139,12 @@ alerts will be triggered for the wallet`, Comment: `Commit batching configuration`, }, + { + Name: "Update", + Type: "UpdateBatchingConfig", + + Comment: `Snap Deals batching configuration`, + }, }, "CurioConfig": { { @@ -227,6 +233,12 @@ alerts will be triggered for the wallet`, Comment: ``, }, + { + Name: "MaxUpdateBatchGasFee", + Type: "BatchFeeConfig", + + Comment: ``, + }, { Name: "MaxTerminateGasFee", Type: "types.FIL", @@ -798,4 +810,24 @@ identifier in the integration page for the service.`, Example: https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX`, }, }, + "UpdateBatchingConfig": { + { + Name: "BaseFeeThreshold", + Type: "types.FIL", + + Comment: `Base fee value below which we should try to send Commit messages immediately`, + }, + { + Name: "Timeout", + Type: "Duration", + + Comment: `Maximum amount of time any given sector in the batch can wait for the batch to accumulate`, + }, + { + Name: "Slack", + Type: "Duration", + + Comment: `Time buffer for forceful batch submission before sectors/deals in batch would start expiring`, + }, + }, } diff --git a/deps/config/types.go b/deps/config/types.go index 497023053..48ebbdfc8 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -30,6 +30,10 @@ func DefaultCurioConfig() *CurioConfig { Base: types.MustParseFIL("0"), PerSector: types.MustParseFIL("0.03"), // enough for 6 agg and 1nFIL base fee }, + MaxUpdateBatchGasFee: BatchFeeConfig{ + Base: types.MustParseFIL("0"), + PerSector: types.MustParseFIL("0.03"), + }, MaxTerminateGasFee: types.MustParseFIL("0.5"), MaxWindowPoStGasFee: types.MustParseFIL("5"), @@ -84,6 +88,11 @@ func DefaultCurioConfig() *CurioConfig { Timeout: Duration(1 * time.Hour), Slack: Duration(1 * time.Hour), }, + Update: UpdateBatchingConfig{ + BaseFeeThreshold: types.MustParseFIL("0.005"), + Timeout: Duration(1 * time.Hour), + Slack: Duration(1 * time.Hour), + }, }, } } @@ -308,6 +317,7 @@ type CurioFees struct { // maxBatchFee = maxBase + maxPerSector * nSectors MaxPreCommitBatchGasFee BatchFeeConfig MaxCommitBatchGasFee BatchFeeConfig + MaxUpdateBatchGasFee BatchFeeConfig MaxTerminateGasFee types.FIL // WindowPoSt is a high-value operation, so the default fee should be high. @@ -527,6 +537,9 @@ type CurioBatchingConfig struct { // Commit batching configuration Commit CommitBatchingConfig + + // Snap Deals batching configuration + Update UpdateBatchingConfig } type PreCommitBatchingConfig struct { @@ -550,3 +563,14 @@ type CommitBatchingConfig struct { // Time buffer for forceful batch submission before sectors/deals in batch would start expiring Slack Duration } + +type UpdateBatchingConfig struct { + // Base fee value below which we should try to send Commit messages immediately + BaseFeeThreshold types.FIL + + // Maximum amount of time any given sector in the batch can wait for the batch to accumulate + Timeout Duration + + // Time buffer for forceful batch submission before sectors/deals in batch would start expiring + Slack Duration +} diff --git a/documentation/en/configuration/default-curio-configuration.md b/documentation/en/configuration/default-curio-configuration.md index bcc25ce5c..c48fec76d 100644 --- a/documentation/en/configuration/default-curio-configuration.md +++ b/documentation/en/configuration/default-curio-configuration.md @@ -302,6 +302,13 @@ description: The default curio configuration # type: types.FIL #PerSector = "0.03 FIL" + [Fees.MaxUpdateBatchGasFee] + # type: types.FIL + #Base = "0 FIL" + + # type: types.FIL + #PerSector = "0.03 FIL" + [[Addresses]] #PreCommitControl = [] @@ -536,4 +543,20 @@ description: The default curio configuration # type: Duration #Slack = "1h0m0s" + [Batching.Update] + # Base fee value below which we should try to send Commit messages immediately + # + # type: types.FIL + #BaseFeeThreshold = "0.005 FIL" + + # Maximum amount of time any given sector in the batch can wait for the batch to accumulate + # + # type: Duration + #Timeout = "1h0m0s" + + # Time buffer for forceful batch submission before sectors/deals in batch would start expiring + # + # type: Duration + #Slack = "1h0m0s" + ``` diff --git a/harmony/harmonydb/sql/20240611-snap-pipeline.sql b/harmony/harmonydb/sql/20240611-snap-pipeline.sql index f8820546b..6ed7c170c 100644 --- a/harmony/harmonydb/sql/20240611-snap-pipeline.sql +++ b/harmony/harmonydb/sql/20240611-snap-pipeline.sql @@ -26,6 +26,8 @@ CREATE TABLE sectors_snap_pipeline ( task_id_prove BIGINT, after_prove BOOLEAN NOT NULL DEFAULT FALSE, + -- update_ready_at TIMESTAMP, // Added in 20241210-sdr-batching + -- submit prove_msg_cid TEXT, diff --git a/harmony/harmonydb/sql/20241210-sdr-batching.sql b/harmony/harmonydb/sql/20241210-sdr-batching.sql index bfa73d114..3f39a3e08 100644 --- a/harmony/harmonydb/sql/20241210-sdr-batching.sql +++ b/harmony/harmonydb/sql/20241210-sdr-batching.sql @@ -43,4 +43,24 @@ CREATE TRIGGER update_commit_ready_at AFTER INSERT OR UPDATE OR DELETE ON sectors_sdr_pipeline FOR EACH ROW EXECUTE FUNCTION set_commit_ready_at(); +ALTER TABLE sectors_snap_pipeline ADD COLUMN update_ready_at TIMESTAMPTZ; +-- Function to precommit_ready_at value. Used by the trigger +CREATE OR REPLACE FUNCTION set_update_ready_at() +RETURNS TRIGGER AS $$ +BEGIN + -- Check if after_prove column is changing from FALSE to TRUE + IF OLD.after_prove = FALSE AND NEW.after_prove = TRUE THEN + -- Explicitly set update_ready_at to the current UTC timestamp + UPDATE sectors_snap_pipeline SET update_ready_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + WHERE sp_id = NEW.sp_id AND sector_number = NEW.sector_number; + END IF; + + -- Return the modified row + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER update_update_ready_at + AFTER INSERT OR UPDATE OR DELETE ON sectors_snap_pipeline + FOR EACH ROW EXECUTE FUNCTION set_update_ready_at(); diff --git a/tasks/seal/task_submit_commit.go b/tasks/seal/task_submit_commit.go index acd0d22a2..777fb3d3d 100644 --- a/tasks/seal/task_submit_commit.go +++ b/tasks/seal/task_submit_commit.go @@ -274,7 +274,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) } log.Errorw("allocation check failed with an unrecoverable issue", "sp", sectorParams.SpID, "sector", sectorParams.SectorNumber, "err", err) - sectorFailed = false + sectorFailed = true break } } @@ -286,7 +286,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) } if sectorFailed { - break + continue // Skip this sector } ssize, err := pci.Info.SealProof.SectorSize() diff --git a/tasks/snap/task_submit.go b/tasks/snap/task_submit.go index d0c53e7df..95d5255a3 100644 --- a/tasks/snap/task_submit.go +++ b/tasks/snap/task_submit.go @@ -5,11 +5,12 @@ import ( "context" "encoding/json" "fmt" - "math/rand/v2" + "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "go.uber.org/multierr" + "golang.org/x/exp/maps" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -21,6 +22,7 @@ import ( "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" @@ -60,8 +62,16 @@ type SubmitTaskNodeAPI interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) } +type updateBatchingConfig struct { + MaxUpdateBatch int + Slack time.Duration + Timeout time.Duration + BaseFeeThreshold abi.TokenAmount +} + type submitConfig struct { - maxFee types.FIL + batch updateBatchingConfig + feeCfg *config.CurioFees RequireActivationSuccess bool RequireNotificationSuccess bool CollateralFromMinerBalance bool @@ -90,7 +100,13 @@ func NewSubmitTask(db *harmonydb.DB, api SubmitTaskNodeAPI, bstore curiochain.Cu as: as, cfg: submitConfig{ - maxFee: cfg.Fees.MaxCommitGasFee, // todo snap-specific + batch: updateBatchingConfig{ + MaxUpdateBatch: 256, + Slack: time.Duration(cfg.Batching.Update.Slack), + Timeout: time.Duration(cfg.Batching.Update.Timeout), + BaseFeeThreshold: abi.TokenAmount(cfg.Batching.Update.BaseFeeThreshold), + }, + feeCfg: &cfg.Fees, RequireActivationSuccess: cfg.Subsystems.RequireActivationSuccess, RequireNotificationSuccess: cfg.Subsystems.RequireNotificationSuccess, @@ -100,6 +116,11 @@ func NewSubmitTask(db *harmonydb.DB, api SubmitTaskNodeAPI, bstore curiochain.Cu } } +type updateCids struct { + sealed cid.Cid + unsealed cid.Cid +} + func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { var tasks []struct { SpID int64 `db:"sp_id"` @@ -127,23 +148,13 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting sector params: %w", err) } - if len(tasks) != 1 { - return false, xerrors.Errorf("expected 1 sector params, got %d", len(tasks)) + if len(tasks) == 0 { + return false, xerrors.Errorf("expected at least 1 sector params, got 0") } - update := tasks[0] - - var pieces []struct { - Manifest json.RawMessage `db:"direct_piece_activation_manifest"` - Size int64 `db:"piece_size"` - Start int64 `db:"direct_start_epoch"` - } - err = s.db.Select(ctx, &pieces, ` - SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch - FROM sectors_snap_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber) + maddr, err := address.NewIDAddress(uint64(tasks[0].SpID)) if err != nil { - return false, xerrors.Errorf("getting pieces: %w", err) + return false, xerrors.Errorf("getting miner address: %w", err) } ts, err := s.api.ChainHead(ctx) @@ -151,155 +162,202 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting chain head: %w", err) } - maddr, err := address.NewIDAddress(uint64(update.SpID)) + mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) if err != nil { - return false, xerrors.Errorf("parsing miner address: %w", err) + return false, xerrors.Errorf("getting miner info: %w", err) } - snum := abi.SectorNumber(update.SectorNumber) + regProof := tasks[0].RegSealProof + updateProof := tasks[0].UpdateProof - onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting sector info: %w", err) - } - if onChainInfo == nil { - return false, xerrors.Errorf("sector not found on chain") + params := miner.ProveReplicaUpdates3Params{ + AggregateProof: nil, + UpdateProofsType: abi.RegisteredUpdateProof(updateProof), + AggregateProofType: nil, + RequireActivationSuccess: s.cfg.RequireActivationSuccess, + RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, } - sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting sector location: %w", err) - } + collateral := big.Zero() + transferMap := make(map[int64]*updateCids) - // Check that the sector isn't in an immutable deadline (or isn't about to be) - curDl, err := s.api.StateMinerProvingDeadline(ctx, maddr, ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting current proving deadline: %w", err) - } + for _, update := range tasks { + update := update - // Matches actor logic - https://github.com/filecoin-project/builtin-actors/blob/76abc47726bdbd8b478ef10e573c25957c786d1d/actors/miner/src/deadlines.rs#L65 - sectorDl := dline.NewInfo(curDl.PeriodStart, sl.Deadline, curDl.CurrentEpoch, - curDl.WPoStPeriodDeadlines, - curDl.WPoStProvingPeriod, - curDl.WPoStChallengeWindow, - curDl.WPoStChallengeLookback, - curDl.FaultDeclarationCutoff) + // Check miner ID is same for all sectors in batch + tmpMaddr, err := address.NewIDAddress(uint64(update.SpID)) + if err != nil { + return false, xerrors.Errorf("getting miner address: %w", err) + } - sectorDl = sectorDl.NextNotElapsed() - firstImmutableEpoch := sectorDl.Open - curDl.WPoStChallengeWindow - firstUnsafeEpoch := firstImmutableEpoch - ImmutableSubmitGate - lastImmutableEpoch := sectorDl.Close + if maddr != tmpMaddr { + return false, xerrors.Errorf("expected miner IDs to be same (%s) for all sectors in a batch but found %s", maddr.String(), tmpMaddr.String()) + } - if ts.Height() > firstUnsafeEpoch && ts.Height() < lastImmutableEpoch { - closeTime := curiochain.EpochTime(ts, sectorDl.Close) + // Check proof types is same for all sectors in batch + if update.RegSealProof != regProof { + return false, xerrors.Errorf("expected proofs type to be same (%d) for all sectors in a batch but found %d for sector %d of miner %d", regProof, update.RegSealProof, update.SectorNumber, update.SpID) + } - log.Warnw("sector in unsafe window, delaying submit", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl, "close_time", closeTime) + if update.UpdateProof != updateProof { + return false, xerrors.Errorf("expected registered proofs type to be same (%d) for all sectors in a batch but found %d for sector %d of miner %d", updateProof, update.UpdateProof, update.SectorNumber, update.SpID) + } - _, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET - task_id_submit = NULL, after_submit = FALSE, submit_after = $1 - WHERE sp_id = $2 AND sector_number = $3`, closeTime, update.SpID, update.SectorNumber) + var pieces []struct { + Manifest json.RawMessage `db:"direct_piece_activation_manifest"` + Size int64 `db:"piece_size"` + Start int64 `db:"direct_start_epoch"` + } + err = s.db.Select(ctx, &pieces, ` + SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch + FROM sectors_snap_initial_pieces + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber) if err != nil { - return false, xerrors.Errorf("updating sector params: %w", err) + return false, xerrors.Errorf("getting pieces: %w", err) } - return true, nil - } - if ts.Height() >= lastImmutableEpoch { - // the deadline math shouldn't allow this to ever happen, buuut just in case the math is wrong we also check the - // upper bound of the proving window - // (should never happen because if the current epoch is at deadline Close, NextNotElapsed will give us the next deadline) - log.Errorw("sector in somehow past immutable window", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl) - } + snum := abi.SectorNumber(update.SectorNumber) - // Process pieces, prepare PAMs - var pams []miner.PieceActivationManifest - var minStart abi.ChainEpoch - var verifiedSize int64 - for _, piece := range pieces { - var pam *miner.PieceActivationManifest - err = json.Unmarshal(piece.Manifest, &pam) + onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) if err != nil { - return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) + return false, xerrors.Errorf("getting sector info: %w", err) + } + if onChainInfo == nil { + return false, xerrors.Errorf("sector not found on chain") } - unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts) + if onChainInfo.SealProof != abi.RegisteredSealProof(regProof) { + return false, xerrors.Errorf("Proof mismatch between on chain %d and local database %d for sector %d of miner %d", onChainInfo.SealProof, regProof, update.SectorNumber, update.SpID) + } + + sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK) if err != nil { - if unrecoverable { - _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + return false, xerrors.Errorf("getting sector location: %w", err) + } + + // Check that the sector isn't in an immutable deadline (or isn't about to be) + curDl, err := s.api.StateMinerProvingDeadline(ctx, maddr, ts.Key()) + if err != nil { + return false, xerrors.Errorf("getting current proving deadline: %w", err) + } + + // Matches actor logic - https://github.com/filecoin-project/builtin-actors/blob/76abc47726bdbd8b478ef10e573c25957c786d1d/actors/miner/src/deadlines.rs#L65 + sectorDl := dline.NewInfo(curDl.PeriodStart, sl.Deadline, curDl.CurrentEpoch, + curDl.WPoStPeriodDeadlines, + curDl.WPoStProvingPeriod, + curDl.WPoStChallengeWindow, + curDl.WPoStChallengeLookback, + curDl.FaultDeclarationCutoff) + + sectorDl = sectorDl.NextNotElapsed() + firstImmutableEpoch := sectorDl.Open - curDl.WPoStChallengeWindow + firstUnsafeEpoch := firstImmutableEpoch - ImmutableSubmitGate + lastImmutableEpoch := sectorDl.Close + + if ts.Height() > firstUnsafeEpoch && ts.Height() < lastImmutableEpoch { + closeTime := curiochain.EpochTime(ts, sectorDl.Close) + + log.Warnw("sector in unsafe window, delaying submit", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl, "close_time", closeTime) + + _, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + task_id_submit = NULL, after_submit = FALSE, submit_after = $1 + WHERE sp_id = $2 AND sector_number = $3`, closeTime, update.SpID, update.SectorNumber) + if err != nil { + return false, xerrors.Errorf("updating sector params: %w", err) + } + + continue // Skip this sector + } + if ts.Height() >= lastImmutableEpoch { + // the deadline math shouldn't allow this to ever happen, buuut just in case the math is wrong we also check the + // upper bound of the proving window + // (should never happen because if the current epoch is at deadline Close, NextNotElapsed will give us the next deadline) + log.Errorw("sector in somehow past immutable window", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl) + } + + // Process pieces, prepare PAMs + var pams []miner.PieceActivationManifest + var verifiedSize int64 + pieceCheckFailed := false + for _, piece := range pieces { + var pam *miner.PieceActivationManifest + err = json.Unmarshal(piece.Manifest, &pam) + if err != nil { + return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) + } + unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts) + if err != nil { + if unrecoverable { + _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1, task_id_submit = NULL, after_submit = FALSE WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) - log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err) - return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err) + return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + } + + pieceCheckFailed = true + break } - return false, err - } + if pam.VerifiedAllocationKey != nil { + verifiedSize += piece.Size + } - if pam.VerifiedAllocationKey != nil { - verifiedSize += piece.Size + pams = append(pams, *pam) } - if minStart == 0 || abi.ChainEpoch(piece.Start) < minStart { - minStart = abi.ChainEpoch(piece.Start) + if pieceCheckFailed { + continue // Skip this sector } - pams = append(pams, *pam) - } + newSealedCID, err := cid.Parse(update.UpdateSealedCID) + if err != nil { + return false, xerrors.Errorf("parsing new sealed cid: %w", err) + } + newUnsealedCID, err := cid.Parse(update.UpdateUnsealedCID) + if err != nil { + return false, xerrors.Errorf("parsing new unsealed cid: %w", err) + } - newSealedCID, err := cid.Parse(update.UpdateSealedCID) - if err != nil { - return false, xerrors.Errorf("parsing new sealed cid: %w", err) - } - newUnsealedCID, err := cid.Parse(update.UpdateUnsealedCID) - if err != nil { - return false, xerrors.Errorf("parsing new unsealed cid: %w", err) - } + transferMap[update.SectorNumber] = &updateCids{ + sealed: newSealedCID, + unsealed: newUnsealedCID, + } - // Prepare params - params := miner.ProveReplicaUpdates3Params{ - SectorUpdates: []miner13.SectorUpdateManifest{ - { - Sector: snum, - Deadline: sl.Deadline, - Partition: sl.Partition, - NewSealedCID: newSealedCID, - Pieces: pams, - }, - }, - SectorProofs: [][]byte{update.Proof}, - AggregateProof: nil, - UpdateProofsType: abi.RegisteredUpdateProof(update.UpdateProof), - AggregateProofType: nil, - RequireActivationSuccess: s.cfg.RequireActivationSuccess, - RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, - } + ssize, err := onChainInfo.SealProof.SectorSize() + if err != nil { + return false, xerrors.Errorf("getting sector size: %w", err) + } - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - return false, xerrors.Errorf("could not serialize commit params: %w", err) - } + duration := onChainInfo.Expiration - ts.Height() - mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting miner info: %w", err) - } + secCollateral, err := s.api.StateMinerInitialPledgeForSector(ctx, duration, ssize, uint64(verifiedSize), ts.Key()) + if err != nil { + return false, xerrors.Errorf("calculating pledge: %w", err) + } - ssize, err := onChainInfo.SealProof.SectorSize() - if err != nil { - return false, xerrors.Errorf("getting sector size: %w", err) - } + secCollateral = big.Sub(secCollateral, onChainInfo.InitialPledge) + if secCollateral.LessThan(big.Zero()) { + secCollateral = big.Zero() + } - duration := onChainInfo.Expiration - ts.Height() + collateral = big.Add(collateral, secCollateral) - collateral, err := s.api.StateMinerInitialPledgeForSector(ctx, duration, ssize, uint64(verifiedSize), ts.Key()) - if err != nil { - return false, xerrors.Errorf("calculating pledge: %w", err) + // Prepare params + params.SectorUpdates = append(params.SectorUpdates, miner13.SectorUpdateManifest{ + Sector: snum, + Deadline: sl.Deadline, + Partition: sl.Partition, + NewSealedCID: newSealedCID, + Pieces: pams, + }) + params.SectorProofs = append(params.SectorProofs, update.Proof) } - collateral = big.Sub(collateral, onChainInfo.InitialPledge) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return false, xerrors.Errorf("could not serialize commit params: %w", err) } if s.cfg.CollateralFromMinerBalance { @@ -318,6 +376,8 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } } + maxFee := s.cfg.feeCfg.MaxUpdateBatchGasFee.FeeForSectors(len(params.SectorUpdates)) + a, _, err := s.as.AddressFor(ctx, s.api, maddr, mi, api.CommitAddr, collateral, big.Zero()) if err != nil { return false, xerrors.Errorf("getting address for precommit: %w", err) @@ -328,27 +388,16 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done From: a, Method: builtin.MethodsMiner.ProveReplicaUpdates3, Params: enc.Bytes(), - Value: collateral, // todo config for pulling from miner balance!! + Value: collateral, } mss := &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(s.cfg.maxFee), + MaxFee: maxFee, } mcid, err := s.sender.Send(ctx, msg, mss, "update") if err != nil { - if minStart != 0 && ts.Height() > minStart { - _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET - failed = TRUE, failed_at = NOW(), failed_reason = 'start-expired', failed_reason_msg = $1, - task_id_submit = NULL, after_submit = FALSE - WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) - - log.Errorw("failed to push message to mpool (beyond deal start epoch)", "sp", update.SpID, "sector", update.SectorNumber, "err", err) - - return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2)) - } - - return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err) + return false, xerrors.Errorf("pushing message to mpool: %w", err) } _, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID) @@ -361,18 +410,38 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("inserting into message_waits: %w", err) } - if err := s.transferUpdatedSectorData(ctx, update.SpID, update.SectorNumber, newUnsealedCID, newSealedCID, mcid); err != nil { + if err := s.transferUpdatedSectorData(ctx, tasks[0].SpID, transferMap, mcid); err != nil { return false, xerrors.Errorf("updating sector meta: %w", err) } return true, nil } -func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID, sectorNum int64, newUns, newSl, mcid cid.Cid) error { - if _, err := s.db.Exec(ctx, `UPDATE sectors_meta SET cur_sealed_cid = $1, +func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID int64, transferMap map[int64]*updateCids, mcid cid.Cid) error { + + commit, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + for sectorNum, cids := range transferMap { + sectorNum, cids := sectorNum, cids + n, err := tx.Exec(`UPDATE sectors_meta SET cur_sealed_cid = $1, cur_unsealed_cid = $2, msg_cid_update = $3 - WHERE sp_id = $4 AND sector_num = $5`, newSl.String(), newUns.String(), mcid.String(), spID, sectorNum); err != nil { - return xerrors.Errorf("updating sector meta: %w", err) + WHERE sp_id = $4 AND sector_num = $5`, cids.sealed.String(), cids.unsealed.String(), mcid.String(), spID, sectorNum) + + if err != nil { + return false, xerrors.Errorf("updating sector meta: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("updating sector meta: expected to update 1 row, but updated %d rows", n) + } + } + return true, nil + }, harmonydb.OptionRetry()) + + if err != nil { + return err + } + + if !commit { + return xerrors.Errorf("updating sector meta: transaction failed") } // Execute the query for piece metadata @@ -408,7 +477,7 @@ func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID, sector sectors_snap_initial_pieces WHERE sp_id = $1 AND - sector_number = $2 + sector_number = ANY($2) ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE SET piece_cid = excluded.piece_cid, piece_size = excluded.piece_size, @@ -419,7 +488,7 @@ func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID, sector f05_deal_id = excluded.f05_deal_id, ddo_pam = excluded.ddo_pam, f05_deal_proposal = excluded.f05_deal_proposal; - `, spID, sectorNum); err != nil { + `, spID, maps.Keys(transferMap)); err != nil { return fmt.Errorf("failed to insert/update sector_meta_pieces: %w", err) } @@ -433,7 +502,7 @@ func (s *SubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Tas func (s *SubmitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ - Name: "UpdateSubmit", + Name: "BatchUpdateSubmit", Cost: resources.Resources{ Cpu: 1, Ram: 64 << 20, @@ -447,42 +516,147 @@ func (s *SubmitTask) TypeDetails() harmonytask.TaskTypeDetails { func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error { // schedule submits - var stop bool - for !stop { - taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - stop = true // assume we're done until we find a task to schedule - - var tasks []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - } + taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + var cmt bool + type task struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + UpgradeProof int `db:"upgrade_proof"` + StartEpoch abi.ChainEpoch `db:"smallest_direct_start_epoch"` + UpdateReadyAt *time.Time `db:"update_ready_at"` + } - err := tx.Select(&tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE failed = FALSE - AND after_encode = TRUE - AND after_prove = TRUE - AND after_submit = FALSE - AND (submit_after IS NULL OR submit_after < NOW()) - AND task_id_submit IS NULL`) - if err != nil { - return false, xerrors.Errorf("getting tasks: %w", err) - } + type sectorBatch struct { + cutoff abi.ChainEpoch + earliest time.Time + sectors []int64 + } - if len(tasks) == 0 { - return false, nil - } + var tasks []task + + err := tx.Select(&tasks, `SELECT + ssp.sp_id, + ssp.sector_number, + ssp.upgrade_proof, + ssp.update_ready_at, + MIN(ssip.direct_start_epoch) AS smallest_direct_start_epoch + FROM + sectors_snap_pipeline ssp + JOIN + sectors_snap_initial_pieces ssip + ON + ssp.sp_id = ssip.sp_id AND ssp.sector_number = ssip.sector_number + WHERE + ssp.failed = FALSE + AND ssp.after_encode = TRUE + AND ssp.after_prove = TRUE + AND ssp.after_submit = FALSE + AND (ssp.submit_after IS NULL OR ssp.submit_after < NOW()) + AND ssp.task_id_submit IS NULL + GROUP BY + ssp.sp_id, ssp.sector_number, ssp.upgrade_proof + ORDER BY + ssp.sector_number ASC;`) + if err != nil { + return false, xerrors.Errorf("getting tasks: %w", err) + } - // pick at random in case there are a bunch of schedules across the cluster - t := tasks[rand.N(len(tasks))] + if len(tasks) == 0 { + return false, nil + } - _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, t.SpID, t.SectorNumber) - if err != nil { - return false, xerrors.Errorf("updating task id: %w", err) + // Make batches based on Proof types + ts, err := s.api.ChainHead(ctx) + if err != nil { + log.Errorf("error getting chain head: %s", err) + return + } + + batchMap := make(map[int64]map[abi.RegisteredUpdateProof][]task) + for i := range tasks { + // Check if SpID exists in batchMap + v, ok := batchMap[tasks[i].SpID] + if !ok { + // If not, initialize a new map for the RegisteredSealProof + v = make(map[abi.RegisteredUpdateProof][]task) + batchMap[tasks[i].SpID] = v } + // Append the task to the correct RegisteredSealProof + v[abi.RegisteredUpdateProof(tasks[i].UpgradeProof)] = append(v[abi.RegisteredUpdateProof(tasks[i].UpgradeProof)], tasks[i]) + } - stop = false // we found a task to schedule, keep going - return true, nil - }) - } + // Send batches per MinerID and per Proof type based on the following logic: + // 1. Check if Slack for any sector is reaching, if yes then send full batch + // 2. Check if timeout is reaching for any sector in the batch, if yes, then send the batch + // 3. Check if baseFee below set threshold. If yes then send all batches + + for spid, miners := range batchMap { + for _, pts := range miners { + // Break into batches + var batches []sectorBatch + for i := 0; i < len(pts); i += s.cfg.batch.MaxUpdateBatch { + // Create a batch of size `maxBatchSize` or smaller for the last batch + end := i + s.cfg.batch.MaxUpdateBatch + if end > len(pts) { + end = len(pts) + } + var batch []int64 + cutoff := abi.ChainEpoch(0) + earliest := time.Now() + for _, pt := range pts[i:end] { + + if cutoff == 0 || pt.StartEpoch < cutoff { + cutoff = pt.StartEpoch + } + + if pt.UpdateReadyAt.Before(earliest) { + earliest = *pt.UpdateReadyAt + } + + batch = append(batch, pt.SectorNumber) + } + + batches = append(batches, sectorBatch{ + cutoff: cutoff, + sectors: batch, + }) + } + + for i := range batches { + batch := batches[i] + //sectors := batch.sectors + // Process batch if slack has reached + if (time.Duration(batch.cutoff-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second) < s.cfg.batch.Slack { + _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors) + if err != nil { + return false, xerrors.Errorf("updating task id: %w", err) + } + cmt = true + continue + } + // Process batch if timeout has reached + if batch.earliest.Add(s.cfg.batch.Timeout).After(time.Now()) { + _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors) + if err != nil { + return false, xerrors.Errorf("updating task id: %w", err) + } + cmt = true + continue + } + // Process batch if base fee is low enough for us to send + if ts.MinTicketBlock().ParentBaseFee.LessThan(s.cfg.batch.BaseFeeThreshold) { + _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors) + if err != nil { + return false, xerrors.Errorf("updating task id: %w", err) + } + cmt = true + continue + } + } + } + } + return cmt, nil + }) // update landed var tasks []struct {