From 74b628763701fe9d0e9a886dda8136baf1c60e8f Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Wed, 24 Jul 2024 00:29:38 -0500 Subject: [PATCH 1/6] dramatically simpler task system --- cmd/curio/tasks/tasks.go | 2 + deps/config/types.go | 17 +++ harmony/harmonydb/sql/20240724-user_sched.sql | 5 + harmony/harmonytask/task_type_handler.go | 27 ++-- harmony/taskhelp/usertaskmgt/usertaskmgt.go | 144 ++++++++++++++++++ 5 files changed, 184 insertions(+), 11 deletions(-) create mode 100644 harmony/harmonydb/sql/20240724-user_sched.sql create mode 100644 harmony/taskhelp/usertaskmgt/usertaskmgt.go diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index bf9ce9523..2668477ff 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt" "github.com/filecoin-project/curio/lib/chainsched" "github.com/filecoin-project/curio/lib/curiochain" "github.com/filecoin-project/curio/lib/fastparamfetch" @@ -196,6 +197,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task // (we could have just appended to this list in the reverse order, but defining // tasks in pipeline order is more intuitive) + usertaskmgt.WrapTasks(activeTasks, dependencies.Cfg.Subsystems.UserScheduleURL, dependencies.DB, dependencies.ListenAddr) ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr) if err != nil { return nil, err diff --git a/deps/config/types.go b/deps/config/types.go index 19bf13709..46a52335a 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -241,6 +241,23 @@ type CurioSubsystemsConfig struct { // The address that should listen for Web GUI requests. GuiAddress string + + // UserScheduleURL are the URLs for the user schedule optimization service. Please preceed each with + // the name of the task that it should be used for, followed by a comma and the URL. For example: + // "SealSDR,http://localhost:8080/schedule" + // This http endpoint gets a POST request with the following JSON body: + // { + // "task_id": "task_id", + // "task_type": "task_type", + // "workers": ["worker1", "worker2"] + // } + // And looks for a 200 response with the following JSON body: + // { + // "worker": "worker1" + // "timeout": 60 + // } + // Timeout in seconds until it will be rescheduled. + UserScheduleURL []string } type CurioFees struct { DefaultMaxFee types.FIL diff --git a/harmony/harmonydb/sql/20240724-user_sched.sql b/harmony/harmonydb/sql/20240724-user_sched.sql new file mode 100644 index 000000000..ae8f1fc79 --- /dev/null +++ b/harmony/harmonydb/sql/20240724-user_sched.sql @@ -0,0 +1,5 @@ +CREATE TABLE harmony_task_user ( + task_id INTEGER PRIMARY KEY, + owner TEXT NOT NULL, + expiration INTEGER NOT NULL +); \ No newline at end of file diff --git a/harmony/harmonytask/task_type_handler.go b/harmony/harmonytask/task_type_handler.go index 642f29bb7..8f4d48148 100644 --- a/harmony/harmonytask/task_type_handler.go +++ b/harmony/harmonytask/task_type_handler.go @@ -194,27 +194,23 @@ canAcceptAgain: } return owner == h.TaskEngine.ownerID }) - if doErr != nil { + if doErr != nil && doErr != ErrReturnToPoolPlease { log.Errorw("Do() returned error", "type", h.Name, "id", strconv.Itoa(int(*tID)), "error", doErr) } }() return true } +var ErrReturnToPoolPlease = errors.New("return to pool") + func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done bool, doErr error) { workEnd := time.Now() retryWait := time.Millisecond * 100 retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { - var postedTime time.Time - err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) - - if err != nil { - return false, fmt.Errorf("could not log completion: %w ", err) - } result := "unspecified error" if done { - _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) + _, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { return false, fmt.Errorf("could not log completion: %w", err) @@ -228,9 +224,9 @@ retryRecordCompletion: result = "error: " + doErr.Error() } var deleteTask bool - if h.MaxFailures > 0 { + if doErr != ErrReturnToPoolPlease && h.MaxFailures > 0 { ct := uint(0) - err = tx.QueryRow(`SELECT count(*) FROM harmony_task_history + err := tx.QueryRow(`SELECT count(*) FROM harmony_task_history WHERE task_id=$1 AND result=FALSE`, tID).Scan(&ct) if err != nil { return false, fmt.Errorf("could not read task history: %w", err) @@ -240,7 +236,7 @@ retryRecordCompletion: } } if deleteTask { - _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) + _, err := tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { return false, fmt.Errorf("could not delete failed job: %w", err) } @@ -252,6 +248,15 @@ retryRecordCompletion: } } } + if doErr == ErrReturnToPoolPlease { + return true, nil + } + var postedTime time.Time + err := tx.QueryRow(`SELECT posted_time FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime) + + if err != nil { + return false, fmt.Errorf("could not log completion: %w ", err) + } _, err = tx.Exec(`INSERT INTO harmony_task_history (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result) diff --git a/harmony/taskhelp/usertaskmgt/usertaskmgt.go b/harmony/taskhelp/usertaskmgt/usertaskmgt.go new file mode 100644 index 000000000..d459bfca9 --- /dev/null +++ b/harmony/taskhelp/usertaskmgt/usertaskmgt.go @@ -0,0 +1,144 @@ +/* + Package usertaskmgt provides a way to wrap tasks with a URL that can be called to assign the task to a worker. + Timeline + +- UrlTask accepts everything +- once accepted, UrlTask.Do() finds who should own the task and updates the DB: + - harmony_task_user.owner_id & expiration_time + - harmony_task releases the task (without err) + +- The poller will see the task & call CanAccept() + - CanAccept() will see the owner_id and call the deeper canaccept() if it's us. + - If it's not us, check the expiration time and release the task by deleting the row. + +- The task will be done by the worker who was told to do it, or eventually reassigned. + +Pitfalls: +- If the user's URL is down, the task will be stuck in the DB. +- Turnaround time is slowed by the additional trip through the poller. +- Full task resources are claimed by the URL runner, so the task needs a full capacity. +*/ +package usertaskmgt + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + logging "github.com/ipfs/go-log/v2" + "github.com/samber/lo" + "golang.org/x/xerrors" +) + +var log = logging.Logger("userTaskMgt") + +func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduleUrl []string, db *harmonydb.DB, hostAndPort string) { + urlMap := lo.SliceToMap(UserScheduleUrl, func(s string) (string, *url.URL) { + spl := strings.SplitN(s, ",", 2) + if len(spl) != 2 { + log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s) + return "", &url.URL{} + } + u, err := url.Parse(spl[1]) + if err != nil { + log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s) + return "", &url.URL{} + } + return spl[0], u + }) + for i, task := range tasks { + if url, ok := urlMap[task.TypeDetails().Name]; ok { + tasks[i] = &UrlTask{ + TaskInterface: task, + UserScheduleUrl: url, + name: task.TypeDetails().Name, + db: db, + hostAndPort: hostAndPort, + } + } + } +} + +type UrlTask struct { + harmonytask.TaskInterface + db *harmonydb.DB + UserScheduleUrl *url.URL + name string + hostAndPort string +} + +// CanAccept should accept all IF no harmony_task_user row exists, ELSE +// if us, try CanAccept() until expiration hits. +func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := tids[0] + var owner string + var expiration int64 + err := t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,''), COALESCE(expiration, 0) from harmony_task_user WHERE task_id=$1`, id).Scan(&owner, &expiration) + if err != nil { + return nil, xerrors.Errorf("could not get owner: %w", err) + } + if owner != "" { + if owner == t.hostAndPort { + return t.TaskInterface.CanAccept(tids, te) + } + if expiration < time.Now().Unix() { + _, err = t.db.Exec(context.Background(), `DELETE FROM harmony_task_user WHERE task_id=$1`, id) + if err != nil { + return nil, xerrors.Errorf("could not delete from harmony_task_user: %w", err) + } + } + } + return &id, nil +} + +var client = &http.Client{Timeout: time.Second * 10} + +func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (bool, error) { + var workerList []string + err := t.db.Select(context.Background(), &workerList, `SELECT host_and_port + FROM harmony_machines m JOIN harmony_machine_details d ON d.machine_id=m.id + WHERE tasks LIKE $1`, "%,"+t.name+",%") + if err != nil { + return false, xerrors.Errorf("could not get worker list: %w", err) + } + + resp, err := client.Post(t.UserScheduleUrl.String(), "application/json", bytes.NewReader([]byte(` + { + "task_type": "`+t.name+`", + "task_id": `+strconv.Itoa(int(id))+`, + "workers": [`+strings.Join(workerList, ",")+`], + } + `))) + if err != nil { + return false, xerrors.Errorf("could not call user defined URL: %w", err) + } + if resp.StatusCode != http.StatusOK { + return false, xerrors.Errorf("User defined URL returned non-200 status code: %d", resp.StatusCode) + } + var respData struct { + Worker string + Timeout int + } + defer resp.Body.Close() + err = json.NewDecoder(resp.Body).Decode(&respData) + if err != nil { + return false, xerrors.Errorf("could not decode user defined URL response: %w", err) + } + + // If it's us, we cannot shortcut because we don't have CanAccept's 2nd arg. + + expires := time.Now().Add(time.Second * time.Duration(respData.Timeout)) + _, err = t.db.Exec(context.Background(), `INSERT INTO harmony_task_user (task_id, owner, expiration) VALUES ($1,$2)`, id, respData.Worker, expires) + if err != nil { + return false, xerrors.Errorf("could not insert into harmony_task_user: %w", err) + } + + return false, harmonytask.ErrReturnToPoolPlease +} From c4fa8937e103870adb1dc5f34c7590bb781151fb Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Wed, 24 Jul 2024 00:33:48 -0500 Subject: [PATCH 2/6] do it --- harmony/taskhelp/usertaskmgt/usertaskmgt.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/harmony/taskhelp/usertaskmgt/usertaskmgt.go b/harmony/taskhelp/usertaskmgt/usertaskmgt.go index d459bfca9..3a2062fb3 100644 --- a/harmony/taskhelp/usertaskmgt/usertaskmgt.go +++ b/harmony/taskhelp/usertaskmgt/usertaskmgt.go @@ -101,8 +101,16 @@ func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngin var client = &http.Client{Timeout: time.Second * 10} func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (bool, error) { + var owner string + err := t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,'') FROM harmony_task_user WHERE task_id=$1`, id).Scan(&owner) + if err != nil { + return false, xerrors.Errorf("could not get owner: %w", err) + } + if owner == t.hostAndPort { + return t.TaskInterface.Do(id, stillMe) + } var workerList []string - err := t.db.Select(context.Background(), &workerList, `SELECT host_and_port + err = t.db.Select(context.Background(), &workerList, `SELECT host_and_port FROM harmony_machines m JOIN harmony_machine_details d ON d.machine_id=m.id WHERE tasks LIKE $1`, "%,"+t.name+",%") if err != nil { From 4c535f4b81d1e7cab136ccae9b3972dda3457e67 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 25 Jul 2024 23:02:54 -0500 Subject: [PATCH 3/6] utask- downHdlr, ex, test --- .gitignore | 1 + Makefile | 5 + cmd/curio/tasks/tasks.go | 2 +- cmd/userschedule/userschedule.go | 100 ++++++++++++++++++ deps/config/types.go | 35 +++++- harmony/harmonydb/sql/20240724-user_sched.sql | 4 +- harmony/taskhelp/usertaskmgt/usertaskmgt.go | 63 ++++++----- itests/userschedule_test.go | 82 ++++++++++++++ 8 files changed, 261 insertions(+), 31 deletions(-) create mode 100644 cmd/userschedule/userschedule.go create mode 100644 itests/userschedule_test.go diff --git a/.gitignore b/.gitignore index 931d5f5ce..e52a7547d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ /docgen-md /docgen-openrpc /deps.json +/userschedule extern/filecoin-ffi/rust/target **/*.a diff --git a/Makefile b/Makefile index dcb200b89..1d25350bf 100644 --- a/Makefile +++ b/Makefile @@ -54,6 +54,11 @@ sptool: $(BUILD_DEPS) .PHONY: sptool BINS+=sptool +userschedule: + rm -f userschedule + $(GOCC) build $(GOFLAGS) -o userschedule ./cmd/userschedule +.PHONY: userschedule + calibnet: GOFLAGS+=-tags=calibnet calibnet: build diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 2668477ff..9f9e8cee5 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -197,7 +197,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task // (we could have just appended to this list in the reverse order, but defining // tasks in pipeline order is more intuitive) - usertaskmgt.WrapTasks(activeTasks, dependencies.Cfg.Subsystems.UserScheduleURL, dependencies.DB, dependencies.ListenAddr) + usertaskmgt.WrapTasks(activeTasks, dependencies.Cfg.Subsystems.UserScheduler, dependencies.DB, dependencies.ListenAddr) ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr) if err != nil { return nil, err diff --git a/cmd/userschedule/userschedule.go b/cmd/userschedule/userschedule.go new file mode 100644 index 000000000..86caeaee8 --- /dev/null +++ b/cmd/userschedule/userschedule.go @@ -0,0 +1,100 @@ +// This is an example round-robin scheduler. +// It can be used by modifying Curio's base configuration Subsystems.UserSchedule +// to point to a machine named myscheduler with URL: +// http://myscheduler:7654/userschedule +// Be sure to open the selected port on the machine running this scheduler. +// +// Usage: +// +// Fork the repo from Github and clone it to your local machine, +// Edit this file as needed to implement your own scheduling logic, +// build with 'make userschedule' then run with ./userschedule +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "runtime/debug" + "runtime/pprof" + "sync" + "syscall" + + "golang.org/x/xerrors" +) + +const WorkerBusyTimeout = 60 // Seconds until Curio asks again for this task. +func sched(w http.ResponseWriter, r *http.Request) { + var input struct { + TaskID string `json:"task_id"` + TaskType string `json:"task_type"` + Workers []string `json:"workers"` + } + // Parse the request + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + OrHTTPFail(w, xerrors.Errorf("failed to parse request: %s", err)) + } + + // Scheduler Logic goes here + selectedWorker := roundRobin(input.TaskType, input.Workers) + + // Respond to Curio + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(struct { + Worker string `json:"worker"` + Timeout int `json:"timeout"` + }{selectedWorker, WorkerBusyTimeout}) + if err != nil { + OrHTTPFail(w, err) + } +} + +// ///////// Round Robin Scheduler ///////// // +var mx sync.Mutex +var m = make(map[string]int) + +func roundRobin(taskType string, workers []string) string { + mx.Lock() + defer mx.Unlock() + selectedWorker := workers[m[taskType]%len(workers)] + m[taskType]++ + return selectedWorker +} + +// /////////////////////////////////// +// Everything below this line is boilerplate code. +// /////////////////////////////////// + +func main() { + setupCloseHandler() + mux := http.NewServeMux() + mux.HandleFunc("/userschedule", func(w http.ResponseWriter, r *http.Request) { + defer recover() + sched(w, r) + }) + http.ListenAndServe(":7654", mux) +} + +// Intentionally inlined dependencies to make it easy to copy-paste into your own codebase. +func OrHTTPFail(w http.ResponseWriter, err error) { + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(err.Error())) + log.Printf("http fail. err %s, stack %s", err, string(debug.Stack())) + panic(1) + } +} + +func setupCloseHandler() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\r- Ctrl+C pressed in Terminal") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + panic(1) + }() +} diff --git a/deps/config/types.go b/deps/config/types.go index 46a52335a..b2bad6829 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -242,9 +242,7 @@ type CurioSubsystemsConfig struct { // The address that should listen for Web GUI requests. GuiAddress string - // UserScheduleURL are the URLs for the user schedule optimization service. Please preceed each with - // the name of the task that it should be used for, followed by a comma and the URL. For example: - // "SealSDR,http://localhost:8080/schedule" + // UserScheduler allows for the user to schedule tasks on specific machines of their choice. // This http endpoint gets a POST request with the following JSON body: // { // "task_id": "task_id", @@ -257,7 +255,36 @@ type CurioSubsystemsConfig struct { // "timeout": 60 // } // Timeout in seconds until it will be rescheduled. - UserScheduleURL []string + UserScheduler []UserSchedule +} + +// UserSchedule allows for the user to schedule a task on specific machines of their choice. +// This http endpoint gets a POST request with the following JSON body: +// +// { +// "task_id": "task_id", +// "task_type": "task_type", +// "workers": ["worker1", "worker2"] +// } +// +// And looks for a 200 response with the following JSON body: +// +// { +// "worker": "worker1" +// "timeout": 60 +// } +// +// Timeout in seconds until it will be rescheduled. +type UserSchedule struct { + // TaskName as listed in the GUI. Ex: SDR + TaskName string + + // URL to http(s) user scheduler + URL string + + // HaltOnSchedulerDown - If true, the tasks will not run when the URL response is not usable. + // The False value is recommended to keep scheduling working even if the UserScheduler service is down. + HaltOnSchedulerDown bool } type CurioFees struct { DefaultMaxFee types.FIL diff --git a/harmony/harmonydb/sql/20240724-user_sched.sql b/harmony/harmonydb/sql/20240724-user_sched.sql index ae8f1fc79..f5f5f94dd 100644 --- a/harmony/harmonydb/sql/20240724-user_sched.sql +++ b/harmony/harmonydb/sql/20240724-user_sched.sql @@ -1,5 +1,7 @@ CREATE TABLE harmony_task_user ( task_id INTEGER PRIMARY KEY, owner TEXT NOT NULL, - expiration INTEGER NOT NULL + expiration INTEGER NOT NULL, + ignore_userscheduler BOOLEAN NOT NULL DEFAULT FALSE, + FOREIGN KEY (task_id) REFERENCES harmony_task (task_id) ON DELETE CASCADE ); \ No newline at end of file diff --git a/harmony/taskhelp/usertaskmgt/usertaskmgt.go b/harmony/taskhelp/usertaskmgt/usertaskmgt.go index 3a2062fb3..c9f42591e 100644 --- a/harmony/taskhelp/usertaskmgt/usertaskmgt.go +++ b/harmony/taskhelp/usertaskmgt/usertaskmgt.go @@ -30,6 +30,7 @@ import ( "strings" "time" + "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" logging "github.com/ipfs/go-log/v2" @@ -39,28 +40,24 @@ import ( var log = logging.Logger("userTaskMgt") -func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduleUrl []string, db *harmonydb.DB, hostAndPort string) { - urlMap := lo.SliceToMap(UserScheduleUrl, func(s string) (string, *url.URL) { - spl := strings.SplitN(s, ",", 2) - if len(spl) != 2 { - log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s) - return "", &url.URL{} - } - u, err := url.Parse(spl[1]) +func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduler []config.UserSchedule, db *harmonydb.DB, hostAndPort string) { + m := lo.SliceToMap(UserScheduler, func(s config.UserSchedule) (string, *config.UserSchedule) { + _, err := url.Parse(s.URL) if err != nil { log.Errorf("Invalid UserScheduleUrl: %s. Expected: taskName,url", s) - return "", &url.URL{} + return "", nil } - return spl[0], u + return s.TaskName, &s }) for i, task := range tasks { - if url, ok := urlMap[task.TypeDetails().Name]; ok { + if s, ok := m[task.TypeDetails().Name]; ok { tasks[i] = &UrlTask{ - TaskInterface: task, - UserScheduleUrl: url, - name: task.TypeDetails().Name, - db: db, - hostAndPort: hostAndPort, + TaskInterface: task, + UserScheduleUrl: s.URL, + name: task.TypeDetails().Name, + db: db, + hostAndPort: hostAndPort, + haltOnSchedulerDown: s.HaltOnSchedulerDown, } } } @@ -68,10 +65,11 @@ func WrapTasks(tasks []harmonytask.TaskInterface, UserScheduleUrl []string, db * type UrlTask struct { harmonytask.TaskInterface - db *harmonydb.DB - UserScheduleUrl *url.URL - name string - hostAndPort string + db *harmonydb.DB + UserScheduleUrl string + name string + hostAndPort string + haltOnSchedulerDown bool } // CanAccept should accept all IF no harmony_task_user row exists, ELSE @@ -80,12 +78,17 @@ func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngin id := tids[0] var owner string var expiration int64 - err := t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,''), COALESCE(expiration, 0) from harmony_task_user WHERE task_id=$1`, id).Scan(&owner, &expiration) + var ignoreUserScheduler bool + err := t.db.QueryRow(context.Background(), `SELECT + COALESCE(owner,''), + COALESCE(expiration, 0), + COALESCE(ignore_userscheduler,false) + from harmony_task_user WHERE task_id=$1`, id).Scan(&owner, &expiration, &ignoreUserScheduler) if err != nil { return nil, xerrors.Errorf("could not get owner: %w", err) } if owner != "" { - if owner == t.hostAndPort { + if owner == t.hostAndPort || ignoreUserScheduler { return t.TaskInterface.CanAccept(tids, te) } if expiration < time.Now().Unix() { @@ -100,9 +103,19 @@ func (t *UrlTask) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngin var client = &http.Client{Timeout: time.Second * 10} -func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (bool, error) { +func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (b bool, err error) { + defer func() { + if err != harmonytask.ErrReturnToPoolPlease && !t.haltOnSchedulerDown { + log.Error("Proceeding without user scheduler service running (as configured)") + log.Error(err) + t.db.Exec(context.Background(), + `INSERT INTO harmony_task_user (task_id, owner, expiration, ignore_userscheduler) + VALUES ($1, '-', 0, true)`, id) + err = harmonytask.ErrReturnToPoolPlease + } + }() var owner string - err := t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,'') FROM harmony_task_user WHERE task_id=$1`, id).Scan(&owner) + err = t.db.QueryRow(context.Background(), `SELECT COALESCE(owner,'') FROM harmony_task_user WHERE task_id=$1`, id).Scan(&owner) if err != nil { return false, xerrors.Errorf("could not get owner: %w", err) } @@ -117,7 +130,7 @@ func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (bool, error) { return false, xerrors.Errorf("could not get worker list: %w", err) } - resp, err := client.Post(t.UserScheduleUrl.String(), "application/json", bytes.NewReader([]byte(` + resp, err := client.Post(t.UserScheduleUrl, "application/json", bytes.NewReader([]byte(` { "task_type": "`+t.name+`", "task_id": `+strconv.Itoa(int(id))+`, diff --git a/itests/userschedule_test.go b/itests/userschedule_test.go new file mode 100644 index 000000000..469abd51e --- /dev/null +++ b/itests/userschedule_test.go @@ -0,0 +1,82 @@ +package itests + +import ( + "os/exec" + "strconv" + "testing" + "time" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt" + "github.com/stretchr/testify/require" +) + +func TestUserTaskMgt(t *testing.T) { + gopath, err := exec.LookPath("go") + require.NoError(t, err) + exec.Command(gopath, "run", "cmd/userschedule").Run() // round-robin scheduler + + db := dbSetup(t) + harmonytask.POLL_DURATION = 200 * time.Millisecond + output := "" + var instances []*ut + for a := 0; a < 3; a++ { // make 3 "machines" + inst := &ut{db: db, f: func() { output += strconv.Itoa(a) }} + instances = append(instances, inst) + + host := "foo:" + strconv.Itoa(a) + tasks := []harmonytask.TaskInterface{inst} + usertaskmgt.WrapTasks(tasks, []config.UserSchedule{ + {TaskName: "foo", URL: "http://localhost:7654"}, + }, db, host) + harmonytask.New(db, tasks, host) + } + + for a := 0; a < 5; a++ { // schedule 5 tasks + instances[0].myAddTask(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + return true, nil + }) + } + require.Equal(t, "01201", output) +} + +type ut struct { + db *harmonydb.DB + f func() + myAddTask harmonytask.AddTaskFunc +} + +func (u *ut) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: "foo", + Cost: resources.Resources{}, + } +} +func (u *ut) CanAccept(tids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + return &tids[0], nil +} +func (u *ut) Adder(f harmonytask.AddTaskFunc) { + u.myAddTask = f +} +func (u *ut) Do(tID harmonytask.TaskID, _ func() bool) (bool, error) { + u.f() + time.Sleep(time.Second) // so there's no chance that a later task will finish first. + return true, nil +} + +func dbSetup(t *testing.T) *harmonydb.DB { + sharedITestID := harmonydb.ITestNewID() + dbConfig := config.HarmonyDB{ + Hosts: []string{envElse("CURIO_HARMONYDB_HOSTS", "127.0.0.1")}, + Database: "yugabyte", + Username: "yugabyte", + Password: "yugabyte", + Port: "5433", + } + db, err := harmonydb.NewFromConfigWithITestID(t, dbConfig, sharedITestID) + require.NoError(t, err) + return db +} From a98c558b8a652fd16642a51153e70e02bab8911b Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Fri, 26 Jul 2024 17:28:23 -0500 Subject: [PATCH 4/6] test fixes --- deps/config/doc_gen.go | 39 +++++++++++++++++++ harmony/harmonydb/sql/20240724-user_sched.sql | 2 +- harmony/taskhelp/usertaskmgt/usertaskmgt.go | 7 ++-- itests/userschedule_test.go | 6 ++- 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index 4646f66eb..3814200ca 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -632,6 +632,24 @@ cache data held on disk after the completion of TreeRC task to 11GiB.`, Comment: `The maximum amount of SyntheticPoRep tasks that can run simultaneously. Note that the maximum number of tasks will also be bounded by resources available on the machine.`, }, + { + Name: "UserScheduler", + Type: "[]UserSchedule", + + Comment: `UserScheduler allows for the user to schedule tasks on specific machines of their choice. +This http endpoint gets a POST request with the following JSON body: +{ +"task_id": "task_id", +"task_type": "task_type", +"workers": ["worker1", "worker2"] +} +And looks for a 200 response with the following JSON body: +{ +"worker": "worker1" +"timeout": 60 +} +Timeout in seconds until it will be rescheduled.`, + }, }, "Duration time.Duration": { { @@ -705,4 +723,25 @@ identifier in the integration page for the service.`, Example: https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX`, }, }, + "UserSchedule": { + { + Name: "TaskName", + Type: "string", + + Comment: `TaskName as listed in the GUI. Ex: SDR`, + }, + { + Name: "URL", + Type: "string", + + Comment: `URL to http(s) user scheduler`, + }, + { + Name: "HaltOnSchedulerDown", + Type: "bool", + + Comment: `HaltOnSchedulerDown - If true, the tasks will not run when the URL response is not usable. +The False value is recommended to keep scheduling working even if the UserScheduler service is down.`, + }, + }, } diff --git a/harmony/harmonydb/sql/20240724-user_sched.sql b/harmony/harmonydb/sql/20240724-user_sched.sql index f5f5f94dd..6c7b6df8d 100644 --- a/harmony/harmonydb/sql/20240724-user_sched.sql +++ b/harmony/harmonydb/sql/20240724-user_sched.sql @@ -3,5 +3,5 @@ CREATE TABLE harmony_task_user ( owner TEXT NOT NULL, expiration INTEGER NOT NULL, ignore_userscheduler BOOLEAN NOT NULL DEFAULT FALSE, - FOREIGN KEY (task_id) REFERENCES harmony_task (task_id) ON DELETE CASCADE + FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE CASCADE ); \ No newline at end of file diff --git a/harmony/taskhelp/usertaskmgt/usertaskmgt.go b/harmony/taskhelp/usertaskmgt/usertaskmgt.go index c9f42591e..a2634bcc7 100644 --- a/harmony/taskhelp/usertaskmgt/usertaskmgt.go +++ b/harmony/taskhelp/usertaskmgt/usertaskmgt.go @@ -30,12 +30,13 @@ import ( "strings" "time" - "github.com/filecoin-project/curio/deps/config" - "github.com/filecoin-project/curio/harmony/harmonydb" - "github.com/filecoin-project/curio/harmony/harmonytask" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" ) var log = logging.Logger("userTaskMgt") diff --git a/itests/userschedule_test.go b/itests/userschedule_test.go index 469abd51e..ba05999e9 100644 --- a/itests/userschedule_test.go +++ b/itests/userschedule_test.go @@ -6,12 +6,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" "github.com/filecoin-project/curio/harmony/taskhelp/usertaskmgt" - "github.com/stretchr/testify/require" ) func TestUserTaskMgt(t *testing.T) { @@ -32,7 +33,8 @@ func TestUserTaskMgt(t *testing.T) { usertaskmgt.WrapTasks(tasks, []config.UserSchedule{ {TaskName: "foo", URL: "http://localhost:7654"}, }, db, host) - harmonytask.New(db, tasks, host) + _, err := harmonytask.New(db, tasks, host) + require.NoError(t, err) } for a := 0; a < 5; a++ { // schedule 5 tasks From 9404d4ee585323d84430e3c29b7230ea5aa7aae5 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 29 Jul 2024 17:38:36 -0500 Subject: [PATCH 5/6] lint --- cmd/userschedule/userschedule.go | 4 ++-- harmony/taskhelp/usertaskmgt/usertaskmgt.go | 6 +++++- itests/userschedule_test.go | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/userschedule/userschedule.go b/cmd/userschedule/userschedule.go index 86caeaee8..574534335 100644 --- a/cmd/userschedule/userschedule.go +++ b/cmd/userschedule/userschedule.go @@ -72,10 +72,10 @@ func main() { setupCloseHandler() mux := http.NewServeMux() mux.HandleFunc("/userschedule", func(w http.ResponseWriter, r *http.Request) { - defer recover() + defer func() { _ = recover() }() sched(w, r) }) - http.ListenAndServe(":7654", mux) + fmt.Println(http.ListenAndServe(":7654", mux)) } // Intentionally inlined dependencies to make it easy to copy-paste into your own codebase. diff --git a/harmony/taskhelp/usertaskmgt/usertaskmgt.go b/harmony/taskhelp/usertaskmgt/usertaskmgt.go index a2634bcc7..d5b8eda0d 100644 --- a/harmony/taskhelp/usertaskmgt/usertaskmgt.go +++ b/harmony/taskhelp/usertaskmgt/usertaskmgt.go @@ -109,9 +109,13 @@ func (t *UrlTask) Do(id harmonytask.TaskID, stillMe func() bool) (b bool, err er if err != harmonytask.ErrReturnToPoolPlease && !t.haltOnSchedulerDown { log.Error("Proceeding without user scheduler service running (as configured)") log.Error(err) - t.db.Exec(context.Background(), + _, err = t.db.Exec(context.Background(), `INSERT INTO harmony_task_user (task_id, owner, expiration, ignore_userscheduler) VALUES ($1, '-', 0, true)`, id) + if err != nil { + log.Error("Could not insert into harmony_task_user: ", err) + return + } err = harmonytask.ErrReturnToPoolPlease } }() diff --git a/itests/userschedule_test.go b/itests/userschedule_test.go index ba05999e9..75b48ea91 100644 --- a/itests/userschedule_test.go +++ b/itests/userschedule_test.go @@ -18,7 +18,7 @@ import ( func TestUserTaskMgt(t *testing.T) { gopath, err := exec.LookPath("go") require.NoError(t, err) - exec.Command(gopath, "run", "cmd/userschedule").Run() // round-robin scheduler + require.NoError(t, exec.Command(gopath, "run", "cmd/userschedule").Run()) // round-robin scheduler db := dbSetup(t) harmonytask.POLL_DURATION = 200 * time.Millisecond From 022200417253b9ef25032b5a70ababa226b3e314 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 15 Aug 2024 17:47:22 -0500 Subject: [PATCH 6/6] make gen --- deps/config/doc_gen.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index 2716fb593..bfbd97b42 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -707,7 +707,8 @@ And looks for a 200 response with the following JSON body: "timeout": 60 } Timeout in seconds until it will be rescheduled.`, - },{ + }, + { Name: "EnableBatchSeal", Type: "bool",