Skip to content

Commit

Permalink
index and announce
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Sep 12, 2023
1 parent 6ae5bfc commit 6cf7ef6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 17 deletions.
36 changes: 27 additions & 9 deletions indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (w *Wrapper) handleUpdates(ctx context.Context, sectorUpdates map[abi.Secto
FastRetrieval: sectorSealState == db.SealStateUnsealed,
VerifiedDeal: deal.DealProposal.Proposal.VerifiedDeal,
}
announceCid, err := w.AnnounceBoostDealMetadata(ctx, md, propCid)
announceCid, err := w.AnnounceBoostDealMetadata(ctx, md, propCid.Bytes())
if err != nil {
log.Errorf("announcing deal %s to index provider: %w", deal.DealID, err)
} else {
Expand Down Expand Up @@ -282,25 +282,27 @@ func (w *Wrapper) Enabled() bool {
// The advertisement published by this function covers 2 protocols:
//
// Bitswap:
// 1. bitswap is completely disabled: in which case an advertisement is
//
// 1. bitswap is completely disabled: in which case an advertisement is
// published with http(or empty if http is disabled) extended providers
// that should wipe previous support on indexer side.
//
// 2. bitswap is enabled with public addresses: in which case publish an
// 2. bitswap is enabled with public addresses: in which case publish an
// advertisement with extended providers records corresponding to the
// public addresses. Note, according the IPNI spec, the host ID will
// also be added to the extended providers for signing reasons with empty
// metadata making a total of 2 extended provider records.
//
// 3. bitswap with boostd address: in which case public an advertisement
// 3. bitswap with boostd address: in which case public an advertisement
// with one extended provider record that just adds bitswap metadata.
//
// HTTP:
// 1. http is completely disabled: in which case an advertisement is
//
// 1. http is completely disabled: in which case an advertisement is
// published with bitswap(or empty if bitswap is disabled) extended providers
// that should wipe previous support on indexer side
//
// 2. http is enabled: in which case an advertisement is published with
// 2. http is enabled: in which case an advertisement is published with
// bitswap and http(or only http if bitswap is disabled) extended providers
// that should wipe previous support on indexer side
//
Expand Down Expand Up @@ -632,10 +634,10 @@ func (w *Wrapper) AnnounceBoostDeal(ctx context.Context, deal *types.ProviderDea
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.ClientDealProposal.Proposal.VerifiedDeal,
}
return w.AnnounceBoostDealMetadata(ctx, md, propCid)
return w.AnnounceBoostDealMetadata(ctx, md, propCid.Bytes())
}

func (w *Wrapper) AnnounceBoostDealMetadata(ctx context.Context, md metadata.GraphsyncFilecoinV1, propCid cid.Cid) (cid.Cid, error) {
func (w *Wrapper) AnnounceBoostDealMetadata(ctx context.Context, md metadata.GraphsyncFilecoinV1, contextID []byte) (cid.Cid, error) {
if !w.enabled {
return cid.Undef, errors.New("cannot announce deal: index provider is disabled")
}
Expand All @@ -648,7 +650,7 @@ func (w *Wrapper) AnnounceBoostDealMetadata(ctx context.Context, md metadata.Gra

// Announce deal to network Indexer
fm := metadata.Default.New(&md)
annCid, err := w.prov.NotifyPut(ctx, nil, propCid.Bytes(), fm)
annCid, err := w.prov.NotifyPut(ctx, nil, contextID, fm)
if err != nil {
// Check if the error is because the deal was already advertised
// (we can safely ignore this error)
Expand Down Expand Up @@ -684,3 +686,19 @@ type basicDealInfo struct {
SectorID abi.SectorID
DealProposal storagemarket.ClientDealProposal
}

func (w *Wrapper) AnnounceBoostDirectDeal(ctx context.Context, entry *types.DirectDeal) (cid.Cid, error) {
// Filter out deals that should not be announced
if !entry.AnnounceToIPNI {
return cid.Undef, nil
}

contextID := []byte(entry.AllocationID.Key())

md := metadata.GraphsyncFilecoinV1{
PieceCID: entry.PieceCID,
FastRetrieval: entry.KeepUnsealedCopy,
VerifiedDeal: true,
}
return w.AnnounceBoostDealMetadata(ctx, md, contextID)
}
2 changes: 1 addition & 1 deletion node/modules/directdeals.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc
RemoteCommp: cfg.Dealmaking.RemoteCommp,
}

prov := storagemarket.NewDirectDealsProvider(ddpCfg, fullnodeApi, secb, commpc, directDealsDB, dl)
prov := storagemarket.NewDirectDealsProvider(ddpCfg, fullnodeApi, secb, commpc, directDealsDB, dl, piecedirectory, ip)
return prov, nil
}
}
79 changes: 73 additions & 6 deletions storagemarket/direct_deals_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/filecoin-project/boost/api"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/indexprovider"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/storagemarket/logs"
"github.com/filecoin-project/boost/storagemarket/types"
smtypes "github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v12/miner"
Expand Down Expand Up @@ -52,9 +55,13 @@ type DirectDealsProvider struct {

runningLk sync.RWMutex
running map[uuid.UUID]struct{}

piecedirectory *piecedirectory.PieceDirectory
ip *indexprovider.Wrapper
}

func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdder types.PieceAdder, commpCalc smtypes.CommpCalculator, directDealsDB *db.DirectDataDB, dealLogger *logs.DealLogger) *DirectDealsProvider {
func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdder types.PieceAdder, commpCalc smtypes.CommpCalculator, directDealsDB *db.DirectDataDB, dealLogger *logs.DealLogger,
piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper) *DirectDealsProvider {
return &DirectDealsProvider{
config: cfg,
fullnodeApi: fullnodeApi,
Expand All @@ -66,8 +73,10 @@ func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdde
//logsSqlDB: logsSqlDB,
//logsDB: logsDB,

dealLogger: dealLogger,
running: make(map[uuid.UUID]struct{}),
dealLogger: dealLogger,
running: make(map[uuid.UUID]struct{}),
piecedirectory: piecedirectory,
ip: ip,
}
}

Expand Down Expand Up @@ -295,6 +304,8 @@ func (ddp *DirectDealsProvider) execDeal(ctx context.Context, entry *smtypes.Dir
}
}

entry.InboundFileSize = fstat.Size()

log.Infow("direct deal details", "filepath", entry.InboundFilePath, "supplied-piececid", entry.PieceCID, "calculated-piececid", generatedPieceInfo.PieceCID, "calculated-piecesize", generatedPieceInfo.Size, "os stat size", fstat.Size())

if !entry.PieceCID.Equals(generatedPieceInfo.PieceCID) {
Expand Down Expand Up @@ -399,9 +410,19 @@ func (ddp *DirectDealsProvider) execDeal(ctx context.Context, entry *smtypes.Dir
}
}

if entry.Checkpoint <= dealcheckpoints.AddedPiece {
// add index and announce
ddp.dealLogger.Infow(dealUuid, "index and announce")
// Index and announce the deal
if entry.Checkpoint < dealcheckpoints.IndexedAndAnnounced {
if err := ddp.indexAndAnnounce(ctx, entry); err != nil {
err.error = fmt.Errorf("failed to add index and announce deal: %w", err.error)
return err
}
if entry.AnnounceToIPNI {
ddp.dealLogger.Infow(entry.ID, "deal successfully indexed and announced")
} else {
ddp.dealLogger.Infow(entry.ID, "deal successfully indexed")
}
} else {
ddp.dealLogger.Infow(entry.ID, "deal has already been indexed and announced")
}

return nil
Expand Down Expand Up @@ -507,3 +528,49 @@ func (ddp *DirectDealsProvider) FailPausedDeal(ctx context.Context, id uuid.UUID

return nil
}

func (ddp *DirectDealsProvider) indexAndAnnounce(ctx context.Context, entry *smtypes.DirectDeal) *dealMakingError {
// add deal to piece metadata store
ddp.dealLogger.Infow(entry.ID, "about to add direct deal for piece in LID")
if err := ddp.piecedirectory.AddDealForPiece(ctx, entry.PieceCID, model.DealInfo{
DealUuid: entry.ID.String(),
ChainDealID: abi.DealID(entry.AllocationID), // Convert the type to avoid migration as underlying types are same
MinerAddr: entry.Provider,
SectorID: entry.SectorID,
PieceOffset: entry.Offset,
PieceLength: entry.Length,
CarLength: uint64(entry.InboundFileSize),
}); err != nil {
return &dealMakingError{
retry: types.DealRetryAuto,
error: fmt.Errorf("failed to add deal to piece metadata store: %w", err),
}
}
ddp.dealLogger.Infow(entry.ID, "direct deal successfully added to LID")

// if the index provider is enabled
if ddp.ip.Enabled() {
if entry.AnnounceToIPNI {
// announce to the network indexer but do not fail the deal if the announcement fails,
// just retry the next time boost restarts
annCid, err := ddp.ip.AnnounceBoostDirectDeal(ctx, entry)
if err != nil {
return &dealMakingError{
retry: types.DealRetryAuto,
error: fmt.Errorf("failed to announce deal to network indexer: %w", err),
}
}
ddp.dealLogger.Infow(entry.ID, "announced the direct deal to network indexer", "announcement-cid", annCid)
} else {
ddp.dealLogger.Infow(entry.ID, "didn't announce the direct deal as requested in the deal proposal")
}
} else {
ddp.dealLogger.Infow(entry.ID, "didn't announce the direct deal because network indexer is disabled")
}

if derr := ddp.updateCheckpoint(ctx, entry, dealcheckpoints.IndexedAndAnnounced); derr != nil {
return derr
}

return nil
}
3 changes: 2 additions & 1 deletion storagemarket/types/direct_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/google/uuid"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
)

// DirectDeal is the local state tracked for direct data onboard.
Expand All @@ -32,6 +32,7 @@ type DirectDeal struct {

// InboundCARPath is the file-path where the storage provider will persist the CAR file sent by the client.
InboundFilePath string
InboundFileSize int64

// sector packing info
SectorID abi.SectorNumber
Expand Down

0 comments on commit 6cf7ef6

Please sign in to comment.