diff --git a/harmony/harmonydb/sql/20251011-ipni-fetch-tracking.sql b/harmony/harmonydb/sql/20251011-ipni-fetch-tracking.sql new file mode 100644 index 000000000..80629a4a5 --- /dev/null +++ b/harmony/harmonydb/sql/20251011-ipni-fetch-tracking.sql @@ -0,0 +1,10 @@ +-- Track IPNI advertisement fetches to provide indexing status visibility +-- This table logs when advertisements are fetched by indexers + +CREATE TABLE IF NOT EXISTS ipni_ad_fetches ( + ad_cid TEXT NOT NULL, + fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Index for efficient lookup by ad_cid and time-based queries +CREATE INDEX IF NOT EXISTS ipni_ad_fetches_ad_cid_time ON ipni_ad_fetches(ad_cid, fetched_at DESC); diff --git a/market/ipni/ipni-provider/ipni-provider.go b/market/ipni/ipni-provider/ipni-provider.go index 26789ff2d..bd9eb6fc4 100644 --- a/market/ipni/ipni-provider/ipni-provider.go +++ b/market/ipni/ipni-provider/ipni-provider.go @@ -408,6 +408,15 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) { if err != nil { log.Errorw("failed to write HTTP response", "err", err) } + + // Log advertisement fetch for indexing status tracking + go func() { + logCtx := context.Background() + _, err := p.db.Exec(logCtx, `INSERT INTO ipni_ad_fetches (ad_cid, fetched_at) VALUES ($1, NOW())`, b.String()) + if err != nil { + log.Warnw("failed to log ad fetch", "ad_cid", b.String(), "err", err) + } + }() return case ipnisync.CidSchemaEntryChunk: entry, err := p.sc.GetEntry(r.Context(), b) @@ -463,6 +472,15 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) { if err != nil { log.Errorw("failed to write HTTP response", "err", err) } + + // Log advertisement fetch for indexing status tracking + go func() { + logCtx := context.Background() + _, err := p.db.Exec(logCtx, `INSERT INTO ipni_ad_fetches (ad_cid, fetched_at) VALUES ($1, NOW())`, b.String()) + if err != nil { + log.Warnw("failed to log ad fetch", "ad_cid", b.String(), "err", err) + } + }() return } } diff --git a/pdp/handlers.go b/pdp/handlers.go index 4fe17430d..2a6f33c68 100644 --- a/pdp/handlers.go +++ b/pdp/handlers.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/lib/paths" "github.com/filecoin-project/curio/pdp/contract" + "github.com/filecoin-project/curio/tasks/indexing" "github.com/filecoin-project/curio/tasks/message" types2 "github.com/filecoin-project/lotus/chain/types" @@ -108,6 +109,9 @@ func Routes(r *chi.Mux, p *PDPService) { r.Get(path.Join(PDPRoutePath, "/ping"), p.handlePing) + // GET /pdp/piece/{pieceCid}/status - Get indexing/IPNI status for a piece + r.Get(path.Join(PDPRoutePath, "/piece/{pieceCid}/status"), p.handleGetPieceStatus) + // Routes for piece storage and retrieval // POST /pdp/piece r.Post(path.Join(PDPRoutePath, "/piece"), p.handlePiecePost) @@ -142,6 +146,133 @@ func (p *PDPService) handlePing(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +// handleGetPieceStatus returns the indexing and IPNI status for a piece +func (p *PDPService) handleGetPieceStatus(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Verify authorization + serviceLabel, err := p.AuthService(r) + if err != nil { + http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + // Extract pieceCid from URL and convert to v1 for DB query + pieceCidStr := chi.URLParam(r, "pieceCid") + if pieceCidStr == "" { + http.Error(w, "Missing pieceCid in URL", http.StatusBadRequest) + return + } + + // Convert to v1 format (database stores v1) + pieceCidV1, err := asPieceCIDv1(pieceCidStr) + if err != nil { + http.Error(w, "Invalid pieceCid format: "+err.Error(), http.StatusBadRequest) + return + } + pieceCidV1Str := pieceCidV1.String() + + // Query status from database + var result struct { + PieceCID string `db:"piece_cid"` + PieceRawSize uint64 `db:"piece_raw_size"` + CreatedAt time.Time `db:"created_at"` + Indexed bool `db:"indexed"` + Advertised bool `db:"advertised"` + AdCID *string `db:"ad_cid"` + Retrieved bool `db:"retrieved"` + RetrievedAt *time.Time `db:"retrieved_at"` + Status string `db:"status"` + } + + err = p.db.QueryRow(ctx, ` + SELECT + pr.piece_cid, + pp.piece_raw_size, + pr.created_at, + + -- Indexing status (true when CAR indexing completed and ready for/in IPNI) + (pr.needs_ipni OR pr.ipni_task_id IS NOT NULL OR i.ad_cid IS NOT NULL) as indexed, + + -- Advertisement status + i.ad_cid IS NOT NULL as advertised, + i.ad_cid, + + -- Fetch status + af.fetched_at IS NOT NULL as retrieved, + af.fetched_at as retrieved_at, + + -- Determine overall status + CASE + WHEN af.fetched_at IS NOT NULL THEN 'retrieved' + WHEN i.ad_cid IS NOT NULL THEN 'announced' + WHEN pr.ipni_task_id IS NOT NULL THEN 'creating_ad' + WHEN pr.indexing_task_id IS NOT NULL THEN 'indexing' + ELSE 'pending' + END as status + + FROM pdp_piecerefs pr + JOIN parked_piece_refs pprf ON pprf.ref_id = pr.piece_ref + JOIN parked_pieces pp ON pp.id = pprf.piece_id + LEFT JOIN ipni i ON i.piece_cid = pr.piece_cid + AND i.provider = (SELECT peer_id FROM ipni_peerid WHERE sp_id = $3) + LEFT JOIN ipni_ad_fetches af ON af.ad_cid = i.ad_cid + WHERE pr.piece_cid = $1 AND pr.service = $2 + LIMIT 1 + `, pieceCidV1Str, serviceLabel, indexing.PDP_SP_ID).Scan( + &result.PieceCID, + &result.PieceRawSize, + &result.CreatedAt, + &result.Indexed, + &result.Advertised, + &result.AdCID, + &result.Retrieved, + &result.RetrievedAt, + &result.Status, + ) + + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + http.Error(w, "Piece not found or does not belong to service", http.StatusNotFound) + return + } + http.Error(w, "Failed to query piece status: "+err.Error(), http.StatusInternalServerError) + return + } + + // Convert authoritative PieceCID back from v1 to v2 for external API + pieceCidV2, _, err := asPieceCIDv2(result.PieceCID, result.PieceRawSize) + if err != nil { + http.Error(w, "Failed to convert PieceCID to v2: "+err.Error(), http.StatusInternalServerError) + return + } + + // Prepare response + response := struct { + PieceCID string `json:"pieceCid"` + Status string `json:"status"` + Indexed bool `json:"indexed"` + Advertised bool `json:"advertised"` + Retrieved bool `json:"retrieved"` + RetrievedAt *time.Time `json:"retrievedAt,omitempty"` + }{ + PieceCID: pieceCidV2.String(), + Status: result.Status, + Indexed: result.Indexed, + Advertised: result.Advertised, + Retrieved: result.Retrieved, + RetrievedAt: result.RetrievedAt, + } + + // Return JSON response + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(response) + if err != nil { + http.Error(w, "Failed to encode response: "+err.Error(), http.StatusInternalServerError) + return + } +} + // handleCreateDataSet handles the creation of a new data set func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) { ctx := r.Context()