Skip to content

Commit

Permalink
feat: data segment indexing (#1739)
Browse files Browse the repository at this point in the history
feat: support for podsi indexing

---------

Co-authored-by: Will <[email protected]>
Co-authored-by: Jacob Heun <[email protected]>
Co-authored-by: dirkmc <[email protected]>
Co-authored-by: Anton Evangelatov <[email protected]>
Co-authored-by: Jacob Heun <[email protected]>
Co-authored-by: Rod Vagg <[email protected]>
Co-authored-by: Łukasz Magiera <[email protected]>
Co-authored-by: Łukasz Magiera <[email protected]>
Co-authored-by: Hannah Howard <[email protected]>
Co-authored-by: gammazero <[email protected]>
Co-authored-by: Adin Schmahmann <[email protected]>
Co-authored-by: Masih H. Derkani <[email protected]>
Co-authored-by: Ivan Schasny <[email protected]>
Co-authored-by: Ivan Schasny <[email protected]>
  • Loading branch information
15 people authored Dec 1, 2023
1 parent e9d18ac commit 37e4d80
Show file tree
Hide file tree
Showing 31 changed files with 1,378 additions and 166 deletions.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ workflows:
suite: all
target: "`go list ./... | grep -v boost/itests | grep -v cmd/booster-http | grep -v cmd/booster-bitswap`"

- test:
name: test-itest-data-segment-index
suite: itest-data-segment-index
target: "./itests/data_segment_index_retrieval_test.go"

- test:
name: test-itest-ipni
suite: itest-ipni
Expand Down
81 changes: 81 additions & 0 deletions car/multi_reader_at.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package car

import (
"io"
"sort"
)

func NewMultiReaderAt(parts ...ReaderAtSize) io.ReaderAt {
m := &multiReaderAt{
parts: make([]readerAtOffset, 0, len(parts)),
}
var off int64
for _, p := range parts {
rao := readerAtOffset{off, p}
m.parts = append(m.parts, rao)
off += rao.Size()
}
m.size = off
return m
}

type ReaderAtSize interface {
io.ReaderAt
Size() int64
}

type readerAtOffset struct {
off int64
ReaderAtSize
}

type multiReaderAt struct {
parts []readerAtOffset
size int64
}

func (m *multiReaderAt) Size() int64 {
return m.size
}

func (m *multiReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
wantN := len(p)

// Skip past the requested offset.
skipParts := sort.Search(len(m.parts), func(i int) bool {
// This function returns whether parts[i] will
// contribute any bytes to our output.
part := m.parts[i]
return part.off+part.Size() > off
})
parts := m.parts[skipParts:]

// How far to skip in the first part.
needSkip := off
if len(parts) > 0 {
needSkip -= parts[0].off
}

for len(parts) > 0 && len(p) > 0 {
readP := p
partSize := parts[0].Size()
if int64(len(readP)) > partSize-needSkip {
readP = readP[:partSize-needSkip]
}
pn, err0 := parts[0].ReadAt(readP, needSkip)
if err0 != nil {
return n, err0
}
n += pn
p = p[pn:]
if int64(pn)+needSkip == partSize {
parts = parts[1:]
}
needSkip = 0
}

if n != wantN {
err = io.ErrUnexpectedEOF
}
return
}
101 changes: 101 additions & 0 deletions car/multi_reader_at_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package car

import (
"bytes"
"crypto/rand"
"fmt"
"io"
mrand "math/rand"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestMultiReaderAt(t *testing.T) {
req := require.New(t)

sourceData := make([]byte, 4<<20)
_, err := rand.Read(sourceData)
req.NoError(err)

testRead := func(t *testing.T, mra io.ReaderAt, readLen int, pos int) {
// t.Logf("testRead() readLen=%d pos=%d", readLen, pos)
req := require.New(t)
readData := make([]byte, readLen)
n, err := mra.ReadAt(readData, int64(pos))
req.NoError(err)
req.Equal(readLen, n)
req.True(bytes.Equal(sourceData[pos:pos+readLen], readData))
}

for _, testCase := range [][]int{
{1},
{8},
{10},
{1000},
{1024},
{2000},
{1 << 20},
{10, 10},
{1 << 20, 1 << 20},
{10, 10, 10},
{1 << 20, 1 << 20, 1 << 20},
{1, 1, 1, 1, 1},
{8, 1, 8, 1, 8},
{1000, 8, 10, 1000},
{1000, 2000, 2000, 1000},
{1000, 2000, 2000, 8, 1000},
{8, 2000, 1024, 1 << 20, 1000},
} {
var sb strings.Builder
for ii, sz := range testCase {
if ii > 0 {
sb.WriteString("_")
}
sb.WriteString(fmt.Sprintf("%d", sz))
}

t.Run(sb.String(), func(t *testing.T) {
testLen := 0
ra := make([]ReaderAtSize, len(testCase))
for ii, sz := range testCase {
ra[ii] = bytes.NewReader(sourceData[testLen : testLen+sz])
testLen += sz
}
mra := NewMultiReaderAt(ra...)
// read all
testRead(t, mra, testLen, 0)
// read at random positions
for ii := 0; ii < 100; ii++ {
pos := mrand.Intn(testLen)
readLen := mrand.Intn(testLen - pos)
testRead(t, mra, readLen, pos)
}
// read blocks
off := 0
for _, sz := range testCase {
testRead(t, mra, sz, off)
off += sz
}
// read just outsize of blocks
off = 0
for ii, sz := range testCase {
pos := off
rd := sz
if ii > 0 {
rd++
off--
}
if off < testLen {
rd++
}
if rd > testLen-pos {
rd = testLen - pos
}
testRead(t, mra, rd, pos)
off += sz
}
})
}
}
35 changes: 6 additions & 29 deletions cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/boost/extern/boostd-data/model"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
carv2 "github.com/ipld/go-car/v2"
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -467,7 +467,7 @@ func (dr *DisasterRecovery) CompleteSector(s abi.SectorNumber) error {
}

// safeUnsealSector tries to return a reader to an unsealed sector or times out
func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi.UnpaddedPieceSize, piecesize abi.PaddedPieceSize) (io.ReadCloser, bool, error) {
func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi.UnpaddedPieceSize, piecesize abi.PaddedPieceSize) (mount.Reader, bool, error) {
mid, _ := address.IDFromAddress(maddr)

sid := abi.SectorID{
Expand All @@ -480,7 +480,7 @@ func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi
logger.Errorw("storage find sector", "err", err)
}

var reader io.ReadCloser
var reader mount.Reader
var isUnsealed bool

done := make(chan struct{})
Expand Down Expand Up @@ -513,7 +513,7 @@ func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi
logger.Debugw("sa.IsUnsealed return true", "sector", sectorid)

go func() {
reader, err = sa.UnsealSector(ctx, sectorid, offset, piecesize.Unpadded())
reader, err = sa.UnsealSectorAt(ctx, sectorid, offset, piecesize.Unpadded())
if err != nil {
logger.Errorw("sa.UnsealSector return error", "sector", sectorid, "err", err)
return
Expand Down Expand Up @@ -580,25 +580,12 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab
if err != nil {
return err
}
defer reader.Close()
if !isUnsealed {
return fmt.Errorf("sector %d is not unsealed", sid)
}

dr.Sectors[sid].Deals[cdi].IsUnsealed = true

readerAt := reader.(Reader)

opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)}
rr, err := carv2.NewReader(readerAt, opts...)
if err != nil {
return err
}

drr, err := rr.DataReader()
if err != nil {
return err
}

dr.Sectors[sid].Deals[cdi].GotDataReader = true

if !ignoreLID { // populate LID
Expand Down Expand Up @@ -663,7 +650,7 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab

if !ignoreCommp { // commp over data reader
w := &writer.Writer{}
_, err = io.CopyBuffer(w, drr, make([]byte, writer.CommPBuf))
_, err = io.CopyBuffer(w, reader, make([]byte, writer.CommPBuf))
if err != nil {
return fmt.Errorf("copy into commp writer: %w", err)
}
Expand All @@ -676,9 +663,6 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab
encoder := cidenc.Encoder{Base: multibase.MustNewEncoder(multibase.Base32)}
_ = encoder

//fmt.Println("CommP CID: ", encoder.Encode(commp.PieceCID))
//fmt.Println("Piece size: ", types.NewInt(uint64(commp.PieceSize.Unpadded().Padded())))

if !commp.PieceCID.Equals(piececid) {
return fmt.Errorf("calculated commp doesnt match on-chain data, expected %s, got %s", piececid, commp.PieceCID)
}
Expand Down Expand Up @@ -782,13 +766,6 @@ func getActorAddress(ctx context.Context, cctx *cli.Context) (maddr address.Addr
return maddr, nil
}

type Reader interface {
io.Closer
io.Reader
io.ReaderAt
io.Seeker
}

func createLogger(logPath string) (*zap.SugaredLogger, error) {
logCfg := zap.NewDevelopmentConfig()
logCfg.OutputPaths = []string{"stdout", logPath}
Expand Down
4 changes: 2 additions & 2 deletions cmd/booster-http/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestE2E(t *testing.T) {
framework.SetLogLevel()

t.Log("Starting boost and miner")
boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true))
req.NoError(boostAndMiner.Start(framework.WithMaxStagingDealsBytes(40000000)))
boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10485760))
req.NoError(boostAndMiner.Start())
defer boostAndMiner.Stop()

req.NoError(boostAndMiner.AddClientProviderBalance(abi.NewTokenAmount(1e15)))
Expand Down
4 changes: 2 additions & 2 deletions cmd/booster-http/trustless_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestTrustlessGateway(t *testing.T) {
kit.QuietMiningLogs()
framework.SetLogLevel()

boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true))
req.NoError(boostAndMiner.Start(framework.WithMaxStagingDealsBytes(40000000)))
boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10485760))
req.NoError(boostAndMiner.Start())
defer boostAndMiner.Stop()

req.NoError(boostAndMiner.AddClientProviderBalance(abi.NewTokenAmount(1e15)))
Expand Down
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ require (
github.com/filecoin-project/go-commp-utils v0.1.4
github.com/filecoin-project/go-data-transfer v1.15.4-boost
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0
github.com/filecoin-project/go-fil-markets v1.28.3
github.com/filecoin-project/go-jsonrpc v0.3.1
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4
Expand Down Expand Up @@ -50,7 +51,7 @@ require (
github.com/ipfs/go-ipfs-files v0.3.0 // indirect
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-ipld-legacy v0.2.1
github.com/ipfs/go-libipfs v0.7.0 // indirect
github.com/ipfs/go-libipfs v0.7.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1
Expand Down Expand Up @@ -201,7 +202,6 @@ require (
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-ds-badger2 v0.1.3 // indirect
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-measure v0.2.0 // indirect
github.com/ipfs/go-fs-lock v0.0.7 // indirect
github.com/ipfs/go-ipfs-cmds v0.10.0 // indirect
Expand Down Expand Up @@ -321,10 +321,11 @@ require (
github.com/filecoin-project/boost-gfm v1.26.7
github.com/filecoin-project/boost-graphsync v0.13.9
github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f
github.com/filecoin-project/go-data-segment v0.0.1
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
github.com/filecoin-project/go-fil-markets v1.28.3
github.com/filecoin-project/lotus v1.25.0
github.com/ipfs/boxo v0.12.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/kubo v0.22.0
github.com/ipld/frisbii v0.4.1
github.com/ipld/go-fixtureplate v0.0.2
Expand All @@ -336,13 +337,11 @@ require (
github.com/schollz/progressbar/v3 v3.13.1
)

require (
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
)
require github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect

require (
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
github.com/filecoin-project/kubo-api-client v0.0.2-0.20230829103503-14448166d14d // indirect
github.com/gammazero/channelqueue v0.2.1 // indirect
github.com/gammazero/deque v0.2.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o=
github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-segment v0.0.1 h1:1wmDxOG4ubWQm3ZC1XI5nCon5qgSq7Ra3Rb6Dbu10Gs=
github.com/filecoin-project/go-data-segment v0.0.1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4=
github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0=
github.com/filecoin-project/go-data-transfer v1.15.4-boost/go.mod h1:S5Es9uoD+3TveYyGjxZInAF6mSQtRjNzezV7Y7Sh8X0=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc=
Expand All @@ -357,8 +359,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88OqLYEo6roi+GiIeOh8=
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 h1:HYIUugzjq78YvV3vC6rL95+SfC/aSTVSnZSZiDV5pCk=
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0/go.mod h1:VH3fAFOru4yyWar4626IoS5+VGE8SfZiBODJLUigEo4=
github.com/filecoin-project/go-fil-markets v1.28.3 h1:2cFu7tLZYrfNz4LnxjgERaVD7k5+Wwp0H76mnnTGPBk=
github.com/filecoin-project/go-fil-markets v1.28.3/go.mod h1:eryxo/oVgIxaR5g5CNr9PlvZOi+u/bak0IsPL/PT1hk=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
Expand Down
Loading

0 comments on commit 37e4d80

Please sign in to comment.