Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: alerts for wnPost and wdPost #251

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 43 additions & 71 deletions alertmanager/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func NowCheck(al *alerts) {
}
}()
if len(nowAlerts) > 0 {
fmt.Println("ALERTMANAGER: NOW ALERTS")
al.alertMap[Name].alertString = strings.Join(lo.Map(nowAlerts, func(n NowType, _ int) string {
return fmt.Sprintf("Machine %s: %s", n.Name, n.Message)
}), " ")
Expand Down Expand Up @@ -420,13 +419,12 @@ func wdPostCheck(al *alerts) {
return
}

// Calculate from epoch for last AlertMangerInterval
from := head.Height() - abi.ChainEpoch(math.Ceil(AlertMangerInterval.Seconds()/float64(build.BlockDelaySecs))) - 1
if from < 0 {
from = 0
}

log.Infof("ALERTMANAGER: FROM: %d", from)

_, miners, err := al.getAddresses()
if err != nil {
al.alertMap[Name].err = err
Expand All @@ -435,13 +433,10 @@ func wdPostCheck(al *alerts) {

h := head

type partSent struct {
sent bool
parts int
}

msgCheck := make(map[address.Address]map[uint64]*partSent)
// Map[Miner Address]Map[DeadlineIdx][]Partitions
msgCheck := make(map[address.Address]map[uint64][]bool)

// Walk back all tipset from current height to from height and find all deadlines and their partitions
for h.Height() >= from {
for _, maddr := range miners {
deadlineInfo, err := al.api.StateMinerProvingDeadline(al.ctx, maddr, h.Key())
Expand All @@ -455,13 +450,11 @@ func wdPostCheck(al *alerts) {
return
}
if _, ok := msgCheck[maddr]; !ok {
msgCheck[maddr] = make(map[uint64]*partSent)
msgCheck[maddr] = make(map[uint64][]bool)
}
if _, ok := msgCheck[maddr][deadlineInfo.Index]; !ok {
msgCheck[maddr][deadlineInfo.Index] = &partSent{
sent: false,
parts: len(partitions),
}
ps := make([]bool, len(partitions))
msgCheck[maddr][deadlineInfo.Index] = ps
}
}
h, err = al.api.ChainGetTipSet(al.ctx, h.Parents())
Expand All @@ -471,12 +464,7 @@ func wdPostCheck(al *alerts) {
}
}

for maddr, deadlines := range msgCheck {
for deadlineIndex, ps := range deadlines {
log.Infof("ALERTMANAGER: Address: %s, DEADLINE: %d, Partitions: %d", maddr.String(), deadlineIndex, ps.parts)
}
}

// Get all wdPost tasks from DB between from and head
var wdDetails []struct {
Miner int64 `db:"sp_id"`
Deadline int64 `db:"deadline"`
Expand All @@ -498,6 +486,7 @@ func wdPostCheck(al *alerts) {
return
}

// For all tasks between from and head, match how many we posted successfully
for _, detail := range wdDetails {
addr, err := address.NewIDAddress(uint64(detail.Miner))
if err != nil {
Expand All @@ -508,8 +497,11 @@ func wdPostCheck(al *alerts) {
al.alertMap[Name].alertString += fmt.Sprintf("unknown WindowPost jobs for miner %s deadline %d partition %d found. ", addr.String(), detail.Deadline, detail.Partition)
continue
}
msgCheck[addr][uint64(detail.Deadline)].sent = true

// If entry for a partition is found we should mark it as processed
msgCheck[addr][uint64(detail.Deadline)][detail.Partition] = true

// Check if we skipped any sectors
var postOut miner.SubmitWindowedPoStParams
err = postOut.UnmarshalCBOR(bytes.NewReader(detail.Proof))
if err != nil {
Expand All @@ -529,10 +521,13 @@ func wdPostCheck(al *alerts) {
}
}

// Check if we missed any deadline/partitions
for maddr, deadlines := range msgCheck {
for deadlineIndex, ps := range deadlines {
if !ps.sent {
al.alertMap[Name].alertString += fmt.Sprintf("No WindowPost jobs found for miner %s deadline %d. ", maddr.String(), deadlineIndex)
for idx := range ps {
if !ps[idx] {
al.alertMap[Name].alertString += fmt.Sprintf("No WindowPost jobs found for miner %s deadline %d paritions %d. ", maddr.String(), deadlineIndex, idx)
}
}
}
}
Expand All @@ -547,17 +542,20 @@ func wnPostCheck(al *alerts) {
return
}

// Calculate from epoch for last AlertMangerInterval
from := head.Height() - abi.ChainEpoch(math.Ceil(AlertMangerInterval.Seconds()/float64(build.BlockDelaySecs))) - 1
if from < 0 {
from = 0
}

var wnDetails []struct {
Miner int64 `db:"sp_id"`
Block string `db:"mined_cid"`
Epoch abi.ChainEpoch `db:"epoch"`
Miner int64 `db:"sp_id"`
Block string `db:"mined_cid"`
Epoch abi.ChainEpoch `db:"epoch"`
Included bool `db:"included"`
}

// Get all DB entries where we won the election in last AlertMangerInterval
err = al.db.Select(al.ctx, &wnDetails, `
SELECT sp_id, mined_cid, epoch
FROM mining_tasks
Expand All @@ -568,75 +566,49 @@ func wnPostCheck(al *alerts) {
return
}

var count []int64
err = al.db.Select(al.ctx, &count, `
// Get count of all mining tasks in DB in last AlertMangerInterval
var count int64
err = al.db.QueryRow(al.ctx, `
SELECT COUNT(*)
FROM mining_tasks
WHERE epoch > $1;`, from)
WHERE epoch > $1;`, from).Scan(&count)
if err != nil {
al.alertMap[Name].err = xerrors.Errorf("getting winningPost count details from database: %w", err)
return
}

if count[0] == 0 {
// If we have no task created for any miner ID, this is a serious issue
if count == 0 {
al.alertMap[Name].alertString += "No winningPost tasks found in the last " + humanize.Time(time.Now().Add(-AlertMangerInterval))
return
}

// Calculate how many tasks should be in DB for AlertMangerInterval (epochs) as each epoch should have 1 task
epochs := int64(math.Ceil(AlertMangerInterval.Seconds() / float64(build.BlockDelaySecs)))
if (head.Height() - abi.ChainEpoch(epochs)) < 0 {
epochs = int64(head.Height())
}

if epochs != count[0]+1 && epochs != count[0]-1 && epochs != count[0] {
al.alertMap[Name].alertString += fmt.Sprintf("Expected %d WinningPost task and found %d in DB ", epochs, count[0])
}

if len(wnDetails) < 1 {
_, miners, err := al.getAddresses()
if err != nil {
al.alertMap[Name].err = err
return
}

to := wnDetails[len(wnDetails)-1].Epoch
epochs = epochs * int64(len(miners)) // Multiply epochs by number of miner IDs

epochMap := make(map[abi.ChainEpoch]string)

for head.Height() >= to {
epochMap[head.Height()] = head.String()
head, err = al.api.ChainGetTipSet(al.ctx, head.Parents())
if err != nil {
al.alertMap[Name].err = xerrors.Errorf("getting tipset: %w", err)
}
if head == nil {
al.alertMap[Name].err = xerrors.Errorf("tipset is nil")
return
}
if head.Height() == 0 {
break
}
if epochs != count+1 && epochs != count-1 && epochs != count {
al.alertMap[Name].alertString += fmt.Sprintf("Expected %d WinningPost task and found %d in DB ", epochs, count)
}

winMap := make(map[abi.ChainEpoch]struct {
won bool
cid string
})

for _, wn := range wnDetails {
if strings.Contains(epochMap[wn.Epoch], wn.Block) {
winMap[wn.Epoch] = struct {
won bool
cid string
}{won: true, cid: wn.Block}
continue
}
winMap[wn.Epoch] = struct {
won bool
cid string
}{won: false, cid: wn.Block}
if len(wnDetails) < 1 {
return
}

for epoch, st := range winMap {
if !st.won {
al.alertMap[Name].alertString += fmt.Sprintf("Epoch %d: does not contain our block %s", epoch, st.cid)
// Repost any block which we submitted but was not included in the chain
for _, wn := range wnDetails {
if !wn.Included {
al.alertMap[Name].alertString += fmt.Sprintf("Epoch %d: does not contain our block %s", wn.Epoch, wn.Block)
}
}
}