From eb175766d7e98a52dca7184097ddd831f5ab5d2c Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 7 Oct 2024 19:11:21 +0400 Subject: [PATCH 1/6] show task waiting in PoRep pipeline --- web/api/webrpc/pipeline_porep.go | 94 +++++++++++++++---- .../pipeline_porep/pipeline-porep-sectors.mjs | 10 +- 2 files changed, 81 insertions(+), 23 deletions(-) diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index 00eb9d664..9c01e8aa2 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -22,39 +22,48 @@ type PipelineTask struct { CreateTime time.Time `db:"create_time"` - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` + TaskSDR *int64 `db:"task_id_sdr"` + AfterSDR bool `db:"after_sdr"` + StartedSDR bool - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` + TaskTreeD *int64 `db:"task_id_tree_d"` + AfterTreeD bool `db:"after_tree_d"` + StartedTreeD bool - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` + TaskTreeC *int64 `db:"task_id_tree_c"` + AfterTreeC bool `db:"after_tree_c"` + StartedTreeRC bool TaskTreeR *int64 `db:"task_id_tree_r"` AfterTreeR bool `db:"after_tree_r"` - TaskSynthetic *int64 `db:"task_id_synth"` - AfterSynthetic bool `db:"after_synth"` + TaskSynthetic *int64 `db:"task_id_synth"` + AfterSynthetic bool `db:"after_synth"` + StartedSynthetic bool - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` + TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` + AfterPrecommitMsg bool `db:"after_precommit_msg"` + StartedPrecommitMsg bool AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` SeedEpoch *int64 `db:"seed_epoch"` - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` + TaskPoRep *int64 `db:"task_id_porep"` + PoRepProof []byte `db:"porep_proof"` + AfterPoRep bool `db:"after_porep"` + StartedPoRep bool - TaskFinalize *int64 `db:"task_id_finalize"` - AfterFinalize bool `db:"after_finalize"` + TaskFinalize *int64 `db:"task_id_finalize"` + AfterFinalize bool `db:"after_finalize"` + StartedFinalize bool - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` + TaskMoveStorage *int64 `db:"task_id_move_storage"` + AfterMoveStorage bool `db:"after_move_storage"` + StartedMoveStorage bool - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` + TaskCommitMsg *int64 `db:"task_id_commit_msg"` + AfterCommitMsg bool `db:"after_commit_msg"` + StartedCommitMsg bool AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` @@ -117,6 +126,13 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e missingTasksMap[mt.sectorID()] = mt } + for _, ta := range tasks { + err = a.GetFirstNotNullPipelineTask(ctx, &ta) + if err != nil { + return nil, err + } + } + head, err := a.deps.Chain.ChainHead(ctx) if err != nil { return nil, xerrors.Errorf("failed to fetch chain head: %w", err) @@ -245,7 +261,7 @@ func (a *WebRPC) PorepPipelineSummary(ctx context.Context) ([]PorepPipelineSumma COUNT(*) FILTER (WHERE (after_tree_d = false OR after_tree_c = false OR after_tree_r = false) AND after_sdr = true) as CountTrees, COUNT(*) FILTER (WHERE after_tree_r = true and after_precommit_msg = false) as CountPrecommitMsg, COUNT(*) FILTER (WHERE after_precommit_msg_success = true AND seed_epoch > $1) as CountWaitSeed, - COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true) as CountPoRep, + COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true AND seed_epoch < $1) as CountPoRep, COUNT(*) FILTER (WHERE after_commit_msg_success = false AND after_porep = true) as CountCommitMsg, COUNT(*) FILTER (WHERE after_commit_msg_success = true) as CountDone, COUNT(*) FILTER (WHERE failed = true) as CountFailed @@ -360,3 +376,41 @@ func (a *WebRPC) pipelinePorepMissingTasks(ctx context.Context) ([]porepMissingT return tasks, nil } + +func (a *WebRPC) GetFirstNotNullPipelineTask(ctx context.Context, p *PipelineTask) error { + tasks := []struct { + AfterFirst, AfterSecond, Started bool + Task *int64 + }{ + {!p.AfterTreeD, false, p.StartedSDR, p.TaskSDR}, + {p.AfterSDR, !p.AfterTreeD, p.StartedTreeD, p.TaskTreeD}, + {p.AfterTreeD, !p.AfterTreeC, p.StartedTreeRC, p.TaskTreeC}, + {p.AfterTreeC, !p.AfterSynthetic, p.StartedSynthetic, p.TaskSynthetic}, + {p.AfterSynthetic, !p.AfterPrecommitMsg, p.StartedPrecommitMsg, p.TaskPrecommitMsg}, + {p.AfterPrecommitMsg, !p.AfterPoRep, p.StartedPoRep, p.TaskPoRep}, + {p.AfterPoRep, !p.AfterFinalize, p.StartedFinalize, p.TaskFinalize}, + {p.AfterPoRep, !p.AfterCommitMsg, p.StartedCommitMsg, p.TaskCommitMsg}, + {p.AfterFinalize, !p.AfterMoveStorage, p.StartedMoveStorage, p.TaskMoveStorage}, + } + + for _, task := range tasks { + if task.AfterFirst && task.AfterSecond && task.Task != nil { + found, err := a.getOwner(ctx, *task.Task) + if err != nil { + return err + } + task.Started = found + } + } + + return nil +} + +func (a *WebRPC) getOwner(ctx context.Context, id int64) (bool, error) { + var owner int64 + err := a.deps.DB.QueryRow(ctx, `SELECT owner_id FROM harmony_task WHERE id = $1`, id).Scan(&owner) + if err != nil { + return false, xerrors.Errorf("failed to fetch owner ID: %w", err) + } + return &owner != nil, nil +} diff --git a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs index 3c09b8881..15eda891b 100644 --- a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs +++ b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs @@ -66,6 +66,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend .pipeline-failed { background-color: #603030; + } + + .pipeline-waiting { + background-color: #d0d0d0; }` properties = { sector: Object, @@ -105,7 +109,7 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend - ${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR)} + ${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR, sector.StartedSDR)} ${this.renderSectorState('TreeC', 1, sector, sector.TaskTreeC, sector.AfterTreeC)} ${this.renderSectorState('Synthetic', 2, sector, sector.TaskSynthetic, sector.AfterSynthetic)} ${this.renderSectorState('PComm Msg', 2, sector, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg)} @@ -149,14 +153,14 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend `; } - renderSectorState(name, rowspan, sector, task, after) { + renderSectorState(name, rowspan, sector, task, after, started) { if(task) { // sector.MissingTasks is a list of tasks // sector.MissingTasks.includes(task) is true if task is missing let missing = sector.MissingTasks && sector.MissingTasks.includes(task); return html` - ${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR, sector.StartedSDR)} - ${this.renderSectorState('TreeC', 1, sector, sector.TaskTreeC, sector.AfterTreeC)} - ${this.renderSectorState('Synthetic', 2, sector, sector.TaskSynthetic, sector.AfterSynthetic)} - ${this.renderSectorState('PComm Msg', 2, sector, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg)} + ${this.renderSectorState('TreeC', 1, sector, sector.TaskTreeC, sector.AfterTreeC, sector.StartedTreeRC)} + ${this.renderSectorState('Synthetic', 2, sector, sector.TaskSynthetic, sector.AfterSynthetic, sector.StartedSynthetic)} + ${this.renderSectorState('PComm Msg', 2, sector, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg, sector.StartedPrecommitMsg)} ${this.renderSectorStateNoTask('PComm Wait', 2, sector.AfterPrecommitMsg, sector.AfterPrecommitMsgSuccess)} - ${this.renderSectorState('PoRep', 2, sector, sector.TaskPoRep, sector.AfterPoRep)} - ${this.renderSectorState('Clear Cache', 1, sector, sector.TaskFinalize, sector.AfterFinalize)} - ${this.renderSectorState('Move Storage', 1, sector, sector.TaskMoveStorage, sector.AfterMoveStorage)} + ${this.renderSectorState('PoRep', 2, sector, sector.TaskPoRep, sector.AfterPoRep, sector.StartedPoRep)} + ${this.renderSectorState('Clear Cache', 1, sector, sector.TaskFinalize, sector.AfterFinalize, sector.StartedFinalize)} + ${this.renderSectorState('Move Storage', 1, sector, sector.TaskMoveStorage, sector.AfterMoveStorage, sector.StartedMoveStorage)} - ${this.renderSectorState('TreeD', 1, sector, sector.TaskTreeD, sector.AfterTreeD)} - ${this.renderSectorState('TreeR', 1, sector, sector.TaskTreeR, sector.AfterTreeR)} + ${this.renderSectorState('TreeD', 1, sector, sector.TaskTreeD, sector.AfterTreeD, sector.StartedTreeD)} + ${this.renderSectorState('TreeR', 1, sector, sector.TaskTreeR, sector.AfterTreeR, sector.StartedTreeRC)} - ${this.renderSectorState('Commit Msg', 1, sector, sector.TaskCommitMsg, sector.AfterCommitMsg)} + ${this.renderSectorState('Commit Msg', 1, sector, sector.TaskCommitMsg, sector.AfterCommitMsg, sector.StartedCommitMsg)} ${this.renderSectorStateNoTask('Commit Wait', 1, sector.AfterCommitMsg, sector.AfterCommitMsgSuccess)}
+
${name}
T:${task}
${missing ? html`
FAILED
` : ''} From 20abc1d0a63cab5a7c6497b41f643f26b65a92dc Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 9 Oct 2024 14:51:53 +0400 Subject: [PATCH 2/6] fix cluster task sorting order --- web/api/webrpc/pipeline_porep.go | 2 +- web/api/webrpc/tasks.go | 3 ++- web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index 9c01e8aa2..aa57faba4 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -412,5 +412,5 @@ func (a *WebRPC) getOwner(ctx context.Context, id int64) (bool, error) { if err != nil { return false, xerrors.Errorf("failed to fetch owner ID: %w", err) } - return &owner != nil, nil + return owner > 0, nil } diff --git a/web/api/webrpc/tasks.go b/web/api/webrpc/tasks.go index dd301beb0..52d13ff3c 100644 --- a/web/api/webrpc/tasks.go +++ b/web/api/webrpc/tasks.go @@ -26,7 +26,8 @@ func (a *WebRPC) ClusterTaskSummary(ctx context.Context) ([]TaskSummary, error) err := a.deps.DB.Select(ctx, &ts, `SELECT t.id as id, t.name as name, t.update_time as since_posted, t.owner_id as owner_id, hm.host_and_port as owner FROM harmony_task t LEFT JOIN harmony_machines hm ON hm.id = t.owner_id - ORDER BY t.update_time ASC, t.owner_id`) + ORDER BY + CASE WHEN t.owner_id IS NULL THEN 1 ELSE 0 END, t.update_time ASC`) if err != nil { return nil, err // Handle error } diff --git a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs index 15eda891b..db805cbc8 100644 --- a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs +++ b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs @@ -69,7 +69,7 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend } .pipeline-waiting { - background-color: #d0d0d0; + background-color: #808080; }` properties = { sector: Object, From 4559b9b2b376f7e05901e17032b002788208f1d3 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 9 Oct 2024 17:01:34 +0400 Subject: [PATCH 3/6] docker lotus version --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c84112517..4eee42436 100644 --- a/Makefile +++ b/Makefile @@ -257,7 +257,7 @@ build_lotus?=0 curio_docker_user?=curio curio_base_image=$(curio_docker_user)/curio-all-in-one:latest-debug ffi_from_source?=0 -lotus_version?=v1.28.1 +lotus_version?=v1.29.0 ifeq ($(build_lotus),1) # v1: building lotus image with provided lotus version From 01ba722bba9d40856343cc4d6164b4e9914bcca6 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 9 Oct 2024 17:05:42 +0400 Subject: [PATCH 4/6] add remaining wait task in UI --- .../pipeline_porep/pipeline-porep-sectors.mjs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs index db805cbc8..4a53ff9bf 100644 --- a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs +++ b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs @@ -110,17 +110,17 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
Wait Seed
${sector.AfterSeed?'done':sector.SeedEpoch}
On Chain
${sector.ChainSector ? 'yes' : (sector.ChainAlloc ? 'allocated' : 'no')}
@@ -131,10 +131,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend
Active
From ac69bfdc1d85117ef488364af1bbfee2a081b6dc Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 9 Oct 2024 17:23:01 +0400 Subject: [PATCH 5/6] fix null handling --- web/api/webrpc/pipeline_porep.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index aa57faba4..381666bae 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "time" "github.com/snadrus/must" @@ -407,10 +408,10 @@ func (a *WebRPC) GetFirstNotNullPipelineTask(ctx context.Context, p *PipelineTas } func (a *WebRPC) getOwner(ctx context.Context, id int64) (bool, error) { - var owner int64 + var owner sql.NullInt64 err := a.deps.DB.QueryRow(ctx, `SELECT owner_id FROM harmony_task WHERE id = $1`, id).Scan(&owner) if err != nil { return false, xerrors.Errorf("failed to fetch owner ID: %w", err) } - return owner > 0, nil + return owner.Valid && owner.Int64 > 0, nil } From b77386995c6c82988240e77fadc879f7be1e5b46 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 9 Oct 2024 19:16:30 +0400 Subject: [PATCH 6/6] convert to a single query --- tasks/unseal/task_unseal_decode.go | 8 +- web/api/webrpc/pipeline_porep.go | 280 +++++++++++++++++++---------- 2 files changed, 191 insertions(+), 97 deletions(-) diff --git a/tasks/unseal/task_unseal_decode.go b/tasks/unseal/task_unseal_decode.go index b67f1ef4e..5507d8571 100644 --- a/tasks/unseal/task_unseal_decode.go +++ b/tasks/unseal/task_unseal_decode.go @@ -163,7 +163,7 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails { ssize = abi.SectorSize(2 << 20) } - return harmonytask.TaskTypeDetails{ + res := harmonytask.TaskTypeDetails{ Max: taskhelp.Max(t.max), Name: "UnsealDecode", Cost: resources.Resources{ @@ -177,6 +177,12 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails { return t.schedule(context.Background(), taskFunc) }), } + + if isDevnet { + res.Cost.Ram = 1 << 30 + } + + return res } func (t *TaskUnsealDecode) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error { diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index 381666bae..a6d0ac981 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -2,7 +2,6 @@ package webrpc import ( "context" - "database/sql" "time" "github.com/snadrus/must" @@ -25,26 +24,26 @@ type PipelineTask struct { TaskSDR *int64 `db:"task_id_sdr"` AfterSDR bool `db:"after_sdr"` - StartedSDR bool + StartedSDR bool `db:"started_sdr"` TaskTreeD *int64 `db:"task_id_tree_d"` AfterTreeD bool `db:"after_tree_d"` - StartedTreeD bool + StartedTreeD bool `db:"started_tree_d"` TaskTreeC *int64 `db:"task_id_tree_c"` AfterTreeC bool `db:"after_tree_c"` - StartedTreeRC bool + StartedTreeRC bool `db:"started_tree_rc"` TaskTreeR *int64 `db:"task_id_tree_r"` AfterTreeR bool `db:"after_tree_r"` TaskSynthetic *int64 `db:"task_id_synth"` AfterSynthetic bool `db:"after_synth"` - StartedSynthetic bool + StartedSynthetic bool `db:"started_synthetic"` TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` AfterPrecommitMsg bool `db:"after_precommit_msg"` - StartedPrecommitMsg bool + StartedPrecommitMsg bool `db:"started_precommit_msg"` AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` SeedEpoch *int64 `db:"seed_epoch"` @@ -52,28 +51,27 @@ type PipelineTask struct { TaskPoRep *int64 `db:"task_id_porep"` PoRepProof []byte `db:"porep_proof"` AfterPoRep bool `db:"after_porep"` - StartedPoRep bool + StartedPoRep bool `db:"started_porep"` TaskFinalize *int64 `db:"task_id_finalize"` AfterFinalize bool `db:"after_finalize"` - StartedFinalize bool + StartedFinalize bool `db:"started_finalize"` TaskMoveStorage *int64 `db:"task_id_move_storage"` AfterMoveStorage bool `db:"after_move_storage"` - StartedMoveStorage bool + StartedMoveStorage bool `db:"started_move_storage"` TaskCommitMsg *int64 `db:"task_id_commit_msg"` AfterCommitMsg bool `db:"after_commit_msg"` - StartedCommitMsg bool + StartedCommitMsg bool `db:"started_commit_msg"` AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` Failed bool `db:"failed"` FailedReason string `db:"failed_reason"` -} -func (pt PipelineTask) sectorID() abi.SectorID { - return abi.SectorID{Miner: abi.ActorID(pt.SpID), Number: abi.SectorNumber(pt.SectorNumber)} + MissingTasks []int64 `db:"missing_tasks"` + AllTasks []int64 `db:"all_tasks"` } type sectorListEntry struct { @@ -84,9 +82,6 @@ type sectorListEntry struct { AfterSeed bool ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool - - MissingTasks []int64 - AllTasks []int64 } type minerBitfields struct { @@ -97,43 +92,183 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e var tasks []PipelineTask err := a.deps.DB.Select(ctx, &tasks, `SELECT - sp_id, sector_number, - create_time, - task_id_sdr, after_sdr, - task_id_tree_d, after_tree_d, - task_id_tree_c, after_tree_c, - task_id_tree_r, after_tree_r, - task_id_synth, after_synth, - task_id_precommit_msg, after_precommit_msg, - after_precommit_msg_success, seed_epoch, - task_id_porep, porep_proof, after_porep, - task_id_finalize, after_finalize, - task_id_move_storage, after_move_storage, - task_id_commit_msg, after_commit_msg, - after_commit_msg_success, - failed, failed_reason - FROM sectors_sdr_pipeline order by sp_id, sector_number`) // todo where constrain list + sp.sp_id, + sp.sector_number, + sp.create_time, + sp.task_id_sdr, + sp.after_sdr, + sp.task_id_tree_d, + sp.after_tree_d, + sp.task_id_tree_c, + sp.after_tree_c, + sp.task_id_tree_r, + sp.after_tree_r, + sp.task_id_synth, + sp.after_synth, + sp.task_id_precommit_msg, + sp.after_precommit_msg, + sp.after_precommit_msg_success, + sp.seed_epoch, + sp.task_id_porep, + sp.porep_proof, + sp.after_porep, + sp.task_id_finalize, + sp.after_finalize, + sp.task_id_move_storage, + sp.after_move_storage, + sp.task_id_commit_msg, + sp.after_commit_msg, + sp.after_commit_msg_success, + sp.failed, + sp.failed_reason, + + -- Compute StartedSDR + CASE + WHEN NOT after_tree_d AND task_id_sdr IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_sdr AND owner_id > 0 + ) + ELSE FALSE + END AS started_sdr, + + -- Compute StartedTreeD + CASE + WHEN after_sdr AND NOT after_tree_d AND task_id_tree_d IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_tree_d AND owner_id > 0 + ) + ELSE FALSE + END AS started_tree_d, + + -- Compute StartedTreeRC + CASE + WHEN after_tree_d AND NOT after_tree_c AND task_id_tree_c IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_tree_c AND owner_id > 0 + ) + ELSE FALSE + END AS started_tree_rc, + + -- Compute StartedSynthetic + CASE + WHEN after_tree_c AND NOT after_synth AND task_id_synth IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_synth AND owner_id > 0 + ) + ELSE FALSE + END AS started_synthetic, + + -- Compute StartedPrecommitMsg + CASE + WHEN after_synth AND NOT after_precommit_msg AND task_id_precommit_msg IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_precommit_msg AND owner_id > 0 + ) + ELSE FALSE + END AS started_precommit_msg, + + -- Compute StartedPoRep + CASE + WHEN after_precommit_msg AND NOT after_porep AND task_id_porep IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_porep AND owner_id > 0 + ) + ELSE FALSE + END AS started_porep, + + -- Compute StartedFinalize + CASE + WHEN after_porep AND NOT after_finalize AND task_id_finalize IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_finalize AND owner_id > 0 + ) + ELSE FALSE + END AS started_finalize, + + -- Compute StartedCommitMsg + CASE + WHEN after_porep AND NOT after_commit_msg AND task_id_commit_msg IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_commit_msg AND owner_id > 0 + ) + ELSE FALSE + END AS started_commit_msg, + + -- Compute StartedMoveStorage + CASE + WHEN after_finalize AND NOT after_move_storage AND task_id_move_storage IS NOT NULL THEN + EXISTS ( + SELECT 1 + FROM harmony_task + WHERE id = task_id_move_storage AND owner_id > 0 + ) + ELSE FALSE + END AS started_move_storage, + + -- Collect all task IDs into an array without NULLs + ( + SELECT array_agg(task_id) + FROM ( + VALUES + (sp.task_id_sdr), + (sp.task_id_tree_d), + (sp.task_id_tree_c), + (sp.task_id_tree_r), + (sp.task_id_synth), + (sp.task_id_precommit_msg), + (sp.task_id_porep), + (sp.task_id_finalize), + (sp.task_id_move_storage), + (sp.task_id_commit_msg) + ) AS t(task_id) + WHERE task_id IS NOT NULL + ) AS all_tasks, + + -- Compute missing tasks without NULLs + ( + SELECT array_agg(task_id) + FROM ( + SELECT task_id + FROM unnest(ARRAY[ + sp.task_id_sdr, + sp.task_id_tree_d, + sp.task_id_tree_c, + sp.task_id_tree_r, + sp.task_id_synth, + sp.task_id_precommit_msg, + sp.task_id_porep, + sp.task_id_finalize, + sp.task_id_move_storage, + sp.task_id_commit_msg + ]) AS task_id + LEFT JOIN harmony_task ht ON ht.id = task_id + WHERE task_id IS NOT NULL AND ht.id IS NULL + ) AS missing + ) AS missing_tasks + + FROM sectors_sdr_pipeline sp + ORDER BY sp_id, sector_number; + `) // todo where constrain list if err != nil { return nil, xerrors.Errorf("failed to fetch pipeline tasks: %w", err) } - missingTasks, err := a.pipelinePorepMissingTasks(ctx) - if err != nil { - return nil, xerrors.Errorf("failed to fetch missing tasks: %w", err) - } - - missingTasksMap := make(map[abi.SectorID]porepMissingTask) - for _, mt := range missingTasks { - missingTasksMap[mt.sectorID()] = mt - } - - for _, ta := range tasks { - err = a.GetFirstNotNullPipelineTask(ctx, &ta) - if err != nil { - return nil, err - } - } - head, err := a.deps.Chain.ChainHead(ctx) if err != nil { return nil, xerrors.Errorf("failed to fetch chain head: %w", err) @@ -164,12 +299,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch) - var missingTasks, allTasks []int64 - if mt, ok := missingTasksMap[task.sectorID()]; ok { - missingTasks = mt.MissingTaskIDs - allTasks = mt.AllTaskIDs - } - sectorList = append(sectorList, sectorListEntry{ PipelineTask: task, Address: addr, @@ -181,9 +310,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e ChainActive: must.One(mbf.active.IsSet(uint64(task.SectorNumber))), ChainUnproven: must.One(mbf.unproven.IsSet(uint64(task.SectorNumber))), ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))), - - MissingTasks: missingTasks, - AllTasks: allTasks, }) } @@ -377,41 +503,3 @@ func (a *WebRPC) pipelinePorepMissingTasks(ctx context.Context) ([]porepMissingT return tasks, nil } - -func (a *WebRPC) GetFirstNotNullPipelineTask(ctx context.Context, p *PipelineTask) error { - tasks := []struct { - AfterFirst, AfterSecond, Started bool - Task *int64 - }{ - {!p.AfterTreeD, false, p.StartedSDR, p.TaskSDR}, - {p.AfterSDR, !p.AfterTreeD, p.StartedTreeD, p.TaskTreeD}, - {p.AfterTreeD, !p.AfterTreeC, p.StartedTreeRC, p.TaskTreeC}, - {p.AfterTreeC, !p.AfterSynthetic, p.StartedSynthetic, p.TaskSynthetic}, - {p.AfterSynthetic, !p.AfterPrecommitMsg, p.StartedPrecommitMsg, p.TaskPrecommitMsg}, - {p.AfterPrecommitMsg, !p.AfterPoRep, p.StartedPoRep, p.TaskPoRep}, - {p.AfterPoRep, !p.AfterFinalize, p.StartedFinalize, p.TaskFinalize}, - {p.AfterPoRep, !p.AfterCommitMsg, p.StartedCommitMsg, p.TaskCommitMsg}, - {p.AfterFinalize, !p.AfterMoveStorage, p.StartedMoveStorage, p.TaskMoveStorage}, - } - - for _, task := range tasks { - if task.AfterFirst && task.AfterSecond && task.Task != nil { - found, err := a.getOwner(ctx, *task.Task) - if err != nil { - return err - } - task.Started = found - } - } - - return nil -} - -func (a *WebRPC) getOwner(ctx context.Context, id int64) (bool, error) { - var owner sql.NullInt64 - err := a.deps.DB.QueryRow(ctx, `SELECT owner_id FROM harmony_task WHERE id = $1`, id).Scan(&owner) - if err != nil { - return false, xerrors.Errorf("failed to fetch owner ID: %w", err) - } - return owner.Valid && owner.Int64 > 0, nil -}