From 053902fcedc2eebc6bf4902a7626da85e568ec31 Mon Sep 17 00:00:00 2001 From: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Date: Thu, 10 Oct 2024 00:35:40 +0400 Subject: [PATCH] fix: show task waiting in PoRep pipeline (#260) * show task waiting in PoRep pipeline * fix cluster task sorting order * docker lotus version * add remaining wait task in UI * fix null handling * convert to a single query --- Makefile | 2 +- tasks/unseal/task_unseal_decode.go | 8 +- web/api/webrpc/pipeline_porep.go | 263 ++++++++++++++---- web/api/webrpc/tasks.go | 3 +- .../pipeline_porep/pipeline-porep-sectors.mjs | 28 +- 5 files changed, 229 insertions(+), 75 deletions(-) diff --git a/Makefile b/Makefile index c84112517..4eee42436 100644 --- a/Makefile +++ b/Makefile @@ -257,7 +257,7 @@ build_lotus?=0 curio_docker_user?=curio curio_base_image=$(curio_docker_user)/curio-all-in-one:latest-debug ffi_from_source?=0 -lotus_version?=v1.28.1 +lotus_version?=v1.29.0 ifeq ($(build_lotus),1) # v1: building lotus image with provided lotus version diff --git a/tasks/unseal/task_unseal_decode.go b/tasks/unseal/task_unseal_decode.go index b67f1ef4e..5507d8571 100644 --- a/tasks/unseal/task_unseal_decode.go +++ b/tasks/unseal/task_unseal_decode.go @@ -163,7 +163,7 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails { ssize = abi.SectorSize(2 << 20) } - return harmonytask.TaskTypeDetails{ + res := harmonytask.TaskTypeDetails{ Max: taskhelp.Max(t.max), Name: "UnsealDecode", Cost: resources.Resources{ @@ -177,6 +177,12 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails { return t.schedule(context.Background(), taskFunc) }), } + + if isDevnet { + res.Cost.Ram = 1 << 30 + } + + return res } func (t *TaskUnsealDecode) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error { diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index b7d0721fa..a6d0ac981 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -22,48 +22,56 @@ type PipelineTask struct { CreateTime time.Time `db:"create_time"` - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` + TaskSDR *int64 `db:"task_id_sdr"` + AfterSDR bool `db:"after_sdr"` + StartedSDR bool `db:"started_sdr"` - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` + TaskTreeD *int64 `db:"task_id_tree_d"` + AfterTreeD bool `db:"after_tree_d"` + StartedTreeD bool `db:"started_tree_d"` - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` + TaskTreeC *int64 `db:"task_id_tree_c"` + AfterTreeC bool `db:"after_tree_c"` + StartedTreeRC bool `db:"started_tree_rc"` TaskTreeR *int64 `db:"task_id_tree_r"` AfterTreeR bool `db:"after_tree_r"` - TaskSynthetic *int64 `db:"task_id_synth"` - AfterSynthetic bool `db:"after_synth"` + TaskSynthetic *int64 `db:"task_id_synth"` + AfterSynthetic bool `db:"after_synth"` + StartedSynthetic bool `db:"started_synthetic"` - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` + TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` + AfterPrecommitMsg bool `db:"after_precommit_msg"` + StartedPrecommitMsg bool `db:"started_precommit_msg"` AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` SeedEpoch *int64 `db:"seed_epoch"` - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` + TaskPoRep *int64 `db:"task_id_porep"` + PoRepProof []byte `db:"porep_proof"` + AfterPoRep bool `db:"after_porep"` + StartedPoRep bool `db:"started_porep"` - TaskFinalize *int64 `db:"task_id_finalize"` - AfterFinalize bool `db:"after_finalize"` + TaskFinalize *int64 `db:"task_id_finalize"` + AfterFinalize bool `db:"after_finalize"` + StartedFinalize bool `db:"started_finalize"` - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` + TaskMoveStorage *int64 `db:"task_id_move_storage"` + AfterMoveStorage bool `db:"after_move_storage"` + StartedMoveStorage bool `db:"started_move_storage"` - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` + TaskCommitMsg *int64 `db:"task_id_commit_msg"` + AfterCommitMsg bool `db:"after_commit_msg"` + StartedCommitMsg bool `db:"started_commit_msg"` AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` Failed bool `db:"failed"` FailedReason string `db:"failed_reason"` -} -func (pt PipelineTask) sectorID() abi.SectorID { - return abi.SectorID{Miner: abi.ActorID(pt.SpID), Number: abi.SectorNumber(pt.SectorNumber)} + MissingTasks []int64 `db:"missing_tasks"` + AllTasks []int64 `db:"all_tasks"` } type sectorListEntry struct { @@ -74,9 +82,6 @@ type sectorListEntry struct { AfterSeed bool ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool - - MissingTasks []int64 - AllTasks []int64 } type minerBitfields struct { @@ -87,36 +92,183 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e var tasks []PipelineTask err := a.deps.DB.Select(ctx, &tasks, `SELECT - sp_id, sector_number, - create_time, - task_id_sdr, after_sdr, - task_id_tree_d, after_tree_d, - task_id_tree_c, after_tree_c, - task_id_tree_r, after_tree_r, - task_id_synth, after_synth, - task_id_precommit_msg, after_precommit_msg, - after_precommit_msg_success, seed_epoch, - task_id_porep, porep_proof, after_porep, - task_id_finalize, after_finalize, - task_id_move_storage, after_move_storage, - task_id_commit_msg, after_commit_msg, - after_commit_msg_success, - failed, failed_reason - FROM sectors_sdr_pipeline order by sp_id, sector_number`) // todo where constrain list + sp.sp_id, + sp.sector_number, + sp.create_time, + sp.task_id_sdr, + sp.after_sdr, + sp.task_id_tree_d, + sp.after_tree_d, + sp.task_id_tree_c, + sp.after_tree_c, + sp.task_id_tree_r, + sp.after_tree_r, + sp.task_id_synth, + sp.after_synth, + sp.task_id_precommit_msg, + sp.after_precommit_msg, + sp.after_precommit_msg_success, + sp.seed_epoch, + sp.task_id_porep, + sp.porep_proof, + sp.after_porep, + sp.task_id_finalize, + sp.after_finalize, + sp.task_id_move_storage, + sp.after_move_storage, + sp.task_id_commit_msg, + sp.after_commit_msg, + sp.after_commit_msg_success, + sp.failed, + sp.failed_reason, + + -- Compute StartedSDR + CASE + WHEN NOT after_tree_d AND task_id_sdr IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_sdr AND owner_id > 0 + ) + ELSE FALSE + END AS started_sdr, + + -- Compute StartedTreeD + CASE + WHEN after_sdr AND NOT after_tree_d AND task_id_tree_d IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_tree_d AND owner_id > 0 + ) + ELSE FALSE + END AS started_tree_d, + + -- Compute StartedTreeRC + CASE + WHEN after_tree_d AND NOT after_tree_c AND task_id_tree_c IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_tree_c AND owner_id > 0 + ) + ELSE FALSE + END AS started_tree_rc, + + -- Compute StartedSynthetic + CASE + WHEN after_tree_c AND NOT after_synth AND task_id_synth IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_synth AND owner_id > 0 + ) + ELSE FALSE + END AS started_synthetic, + + -- Compute StartedPrecommitMsg + CASE + WHEN after_synth AND NOT after_precommit_msg AND task_id_precommit_msg IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_precommit_msg AND owner_id > 0 + ) + ELSE FALSE + END AS started_precommit_msg, + + -- Compute StartedPoRep + CASE + WHEN after_precommit_msg AND NOT after_porep AND task_id_porep IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_porep AND owner_id > 0 + ) + ELSE FALSE + END AS started_porep, + + -- Compute StartedFinalize + CASE + WHEN after_porep AND NOT after_finalize AND task_id_finalize IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_finalize AND owner_id > 0 + ) + ELSE FALSE + END AS started_finalize, + + -- Compute StartedCommitMsg + CASE + WHEN after_porep AND NOT after_commit_msg AND task_id_commit_msg IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_commit_msg AND owner_id > 0 + ) + ELSE FALSE + END AS started_commit_msg, + + -- Compute StartedMoveStorage + CASE + WHEN after_finalize AND NOT after_move_storage AND task_id_move_storage IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_move_storage AND owner_id > 0 + ) + ELSE FALSE + END AS started_move_storage, + + -- Collect all task IDs into an array without NULLs + ( + SELECT array_agg(task_id) + FROM ( + VALUES + (sp.task_id_sdr), + (sp.task_id_tree_d), + (sp.task_id_tree_c), + (sp.task_id_tree_r), + (sp.task_id_synth), + (sp.task_id_precommit_msg), + (sp.task_id_porep), + (sp.task_id_finalize), + (sp.task_id_move_storage), + (sp.task_id_commit_msg) + ) AS t(task_id) + WHERE task_id IS NOT NULL + ) AS all_tasks, + + -- Compute missing tasks without NULLs + ( + SELECT array_agg(task_id) + FROM ( + SELECT task_id + FROM unnest(ARRAY[ + sp.task_id_sdr, + sp.task_id_tree_d, + sp.task_id_tree_c, + sp.task_id_tree_r, + sp.task_id_synth, + sp.task_id_precommit_msg, + sp.task_id_porep, + sp.task_id_finalize, + sp.task_id_move_storage, + sp.task_id_commit_msg + ]) AS task_id + LEFT JOIN harmony_task ht ON ht.id = task_id + WHERE task_id IS NOT NULL AND ht.id IS NULL + ) AS missing + ) AS missing_tasks + + FROM sectors_sdr_pipeline sp + ORDER BY sp_id, sector_number; + `) // todo where constrain list if err != nil { return nil, xerrors.Errorf("failed to fetch pipeline tasks: %w", err) } - missingTasks, err := a.pipelinePorepMissingTasks(ctx) - if err != nil { - return nil, xerrors.Errorf("failed to fetch missing tasks: %w", err) - } - - missingTasksMap := make(map[abi.SectorID]porepMissingTask) - for _, mt := range missingTasks { - missingTasksMap[mt.sectorID()] = mt - } - head, err := a.deps.Chain.ChainHead(ctx) if err != nil { return nil, xerrors.Errorf("failed to fetch chain head: %w", err) @@ -147,12 +299,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch) - var missingTasks, allTasks []int64 - if mt, ok := missingTasksMap[task.sectorID()]; ok { - missingTasks = mt.MissingTaskIDs - allTasks = mt.AllTaskIDs - } - sectorList = append(sectorList, sectorListEntry{ PipelineTask: task, Address: addr, @@ -164,9 +310,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e ChainActive: must.One(mbf.active.IsSet(uint64(task.SectorNumber))), ChainUnproven: must.One(mbf.unproven.IsSet(uint64(task.SectorNumber))), ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))), - - MissingTasks: missingTasks, - AllTasks: allTasks, }) } diff --git a/web/api/webrpc/tasks.go b/web/api/webrpc/tasks.go index dd301beb0..52d13ff3c 100644 --- a/web/api/webrpc/tasks.go +++ b/web/api/webrpc/tasks.go @@ -26,7 +26,8 @@ func (a *WebRPC) ClusterTaskSummary(ctx context.Context) ([]TaskSummary, error) err := a.deps.DB.Select(ctx, &ts, `SELECT t.id as id, t.name as name, t.update_time as since_posted, t.owner_id as owner_id, hm.host_and_port as owner FROM harmony_task t LEFT JOIN harmony_machines hm ON hm.id = t.owner_id - ORDER BY t.update_time ASC, t.owner_id`) + ORDER BY + CASE WHEN t.owner_id IS NULL THEN 1 ELSE 0 END, t.update_time ASC`) if err != nil { return nil, err // Handle error } diff --git a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs index 3c09b8881..4a53ff9bf 100644 --- a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs +++ b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs @@ -66,6 +66,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend .pipeline-failed { background-color: #603030; + } + + .pipeline-waiting { + background-color: #808080; }` properties = { sector: Object, @@ -105,18 +109,18 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
Wait Seed
${sector.AfterSeed?'done':sector.SeedEpoch}
|
- ${this.renderSectorState('PoRep', 2, sector, sector.TaskPoRep, sector.AfterPoRep)}
- ${this.renderSectorState('Clear Cache', 1, sector, sector.TaskFinalize, sector.AfterFinalize)}
- ${this.renderSectorState('Move Storage', 1, sector, sector.TaskMoveStorage, sector.AfterMoveStorage)}
+ ${this.renderSectorState('PoRep', 2, sector, sector.TaskPoRep, sector.AfterPoRep, sector.StartedPoRep)}
+ ${this.renderSectorState('Clear Cache', 1, sector, sector.TaskFinalize, sector.AfterFinalize, sector.StartedFinalize)}
+ ${this.renderSectorState('Move Storage', 1, sector, sector.TaskMoveStorage, sector.AfterMoveStorage, sector.StartedMoveStorage)}
On Chain
${sector.ChainSector ? 'yes' : (sector.ChainAlloc ? 'allocated' : 'no')}
@@ -127,10 +131,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
|
|
Active
@@ -149,14 +153,14 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
|
`;
}
- renderSectorState(name, rowspan, sector, task, after) {
+ renderSectorState(name, rowspan, sector, task, after, started) {
if(task) {
// sector.MissingTasks is a list of tasks
// sector.MissingTasks.includes(task) is true if task is missing
let missing = sector.MissingTasks && sector.MissingTasks.includes(task);
return html`
- + |
${name}
T:${task}
${missing ? html`FAILED ` : ''}
|