From 6cf7ef6e735a59368d522ab6ebd18d212ebbed06 Mon Sep 17 00:00:00 2001
From: LexLuthr <lexluthr@protocol.ai>
Date: Tue, 12 Sep 2023 12:12:20 +0400
Subject: [PATCH] index and announce

---
 indexprovider/wrapper.go               | 36 +++++++++---
 node/modules/directdeals.go            |  2 +-
 storagemarket/direct_deals_provider.go | 79 ++++++++++++++++++++++++--
 storagemarket/types/direct_data.go     |  3 +-
 4 files changed, 103 insertions(+), 17 deletions(-)

diff --git a/indexprovider/wrapper.go b/indexprovider/wrapper.go
index 7207e5d8e..efb740104 100644
--- a/indexprovider/wrapper.go
+++ b/indexprovider/wrapper.go
@@ -190,7 +190,7 @@ func (w *Wrapper) handleUpdates(ctx context.Context, sectorUpdates map[abi.Secto
 					FastRetrieval: sectorSealState == db.SealStateUnsealed,
 					VerifiedDeal:  deal.DealProposal.Proposal.VerifiedDeal,
 				}
-				announceCid, err := w.AnnounceBoostDealMetadata(ctx, md, propCid)
+				announceCid, err := w.AnnounceBoostDealMetadata(ctx, md, propCid.Bytes())
 				if err != nil {
 					log.Errorf("announcing deal %s to index provider: %w", deal.DealID, err)
 				} else {
@@ -282,25 +282,27 @@ func (w *Wrapper) Enabled() bool {
 // The advertisement published by this function covers 2 protocols:
 //
 // Bitswap:
-//     1. bitswap is completely disabled: in which case an advertisement is
+//
+//  1. bitswap is completely disabled: in which case an advertisement is
 //     published with http(or empty if http is disabled) extended providers
 //     that should wipe previous support on indexer side.
 //
-//     2. bitswap is enabled with public addresses: in which case publish an
+//  2. bitswap is enabled with public addresses: in which case publish an
 //     advertisement with extended providers records corresponding to the
 //     public addresses. Note, according the IPNI spec, the host ID will
 //     also be added to the extended providers for signing reasons with empty
 //     metadata making a total of 2 extended provider records.
 //
-//     3. bitswap with boostd address: in which case public an advertisement
+//  3. bitswap with boostd address: in which case public an advertisement
 //     with one extended provider record that just adds bitswap metadata.
 //
 // HTTP:
-//     1. http is completely disabled: in which case an advertisement is
+//
+//  1. http is completely disabled: in which case an advertisement is
 //     published with bitswap(or empty if bitswap is disabled) extended providers
 //     that should wipe previous support on indexer side
 //
-//     2. http is enabled: in which case an advertisement is published with
+//  2. http is enabled: in which case an advertisement is published with
 //     bitswap and http(or only http if bitswap is disabled) extended providers
 //     that should wipe previous support on indexer side
 //
@@ -632,10 +634,10 @@ func (w *Wrapper) AnnounceBoostDeal(ctx context.Context, deal *types.ProviderDea
 		FastRetrieval: deal.FastRetrieval,
 		VerifiedDeal:  deal.ClientDealProposal.Proposal.VerifiedDeal,
 	}
-	return w.AnnounceBoostDealMetadata(ctx, md, propCid)
+	return w.AnnounceBoostDealMetadata(ctx, md, propCid.Bytes())
 }
 
-func (w *Wrapper) AnnounceBoostDealMetadata(ctx context.Context, md metadata.GraphsyncFilecoinV1, propCid cid.Cid) (cid.Cid, error) {
+func (w *Wrapper) AnnounceBoostDealMetadata(ctx context.Context, md metadata.GraphsyncFilecoinV1, contextID []byte) (cid.Cid, error) {
 	if !w.enabled {
 		return cid.Undef, errors.New("cannot announce deal: index provider is disabled")
 	}
@@ -648,7 +650,7 @@ func (w *Wrapper) AnnounceBoostDealMetadata(ctx context.Context, md metadata.Gra
 
 	// Announce deal to network Indexer
 	fm := metadata.Default.New(&md)
-	annCid, err := w.prov.NotifyPut(ctx, nil, propCid.Bytes(), fm)
+	annCid, err := w.prov.NotifyPut(ctx, nil, contextID, fm)
 	if err != nil {
 		// Check if the error is because the deal was already advertised
 		// (we can safely ignore this error)
@@ -684,3 +686,19 @@ type basicDealInfo struct {
 	SectorID       abi.SectorID
 	DealProposal   storagemarket.ClientDealProposal
 }
+
+func (w *Wrapper) AnnounceBoostDirectDeal(ctx context.Context, entry *types.DirectDeal) (cid.Cid, error) {
+	// Filter out deals that should not be announced
+	if !entry.AnnounceToIPNI {
+		return cid.Undef, nil
+	}
+
+	contextID := []byte(entry.AllocationID.Key())
+
+	md := metadata.GraphsyncFilecoinV1{
+		PieceCID:      entry.PieceCID,
+		FastRetrieval: entry.KeepUnsealedCopy,
+		VerifiedDeal:  true,
+	}
+	return w.AnnounceBoostDealMetadata(ctx, md, contextID)
+}
diff --git a/node/modules/directdeals.go b/node/modules/directdeals.go
index 1a129e618..9bc8af167 100644
--- a/node/modules/directdeals.go
+++ b/node/modules/directdeals.go
@@ -60,7 +60,7 @@ func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc
 			RemoteCommp:             cfg.Dealmaking.RemoteCommp,
 		}
 
-		prov := storagemarket.NewDirectDealsProvider(ddpCfg, fullnodeApi, secb, commpc, directDealsDB, dl)
+		prov := storagemarket.NewDirectDealsProvider(ddpCfg, fullnodeApi, secb, commpc, directDealsDB, dl, piecedirectory, ip)
 		return prov, nil
 	}
 }
diff --git a/storagemarket/direct_deals_provider.go b/storagemarket/direct_deals_provider.go
index bb1c06abe..13d6ee4f3 100644
--- a/storagemarket/direct_deals_provider.go
+++ b/storagemarket/direct_deals_provider.go
@@ -12,10 +12,13 @@ import (
 	"github.com/davecgh/go-spew/spew"
 	"github.com/filecoin-project/boost/api"
 	"github.com/filecoin-project/boost/db"
+	"github.com/filecoin-project/boost/indexprovider"
+	"github.com/filecoin-project/boost/piecedirectory"
 	"github.com/filecoin-project/boost/storagemarket/logs"
 	"github.com/filecoin-project/boost/storagemarket/types"
 	smtypes "github.com/filecoin-project/boost/storagemarket/types"
 	"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
+	"github.com/filecoin-project/boostd-data/model"
 	"github.com/filecoin-project/go-address"
 	"github.com/filecoin-project/go-state-types/abi"
 	"github.com/filecoin-project/go-state-types/builtin/v12/miner"
@@ -52,9 +55,13 @@ type DirectDealsProvider struct {
 
 	runningLk sync.RWMutex
 	running   map[uuid.UUID]struct{}
+
+	piecedirectory *piecedirectory.PieceDirectory
+	ip             *indexprovider.Wrapper
 }
 
-func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdder types.PieceAdder, commpCalc smtypes.CommpCalculator, directDealsDB *db.DirectDataDB, dealLogger *logs.DealLogger) *DirectDealsProvider {
+func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdder types.PieceAdder, commpCalc smtypes.CommpCalculator, directDealsDB *db.DirectDataDB, dealLogger *logs.DealLogger,
+	piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper) *DirectDealsProvider {
 	return &DirectDealsProvider{
 		config:      cfg,
 		fullnodeApi: fullnodeApi,
@@ -66,8 +73,10 @@ func NewDirectDealsProvider(cfg DDPConfig, fullnodeApi v1api.FullNode, pieceAdde
 		//logsSqlDB: logsSqlDB,
 		//logsDB: logsDB,
 
-		dealLogger: dealLogger,
-		running:    make(map[uuid.UUID]struct{}),
+		dealLogger:     dealLogger,
+		running:        make(map[uuid.UUID]struct{}),
+		piecedirectory: piecedirectory,
+		ip:             ip,
 	}
 }
 
@@ -295,6 +304,8 @@ func (ddp *DirectDealsProvider) execDeal(ctx context.Context, entry *smtypes.Dir
 			}
 		}
 
+		entry.InboundFileSize = fstat.Size()
+
 		log.Infow("direct deal details", "filepath", entry.InboundFilePath, "supplied-piececid", entry.PieceCID, "calculated-piececid", generatedPieceInfo.PieceCID, "calculated-piecesize", generatedPieceInfo.Size, "os stat size", fstat.Size())
 
 		if !entry.PieceCID.Equals(generatedPieceInfo.PieceCID) {
@@ -399,9 +410,19 @@ func (ddp *DirectDealsProvider) execDeal(ctx context.Context, entry *smtypes.Dir
 		}
 	}
 
-	if entry.Checkpoint <= dealcheckpoints.AddedPiece {
-		// add index and announce
-		ddp.dealLogger.Infow(dealUuid, "index and announce")
+	// Index and announce the deal
+	if entry.Checkpoint < dealcheckpoints.IndexedAndAnnounced {
+		if err := ddp.indexAndAnnounce(ctx, entry); err != nil {
+			err.error = fmt.Errorf("failed to add index and announce deal: %w", err.error)
+			return err
+		}
+		if entry.AnnounceToIPNI {
+			ddp.dealLogger.Infow(entry.ID, "deal successfully indexed and announced")
+		} else {
+			ddp.dealLogger.Infow(entry.ID, "deal successfully indexed")
+		}
+	} else {
+		ddp.dealLogger.Infow(entry.ID, "deal has already been indexed and announced")
 	}
 
 	return nil
@@ -507,3 +528,49 @@ func (ddp *DirectDealsProvider) FailPausedDeal(ctx context.Context, id uuid.UUID
 
 	return nil
 }
+
+func (ddp *DirectDealsProvider) indexAndAnnounce(ctx context.Context, entry *smtypes.DirectDeal) *dealMakingError {
+	// add deal to piece metadata store
+	ddp.dealLogger.Infow(entry.ID, "about to add direct deal for piece in LID")
+	if err := ddp.piecedirectory.AddDealForPiece(ctx, entry.PieceCID, model.DealInfo{
+		DealUuid:    entry.ID.String(),
+		ChainDealID: abi.DealID(entry.AllocationID), // Convert the type to avoid migration as underlying types are same
+		MinerAddr:   entry.Provider,
+		SectorID:    entry.SectorID,
+		PieceOffset: entry.Offset,
+		PieceLength: entry.Length,
+		CarLength:   uint64(entry.InboundFileSize),
+	}); err != nil {
+		return &dealMakingError{
+			retry: types.DealRetryAuto,
+			error: fmt.Errorf("failed to add deal to piece metadata store: %w", err),
+		}
+	}
+	ddp.dealLogger.Infow(entry.ID, "direct deal successfully added to LID")
+
+	// if the index provider is enabled
+	if ddp.ip.Enabled() {
+		if entry.AnnounceToIPNI {
+			// announce to the network indexer but do not fail the deal if the announcement fails,
+			// just retry the next time boost restarts
+			annCid, err := ddp.ip.AnnounceBoostDirectDeal(ctx, entry)
+			if err != nil {
+				return &dealMakingError{
+					retry: types.DealRetryAuto,
+					error: fmt.Errorf("failed to announce deal to network indexer: %w", err),
+				}
+			}
+			ddp.dealLogger.Infow(entry.ID, "announced the direct deal to network indexer", "announcement-cid", annCid)
+		} else {
+			ddp.dealLogger.Infow(entry.ID, "didn't announce the direct deal as requested in the deal proposal")
+		}
+	} else {
+		ddp.dealLogger.Infow(entry.ID, "didn't announce the direct deal because network indexer is disabled")
+	}
+
+	if derr := ddp.updateCheckpoint(ctx, entry, dealcheckpoints.IndexedAndAnnounced); derr != nil {
+		return derr
+	}
+
+	return nil
+}
diff --git a/storagemarket/types/direct_data.go b/storagemarket/types/direct_data.go
index 25e010709..4631b51d8 100644
--- a/storagemarket/types/direct_data.go
+++ b/storagemarket/types/direct_data.go
@@ -9,7 +9,7 @@ import (
 	"github.com/filecoin-project/go-state-types/abi"
 	verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
 	"github.com/google/uuid"
-	cid "github.com/ipfs/go-cid"
+	"github.com/ipfs/go-cid"
 )
 
 // DirectDeal is the local state tracked for direct data onboard.
@@ -32,6 +32,7 @@ type DirectDeal struct {
 
 	// InboundCARPath is the file-path where the storage provider will persist the CAR file sent by the client.
 	InboundFilePath string
+	InboundFileSize int64
 
 	// sector packing info
 	SectorID abi.SectorNumber