Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 123 additions & 71 deletions market/indexstore/indexstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"embed"
"errors"
"fmt"
"math"
"math/rand"
"sort"
"strconv"
Expand All @@ -18,6 +19,8 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

commcid "github.com/filecoin-project/go-fil-commcid"

"github.com/filecoin-project/curio/deps/config"
)

Expand Down Expand Up @@ -348,10 +351,10 @@ func (i *IndexStore) RemoveIndexes(ctx context.Context, pieceCidv2 cid.Cid) erro
return nil
}

// PieceInfo contains PieceCidV2 and BlockSize
// PieceInfo contains PieceCid and BlockSize. PieceCid can be either v1 or v2.
type PieceInfo struct {
PieceCidV2 cid.Cid
BlockSize uint64
PieceCid cid.Cid
BlockSize uint64
}

// PiecesContainingMultihash gets all pieces that contain a multihash along with their BlockSize
Expand All @@ -368,8 +371,8 @@ func (i *IndexStore) PiecesContainingMultihash(ctx context.Context, m multihash.
return nil, fmt.Errorf("parsing piece cid: %w", err)
}
pieces = append(pieces, PieceInfo{
PieceCidV2: pcid,
BlockSize: blockSize,
PieceCid: pcid,
BlockSize: blockSize,
})
}
if err := iter.Close(); err != nil {
Expand All @@ -396,21 +399,41 @@ func (i *IndexStore) GetOffset(ctx context.Context, pieceCidv2 cid.Cid, hash mul
}

func (i *IndexStore) GetPieceHashRange(ctx context.Context, piecev2 cid.Cid, start multihash.Multihash, num int64) ([]multihash.Multihash, error) {
qry := "SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash >= ? ORDER BY PayloadMultihash ASC LIMIT ?"
iter := i.session.Query(qry, piecev2.Bytes(), []byte(start), num).WithContext(ctx).Iter()

var hashes []multihash.Multihash
var r []byte
for iter.Scan(&r) {
m := multihash.Multihash(r)
hashes = append(hashes, m)
getHashes := func(pieceCid cid.Cid, start multihash.Multihash, num int64) ([]multihash.Multihash, error) {
qry := "SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash >= ? ORDER BY PayloadMultihash ASC LIMIT ?"
iter := i.session.Query(qry, pieceCid.Bytes(), []byte(start), num).WithContext(ctx).Iter()

var hashes []multihash.Multihash
var r []byte
for iter.Scan(&r) {
m := multihash.Multihash(r)
hashes = append(hashes, m)

// Allocate new r, preallocating the typical size of a multihash (36 bytes)
r = make([]byte, 0, 36)
}
if err := iter.Close(); err != nil {
return nil, xerrors.Errorf("iterating piece hash range (P:0x%02x, H:0x%02x, n:%d): %w", pieceCid.Bytes(), []byte(start), num, err)
}
return hashes, nil
}

// Allocate new r, preallocating the typical size of a multihash (36 bytes)
r = make([]byte, 0, 36)
hashes, err := getHashes(piecev2, start, num)
if err != nil {
return nil, err
}
if err := iter.Close(); err != nil {
return nil, xerrors.Errorf("iterating piece hash range (P:0x%02x, H:0x%02x, n:%d): %w", piecev2.Bytes(), []byte(start), num, err)

if len(hashes) == 0 {
pcid1, _, err := commcid.PieceCidV1FromV2(piecev2)
if err != nil {
return nil, xerrors.Errorf("getting piece cid v1 from v2: %w", err)
}
hashes, err = getHashes(pcid1, start, num)
if err != nil {
return nil, err
}
}

if len(hashes) != int(num) {
return nil, xerrors.Errorf("expected %d hashes, got %d (possibly missing indexes)", num, len(hashes))
}
Expand Down Expand Up @@ -602,71 +625,100 @@ func (i *IndexStore) UpdatePieceCidV1ToV2(ctx context.Context, pieceCidV1 cid.Ci
p1 := pieceCidV1.Bytes()
p2 := pieceCidV2.Bytes()

// First, select all PayloadMultihash for the given PieceCid from PieceBlockOffsetSize
selectQry := `SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ?`
iter := i.session.Query(selectQry, p1).WithContext(ctx).Iter()

var payloadMultihashBytes []byte
var payloadMultihashes [][]byte
for iter.Scan(&payloadMultihashBytes) {
// Copy the bytes since the slice will be overwritten
mhCopy := make([]byte, len(payloadMultihashBytes))
copy(mhCopy, payloadMultihashBytes)
payloadMultihashes = append(payloadMultihashes, mhCopy)
}
if err := iter.Close(); err != nil {
return xerrors.Errorf("scanning PayloadMultihash for piece %s: %w", pieceCidV1.String(), err)
batchLimit := i.settings.InsertBatchSize
if batchLimit <= 0 {
batchLimit = 15000
}

// Prepare batch replace for PayloadToPieces
updatePiecesQry := `UPDATE PayloadToPieces SET PieceCid = ? WHERE PayloadMultihash = ? AND PieceCid = ?`
batch := i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
batchSize := i.settings.InsertBatchSize
pageSize := int(math.Floor(float64(batchLimit) / 2))

for idx, payloadMH := range payloadMultihashes {
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: updatePiecesQry,
Args: []interface{}{p2, payloadMH, p1},
Idempotent: true,
})

if len(batch.Entries) >= batchSize || idx == len(payloadMultihashes)-1 {
if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil {
return xerrors.Errorf("executing batch replace for PayloadToPieces for piece %s: %w", pieceCidV1, err)
}
batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
flush := func(batch *gocql.Batch) error {
if len(batch.Entries) == 0 {
return nil
}
}

if len(batch.Entries) >= 0 {
if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil {
return xerrors.Errorf("executing batch replace for PayloadToPieces for piece %s: %w", pieceCidV1, err)
if err := i.executeBatchWithRetry(ctx, batch, pieceCidV2); err != nil {
return xerrors.Errorf("executing batch for updating index from piece %s to %s: %w", pieceCidV1.String(), pieceCidV2.String(), err)
}
return nil
}

// Prepare batch replace for PieceBlockOffsetSize
updatePiecesQry = `UPDATE PieceBlockOffsetSize SET PieceCid = ? WHERE PayloadMultihash = ? AND PieceCid = ?`
batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
batchSize = i.settings.InsertBatchSize

for idx, payloadMH := range payloadMultihashes {
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: updatePiecesQry,
Args: []interface{}{p2, payloadMH, p1},
Idempotent: true,
})

if len(batch.Entries) >= batchSize || idx == len(payloadMultihashes)-1 {
if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil {
return xerrors.Errorf("executing batch replace for PieceBlockOffsetSize for piece %s: %w", pieceCidV1, err)
// -------- Pass 1: PayloadToPieces --------
{
iter := i.session.Query(`SELECT PayloadMultihash, BlockSize FROM PayloadToPieces WHERE PieceCid = ?`, p1).WithContext(ctx).PageSize(pageSize).Iter()

batch := i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) // Batches must be logged for consistency
var mh []byte
var bs int64
for iter.Scan(&mh, &bs) {
mhCopy := make([]byte, len(mh))
copy(mhCopy, mh)

// INSERT new mapping
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid, BlockSize) VALUES (?, ?, ?)`,
Args: []any{mhCopy, p2, bs},
Idempotent: true,
})
// DELETE old mapping
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: `DELETE FROM PayloadToPieces WHERE PayloadMultihash = ? AND PieceCid = ?`,
Args: []any{mhCopy, p1},
Idempotent: true,
})

if len(batch.Entries) >= batchLimit {
if err := flush(batch); err != nil {
_ = iter.Close()
return err
}
batch = i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
}
batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
}
if err := iter.Close(); err != nil {
return xerrors.Errorf("scan PayloadToPieces for piece %s: %w", pieceCidV1, err)
}
if err := flush(batch); err != nil {
return err
}
}

if len(batch.Entries) >= 0 {
if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil {
return xerrors.Errorf("executing batch replace for PieceBlockOffsetSize for piece %s: %w", pieceCidV1, err)
// -------- Pass 2: PieceBlockOffsetSize --------
{
iter := i.session.Query(`SELECT PayloadMultihash, BlockOffset FROM PieceBlockOffsetSize WHERE PieceCid = ?`, p1).WithContext(ctx).PageSize(pageSize).Iter()

batch := i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
var mh []byte
var off int64
for iter.Scan(&mh, &off) {
mhCopy := make([]byte, len(mh))
copy(mhCopy, mh)

// INSERT new mapping
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset) VALUES (?, ?, ?)`,
Args: []any{p2, mhCopy, off},
Idempotent: true,
})
// DELETE old mapping
batch.Entries = append(batch.Entries, gocql.BatchEntry{
Stmt: `DELETE FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash = ?`,
Args: []any{p1, mhCopy},
Idempotent: true,
})

if len(batch.Entries) >= batchLimit {
if err := flush(batch); err != nil {
_ = iter.Close()
return err
}
batch = i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
}
}
if err := iter.Close(); err != nil {
return xerrors.Errorf("scan PieceBlockOffsetSize for piece %s: %w", pieceCidV1, err)
}
if err := flush(batch); err != nil {
return err
}
}

Expand Down
14 changes: 11 additions & 3 deletions market/indexstore/indexstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestNewIndexStore(t *testing.T) {
// Add index to the store
var eg errgroup.Group
eg.Go(func() error {
serr := idxStore.AddIndex(ctx, pcid2, recs)
serr := idxStore.AddIndex(ctx, pcid1, recs)
return serr
})

Expand Down Expand Up @@ -132,10 +132,18 @@ func TestNewIndexStore(t *testing.T) {
pcids, err := idxStore.PiecesContainingMultihash(ctx, m)
require.NoError(t, err)
require.Len(t, pcids, 1)
require.Equal(t, pcids[0].PieceCidV2.String(), pcid2.String())
require.Equal(t, pcids[0].PieceCid.String(), pcid1.String())

// Migrate V1 to V2
err = idxStore.UpdatePieceCidV1ToV2(ctx, pcid1, pcid2)
require.NoError(t, err)
pcids, err = idxStore.PiecesContainingMultihash(ctx, m)
require.NoError(t, err)
require.Len(t, pcids, 1)
require.Equal(t, pcids[0].PieceCid.String(), pcid2.String())

// Remove all indexes from the store
err = idxStore.RemoveIndexes(ctx, pcids[0].PieceCidV2)
err = idxStore.RemoveIndexes(ctx, pcids[0].PieceCid)
require.NoError(t, err)

err = idxStore.session.Query("SELECT * FROM piece_by_aggregate").Exec()
Expand Down
38 changes: 38 additions & 0 deletions market/ipni/chunker/serve-chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -177,6 +178,43 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated
return nil, xerrors.Errorf("parsing piece CID: %w", err)
}

// Convert to pcid2 if needed
yes := commcidv2.IsPieceCidV2(pieceCidv2)
if !yes {
var rawSize int64
var singlePiece bool
err := p.db.QueryRow(ctx, `WITH meta AS (
SELECT piece_size
FROM market_piece_metadata
WHERE piece_cid = $1
),
exact AS (
SELECT COUNT(*) AS n, MIN(piece_size) AS piece_size
FROM meta
),
raw AS (
SELECT MAX(mpd.raw_size) AS raw_size
FROM market_piece_deal mpd
WHERE mpd.piece_cid = $1
AND mpd.piece_length = (SELECT piece_size FROM exact)
AND (SELECT n FROM exact) = 1
)
SELECT
COALESCE((SELECT raw_size FROM raw), 0) AS raw_size,
((SELECT n FROM exact) = 1) AS has_single_metadata;`, pieceCidv2.String()).Scan(&rawSize, &singlePiece)
if err != nil {
return nil, fmt.Errorf("failed to get piece metadata: %w", err)
}
if !singlePiece {
return nil, fmt.Errorf("more than 1 piece metadata found for piece cid %s, please use piece cid v2", pieceCidv2.String())
}
pcid2, err := commcidv2.PieceCidV2FromV1(pieceCidv2, uint64(rawSize))
if err != nil {
return nil, fmt.Errorf("failed to convert piece cid v1 to v2: %w", err)
}
pieceCidv2 = pcid2
}

if leave, ok := p.noSkipCache.Get(pieceCidv2); !ok || time.Now().After(leave) {
skip, err := p.checkIsEntrySkip(ctx, block)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions market/retrieval/remoteblockstore/remoteblockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block,
var merr error
for _, piece := range pieces {
data, err := func() ([]byte, error) {
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2, true)
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCid, true)
if err != nil {
return nil, fmt.Errorf("getting piece reader: %w", err)
}
Expand All @@ -124,19 +124,19 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block,
}(reader)

// Get the offset of the block within the piece (CAR file)
offset, err := ro.idxApi.GetOffset(ctx, piece.PieceCidV2, c.Hash())
offset, err := ro.idxApi.GetOffset(ctx, piece.PieceCid, c.Hash()) // This can be pieceCidV2 or pieceCidV1, but we don't care because we are feeding back the db output
if err != nil {
return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, piece.PieceCidV2, err)
return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, piece.PieceCid, err)
}

// Seek to the section offset
readerAt := io.NewSectionReader(reader, int64(offset), int64(piece.BlockSize+MaxCarBlockPrefixSize))
readCid, data, err := util.ReadNode(bufio.NewReader(readerAt))
if err != nil {
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, piece.PieceCidV2, err)
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, piece.PieceCid, err)
}
if !bytes.Equal(readCid.Hash(), c.Hash()) {
return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, piece.PieceCidV2, c)
return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, piece.PieceCid, c)
}
return data, nil
}()
Expand Down
9 changes: 7 additions & 2 deletions tasks/gc/task_cleanup_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (p *PieceCleanupTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
}

if dropIndex {
err = dropIndexes(ctx, p.indexStore, pcid2)
err = dropIndexes(ctx, p.indexStore, pcid2, pi.PieceCIDV1)
if err != nil {
return false, xerrors.Errorf("failed to drop indexes for piece %s: %w", pcid2, err)
}
Expand Down Expand Up @@ -392,11 +392,16 @@ func (p *PieceCleanupTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return true, nil
}

func dropIndexes(ctx context.Context, indexStore *indexstore.IndexStore, pieceCid cid.Cid) error {
func dropIndexes(ctx context.Context, indexStore *indexstore.IndexStore, pieceCid, pieceCid2 cid.Cid) error {
err := indexStore.RemoveIndexes(ctx, pieceCid)
if err != nil {
return xerrors.Errorf("failed to remove indexes for piece %s: %w", pieceCid, err)
}

err = indexStore.RemoveIndexes(ctx, pieceCid2)
if err != nil {
return xerrors.Errorf("failed to remove indexes for piece %s: %w", pieceCid, err)
}
return nil
}

Expand Down
Loading