From 11ca825a1377ac0f5e6933c9ac5dc013106f197b Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 7 Oct 2024 19:11:21 +0400 Subject: [PATCH] show task waiting in PoRep pipeline --- web/api/webrpc/pipeline_porep.go | 94 +++++++++++++++---- .../pipeline_porep/pipeline-porep-sectors.mjs | 10 +- 2 files changed, 81 insertions(+), 23 deletions(-) diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index 00eb9d664..9c01e8aa2 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -22,39 +22,48 @@ type PipelineTask struct { CreateTime time.Time `db:"create_time"` - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` + TaskSDR *int64 `db:"task_id_sdr"` + AfterSDR bool `db:"after_sdr"` + StartedSDR bool - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` + TaskTreeD *int64 `db:"task_id_tree_d"` + AfterTreeD bool `db:"after_tree_d"` + StartedTreeD bool - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` + TaskTreeC *int64 `db:"task_id_tree_c"` + AfterTreeC bool `db:"after_tree_c"` + StartedTreeRC bool TaskTreeR *int64 `db:"task_id_tree_r"` AfterTreeR bool `db:"after_tree_r"` - TaskSynthetic *int64 `db:"task_id_synth"` - AfterSynthetic bool `db:"after_synth"` + TaskSynthetic *int64 `db:"task_id_synth"` + AfterSynthetic bool `db:"after_synth"` + StartedSynthetic bool - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` + TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` + AfterPrecommitMsg bool `db:"after_precommit_msg"` + StartedPrecommitMsg bool AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` SeedEpoch *int64 `db:"seed_epoch"` - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` + TaskPoRep *int64 `db:"task_id_porep"` + PoRepProof []byte `db:"porep_proof"` + AfterPoRep bool `db:"after_porep"` + StartedPoRep bool - TaskFinalize *int64 `db:"task_id_finalize"` - AfterFinalize bool `db:"after_finalize"` + TaskFinalize *int64 `db:"task_id_finalize"` + AfterFinalize bool `db:"after_finalize"` + StartedFinalize bool - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` + TaskMoveStorage *int64 `db:"task_id_move_storage"` + AfterMoveStorage bool `db:"after_move_storage"` + StartedMoveStorage bool - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` + TaskCommitMsg *int64 `db:"task_id_commit_msg"` + AfterCommitMsg bool `db:"after_commit_msg"` + StartedCommitMsg bool AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` @@ -117,6 +126,13 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e missingTasksMap[mt.sectorID()] = mt } + for _, ta := range tasks { + err = a.GetFirstNotNullPipelineTask(ctx, &ta) + if err != nil { + return nil, err + } + } + head, err := a.deps.Chain.ChainHead(ctx) if err != nil { return nil, xerrors.Errorf("failed to fetch chain head: %w", err) @@ -245,7 +261,7 @@ func (a *WebRPC) PorepPipelineSummary(ctx context.Context) ([]PorepPipelineSumma COUNT(*) FILTER (WHERE (after_tree_d = false OR after_tree_c = false OR after_tree_r = false) AND after_sdr = true) as CountTrees, COUNT(*) FILTER (WHERE after_tree_r = true and after_precommit_msg = false) as CountPrecommitMsg, COUNT(*) FILTER (WHERE after_precommit_msg_success = true AND seed_epoch > $1) as CountWaitSeed, - COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true) as CountPoRep, + COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true AND seed_epoch < $1) as CountPoRep, COUNT(*) FILTER (WHERE after_commit_msg_success = false AND after_porep = true) as CountCommitMsg, COUNT(*) FILTER (WHERE after_commit_msg_success = true) as CountDone, COUNT(*) FILTER (WHERE failed = true) as CountFailed @@ -360,3 +376,41 @@ func (a *WebRPC) pipelinePorepMissingTasks(ctx context.Context) ([]porepMissingT return tasks, nil } + +func (a *WebRPC) GetFirstNotNullPipelineTask(ctx context.Context, p *PipelineTask) error { + tasks := []struct { + AfterFirst, AfterSecond, Started bool + Task *int64 + }{ + {!p.AfterTreeD, false, p.StartedSDR, p.TaskSDR}, + {p.AfterSDR, !p.AfterTreeD, p.StartedTreeD, p.TaskTreeD}, + {p.AfterTreeD, !p.AfterTreeC, p.StartedTreeRC, p.TaskTreeC}, + {p.AfterTreeC, !p.AfterSynthetic, p.StartedSynthetic, p.TaskSynthetic}, + {p.AfterSynthetic, !p.AfterPrecommitMsg, p.StartedPrecommitMsg, p.TaskPrecommitMsg}, + {p.AfterPrecommitMsg, !p.AfterPoRep, p.StartedPoRep, p.TaskPoRep}, + {p.AfterPoRep, !p.AfterFinalize, p.StartedFinalize, p.TaskFinalize}, + {p.AfterPoRep, !p.AfterCommitMsg, p.StartedCommitMsg, p.TaskCommitMsg}, + {p.AfterFinalize, !p.AfterMoveStorage, p.StartedMoveStorage, p.TaskMoveStorage}, + } + + for _, task := range tasks { + if task.AfterFirst && task.AfterSecond && task.Task != nil { + found, err := a.getOwner(ctx, *task.Task) + if err != nil { + return err + } + task.Started = found + } + } + + return nil +} + +func (a *WebRPC) getOwner(ctx context.Context, id int64) (bool, error) { + var owner int64 + err := a.deps.DB.QueryRow(ctx, `SELECT owner_id FROM harmony_task WHERE id = $1`, id).Scan(&owner) + if err != nil { + return false, xerrors.Errorf("failed to fetch owner ID: %w", err) + } + return &owner != nil, nil +} diff --git a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs index 3c09b8881..15eda891b 100644 --- a/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs +++ b/web/static/pages/pipeline_porep/pipeline-porep-sectors.mjs @@ -66,6 +66,10 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend .pipeline-failed { background-color: #603030; + } + + .pipeline-waiting { + background-color: #d0d0d0; }` properties = { sector: Object, @@ -105,7 +109,7 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend - ${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR)} + ${this.renderSectorState('SDR', 1, sector, sector.TaskSDR, sector.AfterSDR, sector.StartedSDR)} ${this.renderSectorState('TreeC', 1, sector, sector.TaskTreeC, sector.AfterTreeC)} ${this.renderSectorState('Synthetic', 2, sector, sector.TaskSynthetic, sector.AfterSynthetic)} ${this.renderSectorState('PComm Msg', 2, sector, sector.TaskPrecommitMsg, sector.AfterPrecommitMsg)} @@ -149,14 +153,14 @@ customElements.define('pipeline-porep-sectors',class PipelinePorepSectors extend `; } - renderSectorState(name, rowspan, sector, task, after) { + renderSectorState(name, rowspan, sector, task, after, started) { if(task) { // sector.MissingTasks is a list of tasks // sector.MissingTasks.includes(task) is true if task is missing let missing = sector.MissingTasks && sector.MissingTasks.includes(task); return html` -
+
${name}
T:${task}
${missing ? html`
FAILED
` : ''}