Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: index and announce direct deals #1696

Merged
merged 11 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab
return err
}

logger.Infow("added index", "took", time.Since(timeAddIndex), "sector", di.SectorID, "piececid", piececid, "chain-deal-id", di.ChainDealID, "uuid", di.DealUuid)
logger.Infow("added index", "took", time.Since(timeAddIndex), "sector", di.SectorID, "piececid", piececid, "chain-deal-id", di.ChainDealID, "uuid", di.DealUuid) // TODO: Update this for direct deals recovery
}

if !ignoreCommp { // commp over data reader
Expand Down
9 changes: 5 additions & 4 deletions cmd/booster-http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ func TestHttpGzipResponse(t *testing.T) {

// Crate pieceInfo
deal := model.DealInfo{
ChainDealID: 1234567,
SectorID: 0,
PieceOffset: 1233,
PieceLength: 123,
ChainDealID: 1234567,
SectorID: 0,
PieceOffset: 1233,
PieceLength: 123,
IsDirectDeal: false,
}
deals := []model.DealInfo{deal}

Expand Down
11 changes: 10 additions & 1 deletion cmd/booster-http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,12 @@ func (s *HttpServer) unsealedDeal(ctx context.Context, pieceCid cid.Cid, pieceDe
// Try to return an error message with as much useful information as possible
dealSectors := make([]string, 0, len(pieceDeals))
for _, di := range pieceDeals {
dealSectors = append(dealSectors, fmt.Sprintf("Deal %d: Sector %d", di.ChainDealID, di.SectorID))
if di.IsDirectDeal {
dealSectors = append(dealSectors, fmt.Sprintf("Allocation %d: Sector %d", di.ChainDealID, di.SectorID))
} else {
dealSectors = append(dealSectors, fmt.Sprintf("Deal %d: Sector %d", di.ChainDealID, di.SectorID))
}

}

if allErr == nil {
Expand All @@ -343,6 +348,10 @@ func (s *HttpServer) unsealedDeal(ctx context.Context, pieceCid cid.Cid, pieceDe
}

if len(pieceDeals) == 1 {
if pieceDeals[0].IsDirectDeal {
return nil, fmt.Errorf("checking unsealed status of allocation %d (sector %d) containing piece %s: %w",
pieceDeals[0].ChainDealID, pieceDeals[0].SectorID, pieceCid, allErr)
}
return nil, fmt.Errorf("checking unsealed status of deal %d (sector %d) containing piece %s: %w",
pieceDeals[0].ChainDealID, pieceDeals[0].SectorID, pieceCid, allErr)
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,14 @@ func migratePieceStore(ctx context.Context, logger *zap.SugaredLogger, bar *prog
}

dealInfo := model.DealInfo{
DealUuid: uuid,
IsLegacy: isLegacy,
ChainDealID: d.DealID,
MinerAddr: address.Address(maddr),
SectorID: d.SectorID,
PieceOffset: d.Offset,
PieceLength: d.Length,
DealUuid: uuid,
IsLegacy: isLegacy,
ChainDealID: d.DealID,
MinerAddr: address.Address(maddr),
SectorID: d.SectorID,
PieceOffset: d.Offset,
PieceLength: d.Length,
IsDirectDeal: false, // Explicitly set it to false as there should be no direct deals before this migration
}

err = store.AddDealForPiece(ctx, pcid, dealInfo)
Expand Down
1 change: 1 addition & 0 deletions db/directdeals.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func newDirectDealsAccessor(db *sql.DB, deal *types.DirectDeal) *directDealsAcce
"StartEpoch": &fielddef.FieldDef{F: &deal.StartEpoch},
"EndEpoch": &fielddef.FieldDef{F: &deal.EndEpoch},
"InboundFilePath": &fielddef.FieldDef{F: &deal.InboundFilePath},
"InboundFileSize": &fielddef.FieldDef{F: &deal.InboundFileSize},
"SectorID": &fielddef.FieldDef{F: &deal.SectorID},
"Offset": &fielddef.FieldDef{F: &deal.Offset},
"Length": &fielddef.FieldDef{F: &deal.Length},
Expand Down
1 change: 1 addition & 0 deletions db/migrations/20230816133342_direct_deals_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CREATE TABLE IF NOT EXISTS DirectDeals (
StartEpoch INT,
EndEpoch INT,
InboundFilePath TEXT,
InboundFileSize INT,
SectorID INT,
Offset INT,
Length INT,
Expand Down
6 changes: 4 additions & 2 deletions extern/boostd-data/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ type DealInfo struct {
PieceLength abi.PaddedPieceSize `json:"l"`
// The size of the CAR file without zero-padding.
// This value may be zero if the size is unknown.
// If we don't have CarLength, we have to iterate
// over all offsets, get the largest offset and
// sum it with length.
CarLength uint64 `json:"c"`

// If we don't have CarLength, we have to iterate over all offsets, get
// the largest offset and sum it with length.
IsDirectDeal bool `json:"d"`
}

// Metadata for PieceCid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cassmigrate

import (
"context"
"fmt"

"github.com/yugabyte/gocql"
"golang.org/x/sync/errgroup"
)

// ts20230913144459_dealsAddIsDirectDealColumn adds a new column isDirectDeal to pieceDeal table
func ts20230913144459_dealsAddIsDirectDealColumn(ctx context.Context, session *gocql.Session) error {
qry := `ALTER TABLE PieceDeal ADD IsDirectDeal BOOLEAN`
err := session.Query(qry).WithContext(ctx).Exec()

if err != nil {
return fmt.Errorf("creating new column IsDirectDeal: %w", err)
}

qry = `SELECT DealUuid from PieceDeal`
iter := session.Query(qry).WithContext(ctx).Iter()

dealIsDirect := make(map[string]bool)
var id string
for iter.Scan(&id) {
dealIsDirect[id] = false
}
if err := iter.Close(); err != nil {
return fmt.Errorf("getting deal miner address: %w", err)
}

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(64) // only run 64 go routines at a time
for did, isDD := range dealIsDirect {
i := did
dd := isDD
eg.Go(func() error {
qry1 := `UPDATE PieceDeal SET IsDirectDeal = ? WHERE DealUuid = ?`
err1 := session.Query(qry1, dd, i).WithContext(ctx).Exec()
if err1 != nil {
return fmt.Errorf("setting deal %s miner address to %t: %w", i, dd, err1)
}
return nil
})
}

return eg.Wait()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 4 additions & 2 deletions extern/boostd-data/yugabyte/cassmigrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package cassmigrate
import (
"context"
"fmt"
logging "github.com/ipfs/go-log/v2"
"github.com/yugabyte/gocql"
"reflect"
"runtime"
"sort"
"strings"

logging "github.com/ipfs/go-log/v2"
"github.com/yugabyte/gocql"
)

var log = logging.Logger("migrations")
Expand All @@ -17,6 +18,7 @@ type migrationFn func(ctx context.Context, session *gocql.Session) error

var migrations = []migrationFn{
ts20230824154306_dealsFixMinerAddr,
ts20230913144459_dealsAddIsDirectDealColumn,
}

// Migrate migrates the cassandra database
Expand Down
10 changes: 5 additions & 5 deletions extern/boostd-data/yugabyte/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo
}

qry := `INSERT INTO PieceDeal ` +
`(DealUuid, PieceCid, IsLegacy, ChainDealID, MinerAddr, SectorID, PieceOffset, PieceLength, CarLength) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ` +
`(DealUuid, PieceCid, IsLegacy, ChainDealID, MinerAddr, SectorID, PieceOffset, PieceLength, CarLength, IsDirectDeal) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` +
`IF NOT EXISTS`
err = s.session.Query(qry,
dealInfo.DealUuid, pieceCid.Bytes(), dealInfo.IsLegacy, dealInfo.ChainDealID, dealInfo.MinerAddr.String(),
dealInfo.SectorID, dealInfo.PieceOffset, dealInfo.PieceLength, dealInfo.CarLength).
dealInfo.SectorID, dealInfo.PieceOffset, dealInfo.PieceLength, dealInfo.CarLength, dealInfo.IsDirectDeal).
WithContext(ctx).Exec()
if err != nil {
return fmt.Errorf("inserting deal %s for piece %s", dealInfo.DealUuid, pieceCid)
Expand Down Expand Up @@ -198,15 +198,15 @@ func (s *Store) GetPieceDeals(ctx context.Context, pieceCid cid.Cid) ([]model.De

// Get deals for piece
qry := `SELECT DealUuid, IsLegacy, ChainDealID, MinerAddr, ` +
`SectorID, PieceOffset, PieceLength, CarLength ` +
`SectorID, PieceOffset, PieceLength, CarLength, IsDirectDeal ` +
`FROM PieceDeal WHERE PieceCid = ?`
iter := s.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Iter()

var deals []model.DealInfo
var deal model.DealInfo
var minerAddr string
for iter.Scan(&deal.DealUuid, &deal.IsLegacy, &deal.ChainDealID, &minerAddr,
&deal.SectorID, &deal.PieceOffset, &deal.PieceLength, &deal.CarLength) {
&deal.SectorID, &deal.PieceOffset, &deal.PieceLength, &deal.CarLength, &deal.IsDirectDeal) {

ma, err := address.NewFromString(minerAddr)
if err != nil {
Expand Down
Loading