Skip to content

Commit

Permalink
refactor config
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Dec 10, 2024
1 parent a7073d8 commit 5cebd3b
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 315 deletions.
6 changes: 5 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ func addSealingTasks(

var sp *seal.SealPoller
var slr *ffi.SealCalls
var err error
if hasAnySealingTask {
sp = seal.NewPoller(db, full, cfg)
sp, err = seal.NewPoller(db, full, cfg)
if err != nil {
return nil, xerrors.Errorf("creating seal poller: %w", err)
}
go sp.RunPoller(ctx)

slr = must.One(slrLazy.Val())
Expand Down
42 changes: 14 additions & 28 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 15 additions & 24 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v15/miner"

"github.com/filecoin-project/lotus/chain/types"
)
Expand Down Expand Up @@ -76,14 +75,14 @@ func DefaultCurioConfig() *CurioConfig {
},
Batching: CurioBatchingConfig{
PreCommit: PreCommitBatchingConfig{
MaxPreCommitBatch: miner.PreCommitSectorBatchMaxSize,
PreCommitBatchSlack: Duration(6 * time.Hour),
BaseFeeThreshold: types.MustParseFIL("0.005"),
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(4 * time.Hour),
Slack: Duration(6 * time.Hour),
},
Commit: CommitBatchingConfig{
MaxCommitBatch: miner.MaxAggregatedSectors,
CommitBatchSlack: Duration(1 * time.Hour),
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(1 * time.Hour),
Slack: Duration(1 * time.Hour),
},
},
}
Expand Down Expand Up @@ -531,31 +530,23 @@ type CurioBatchingConfig struct {
}

type PreCommitBatchingConfig struct {
// Enable / Disable Precommit aggregation
AggregatePreCommits bool
// Base fee value below which we should try to send Precommit messages immediately
BaseFeeThreshold types.FIL

// Maximum precommit batch size - batches will be sent immediately above this size if BaseFeeThreshold is higher
// than the current base fee. If not then we will wait batch if forced due to PreCommitBatchSlack
MaxPreCommitBatch int
// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deal in batch would start expiring
PreCommitBatchSlack Duration

// Base fee value below which we should try to send Precommit message. This will be ignored if PreCommitBatchSlack has reached
BaseFeeThreshold types.FIL
Slack Duration
}

type CommitBatchingConfig struct {
// Enable / Disable commit aggregation
AggregateCommits bool
// Base fee value below which we should try to send Commit messages immediately
BaseFeeThreshold types.FIL

// Maximum batched commit size - batches will be sent immediately above this size if BaseFeeThreshold is higher
// // than the current base fee. If not then we will wait batch if forced due to CommitBatchSlack
MaxCommitBatch int
// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deals in batch would start expiring
CommitBatchSlack Duration

// Base fee value below which we should try to send Commit message. This will be ignored if CommitBatchSlack has reached
BaseFeeThreshold types.FIL
Slack Duration
}
3 changes: 0 additions & 3 deletions deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context,
if err != nil {
return xerrors.Errorf("populate config: %w", err)
}
if deps.Cfg.Batching.Commit.AggregateCommits && deps.Cfg.Batching.Commit.MaxCommitBatch < miner.MinAggregatedSectors {
return xerrors.Errorf("commit batch size less than minimum required for aggregation")
}
}

log.Debugw("config", "config", deps.Cfg)
Expand Down
38 changes: 13 additions & 25 deletions documentation/en/configuration/default-curio-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -505,47 +505,35 @@ description: The default curio configuration

[Batching]
[Batching.PreCommit]
# Enable / Disable Precommit aggregation
# Base fee value below which we should try to send Precommit messages immediately
#
# type: bool
#AggregatePreCommits = false
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

# Maximum precommit batch size - batches will be sent immediately above this size if BaseFeeThreshold is higher
# than the current base fee. If not then we will wait batch if forced due to PreCommitBatchSlack
# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: int
#MaxPreCommitBatch = 256
# type: Duration
#Timeout = "4h0m0s"

# Time buffer for forceful batch submission before sectors/deal in batch would start expiring
#
# type: Duration
#PreCommitBatchSlack = "6h0m0s"
#Slack = "6h0m0s"

# Base fee value below which we should try to send Precommit message. This will be ignored if PreCommitBatchSlack has reached
[Batching.Commit]
# Base fee value below which we should try to send Commit messages immediately
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

[Batching.Commit]
# Enable / Disable commit aggregation
#
# type: bool
#AggregateCommits = false

# Maximum batched commit size - batches will be sent immediately above this size if BaseFeeThreshold is higher
# // than the current base fee. If not then we will wait batch if forced due to CommitBatchSlack
# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: int
#MaxCommitBatch = 819
# type: Duration
#Timeout = "1h0m0s"

# Time buffer for forceful batch submission before sectors/deals in batch would start expiring
#
# type: Duration
#CommitBatchSlack = "1h0m0s"

# Base fee value below which we should try to send Commit message. This will be ignored if CommitBatchSlack has reached
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"
#Slack = "1h0m0s"

```
8 changes: 6 additions & 2 deletions harmony/harmonydb/sql/20231217-sdr-pipeline.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ create table sectors_sdr_pipeline (
after_tree_r bool not null default false,

-- synthetic proofs (Added in 20240617-synthetic-proofs.sql)
-- task_id_synth bigint,
-- after_synth bool not null default false,
-- task_id_synth bigint,
-- after_synth bool not null default false,

-- precommit_ready_at (Added in 20241210-sdr-batching.sql)

-- precommit message sending
precommit_msg_cid text,
Expand Down Expand Up @@ -66,6 +68,8 @@ create table sectors_sdr_pipeline (
task_id_move_storage bigint,
after_move_storage bool not null default false,

-- commit_ready_at (Added in 20241210-sdr-batching.sql)

-- Commit message sending
commit_msg_cid text,

Expand Down
2 changes: 2 additions & 0 deletions harmony/harmonydb/sql/20241210-sdr-batching.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE sectors_sdr_pipeline ADD COLUMN precommit_ready_at TIMESTAMPTZ;
ALTER TABLE sectors_sdr_pipeline ADD COLUMN commit_ready_at TIMESTAMPTZ;
49 changes: 27 additions & 22 deletions itests/curio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func TestCurioHappyPath(t *testing.T) {

require.Contains(t, baseCfg.Addresses[0].MinerAddresses, maddr.String())

//baseCfg.Batching.PreCommit.BaseFeeThreshold = types.MustParseFIL("100")
baseCfg.Batching.PreCommit.Timeout = config.Duration(5 * time.Minute)
//baseCfg.Batching.Commit.BaseFeeThreshold = types.MustParseFIL("100")
baseCfg.Batching.Commit.Timeout = config.Duration(5 * time.Minute)

temp := os.TempDir()
dir, err := os.MkdirTemp(temp, "curio")
require.NoError(t, err)
Expand All @@ -182,22 +187,7 @@ func TestCurioHappyPath(t *testing.T) {
spt, err := miner2.PreferredSealProofTypeFromWindowPoStType(nv, wpt, false)
require.NoError(t, err)

num, err := seal.AllocateSectorNumbers(ctx, full, db, maddr, 1, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
for _, n := range numbers {
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
if err != nil {
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
}
}
return true, nil
})
require.NoError(t, err)
require.Len(t, num, 1)

spt, err = miner2.PreferredSealProofTypeFromWindowPoStType(nv, wpt, true)
require.NoError(t, err)

num, err = seal.AllocateSectorNumbers(ctx, full, db, maddr, 1, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
num, err := seal.AllocateSectorNumbers(ctx, full, db, maddr, 5, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
for _, n := range numbers {
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
if err != nil {
Expand All @@ -207,7 +197,22 @@ func TestCurioHappyPath(t *testing.T) {
return true, nil
})
require.NoError(t, err)
require.Len(t, num, 1)
require.Len(t, num, 5)

//spt, err = miner2.PreferredSealProofTypeFromWindowPoStType(nv, wpt, true)
//require.NoError(t, err)
//
//num, err = seal.AllocateSectorNumbers(ctx, full, db, maddr, 0, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
// for _, n := range numbers {
// _, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
// if err != nil {
// return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
// }
// }
// return true, nil
//})
//require.NoError(t, err)
//require.Len(t, num, 0)
// TODO: add DDO deal, f05 deal 2 MiB each in the sector

var sectorParamsArr []struct {
Expand All @@ -225,13 +230,13 @@ func TestCurioHappyPath(t *testing.T) {
FROM sectors_sdr_pipeline
WHERE after_commit_msg_success = True`)
require.NoError(t, err)
return len(sectorParamsArr) == 2
return len(sectorParamsArr) == 6
}, 10*time.Minute, 1*time.Second, "sector did not finish sealing in 5 minutes")

require.Equal(t, sectorParamsArr[0].SectorNumber, int64(0))
require.Equal(t, sectorParamsArr[0].SpID, int64(mid))
require.Equal(t, sectorParamsArr[1].SectorNumber, int64(1))
require.Equal(t, sectorParamsArr[1].SpID, int64(mid))
//require.Equal(t, sectorParamsArr[0].SectorNumber, int64(0))
//require.Equal(t, sectorParamsArr[0].SpID, int64(mid))
//require.Equal(t, sectorParamsArr[1].SectorNumber, int64(1))
//require.Equal(t, sectorParamsArr[1].SpID, int64(mid))

_ = capi.Shutdown(ctx)

Expand Down
Loading

0 comments on commit 5cebd3b

Please sign in to comment.