Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: show task waiting in PoRep pipeline #260

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ build_lotus?=0
curio_docker_user?=curio
curio_base_image=$(curio_docker_user)/curio-all-in-one:latest-debug
ffi_from_source?=0
lotus_version?=v1.28.1
lotus_version?=v1.29.0

ifeq ($(build_lotus),1)
# v1: building lotus image with provided lotus version
Expand Down
8 changes: 7 additions & 1 deletion tasks/unseal/task_unseal_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails {
ssize = abi.SectorSize(2 << 20)
}

return harmonytask.TaskTypeDetails{
res := harmonytask.TaskTypeDetails{
Max: taskhelp.Max(t.max),
Name: "UnsealDecode",
Cost: resources.Resources{
Expand All @@ -177,6 +177,12 @@ func (t *TaskUnsealDecode) TypeDetails() harmonytask.TaskTypeDetails {
return t.schedule(context.Background(), taskFunc)
}),
}

if isDevnet {
res.Cost.Ram = 1 << 30
}

return res
}

func (t *TaskUnsealDecode) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error {
Expand Down
265 changes: 204 additions & 61 deletions web/api/webrpc/pipeline_porep.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,56 @@ type PipelineTask struct {

CreateTime time.Time `db:"create_time"`

TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
TaskSDR *int64 `db:"task_id_sdr"`
AfterSDR bool `db:"after_sdr"`
StartedSDR bool `db:"started_sdr"`

TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
TaskTreeD *int64 `db:"task_id_tree_d"`
AfterTreeD bool `db:"after_tree_d"`
StartedTreeD bool `db:"started_tree_d"`

TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
TaskTreeC *int64 `db:"task_id_tree_c"`
AfterTreeC bool `db:"after_tree_c"`
StartedTreeRC bool `db:"started_tree_rc"`

TaskTreeR *int64 `db:"task_id_tree_r"`
AfterTreeR bool `db:"after_tree_r"`

TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
TaskSynthetic *int64 `db:"task_id_synth"`
AfterSynthetic bool `db:"after_synth"`
StartedSynthetic bool `db:"started_synthetic"`

TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
StartedPrecommitMsg bool `db:"started_precommit_msg"`

AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
SeedEpoch *int64 `db:"seed_epoch"`

TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"`
TaskPoRep *int64 `db:"task_id_porep"`
PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"`
StartedPoRep bool `db:"started_porep"`

TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
StartedFinalize bool `db:"started_finalize"`

TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
TaskMoveStorage *int64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`
StartedMoveStorage bool `db:"started_move_storage"`

TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"`
StartedCommitMsg bool `db:"started_commit_msg"`

AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`

Failed bool `db:"failed"`
FailedReason string `db:"failed_reason"`
}

func (pt PipelineTask) sectorID() abi.SectorID {
return abi.SectorID{Miner: abi.ActorID(pt.SpID), Number: abi.SectorNumber(pt.SectorNumber)}
MissingTasks []int64 `db:"missing_tasks"`
AllTasks []int64 `db:"all_tasks"`
}

type sectorListEntry struct {
Expand All @@ -74,9 +82,6 @@ type sectorListEntry struct {
AfterSeed bool

ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool

MissingTasks []int64
AllTasks []int64
}

type minerBitfields struct {
Expand All @@ -87,36 +92,183 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
var tasks []PipelineTask

err := a.deps.DB.Select(ctx, &tasks, `SELECT
sp_id, sector_number,
create_time,
task_id_sdr, after_sdr,
task_id_tree_d, after_tree_d,
task_id_tree_c, after_tree_c,
task_id_tree_r, after_tree_r,
task_id_synth, after_synth,
task_id_precommit_msg, after_precommit_msg,
after_precommit_msg_success, seed_epoch,
task_id_porep, porep_proof, after_porep,
task_id_finalize, after_finalize,
task_id_move_storage, after_move_storage,
task_id_commit_msg, after_commit_msg,
after_commit_msg_success,
failed, failed_reason
FROM sectors_sdr_pipeline order by sp_id, sector_number`) // todo where constrain list
sp.sp_id,
sp.sector_number,
sp.create_time,
sp.task_id_sdr,
sp.after_sdr,
sp.task_id_tree_d,
sp.after_tree_d,
sp.task_id_tree_c,
sp.after_tree_c,
sp.task_id_tree_r,
sp.after_tree_r,
sp.task_id_synth,
sp.after_synth,
sp.task_id_precommit_msg,
sp.after_precommit_msg,
sp.after_precommit_msg_success,
sp.seed_epoch,
sp.task_id_porep,
sp.porep_proof,
sp.after_porep,
sp.task_id_finalize,
sp.after_finalize,
sp.task_id_move_storage,
sp.after_move_storage,
sp.task_id_commit_msg,
sp.after_commit_msg,
sp.after_commit_msg_success,
sp.failed,
sp.failed_reason,

-- Compute StartedSDR
CASE
WHEN NOT after_tree_d AND task_id_sdr IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_sdr AND owner_id > 0
)
ELSE FALSE
END AS started_sdr,

-- Compute StartedTreeD
CASE
WHEN after_sdr AND NOT after_tree_d AND task_id_tree_d IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_tree_d AND owner_id > 0
)
ELSE FALSE
END AS started_tree_d,

-- Compute StartedTreeRC
CASE
WHEN after_tree_d AND NOT after_tree_c AND task_id_tree_c IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_tree_c AND owner_id > 0
)
ELSE FALSE
END AS started_tree_rc,

-- Compute StartedSynthetic
CASE
WHEN after_tree_c AND NOT after_synth AND task_id_synth IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_synth AND owner_id > 0
)
ELSE FALSE
END AS started_synthetic,

-- Compute StartedPrecommitMsg
CASE
WHEN after_synth AND NOT after_precommit_msg AND task_id_precommit_msg IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_precommit_msg AND owner_id > 0
)
ELSE FALSE
END AS started_precommit_msg,

-- Compute StartedPoRep
CASE
WHEN after_precommit_msg AND NOT after_porep AND task_id_porep IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_porep AND owner_id > 0
)
ELSE FALSE
END AS started_porep,

-- Compute StartedFinalize
CASE
WHEN after_porep AND NOT after_finalize AND task_id_finalize IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_finalize AND owner_id > 0
)
ELSE FALSE
END AS started_finalize,

-- Compute StartedCommitMsg
CASE
WHEN after_porep AND NOT after_commit_msg AND task_id_commit_msg IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_commit_msg AND owner_id > 0
)
ELSE FALSE
END AS started_commit_msg,

-- Compute StartedMoveStorage
CASE
WHEN after_finalize AND NOT after_move_storage AND task_id_move_storage IS NOT NULL THEN
EXISTS (
SELECT 1
FROM harmony_task
WHERE id = task_id_move_storage AND owner_id > 0
)
ELSE FALSE
END AS started_move_storage,

-- Collect all task IDs into an array without NULLs
(
SELECT array_agg(task_id)
FROM (
VALUES
(sp.task_id_sdr),
(sp.task_id_tree_d),
(sp.task_id_tree_c),
(sp.task_id_tree_r),
(sp.task_id_synth),
(sp.task_id_precommit_msg),
(sp.task_id_porep),
(sp.task_id_finalize),
(sp.task_id_move_storage),
(sp.task_id_commit_msg)
) AS t(task_id)
WHERE task_id IS NOT NULL
) AS all_tasks,

-- Compute missing tasks without NULLs
(
SELECT array_agg(task_id)
FROM (
SELECT task_id
FROM unnest(ARRAY[
sp.task_id_sdr,
sp.task_id_tree_d,
sp.task_id_tree_c,
sp.task_id_tree_r,
sp.task_id_synth,
sp.task_id_precommit_msg,
sp.task_id_porep,
sp.task_id_finalize,
sp.task_id_move_storage,
sp.task_id_commit_msg
]) AS task_id
LEFT JOIN harmony_task ht ON ht.id = task_id
WHERE task_id IS NOT NULL AND ht.id IS NULL
) AS missing
) AS missing_tasks

FROM sectors_sdr_pipeline sp
ORDER BY sp_id, sector_number;
`) // todo where constrain list
if err != nil {
return nil, xerrors.Errorf("failed to fetch pipeline tasks: %w", err)
}

missingTasks, err := a.pipelinePorepMissingTasks(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch missing tasks: %w", err)
}

missingTasksMap := make(map[abi.SectorID]porepMissingTask)
for _, mt := range missingTasks {
missingTasksMap[mt.sectorID()] = mt
}

head, err := a.deps.Chain.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("failed to fetch chain head: %w", err)
Expand Down Expand Up @@ -147,12 +299,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e

afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch)

var missingTasks, allTasks []int64
if mt, ok := missingTasksMap[task.sectorID()]; ok {
missingTasks = mt.MissingTaskIDs
allTasks = mt.AllTaskIDs
}

sectorList = append(sectorList, sectorListEntry{
PipelineTask: task,
Address: addr,
Expand All @@ -164,9 +310,6 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e
ChainActive: must.One(mbf.active.IsSet(uint64(task.SectorNumber))),
ChainUnproven: must.One(mbf.unproven.IsSet(uint64(task.SectorNumber))),
ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))),

MissingTasks: missingTasks,
AllTasks: allTasks,
})
}

Expand Down Expand Up @@ -245,7 +388,7 @@ func (a *WebRPC) PorepPipelineSummary(ctx context.Context) ([]PorepPipelineSumma
COUNT(*) FILTER (WHERE (after_tree_d = false OR after_tree_c = false OR after_tree_r = false) AND after_sdr = true) as CountTrees,
COUNT(*) FILTER (WHERE after_tree_r = true and after_precommit_msg = false) as CountPrecommitMsg,
COUNT(*) FILTER (WHERE after_precommit_msg_success = true AND seed_epoch > $1) as CountWaitSeed,
COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true) as CountPoRep,
COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true AND seed_epoch < $1) as CountPoRep,
COUNT(*) FILTER (WHERE after_commit_msg_success = false AND after_porep = true) as CountCommitMsg,
COUNT(*) FILTER (WHERE after_commit_msg_success = true) as CountDone,
COUNT(*) FILTER (WHERE failed = true) as CountFailed
Expand Down
3 changes: 2 additions & 1 deletion web/api/webrpc/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func (a *WebRPC) ClusterTaskSummary(ctx context.Context) ([]TaskSummary, error)
err := a.deps.DB.Select(ctx, &ts, `SELECT
t.id as id, t.name as name, t.update_time as since_posted, t.owner_id as owner_id, hm.host_and_port as owner
FROM harmony_task t LEFT JOIN harmony_machines hm ON hm.id = t.owner_id
ORDER BY t.update_time ASC, t.owner_id`)
ORDER BY
CASE WHEN t.owner_id IS NULL THEN 1 ELSE 0 END, t.update_time ASC`)
if err != nil {
return nil, err // Handle error
}
Expand Down
Loading