Skip to content

Commit

Permalink
feat: F3
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 21, 2024
1 parent 9d5a08e commit ba74e00
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 3 deletions.
4 changes: 3 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/lib/slotmgr"
"github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/tasks/f3"
"github.com/filecoin-project/curio/tasks/gc"
"github.com/filecoin-project/curio/tasks/message"
"github.com/filecoin-project/curio/tasks/metadata"
Expand Down Expand Up @@ -150,7 +151,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
store := dependencies.Stor
winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, store, verif, asyncParams(), full, maddrs)
inclCkTask := winning.NewInclusionCheckTask(db, full)
activeTasks = append(activeTasks, winPoStTask, inclCkTask)
f3Task := f3.NewF3Task(db, full, maddrs)
activeTasks = append(activeTasks, winPoStTask, inclCkTask, f3Task)

// Warn if also running a sealing task
if cfg.Subsystems.EnableSealSDR || cfg.Subsystems.EnableSealSDRTrees || cfg.Subsystems.EnableSendPrecommitMsg || cfg.Subsystems.EnablePoRepProof || cfg.Subsystems.EnableMoveStorage || cfg.Subsystems.EnableSendCommitMsg || cfg.Subsystems.EnableUpdateEncode || cfg.Subsystems.EnableUpdateProve || cfg.Subsystems.EnableUpdateSubmit {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/filecoin-project/go-commp-utils v0.1.4
github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8
github.com/filecoin-project/go-commp-utils/v2 v2.1.0
github.com/filecoin-project/go-f3 v0.7.0
github.com/filecoin-project/go-fil-commcid v0.2.0
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0
github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/ipfs/go-ipld-cbor v0.2.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.1.0
github.com/manifoldco/promptui v0.9.0
Expand Down Expand Up @@ -131,7 +133,6 @@ require (
github.com/filecoin-project/go-amt-ipld/v4 v4.4.0 // indirect
github.com/filecoin-project/go-clock v0.1.0 // indirect
github.com/filecoin-project/go-crypto v0.1.0 // indirect
github.com/filecoin-project/go-f3 v0.7.0 // indirect
github.com/filecoin-project/go-hamt-ipld v0.1.5 // indirect
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 // indirect
github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 // indirect
Expand Down Expand Up @@ -205,7 +206,6 @@ require (
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
Expand Down
7 changes: 7 additions & 0 deletions harmony/harmonydb/sql/20241021-f3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE f3_tasks (
sp_id BIGINT PRIMARY KEY,
task_id BIGINT UNIQUE,
previous_ticket BYTEA,

FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE SET NULL
);
11 changes: 11 additions & 0 deletions harmony/taskhelp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,14 @@ func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) {
}
return slice[:ct], true
}

// BackgroundTask are tasks that:
// * Always run in the background
// * Never finish "successfully"
func BackgroundTask(name string) string {
return "bg:" + name
}

func IsBackgroundTask(name string) bool {
return len(name) > 3 && name[:3] == "bg:"
}
279 changes: 279 additions & 0 deletions tasks/f3/f3_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package f3

import (
"context"
"errors"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/jpillora/backoff"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

const (
// ParticipationCheckProgressMaxAttempts defines the maximum number of failed attempts
// before we abandon the current lease and restart the participation process.
//
// The default backoff takes 12 attempts to reach a maximum delay of 1 minute.
// Allowing for 13 failures results in approximately 2 minutes of backoff since
// the lease was granted. Given a lease validity of up to 5 instances, this means
// we would give up on checking the lease during its mid-validity period;
// typically when we would try to renew the participation ticket. Hence, the value
// to 13.
ParticipationCheckProgressMaxAttempts = 13

// ParticipationLeaseTerm is the number of instances the miner will attempt to lease from nodes.
ParticipationLeaseTerm = 5
)

var log = logging.Logger("cf3")

type F3ParticipationAPI interface {
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
}

type F3Task struct {
db *harmonydb.DB
api F3ParticipationAPI

leaseTerm uint64

actors map[dtypes.MinerAddress]bool
}

func NewF3Task(db *harmonydb.DB, api F3ParticipationAPI, actors map[dtypes.MinerAddress]bool) *F3Task {
return &F3Task{
db: db,
api: api,
leaseTerm: ParticipationLeaseTerm,

actors: actors,
}
}

func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

var spID int64
err = f.db.QueryRow(ctx, "SELECT sp_id FROM f3_tasks WHERE task_id = $1", taskID).Scan(&spID)
if err != nil {
return false, xerrors.Errorf("failed to get sp_id: %w", err)
}

maddr, err := address.NewIDAddress(uint64(spID))
if err != nil {
return false, xerrors.Errorf("failed to parse miner address: %w", err)
}

for {
if !stillOwned() {
return false, nil
}

var previousTicket []byte
err = f.db.QueryRow(ctx, "SELECT previous_ticket FROM f3_tasks WHERE task_id = $1", taskID).Scan(&previousTicket)
if err != nil {
return false, xerrors.Errorf("failed to get previous ticket: %w", err)
}

// Ensure that calls are made on the same node (the first call will determine the node)
ctx := deps.OnSingleNode(ctx)

ticket, err := f.tryGetF3ParticipationTicket(ctx, stillOwned, maddr, previousTicket)
if err != nil {
return false, xerrors.Errorf("failed to get participation ticket: %w", err)
}

lease, participating, err := f.tryF3Participate(ctx, stillOwned, ticket)
if err != nil {
return false, xerrors.Errorf("failed to participate in F3: %w", err)
}
if !participating {
return false, xerrors.Errorf("failed to participate in F3: not participating")
}

_, err = f.db.Exec(ctx, "UPDATE f3_tasks SET previous_ticket = $1 WHERE task_id = $2", ticket, taskID)
if err != nil {
return false, xerrors.Errorf("failed to update previous ticket: %w", err)
}

bo := &backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
}

err = f.awaitLeaseExpiry(stillOwned, ctx, lease, bo)
if err != nil {
return false, xerrors.Errorf("failed to await lease expiry: %w", err)
}
}
}

func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) {
for stillOwned() {
switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, ParticipationLeaseTerm); {
case ctx.Err() != nil:
return api.F3ParticipationTicket{}, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationTicket{}, xerrors.Errorf("acquiring F3 participation ticket: %w", err)
case err != nil:
log.Errorw("Failed to acquire F3 participation ticket; retrying", "err", err)
time.Sleep(1 * time.Second)
continue
default:
log.Debug("Successfully acquired F3 participation ticket")
return ticket, nil
}
}
return api.F3ParticipationTicket{}, ctx.Err()
}

func (f *F3Task) tryF3Participate(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
for stillOwned() {
switch lease, err := f.api.F3Participate(ctx, ticket); {
case ctx.Err() != nil:
return api.F3ParticipationLease{}, false, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "err", err)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "err", err)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "err", err)
time.Sleep(1 * time.Second)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "err", err)
time.Sleep(1 * time.Second)
continue
case errors.Is(err, api.ErrF3NotReady):
log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "err", err)
time.Sleep(30 * time.Second)
continue
case err != nil:
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
default:
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
return lease, true, nil
}
}
return api.F3ParticipationLease{}, false, ctx.Err()
}

func (f *F3Task) awaitLeaseExpiry(stillOwned func() bool, ctx context.Context, lease api.F3ParticipationLease, backoff *backoff.Backoff) error {
renewLeaseWithin := f.leaseTerm / 2
for stillOwned() {
manifest, err := f.api.F3GetManifest(ctx)
switch {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
time.Sleep(backoff.Duration())
case manifest == nil || manifest.NetworkName != lease.Network:
// If we got an unexpected manifest, or no manifest, go back to the
// beginning and try to get another ticket. Switching from having a manifest
// to having no manifest can theoretically happen if the lotus node reboots
// and has no static manifest.
return nil
}
switch progress, err := f.api.F3GetProgress(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
time.Sleep(backoff.Duration())
case progress.ID+renewLeaseWithin >= lease.ToInstance():
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
return nil
default:
remainingInstanceLease := lease.ToInstance() - progress.ID
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
time.Sleep(waitTime)
}
}
return ctx.Err()
}

func (f *F3Task) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil
}

func (f *F3Task) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: taskhelp.BackgroundTask("F3Participate"),
Cost: resources.Resources{
Cpu: 0,
Gpu: 0,
Ram: 10 << 20,
},
MaxFailures: 1,
}
}

func (f *F3Task) Adder(taskFunc harmonytask.AddTaskFunc) {
for minerAddress := range f.actors {
taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec("INSERT INTO f3_tasks (sp_id, task_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", minerAddress, id)
if err != nil {
return false, err
}

return n > 0, nil
})
}
}

func (f *F3Task) GetSpid(db *harmonydb.DB, taskID int64) string {
var spId string
err := db.QueryRow(context.Background(), `SELECT sp_id FROM f3_tasks WHERE task_id = $1`, taskID).Scan(&spId)
if err != nil {
return ""
}
return spId
}

var _ = harmonytask.Reg(&F3Task{})
var _ harmonytask.TaskInterface = &F3Task{}

0 comments on commit ba74e00

Please sign in to comment.