Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions harmony/harmonydb/sql/20251011-ipni-fetch-tracking.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Track IPNI advertisement fetches to provide indexing status visibility
-- This table logs when advertisements are fetched by indexers

CREATE TABLE IF NOT EXISTS ipni_ad_fetches (
ad_cid TEXT NOT NULL,
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Index for efficient lookup by ad_cid and time-based queries
CREATE INDEX IF NOT EXISTS ipni_ad_fetches_ad_cid_time ON ipni_ad_fetches(ad_cid, fetched_at DESC);
18 changes: 18 additions & 0 deletions market/ipni/ipni-provider/ipni-provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,15 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Errorw("failed to write HTTP response", "err", err)
}

// Log advertisement fetch for indexing status tracking
go func() {
logCtx := context.Background()
_, err := p.db.Exec(logCtx, `INSERT INTO ipni_ad_fetches (ad_cid, fetched_at) VALUES ($1, NOW())`, b.String())
if err != nil {
log.Warnw("failed to log ad fetch", "ad_cid", b.String(), "err", err)
}
}()
return
case ipnisync.CidSchemaEntryChunk:
entry, err := p.sc.GetEntry(r.Context(), b)
Expand Down Expand Up @@ -463,6 +472,15 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Errorw("failed to write HTTP response", "err", err)
}

// Log advertisement fetch for indexing status tracking
go func() {
logCtx := context.Background()
_, err := p.db.Exec(logCtx, `INSERT INTO ipni_ad_fetches (ad_cid, fetched_at) VALUES ($1, NOW())`, b.String())
if err != nil {
log.Warnw("failed to log ad fetch", "ad_cid", b.String(), "err", err)
}
}()
return
}
}
Expand Down
131 changes: 131 additions & 0 deletions pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/pdp/contract"
"github.com/filecoin-project/curio/tasks/indexing"
"github.com/filecoin-project/curio/tasks/message"

types2 "github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -108,6 +109,9 @@ func Routes(r *chi.Mux, p *PDPService) {

r.Get(path.Join(PDPRoutePath, "/ping"), p.handlePing)

// GET /pdp/piece/{pieceCid}/status - Get indexing/IPNI status for a piece
r.Get(path.Join(PDPRoutePath, "/piece/{pieceCid}/status"), p.handleGetPieceStatus)

// Routes for piece storage and retrieval
// POST /pdp/piece
r.Post(path.Join(PDPRoutePath, "/piece"), p.handlePiecePost)
Expand Down Expand Up @@ -142,6 +146,133 @@ func (p *PDPService) handlePing(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// handleGetPieceStatus returns the indexing and IPNI status for a piece
func (p *PDPService) handleGetPieceStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Verify authorization
serviceLabel, err := p.AuthService(r)
if err != nil {
http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}

// Extract pieceCid from URL and convert to v1 for DB query
pieceCidStr := chi.URLParam(r, "pieceCid")
if pieceCidStr == "" {
http.Error(w, "Missing pieceCid in URL", http.StatusBadRequest)
return
}

// Convert to v1 format (database stores v1)
pieceCidV1, err := asPieceCIDv1(pieceCidStr)
if err != nil {
http.Error(w, "Invalid pieceCid format: "+err.Error(), http.StatusBadRequest)
return
}
pieceCidV1Str := pieceCidV1.String()

// Query status from database
var result struct {
PieceCID string `db:"piece_cid"`
PieceRawSize uint64 `db:"piece_raw_size"`
CreatedAt time.Time `db:"created_at"`
Indexed bool `db:"indexed"`
Advertised bool `db:"advertised"`
AdCID *string `db:"ad_cid"`
Retrieved bool `db:"retrieved"`
RetrievedAt *time.Time `db:"retrieved_at"`
Status string `db:"status"`
}

err = p.db.QueryRow(ctx, `
SELECT
pr.piece_cid,
pp.piece_raw_size,
pr.created_at,

-- Indexing status (true when CAR indexing completed and ready for/in IPNI)
(pr.needs_ipni OR pr.ipni_task_id IS NOT NULL OR i.ad_cid IS NOT NULL) as indexed,

-- Advertisement status
i.ad_cid IS NOT NULL as advertised,
i.ad_cid,

-- Fetch status
af.fetched_at IS NOT NULL as retrieved,
af.fetched_at as retrieved_at,

-- Determine overall status
CASE
WHEN af.fetched_at IS NOT NULL THEN 'retrieved'
WHEN i.ad_cid IS NOT NULL THEN 'announced'
WHEN pr.ipni_task_id IS NOT NULL THEN 'creating_ad'
WHEN pr.indexing_task_id IS NOT NULL THEN 'indexing'
ELSE 'pending'
END as status

FROM pdp_piecerefs pr
JOIN parked_piece_refs pprf ON pprf.ref_id = pr.piece_ref
JOIN parked_pieces pp ON pp.id = pprf.piece_id
LEFT JOIN ipni i ON i.piece_cid = pr.piece_cid
AND i.provider = (SELECT peer_id FROM ipni_peerid WHERE sp_id = $3)
LEFT JOIN ipni_ad_fetches af ON af.ad_cid = i.ad_cid
WHERE pr.piece_cid = $1 AND pr.service = $2
LIMIT 1
`, pieceCidV1Str, serviceLabel, indexing.PDP_SP_ID).Scan(
&result.PieceCID,
&result.PieceRawSize,
&result.CreatedAt,
&result.Indexed,
&result.Advertised,
&result.AdCID,
&result.Retrieved,
&result.RetrievedAt,
&result.Status,
)

if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
http.Error(w, "Piece not found or does not belong to service", http.StatusNotFound)
return
}
http.Error(w, "Failed to query piece status: "+err.Error(), http.StatusInternalServerError)
return
}

// Convert authoritative PieceCID back from v1 to v2 for external API
pieceCidV2, _, err := asPieceCIDv2(result.PieceCID, result.PieceRawSize)
if err != nil {
http.Error(w, "Failed to convert PieceCID to v2: "+err.Error(), http.StatusInternalServerError)
return
}

// Prepare response
response := struct {
PieceCID string `json:"pieceCid"`
Status string `json:"status"`
Indexed bool `json:"indexed"`
Advertised bool `json:"advertised"`
Retrieved bool `json:"retrieved"`
RetrievedAt *time.Time `json:"retrievedAt,omitempty"`
}{
PieceCID: pieceCidV2.String(),
Status: result.Status,
Indexed: result.Indexed,
Advertised: result.Advertised,
Retrieved: result.Retrieved,
RetrievedAt: result.RetrievedAt,
}

// Return JSON response
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(response)
if err != nil {
http.Error(w, "Failed to encode response: "+err.Error(), http.StatusInternalServerError)
return
}
}

// handleCreateDataSet handles the creation of a new data set
func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down
Loading