diff --git a/service/apinode/aggregator/aggregator.go b/service/apinode/aggregator/aggregator.go index 54ff3372..2ab44633 100644 --- a/service/apinode/aggregator/aggregator.go +++ b/service/apinode/aggregator/aggregator.go @@ -35,7 +35,7 @@ func Run(projectManager *project.Manager, db *apidb.DB, sequencerAddr string, in taskMap[k] = append(taskMap[k], ts[i]) } - for _, tasks := range taskMap { + for taskMapKey, tasks := range taskMap { pid, ok := new(big.Int).SetString(tasks[0].ProjectID, 10) if !ok { slog.Error("failed to decode project id string", "project_string", tasks[0].ProjectID) @@ -53,8 +53,19 @@ func Run(projectManager *project.Manager, db *apidb.DB, sequencerAddr string, in continue } if cfg.ProofType == "movement" { - prevTaskID := tasks[0].TaskID - tasks[len(tasks)-1].PrevTaskID = prevTaskID + now := time.Now() + hoursAgo24 := now.Add(-24 * time.Hour) // TODO move to project config + count, err := db.CountSettledTask(tasks[0].ProjectID, tasks[0].DevicePubKey, hoursAgo24, now) + if err != nil { + slog.Error("failed to count settled task", "error", err, "project_id", pid.String()) + continue + } + if count >= 5 { // TODO move to project config + delete(taskMap, taskMapKey) + } else { + prevTaskID := tasks[0].TaskID + tasks[len(tasks)-1].PrevTaskID = prevTaskID + } } } diff --git a/service/apinode/db/sqlite.go b/service/apinode/db/sqlite.go index 48f5486e..678a3d89 100644 --- a/service/apinode/db/sqlite.go +++ b/service/apinode/db/sqlite.go @@ -2,6 +2,7 @@ package db import ( "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" @@ -24,8 +25,10 @@ type AssignedTask struct { type SettledTask struct { gorm.Model - TaskID string `gorm:"uniqueIndex:settled_task_uniq,not null"` - Tx string `gorm:"not null"` + TaskID string `gorm:"uniqueIndex:settled_task_uniq,not null"` + ProjectID string `gorm:"index:settled_task_query,not null"` + DevicePubKey string `gorm:"index:settled_task_query,not null"` + Tx string `gorm:"not null"` } type ProjectDevice struct { @@ -89,11 +92,17 @@ func (p *DB) UpsertAssignedTask(taskID common.Hash, prover common.Address) error } func (p *DB) UpsertSettledTask(taskID, tx common.Hash) error { + chTask, err := p.FetchTask(taskID) + if err != nil { + return err + } t := SettledTask{ - TaskID: taskID.Hex(), - Tx: tx.Hex(), + TaskID: taskID.Hex(), + ProjectID: chTask.ProjectID, + DevicePubKey: chTask.DevicePubKey, + Tx: tx.Hex(), } - err := p.sqlite.Clauses(clause.OnConflict{ + err = p.sqlite.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "task_id"}}, DoUpdates: clause.AssignmentColumns([]string{"tx"}), }).Create(&t).Error @@ -122,6 +131,18 @@ func (p *DB) FetchSettledTask(taskID common.Hash) (*SettledTask, error) { return &t, nil } +func (p *DB) CountSettledTask(projectID, devicePubKey string, begin, end time.Time) (int64, error) { + var c int64 + if err := p.sqlite.Where("project_id = ?", projectID). + Where("device_pub_key = ?", devicePubKey). + Where("created_at > ?", begin). + Where("created_at <= ?", end). + Model(&SettledTask{}).Count(&c).Error; err != nil { + return 0, errors.Wrap(err, "failed to count settled task") + } + return c, nil +} + func (p *DB) UpsertProjectDevice(projectID *big.Int, address common.Address) error { t := ProjectDevice{ ProjectID: projectID.String(),