diff --git a/.circleci/config.yml b/.circleci/config.yml index e10974a2a..f1a14b37e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -353,6 +353,11 @@ workflows: suite: all target: "`go list ./... | grep -v boost/itests | grep -v cmd/booster-http | grep -v cmd/booster-bitswap`" + - test: + name: test-itest-data-segment-index + suite: itest-data-segment-index + target: "./itests/data_segment_index_retrieval_test.go" + - test: name: test-itest-ipni suite: itest-ipni diff --git a/car/multi_reader_at.go b/car/multi_reader_at.go new file mode 100644 index 000000000..14d5f8913 --- /dev/null +++ b/car/multi_reader_at.go @@ -0,0 +1,81 @@ +package car + +import ( + "io" + "sort" +) + +func NewMultiReaderAt(parts ...ReaderAtSize) io.ReaderAt { + m := &multiReaderAt{ + parts: make([]readerAtOffset, 0, len(parts)), + } + var off int64 + for _, p := range parts { + rao := readerAtOffset{off, p} + m.parts = append(m.parts, rao) + off += rao.Size() + } + m.size = off + return m +} + +type ReaderAtSize interface { + io.ReaderAt + Size() int64 +} + +type readerAtOffset struct { + off int64 + ReaderAtSize +} + +type multiReaderAt struct { + parts []readerAtOffset + size int64 +} + +func (m *multiReaderAt) Size() int64 { + return m.size +} + +func (m *multiReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + wantN := len(p) + + // Skip past the requested offset. + skipParts := sort.Search(len(m.parts), func(i int) bool { + // This function returns whether parts[i] will + // contribute any bytes to our output. + part := m.parts[i] + return part.off+part.Size() > off + }) + parts := m.parts[skipParts:] + + // How far to skip in the first part. + needSkip := off + if len(parts) > 0 { + needSkip -= parts[0].off + } + + for len(parts) > 0 && len(p) > 0 { + readP := p + partSize := parts[0].Size() + if int64(len(readP)) > partSize-needSkip { + readP = readP[:partSize-needSkip] + } + pn, err0 := parts[0].ReadAt(readP, needSkip) + if err0 != nil { + return n, err0 + } + n += pn + p = p[pn:] + if int64(pn)+needSkip == partSize { + parts = parts[1:] + } + needSkip = 0 + } + + if n != wantN { + err = io.ErrUnexpectedEOF + } + return +} diff --git a/car/multi_reader_at_test.go b/car/multi_reader_at_test.go new file mode 100644 index 000000000..2276730ae --- /dev/null +++ b/car/multi_reader_at_test.go @@ -0,0 +1,101 @@ +package car + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + mrand "math/rand" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMultiReaderAt(t *testing.T) { + req := require.New(t) + + sourceData := make([]byte, 4<<20) + _, err := rand.Read(sourceData) + req.NoError(err) + + testRead := func(t *testing.T, mra io.ReaderAt, readLen int, pos int) { + // t.Logf("testRead() readLen=%d pos=%d", readLen, pos) + req := require.New(t) + readData := make([]byte, readLen) + n, err := mra.ReadAt(readData, int64(pos)) + req.NoError(err) + req.Equal(readLen, n) + req.True(bytes.Equal(sourceData[pos:pos+readLen], readData)) + } + + for _, testCase := range [][]int{ + {1}, + {8}, + {10}, + {1000}, + {1024}, + {2000}, + {1 << 20}, + {10, 10}, + {1 << 20, 1 << 20}, + {10, 10, 10}, + {1 << 20, 1 << 20, 1 << 20}, + {1, 1, 1, 1, 1}, + {8, 1, 8, 1, 8}, + {1000, 8, 10, 1000}, + {1000, 2000, 2000, 1000}, + {1000, 2000, 2000, 8, 1000}, + {8, 2000, 1024, 1 << 20, 1000}, + } { + var sb strings.Builder + for ii, sz := range testCase { + if ii > 0 { + sb.WriteString("_") + } + sb.WriteString(fmt.Sprintf("%d", sz)) + } + + t.Run(sb.String(), func(t *testing.T) { + testLen := 0 + ra := make([]ReaderAtSize, len(testCase)) + for ii, sz := range testCase { + ra[ii] = bytes.NewReader(sourceData[testLen : testLen+sz]) + testLen += sz + } + mra := NewMultiReaderAt(ra...) + // read all + testRead(t, mra, testLen, 0) + // read at random positions + for ii := 0; ii < 100; ii++ { + pos := mrand.Intn(testLen) + readLen := mrand.Intn(testLen - pos) + testRead(t, mra, readLen, pos) + } + // read blocks + off := 0 + for _, sz := range testCase { + testRead(t, mra, sz, off) + off += sz + } + // read just outsize of blocks + off = 0 + for ii, sz := range testCase { + pos := off + rd := sz + if ii > 0 { + rd++ + off-- + } + if off < testLen { + rd++ + } + if rd > testLen-pos { + rd = testLen - pos + } + testRead(t, mra, rd, pos) + off += sz + } + }) + } +} diff --git a/cmd/boostd/recover.go b/cmd/boostd/recover.go index 87850aaf0..e21edadfb 100644 --- a/cmd/boostd/recover.go +++ b/cmd/boostd/recover.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/boost/extern/boostd-data/model" "github.com/filecoin-project/boost/node/config" "github.com/filecoin-project/boost/piecedirectory" + "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-commp-utils/writer" "github.com/filecoin-project/go-jsonrpc" @@ -34,7 +35,6 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" - carv2 "github.com/ipld/go-car/v2" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multibase" "github.com/urfave/cli/v2" @@ -467,7 +467,7 @@ func (dr *DisasterRecovery) CompleteSector(s abi.SectorNumber) error { } // safeUnsealSector tries to return a reader to an unsealed sector or times out -func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi.UnpaddedPieceSize, piecesize abi.PaddedPieceSize) (io.ReadCloser, bool, error) { +func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi.UnpaddedPieceSize, piecesize abi.PaddedPieceSize) (mount.Reader, bool, error) { mid, _ := address.IDFromAddress(maddr) sid := abi.SectorID{ @@ -480,7 +480,7 @@ func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi logger.Errorw("storage find sector", "err", err) } - var reader io.ReadCloser + var reader mount.Reader var isUnsealed bool done := make(chan struct{}) @@ -513,7 +513,7 @@ func safeUnsealSector(ctx context.Context, sectorid abi.SectorNumber, offset abi logger.Debugw("sa.IsUnsealed return true", "sector", sectorid) go func() { - reader, err = sa.UnsealSector(ctx, sectorid, offset, piecesize.Unpadded()) + reader, err = sa.UnsealSectorAt(ctx, sectorid, offset, piecesize.Unpadded()) if err != nil { logger.Errorw("sa.UnsealSector return error", "sector", sectorid, "err", err) return @@ -580,25 +580,12 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab if err != nil { return err } + defer reader.Close() if !isUnsealed { return fmt.Errorf("sector %d is not unsealed", sid) } dr.Sectors[sid].Deals[cdi].IsUnsealed = true - - readerAt := reader.(Reader) - - opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} - rr, err := carv2.NewReader(readerAt, opts...) - if err != nil { - return err - } - - drr, err := rr.DataReader() - if err != nil { - return err - } - dr.Sectors[sid].Deals[cdi].GotDataReader = true if !ignoreLID { // populate LID @@ -663,7 +650,7 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab if !ignoreCommp { // commp over data reader w := &writer.Writer{} - _, err = io.CopyBuffer(w, drr, make([]byte, writer.CommPBuf)) + _, err = io.CopyBuffer(w, reader, make([]byte, writer.CommPBuf)) if err != nil { return fmt.Errorf("copy into commp writer: %w", err) } @@ -676,9 +663,6 @@ func processPiece(ctx context.Context, sectorid abi.SectorNumber, chainDealID ab encoder := cidenc.Encoder{Base: multibase.MustNewEncoder(multibase.Base32)} _ = encoder - //fmt.Println("CommP CID: ", encoder.Encode(commp.PieceCID)) - //fmt.Println("Piece size: ", types.NewInt(uint64(commp.PieceSize.Unpadded().Padded()))) - if !commp.PieceCID.Equals(piececid) { return fmt.Errorf("calculated commp doesnt match on-chain data, expected %s, got %s", piececid, commp.PieceCID) } @@ -782,13 +766,6 @@ func getActorAddress(ctx context.Context, cctx *cli.Context) (maddr address.Addr return maddr, nil } -type Reader interface { - io.Closer - io.Reader - io.ReaderAt - io.Seeker -} - func createLogger(logPath string) (*zap.SugaredLogger, error) { logCfg := zap.NewDevelopmentConfig() logCfg.OutputPaths = []string{"stdout", logPath} diff --git a/cmd/booster-http/e2e_test.go b/cmd/booster-http/e2e_test.go index 4ee6ff24a..24f2076ec 100644 --- a/cmd/booster-http/e2e_test.go +++ b/cmd/booster-http/e2e_test.go @@ -40,8 +40,8 @@ func TestE2E(t *testing.T) { framework.SetLogLevel() t.Log("Starting boost and miner") - boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true)) - req.NoError(boostAndMiner.Start(framework.WithMaxStagingDealsBytes(40000000))) + boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10485760)) + req.NoError(boostAndMiner.Start()) defer boostAndMiner.Stop() req.NoError(boostAndMiner.AddClientProviderBalance(abi.NewTokenAmount(1e15))) diff --git a/cmd/booster-http/trustless_gateway_test.go b/cmd/booster-http/trustless_gateway_test.go index e2866dfcf..49864f286 100644 --- a/cmd/booster-http/trustless_gateway_test.go +++ b/cmd/booster-http/trustless_gateway_test.go @@ -31,8 +31,8 @@ func TestTrustlessGateway(t *testing.T) { kit.QuietMiningLogs() framework.SetLogLevel() - boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true)) - req.NoError(boostAndMiner.Start(framework.WithMaxStagingDealsBytes(40000000))) + boostAndMiner := framework.NewTestFramework(ctx, t, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10485760)) + req.NoError(boostAndMiner.Start()) defer boostAndMiner.Stop() req.NoError(boostAndMiner.AddClientProviderBalance(abi.NewTokenAmount(1e15))) diff --git a/go.mod b/go.mod index 4771a700e..c6edf2f6c 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,8 @@ require ( github.com/filecoin-project/go-commp-utils v0.1.4 github.com/filecoin-project/go-data-transfer v1.15.4-boost github.com/filecoin-project/go-fil-commcid v0.1.0 - github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 + github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 + github.com/filecoin-project/go-fil-markets v1.28.3 github.com/filecoin-project/go-jsonrpc v0.3.1 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 @@ -50,7 +51,7 @@ require ( github.com/ipfs/go-ipfs-files v0.3.0 // indirect github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-ipld-legacy v0.2.1 - github.com/ipfs/go-libipfs v0.7.0 // indirect + github.com/ipfs/go-libipfs v0.7.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-merkledag v0.11.0 // indirect github.com/ipfs/go-metrics-interface v0.0.1 @@ -201,7 +202,6 @@ require ( github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-ds-badger2 v0.1.3 // indirect - github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-measure v0.2.0 // indirect github.com/ipfs/go-fs-lock v0.0.7 // indirect github.com/ipfs/go-ipfs-cmds v0.10.0 // indirect @@ -321,10 +321,11 @@ require ( github.com/filecoin-project/boost-gfm v1.26.7 github.com/filecoin-project/boost-graphsync v0.13.9 github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f + github.com/filecoin-project/go-data-segment v0.0.1 github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 - github.com/filecoin-project/go-fil-markets v1.28.3 github.com/filecoin-project/lotus v1.25.0 github.com/ipfs/boxo v0.12.0 + github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/kubo v0.22.0 github.com/ipld/frisbii v0.4.1 github.com/ipld/go-fixtureplate v0.0.2 @@ -336,13 +337,11 @@ require ( github.com/schollz/progressbar/v3 v3.13.1 ) -require ( - github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect - github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect -) +require github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect require ( github.com/Jorropo/jsync v1.0.1 // indirect + github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect github.com/filecoin-project/kubo-api-client v0.0.2-0.20230829103503-14448166d14d // indirect github.com/gammazero/channelqueue v0.2.1 // indirect github.com/gammazero/deque v0.2.1 // indirect diff --git a/go.sum b/go.sum index 27b1381a5..e4c84e270 100644 --- a/go.sum +++ b/go.sum @@ -347,6 +347,8 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= +github.com/filecoin-project/go-data-segment v0.0.1 h1:1wmDxOG4ubWQm3ZC1XI5nCon5qgSq7Ra3Rb6Dbu10Gs= +github.com/filecoin-project/go-data-segment v0.0.1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4= github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0= github.com/filecoin-project/go-data-transfer v1.15.4-boost/go.mod h1:S5Es9uoD+3TveYyGjxZInAF6mSQtRjNzezV7Y7Sh8X0= github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc= @@ -357,8 +359,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88OqLYEo6roi+GiIeOh8= github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= -github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= -github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= +github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 h1:HYIUugzjq78YvV3vC6rL95+SfC/aSTVSnZSZiDV5pCk= +github.com/filecoin-project/go-fil-commp-hashhash v0.2.0/go.mod h1:VH3fAFOru4yyWar4626IoS5+VGE8SfZiBODJLUigEo4= github.com/filecoin-project/go-fil-markets v1.28.3 h1:2cFu7tLZYrfNz4LnxjgERaVD7k5+Wwp0H76mnnTGPBk= github.com/filecoin-project/go-fil-markets v1.28.3/go.mod h1:eryxo/oVgIxaR5g5CNr9PlvZOi+u/bak0IsPL/PT1hk= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= diff --git a/go.work.sum b/go.work.sum index 1b0cad041..715c89159 100644 --- a/go.work.sum +++ b/go.work.sum @@ -410,6 +410,7 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 h1:Px2UA+2RvSSvv+RvJ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0/go.mod h1:tPaiy8S5bQ+S5sOiDlINkp7+Ef339+Nz5L5XO+cnOHo= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 h1:1BDTz0u9nC3//pOCMdNH+CiXJVYJh5UQNCOBG7jbELc= github.com/ClickHouse/ch-go v0.57.0 h1:X/QmUmFhpUvLgPSQb7fWOSi1wvqGn6tJ7w2a59c4xsg= github.com/ClickHouse/ch-go v0.57.0/go.mod h1:DR3iBn7OrrDj+KeUp1LbdxLEUDbW+5Qwdl/qkc+PQ+Y= @@ -427,6 +428,7 @@ github.com/Masterminds/glide v0.13.2 h1:M5MOH04TyRiMBVeWHbifqTpnauxWINIubTCOkhXh github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITgsTc= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/vcs v1.13.0 h1:USF5TvZGYgIpcbNAEMLfFhHqP08tFZVlUVrmTSpqnyA= +github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Shopify/sarama v1.37.2 h1:LoBbU0yJPte0cE5TZCGdlzZRmMgMtZU/XgnUKZg9Cv4= @@ -597,6 +599,8 @@ github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7 h1:6IrxszG5G+O7zhtkWxq6+unVvnrm1fqV2Pe+T95DUzw= github.com/crate-crypto/go-ipa v0.0.0-20220523130400-f11357ae11c7/go.mod h1:gFnFS95y8HstDP6P9pPwzrxOOC5TRDkwbM+ao15ChAI= github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cyphar/filepath-securejoin v0.2.2 h1:jCwT2GTP+PY5nBz3c/YL5PAIbusElVrPujOBSCj8xRg= github.com/dave/jennifer v1.2.0 h1:S15ZkFMRoJ36mGAQgWL1tnr0NQJh9rZ8qatseX/VbBc= github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= @@ -615,6 +619,7 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/docker/cli v24.0.2+incompatible h1:QdqR7znue1mtkXIJ+ruQMGQhpw2JzMJLRXp6zpzF6tM= github.com/docker/cli v24.0.2+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v23.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v24.0.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7 h1:kgvzE5wLsLa7XKfV85VZl40QXaMCaeFtHpPwJ8fhotY= github.com/dop251/goja v0.0.0-20230122112309-96b1610dd4f7/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs= @@ -640,12 +645,17 @@ github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byA github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 h1:BBso6MBKW8ncyZLv37o+KNyy0HrrHgfnOaGQC2qvN+A= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231009154452-ca8daa2870f3/go.mod h1:vHUM62fb82DpsBSXptQjpvcysjoV2Guc1MVJiIYccfQ= +github.com/filecoin-project/filecoin-ffi v0.30.4-0.20220519234331-bfd1f5f9fe38/go.mod h1:GM5pXRYvQM7wyH6V2WtPnJ2k1jt+qotRkWLxBSRCOuE= github.com/filecoin-project/go-dagaggregator-unixfs v0.3.0 h1:UXLtBUnPa61LkNa2GqhP+aJ53bOnHP/dzg6/wk2rnsA= github.com/filecoin-project/go-dagaggregator-unixfs v0.3.0/go.mod h1:UTWmEgyqq7RMx56AeHY/uEoLq1dJTPAirjyBPas4IQQ= github.com/filecoin-project/go-indexer-core v0.2.16 h1:1SmJVhfHTsi0CC+U6JdyjIIQtOqmKvCl/tqpI3gI+18= github.com/filecoin-project/go-legs v0.4.9 h1:9ccbv5zDPqMviEpSpf0TdfKKI64TMYGSiuY2A1EXHFY= github.com/filecoin-project/go-retrieval-types v1.2.0/go.mod h1:ojW6wSw2GPyoRDBGqw1K6JxUcbfa5NOSIiyQEeh7KK0= +github.com/filecoin-project/go-state-types v0.9.9/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw= github.com/filecoin-project/index-provider v0.8.1 h1:ggoBWvMSWR91HZQCWfv8SZjoTGNyJBwNMLuN9bJZrbU= +github.com/filecoin-project/kubo-api-client v0.0.1/go.mod h1:c36PPMIVOkKfHDwDG5U05gUlPRY9wNuh/BePwo0e+6Y= +github.com/filecoin-project/lotus v1.25.0-rc1/go.mod h1:L4HPpbCl0dvIVLW/anT1iBu1/CURDYN9n4ULKkDZ5ps= github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M= github.com/filecoin-project/storetheindex v0.4.17 h1:w0dVc954TGPukoVbidlYvn9Xt+wVhk5vBvrqeJiRo8I= github.com/filecoin-project/test-vectors/schema v0.0.7 h1:hhrcxLnQR2Oe6fjk63hZXG1fWQGyxgCVXOOlAlR/D9A= @@ -708,6 +718,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpxhq9o2S/CELCSUxEWWAuoCUcVCQWv7G2OCk= github.com/go-latex/latex v0.0.0-20230307184459-12ec69307ad9 h1:NxXI5pTAtpEaU49bpLpQoDsu1zrteW/vxzTz8Cd2UAs= github.com/go-latex/latex v0.0.0-20230307184459-12ec69307ad9/go.mod h1:gWuR/CrFDDeVRFQwHPvsv9soJVB/iqymhuZQuJ3a9OM= +github.com/go-openapi/spec v0.19.2/go.mod h1:sCxk3jxKgioEJikev4fgkNmwS+3kuYdJtcsZsD5zxMY= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0 h1:MlgtGIfsdMEEQJr2le6b/HNr1ZlQwxyWr77r2aj2U/8= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= @@ -725,6 +736,14 @@ github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrt github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o= github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= +github.com/gobuffalo/envy v1.7.0 h1:GlXgaiBkmrYMHco6t4j7SacKO4XUjvh5pwXh0f4uxXU= +github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= +github.com/gobuffalo/logger v1.0.0 h1:xw9Ko9EcC5iAFprrjJ6oZco9UpzS5MQ4jAwghsLHdy4= +github.com/gobuffalo/logger v1.0.0/go.mod h1:2zbswyIUa45I+c+FLXuWl9zSWEiVuthsk8ze5s8JvPs= +github.com/gobuffalo/packd v0.3.0 h1:eMwymTkA1uXsqxS0Tpoop3Lc0u3kTfiMBE6nKtQU4g4= +github.com/gobuffalo/packd v0.3.0/go.mod h1:zC7QkmNkYVGKPw4tHpBQ+ml7W/3tIebgeo1b36chA3Q= +github.com/gobuffalo/packr/v2 v2.6.0 h1:EMUzJIb5rof6r087PtGmgdzdLKpRBESJ/8jyL9MexfY= +github.com/gobuffalo/packr/v2 v2.6.0/go.mod h1:sgEE1xNZ6G0FNN5xn9pevVu4nywaxHvgup67xisti08= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= @@ -809,6 +828,7 @@ github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-version v1.2.0 h1:3vNe/fWF5CBgRIguda1meWhsZHy3m8gCJ5wx+dIzX/E= github.com/hashicorp/go.net v0.0.1 h1:sNCoNyDEvN1xa+X0baata4RdcpKwcMS6DH+xwfqPgjw= github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= github.com/hashicorp/mdns v1.0.1 h1:XFSOubp8KWB+Jd2PDyaX5xUd5bhSP/+pTDZVDMzZJM8= @@ -828,6 +848,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab h1:BA4a7pe6ZTd9F8kXETBoijjFJ/ntaa//1wiH9BZu4zU= github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 h1:Mo9W14pwbO9VfRe+ygqZ8dFbPpoIK1HFrG/zjTuQ+nc= +github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= @@ -844,14 +865,20 @@ github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9 h1:MHTrDWmQpHq/ github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 h1:+TUUmaFa4YD1Q+7bH9o5NCHQGPMqZCYJiNW6lIIS9z4= github.com/ipfs/boxo v0.8.1/go.mod h1:xJ2hVb4La5WyD7GvKYE0lq2g1rmQZoCD2K4WNrV6aZI= github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM= +github.com/ipfs/boxo v0.10.1/go.mod h1:1qgKq45mPRCxf4ZPoJV2lnXxyxucigILMJOrQrVivv8= github.com/ipfs/go-block-format v0.1.1/go.mod h1:+McEIT+g52p+zz5xGAABGSOKrzmrdX97bc0USBdWPUs= github.com/ipfs/go-cid v0.3.0/go.mod h1:P+HXFDF4CVhaVayiEb4wkAy7zBHxBwsJyt0Y5U6MLro= github.com/ipfs/go-cid v0.4.0/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/ipfs/go-delegated-routing v0.2.2 h1:4fM5i/XFDY1VLXvkP/bvrV/4DgOxdymTm7U9V2Nf9aw= +github.com/ipfs/go-delegated-routing v0.7.0 h1:43FyMnKA+8XnyX68Fwg6aoGkqrf8NS5aG7p644s26PU= +github.com/ipfs/go-delegated-routing v0.7.0/go.mod h1:u4zxjUWIe7APUW5ds9CfD0tJX3vM9JhIeNqA8kE4vHE= github.com/ipfs/go-fetcher v1.6.1 h1:UFuRVYX5AIllTiRhi5uK/iZkfhSpBCGX7L70nSZEmK8= github.com/ipfs/go-filestore v1.2.0 h1:O2wg7wdibwxkEDcl7xkuQsPvJFRBVgVSsOJ/GP6z3yU= +github.com/ipfs/go-graphsync v0.14.6/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo= +github.com/ipfs/go-graphsync v0.14.8/go.mod h1:qyHjUvHey6EfKUDMQPwCuVkMOurRG3hcjRm+FaVP6bE= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-ipfs v0.12.1 h1:stT4AJCiaTS2a+yL382g1IB8Gm+jJFqe7Ssvf+L9KNw= +github.com/ipfs/go-ipfs-cmds v0.9.0/go.mod h1:SBFHK8WNwC416QWH9Vz1Ql42SSMAOqKpaHUMBu3jpLo= github.com/ipfs/go-ipfs-config v0.18.0 h1:Ta1aNGNEq6RIvzbw7dqzCVZJKb7j+Dd35JFnAOCpT8g= github.com/ipfs/go-ipfs-files v0.2.0/go.mod h1:vT7uaQfIsprKktzbTPLnIsd+NGw9ZbYwSq0g3N74u0M= github.com/ipfs/go-ipfs-http-client v0.4.0 h1:LNuVbFoKfCohCmcNImml3byM3PpTxTT7RPrv/UoDFkI= @@ -879,6 +906,7 @@ github.com/ipfs/go-todocounter v0.0.1 h1:kITWA5ZcQZfrUnDNkRn04Xzh0YFaDFXsoO2A81E github.com/ipfs/go-unixfs v0.4.4/go.mod h1:TSG7G1UuT+l4pNj91raXAPkX0BhJi3jST1FDTfQ5QyM= github.com/ipfs/go-unixfsnode v1.5.1/go.mod h1:ed79DaG9IEuZITJVQn4U6MZDftv6I3ygUBLPfhEbHvk= github.com/ipfs/go-unixfsnode v1.7.1/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= +github.com/ipfs/go-unixfsnode v1.7.4/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= github.com/ipfs/interface-go-ipfs-core v0.10.0 h1:b/psL1oqJcySdQAsIBfW5ZJJkOAsYlhWtC0/Qvr4WiM= github.com/ipfs/interface-go-ipfs-core v0.10.0/go.mod h1:F3EcmDy53GFkF0H3iEJpfJC320fZ/4G60eftnItrrJ0= @@ -886,6 +914,8 @@ github.com/ipfs/iptb v1.4.0 h1:YFYTrCkLMRwk/35IMyC6+yjoQSHTEcNcefBStLJzgvo= github.com/ipfs/iptb-plugins v0.3.0 h1:C1rpq1o5lUZtaAOkLIox5akh6ba4uk/3RwWc6ttVxw0= github.com/ipfs/tar-utils v0.0.2 h1:UNgHB4x/PPzbMkmJi+7EqC9LNMPDztOVSnx1HAqSNg4= github.com/ipld/edelweiss v0.1.2 h1:dpcC0V+O4tLgIpLG5E4Lqbpp1N1ytnVnvcHYd1lHfN0= +github.com/ipld/edelweiss v0.2.0 h1:KfAZBP8eeJtrLxLhi7r3N0cBCo7JmwSRhOJp3WSpNjk= +github.com/ipld/edelweiss v0.2.0/go.mod h1:FJAzJRCep4iI8FOFlRriN9n0b7OuX3T/S9++NpBDmA4= github.com/ipld/go-car v0.5.0/go.mod h1:ppiN5GWpjOZU9PgpAZ9HbZd9ZgSpwPMr48fGRJOWmvE= github.com/ipld/go-car/v2 v2.5.1/go.mod h1:jKjGOqoCj5zn6KjnabD6JbnCsMntqU2hLiU6baZVO3E= github.com/ipld/go-car/v2 v2.8.0/go.mod h1:a+BnAxUqgr7wcWxW/lI6ctyEQ2v9gjBChPytwFMp2f4= @@ -898,6 +928,9 @@ github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236 github.com/ipld/go-storethehash v0.1.7 h1:c54J7WTBAjKfnSMC4TL7RLFNIY5ws40IzljKKW8zUAw= github.com/ipni/go-indexer-core v0.8.0 h1:HPFMngR47FL49mVnOZBrcxJoRODjIadlP+UYMRboNKA= github.com/ipni/go-indexer-core v0.8.0/go.mod h1:Y9su+no9k6y+jnQRERP/CKJewdISHzzl+n91GA+y4Ao= +github.com/ipni/go-libipni v0.0.8/go.mod h1:paYP9U4N3/vOzGCuN9kU972vtvw9JUcQjOKyiCFGwRk= +github.com/ipni/go-libipni v0.5.0/go.mod h1:UnrhEqjVI2Z2HXlaieOBONJmtW557nZkYpB4IIsMD+s= +github.com/ipni/index-provider v0.12.0/go.mod h1:GhyrADJp7n06fqoc1djzkvL4buZYHzV8SoWrlxEo5F4= github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgx/v5 v5.4.1 h1:oKfB/FhuVtit1bBM3zNRRsZ925ZkMN3HXL+LgLUM9lE= @@ -920,6 +953,8 @@ github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1 h1:ujPKutqRl github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= +github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI= github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4 h1:nwOc1YaOrYJ37sEBrtWZrdqzK22hiJs3GpDmP3sR2Yw= @@ -935,6 +970,8 @@ github.com/kabukky/httpscerts v0.0.0-20150320125433-617593d7dcb3 h1:Iy7Ifq2ysilW github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d h1:cVtBfNW5XTHiKQe7jDaDBSh/EVM4XLPutLAGboIXuM0= github.com/karalabe/usb v0.0.2 h1:M6QQBNxF+CQ8OFvxrT90BA0qBOXymndZnk5q235mFc4= github.com/karalabe/usb v0.0.2/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= +github.com/karrick/godirwalk v1.10.12 h1:BqUm+LuJcXjGv1d2mj3gBiQyrQ57a0rYoAmhvJQ7RDU= +github.com/karrick/godirwalk v1.10.12/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/kisielk/errcheck v1.5.0 h1:e8esj/e4R+SAOwFwN+n3zr0nYeCyeweozKfO23MvHzY= github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23 h1:FOOIBWrEkLgmlgGfMuZT83xIwfPDxEI2OHu6xUmJMFE= @@ -945,6 +982,7 @@ github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrD github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5 h1:2U0HzY8BJ8hVwDKIzp7y4voR9CX/nvcfymLmg2UiOio= github.com/klauspost/cpuid/v2 v2.1.1/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 h1:KAZ1BW2TCmT6PRihDPpocIy1QTtsAsrx6TneU/4+CMg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada h1:3L+neHp83cTjegPdCiOxVOJtRIy7/8RldvMTsyPYH10= github.com/koalacxr/quantile v0.0.1 h1:wAW+SQ286Erny9wOjVww96t8ws+x5Zj6AKHDULUK+o0= @@ -953,6 +991,7 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8 h1:AkaSdXYQOWeaO3neb8EM634ahkXXe3jYbVh/F9lq+GI= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= @@ -987,7 +1026,9 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.5 h1:KG/KNYL2tYzXAfMvQN5K1aAG github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8= github.com/libp2p/go-libp2p-kad-dht v0.21.0/go.mod h1:Bhm9diAFmc6qcWAr084bHNL159srVZRKADdp96Qqd1I= github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU= +github.com/libp2p/go-libp2p-kad-dht v0.24.0/go.mod h1:lfu5T01EH+r6uDZ/8G+ObhwgzVyd0b1nb54AdT8XGhc= github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= +github.com/libp2p/go-libp2p-kbucket v0.6.1/go.mod h1:dvWO707Oq/vhMVuUhyfLkw0QsOrJFETepbNfpVHSELI= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-metrics v0.0.1 h1:yumdPC/P2VzINdmcKZd0pciSUCpou+s0lwYCjBbzQZU= github.com/libp2p/go-libp2p-mplex v0.6.0 h1:5ubK4/vLE2JkogKlJ2JLeXcSfA6qY6mE2HMJV9ve/Sk= @@ -1064,6 +1105,7 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcs github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/sha256-simd v1.0.1-0.20230130105256-d9c3aea9e949/go.mod h1:svsp3c9I8SlWYKpIFAZMgdvmFn8DIN5C9ktYpzZEj80= github.com/mitchellh/cli v1.1.0 h1:tEElEatulEHDeedTxwckzyYMA5c86fbmNIUL1hBIiTg= github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0= github.com/mitchellh/gox v0.4.0 h1:lfGJxY7ToLJQjHHwi0EX6uYBdK78egf954SQl13PQJc= @@ -1075,6 +1117,7 @@ github.com/mitchellh/pointerstructure v1.2.0/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8oh github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= github.com/moby/sys/mountinfo v0.4.1 h1:1O+1cHA1aujwEwwVMa2Xm2l+gIpUHyd3+D+d7LZh1kM= +github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -1177,8 +1220,10 @@ github.com/quic-go/qtls-go1-19 v0.3.3/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05 github.com/quic-go/qtls-go1-20 v0.1.0/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM= github.com/quic-go/qtls-go1-20 v0.1.1/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM= github.com/quic-go/qtls-go1-20 v0.2.2/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM= +github.com/quic-go/qtls-go1-20 v0.3.2/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k= github.com/quic-go/quic-go v0.32.0/go.mod h1:/fCsKANhQIeD5l76c2JFU+07gVE3KaA0FP+0zMWwfwo= github.com/quic-go/quic-go v0.33.0/go.mod h1:YMuhaAV9/jIu0XclDXwZPAsP/2Kgr5yMYhe9oxhhOFA= +github.com/quic-go/quic-go v0.37.6/go.mod h1:YsbH1r4mSHPJcLF4k4zruUkLBqctEMBDR6VPvcYjIsU= github.com/quic-go/webtransport-go v0.5.2/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU= github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= @@ -1187,11 +1232,14 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52 h1:RnWNS9Hlm8BIkjr6wx8li5abe0fr73jljLycdfemTp0= github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s= +github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/zerolog v1.21.0 h1:Q3vdXlfLNT+OftyBHsU0Y445MD+8m8axjKgf2si0QcM= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= +github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww= +github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245 h1:K1Xf3bKttbF+koVGaX5xngRIZ5bVjbmPnaxE/dR08uY= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= @@ -1237,6 +1285,7 @@ github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 h1:pXY9qYc/MP5zdvq github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133 h1:JtcyT0rk/9PKOdnKQzuDR+FSjh7SGtJwpgVpfZBRKlQ= github.com/siebenmann/go-kstat v0.0.0-20160321171754-d34789b79745 h1:IuH7WumZNax0D+rEqmy2TyhKCzrtMGqbZO0b8rO00JA= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v1.13.0/go.mod h1:wDmR7qL282YbGsPy6H/yAsesrxfxaaSlJazyFLYVFx8= github.com/smola/gocompat v0.2.0 h1:6b1oIMlUXIpz//VKEDzPVBK8KG7beVwmHIUEBIs/Pns= github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= @@ -1306,6 +1355,7 @@ github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30 h1:kZiWylALnU github.com/warpfork/go-fsx v0.3.0 h1:RGueN83R4eOc/2oZkQ58RRxQS9JIevWgvoM55oaN9tE= github.com/weaveworks/common v0.0.0-20220810113439-c65105d60b18/go.mod h1:YfOOLoW1Q/jIIu0WLeSwgStmrKjuJEZSKTAUc+0KFvE= github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= +github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/go-ctrlnet v0.0.0-20180313164037-f564fbbdaa95 h1:c23eYhe7i8MG6dUSPzyIDDy5+cWOoZMovPamBKqrjYQ= github.com/whyrusleeping/go-logging v0.0.1 h1:fwpzlmT0kRC/Fmd0MdmGgJG/CXIZ6gFq46FQZjprUcc= @@ -1397,6 +1447,7 @@ go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d h1:E2M5QgjZ/Jg+ObCQAudsXxuTsLj7Nl5RV/lZcQZmKSo= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= @@ -1457,6 +1508,8 @@ golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852 h1:xYq6+9AtI+xP3M4r0N1hCkHrInHDBohhquRgx9Kk6gI= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190515120540-06a5c4944438/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1474,13 +1527,18 @@ golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -1567,6 +1625,7 @@ modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= modernc.org/lex v1.0.0 h1:w0dxp18i1q+aSE7GkepvwzvVWTLoCIQ2oDgTFAV2JZU= modernc.org/lexer v1.0.0 h1:D2xE6YTaH7aiEC7o/+rbx6qTAEr1uY83peKwkamIdQ0= modernc.org/libc v1.24.1/go.mod h1:FmfO1RLrU3MHJfyi9eYYmZBfi/R+tqZ6+hQ3yQQUkak= +modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= modernc.org/memory v1.6.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= diff --git a/gql/resolver_sector.go b/gql/resolver_sector.go new file mode 100644 index 000000000..c135e55f8 --- /dev/null +++ b/gql/resolver_sector.go @@ -0,0 +1,38 @@ +package gql + +import ( + "context" + "github.com/filecoin-project/boost/gql/types" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" +) + +type sectorStatusResolver struct { + si api.SectorInfo +} + +func (r *sectorStatusResolver) Number() types.Uint64 { + return types.Uint64(r.si.SectorID) +} + +func (r *sectorStatusResolver) State() string { + return string(r.si.State) +} + +func (r *sectorStatusResolver) DealIDs() []types.Uint64 { + ids := make([]types.Uint64, 0, len(r.si.Deals)) + for _, id := range r.si.Deals { + ids = append(ids, types.Uint64(id)) + } + return ids +} + +func (dr *resolver) SectorStatus(ctx context.Context, args struct{ SectorNumber types.Uint64 }) (*sectorStatusResolver, error) { + sec := abi.SectorNumber(args.SectorNumber) + si, err := dr.spApi.SectorsStatus(ctx, sec, false) + if err != nil { + return nil, err + } + + return §orStatusResolver{si: si}, nil +} diff --git a/gql/schema.graphql b/gql/schema.graphql index ebab4efad..af4470f29 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -350,6 +350,12 @@ type SealingPipeline { Workers: [Worker]! } +type SectorStatus { + Number: Uint64! + State: String! + DealIDs: [Uint64!]! +} + type ScanProgress { Progress: Float! LastScan: Time @@ -596,6 +602,9 @@ type RootQuery { """Get sealing pipeline state""" sealingpipeline: SealingPipeline! + """Get status of a particular sector""" + sectorStatus(sectorNumber: Uint64!): SectorStatus! + """Get IPNI Provider configuration and state""" ipniProviderInfo: IpniProviderInfo! diff --git a/itests/data_segment_index_retrieval_test.go b/itests/data_segment_index_retrieval_test.go new file mode 100644 index 000000000..fe8c122f2 --- /dev/null +++ b/itests/data_segment_index_retrieval_test.go @@ -0,0 +1,108 @@ +package itests + +import ( + "context" + "path/filepath" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/stretchr/testify/require" +) + +func TestDataSegmentIndexRetrieval(t *testing.T) { + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + var opts []framework.FrameworkOpts + opts = append(opts, framework.EnableLegacyDeals(true)) + opts = append(opts, framework.SetMaxStagingBytes(10000000)) // 10 MB + f := framework.NewTestFramework(ctx, t, opts...) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.Boost.LogSetLevel(ctx, "piecedirectory", "debug") + require.NoError(t, err) + + err = f.LotusMiner.LogSetLevel(ctx, "stores", "info") + require.NoError(t, err) + + err = f.AddClientProviderBalance(abi.NewTokenAmount(1e15)) + require.NoError(t, err) + + //// Create a CAR file + //tempdir := t.TempDir() + //log.Debugw("using tempdir", "dir", tempdir) + // + //// Select the number of car segments to use in test + //seg := 2 + // + //// Generate car file containing multiple car files + //segmentDetails, err := framework.GenerateDataSegmentFiles(t, tempdir, seg) + //require.NoError(t, err) + // + //p := segmentDetails.Piece.PieceCID.String() + // + //log.Info(p) + + // Start a web server to serve the car files + log.Debug("starting webserver") + server, err := testutil.HttpTestFileServer(t, "fixtures") + require.NoError(t, err) + defer server.Close() + + // Create a new dummy deal + log.Debug("creating dummy deal") + dealUuid := uuid.New() + + pieceCid, err := cid.Parse("baga6ea4seaqly4jqbnjbw5dz4gpcu5uuu3o3t7ohzjpjx7x6z3v53tkfutogwga") + require.NoError(t, err) + + // Make a deal + //res, err := f.MakeDummyDeal(dealUuid, segmentDetails.CarPath, segmentDetails.Piece.PieceCID, server.URL+"/"+filepath.Base(segmentDetails.CarPath), false) + res, err := f.MakeDummyDeal(dealUuid, "fixtures/final.car", pieceCid, server.URL+"/"+filepath.Base("final.car"), false) + require.NoError(t, err) + require.True(t, res.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) + + // Wait for the deal to be added to a sector + err = f.WaitForDealAddedToSector(dealUuid) + require.NoError(t, err) + + ////Retrieve and compare the all car files within the deal + //for i := 0; i < seg; i++ { + // for _, r := range segmentDetails.Segments[i].Root { + // outFile := f.RetrieveDirect(ctx, t, r, &res.DealParams.ClientDealProposal.Proposal.PieceCID, true) + // kit.AssertFilesEqual(t, segmentDetails.Segments[i].FilePath, outFile) + // } + //} + + r1, err := cid.Parse("bafykbzaceaqliwrg6y2bxrhhbbiz3nknhz43yj2bqog4rulu5km5qhkckffuw") + require.NoError(t, err) + r2, err := cid.Parse("bafykbzaceccq64xf6yadlbmqpfindtf5x3cssel2fozkhvdyrrtnjnutr5j52") + require.NoError(t, err) + + outF1 := f.RetrieveDirect(ctx, t, r1, &pieceCid, false, nil) + r, err := carv2.OpenReader(outF1) + require.NoError(t, err) + rs, err := r.Roots() + require.NoError(t, err) + require.Equal(t, r1, rs[0]) + r.Close() + outf2 := f.RetrieveDirect(ctx, t, r2, &pieceCid, false, nil) + r, err = carv2.OpenReader(outf2) + require.NoError(t, err) + rs, err = r.Roots() + require.NoError(t, err) + require.Equal(t, r2, rs[0]) + r.Close() +} diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go new file mode 100644 index 000000000..2a9a3ccb6 --- /dev/null +++ b/itests/dummydeal_podsi_test.go @@ -0,0 +1,239 @@ +package itests + +import ( + "bytes" + "context" + "fmt" + "io" + "math/bits" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-data-segment/datasegment" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipfs/go-unixfsnode/data/builder" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" + multihash "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestDummyPodsiDealOnline(t *testing.T) { + randomFileSize := int(1e6) + + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + var opts []framework.FrameworkOpts + opts = append(opts, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10e9), framework.SetProvisionalWalletBalances(9e18)) + f := framework.NewTestFramework(ctx, t, opts...) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.AddClientProviderBalance(abi.NewTokenAmount(5e18)) + require.NoError(t, err) + + tempdir := t.TempDir() + log.Debugw("using tempdir", "dir", tempdir) + + // create a random file + randomFilepath, err := testutil.CreateRandomFile(tempdir, 5, randomFileSize) + require.NoError(t, err) + + carFile := filepath.Join(tempdir, "test.car") + dataSegmentFile := filepath.Join(tempdir, "datasegment.dat") + + // pack it into the car + rootCid, err := createCar(t, carFile, []string{randomFilepath}) + require.NoError(t, err) + + // pack the car into data segement piece twice so that we have two segments + makeDataSegmentPiece(t, dataSegmentFile, []string{carFile, carFile}) + + // Start a web server to serve the car files + log.Debug("starting webserver") + server, err := testutil.HttpTestFileServer(t, tempdir) + require.NoError(t, err) + defer server.Close() + + // Create a new dummy deal + log.Debug("creating dummy deal") + dealUuid := uuid.New() + + // Make a deal + res, err := f.MakeDummyDeal(dealUuid, dataSegmentFile, rootCid, server.URL+"/"+filepath.Base(dataSegmentFile), false) + require.NoError(t, err) + require.True(t, res.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) + + time.Sleep(2 * time.Second) + + // Wait for the first deal to be added to a sector and cleaned up so space is made + err = f.WaitForDealAddedToSector(dealUuid) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + +} + +func makeDataSegmentPiece(t *testing.T, dataSegmentFile string, subPieces []string) { + readers := make([]io.Reader, 0) + deals := make([]abi.PieceInfo, 0) + for _, sp := range subPieces { + arg, err := os.Open(sp) + require.NoError(t, err) + + readers = append(readers, arg) + cp := new(commp.Calc) + _, _ = io.Copy(cp, arg) + rawCommP, size, err := cp.Digest() + require.NoError(t, err) + + _, _ = arg.Seek(0, io.SeekStart) + c, _ := commcid.DataCommitmentV1ToCID(rawCommP) + subdeal := abi.PieceInfo{ + Size: abi.PaddedPieceSize(size), + PieceCID: c, + } + deals = append(deals, subdeal) + } + require.NotEqual(t, 0, len(deals)) + + _, size, err := datasegment.ComputeDealPlacement(deals) + require.NoError(t, err) + + overallSize := abi.PaddedPieceSize(size) + // we need to make this the 'next' power of 2 in order to have space for the index + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + + a, err := datasegment.NewAggregate(abi.PaddedPieceSize(next), deals) + require.NoError(t, err) + out, err := a.AggregateObjectReader(readers) + require.NoError(t, err) + + // open output file + fo, err := os.Create(dataSegmentFile) + require.NoError(t, err) + defer fo.Close() + + written, err := io.Copy(fo, out) + require.NoError(t, err) + require.NotZero(t, written) +} + +func createCar(t *testing.T, carFile string, files []string) (cid.Cid, error) { + // make a cid with the right length that we eventually will patch with the root. + hasher, err := multihash.GetHasher(multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + digest := hasher.Sum([]byte{}) + hash, err := multihash.Encode(digest, multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + proxyRoot := cid.NewCidV1(uint64(multicodec.DagPb), hash) + + options := []car.Option{} + + cdest, err := blockstore.OpenReadWrite(carFile, []cid.Cid{proxyRoot}, options...) + + if err != nil { + return cid.Undef, err + } + + // Write the unixfs blocks into the store. + root, err := writeFiles(context.Background(), false, cdest, files...) + if err != nil { + return cid.Undef, err + } + + if err := cdest.Finalize(); err != nil { + return cid.Undef, err + } + // re-open/finalize with the final root. + return root, car.ReplaceRootsInFile(carFile, []cid.Cid{root}) +} + +func writeFiles(ctx context.Context, noWrap bool, bs *blockstore.ReadWrite, paths ...string) (cid.Cid, error) { + ls := cidlink.DefaultLinkSystem() + ls.TrustedStorage = true + ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) { + cl, ok := l.(cidlink.Link) + if !ok { + return nil, fmt.Errorf("not a cidlink") + } + blk, err := bs.Get(ctx, cl.Cid) + if err != nil { + return nil, err + } + return bytes.NewBuffer(blk.RawData()), nil + } + ls.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.NewBuffer(nil) + return buf, func(l ipld.Link) error { + cl, ok := l.(cidlink.Link) + if !ok { + return fmt.Errorf("not a cidlink") + } + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + _ = bs.Put(ctx, blk) + return nil + }, nil + } + + topLevel := make([]dagpb.PBLink, 0, len(paths)) + for _, p := range paths { + l, size, err := builder.BuildUnixFSRecursive(p, &ls) + if err != nil { + return cid.Undef, err + } + if noWrap { + rcl, ok := l.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", l) + } + return rcl.Cid, nil + } + name := path.Base(p) + entry, err := builder.BuildUnixFSDirectoryEntry(name, int64(size), l) + if err != nil { + return cid.Undef, err + } + topLevel = append(topLevel, entry) + } + + // make a directory for the file(s). + + root, _, err := builder.BuildUnixFSDirectory(topLevel, &ls) + if err != nil { + return cid.Undef, nil + } + rcl, ok := root.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", root) + } + + return rcl.Cid, nil +} diff --git a/itests/fixtures/final.car b/itests/fixtures/final.car new file mode 100644 index 000000000..68a1122b3 Binary files /dev/null and b/itests/fixtures/final.car differ diff --git a/itests/fixtures/fixture.dat b/itests/fixtures/fixture.dat new file mode 100644 index 000000000..4f5334b65 Binary files /dev/null and b/itests/fixtures/fixture.dat differ diff --git a/itests/fixtures/sample.car b/itests/fixtures/sample.car new file mode 100644 index 000000000..9b1732e55 Binary files /dev/null and b/itests/fixtures/sample.car differ diff --git a/itests/framework/fixtures.go b/itests/framework/fixtures.go new file mode 100644 index 000000000..1aaa8df82 --- /dev/null +++ b/itests/framework/fixtures.go @@ -0,0 +1,164 @@ +package framework + +import ( + "errors" + "io" + "math/bits" + "os" + "testing" + + "github.com/filecoin-project/boost/storagemarket" + "github.com/filecoin-project/go-data-segment/datasegment" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" +) + +type CarDetails struct { + CarPath string + Root []cid.Cid + FilePath string +} + +type SegmentDetails struct { + Piece *abi.PieceInfo + Segments []*CarDetails + CarPath string + CarSize int64 +} + +func GenerateDataSegmentFiles(t *testing.T, tmpdir string, num int) (SegmentDetails, error) { + if num < 2 { + return SegmentDetails{}, errors.New("at least 2 deals are required to test data segment index") + } + + fileSize := 1572864 + + var cars []*CarDetails + for i := 1; i <= num; i++ { + + carPath, filePath := kit.CreateRandomCARv1(t, i, fileSize) + rd, err := car.OpenReader(carPath) + if err != nil { + return SegmentDetails{}, err + } + + roots, err := rd.Roots() + if err != nil { + return SegmentDetails{}, err + } + + err = rd.Close() + if err != nil { + return SegmentDetails{}, err + } + + cars = append(cars, &CarDetails{ + CarPath: carPath, + FilePath: filePath, + Root: roots, + }) + } + + finalCar, err := os.CreateTemp(tmpdir, "finalcar") + if err != nil { + return SegmentDetails{}, err + } + + err = generateDataSegmentCar(cars, finalCar) + if err != nil { + return SegmentDetails{}, err + } + + finalCarName := finalCar.Name() + carStat, err := finalCar.Stat() + if err != nil { + return SegmentDetails{}, err + } + carSize := carStat.Size() + err = finalCar.Close() + if err != nil { + return SegmentDetails{}, err + } + + cidAndSize, err := storagemarket.GenerateCommPLocally(finalCarName) + if err != nil { + return SegmentDetails{}, err + } + + return SegmentDetails{ + Piece: cidAndSize, + Segments: cars, + CarPath: finalCarName, + CarSize: carSize, + }, nil +} + +func generateDataSegmentCar(cars []*CarDetails, outputFile *os.File) error { + + readers := make([]io.Reader, 0) + deals := make([]abi.PieceInfo, 0) + + for _, cf := range cars { + + r, err := os.Open(cf.CarPath) + + if err != nil { + return err + } + + readers = append(readers, r) + cp := new(commp.Calc) + + _, err = io.Copy(cp, r) + if err != nil { + return err + } + + rawCommP, size, err := cp.Digest() + if err != nil { + return err + } + + _, err = r.Seek(0, io.SeekStart) + if err != nil { + return err + } + + c, _ := commcid.DataCommitmentV1ToCID(rawCommP) + + subdeal := abi.PieceInfo{ + Size: abi.PaddedPieceSize(size), + PieceCID: c, + } + deals = append(deals, subdeal) + } + + _, size, err := datasegment.ComputeDealPlacement(deals) + if err != nil { + return err + } + + overallSize := abi.PaddedPieceSize(size) + // we need to make this the 'next' power of 2 in order to have space for the index + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + + a, err := datasegment.NewAggregate(abi.PaddedPieceSize(next), deals) + if err != nil { + return err + } + out, err := a.AggregateObjectReader(readers) + if err != nil { + return err + } + + _, err = io.Copy(outputFile, out) + if err != nil { + return err + } + + return nil +} diff --git a/itests/framework/framework.go b/itests/framework/framework.go index fafbf2fbd..f7bb624ff 100644 --- a/itests/framework/framework.go +++ b/itests/framework/framework.go @@ -77,8 +77,10 @@ import ( var Log = logging.Logger("boosttest") type TestFrameworkConfig struct { - Ensemble *kit.Ensemble - EnableLegacy bool + Ensemble *kit.Ensemble + EnableLegacy bool + MaxStagingBytes int64 + ProvisionalWalletBalances int64 } type TestFramework struct { @@ -104,14 +106,29 @@ func EnableLegacyDeals(enable bool) FrameworkOpts { } } +func SetMaxStagingBytes(max int64) FrameworkOpts { + return func(tmc *TestFrameworkConfig) { + tmc.MaxStagingBytes = max + } +} + func WithEnsemble(e *kit.Ensemble) FrameworkOpts { return func(tmc *TestFrameworkConfig) { tmc.Ensemble = e } } +func SetProvisionalWalletBalances(balance int64) FrameworkOpts { + return func(tmc *TestFrameworkConfig) { + tmc.ProvisionalWalletBalances = balance + } +} + func NewTestFramework(ctx context.Context, t *testing.T, opts ...FrameworkOpts) *TestFramework { - fmc := &TestFrameworkConfig{} + fmc := &TestFrameworkConfig{ + // default provisional balance + ProvisionalWalletBalances: 1e18, + } for _, opt := range opts { opt(fmc) } @@ -188,12 +205,6 @@ func FullNodeAndMiner(t *testing.T, ensemble *kit.Ensemble) (*kit.TestFullNode, type ConfigOpt func(cfg *config.Boost) -func WithMaxStagingDealsBytes(maxBytes int64) ConfigOpt { - return func(cfg *config.Boost) { - cfg.Dealmaking.MaxStagingDealsBytes = maxBytes - } -} - func (f *TestFramework) Start(opts ...ConfigOpt) error { lapi.RunningNodeType = lapi.NodeMiner @@ -223,7 +234,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { clientAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, clientAddr, amt) Log.Infof("Created client wallet %s with %d attoFil", clientAddr, amt) wg.Done() @@ -238,7 +249,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating publish storage deals wallet") psdWalletAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, psdWalletAddr, amt) Log.Infof("Created publish storage deals wallet %s with %d attoFil", psdWalletAddr, amt) wg.Done() @@ -247,7 +258,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating deal collateral wallet") dealCollatAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, dealCollatAddr, amt) Log.Infof("Created deal collateral wallet %s with %d attoFil", dealCollatAddr, amt) wg.Done() @@ -333,7 +344,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { return err } cfg.LotusFees.MaxPublishDealsFee = val - cfg.Dealmaking.MaxStagingDealsBytes = 4000000 // 4 MB + cfg.Dealmaking.RemoteCommp = true // No transfers will start until the first stall check period has elapsed cfg.Dealmaking.HttpTransferStallCheckPeriod = config.Duration(100 * time.Millisecond) @@ -346,6 +357,12 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { o(cfg) } + if f.config.MaxStagingBytes > 0 { + cfg.Dealmaking.MaxStagingDealsBytes = f.config.MaxStagingBytes + } else { + cfg.Dealmaking.MaxStagingDealsBytes = 4000000 // 4 MB + } + cfg.Dealmaking.ExpectedSealDuration = 10 // Enable LID with leveldb diff --git a/piecedirectory/piece_directory_test.go b/piecedirectory/piece_directory_test.go index 300c07dd2..fb8584d04 100644 --- a/piecedirectory/piece_directory_test.go +++ b/piecedirectory/piece_directory_test.go @@ -2,9 +2,11 @@ package piecedirectory import ( "context" + "os" "testing" "github.com/filecoin-project/boost/extern/boostd-data/svc" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" ) @@ -14,6 +16,23 @@ func TestPieceDirectoryLevelDB(t *testing.T) { testPieceDirectory(context.Background(), t, bdsvc) } +func TestSegmentParsing(t *testing.T) { + carSize := int64(8323072) + pieceCid, err := cid.Parse(string("baga6ea4seaqly4jqbnjbw5dz4gpcu5uuu3o3t7ohzjpjx7x6z3v53tkfutogwga")) + require.NoError(t, err) + + rd, err := os.Open("testdata/segment.car") + require.NoError(t, err) + + recs, err := parsePieceWithDataSegmentIndex(pieceCid, carSize, rd) + require.NoError(t, err) + + t.Log(recs) + + err = rd.Close() + require.NoError(t, err) +} + func TestPieceDirectoryLevelDBFuzz(t *testing.T) { //_ = logging.SetLogLevel("piecedirectory", "debug") bdsvc, err := svc.NewLevelDB("") diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 5cc9cd833..b617a3fb8 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -2,13 +2,17 @@ package piecedirectory import ( "bufio" + "bytes" "context" "errors" "fmt" "io" + "runtime" "sync" + "sync/atomic" "time" + carutil "github.com/filecoin-project/boost/car" bdclient "github.com/filecoin-project/boost/extern/boostd-data/client" "github.com/filecoin-project/boost/extern/boostd-data/model" "github.com/filecoin-project/boost/extern/boostd-data/shared/tracing" @@ -16,6 +20,7 @@ import ( "github.com/filecoin-project/boost/node/config" "github.com/filecoin-project/boost/piecedirectory/types" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-data-segment/datasegment" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/lib/readerutil" "github.com/filecoin-project/lotus/markets/dagstore" @@ -24,6 +29,7 @@ import ( "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-car" "github.com/ipld/go-car/util" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" @@ -33,12 +39,19 @@ import ( mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" ) var log = logging.Logger("piecedirectory") const ( MaxCachedReaders = 128 + // 20 MiB x 4 parallel deals is just 80MiB RAM overhead required + PodsiBuffesrSize = 20e6 + // Concurrency is driven by the number of available cores. Set reasonable max and mins + // to support multiple concurrent AddIndex operations + PodsiMaxConcurrency = 32 + PodsiMinConcurrency = 4 ) type settings struct { @@ -224,6 +237,8 @@ func (ps *PieceDirectory) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, if err := ps.addIndexForPieceThrottled(ctx, pieceCid, dealInfo); err != nil { return fmt.Errorf("adding index for piece %s: %w", pieceCid, err) } + } else { + log.Infow("add deal for piece", "index", "not re-indexing, piece already indexed") } // Add deal to list of deals for this piece @@ -298,29 +313,19 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid } defer reader.Close() //nolint:errcheck - // Iterate over all the blocks in the piece to extract the index records + // Try to parse data as containing a data segment index log.Debugw("add index: read index", "pieceCid", pieceCid) - recs := make([]model.Record, 0) - opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} - blockReader, err := carv2.NewBlockReader(reader, opts...) + recs, err := parsePieceWithDataSegmentIndex(pieceCid, int64(dealInfo.PieceLength.Unpadded()), reader) if err != nil { - return fmt.Errorf("getting block reader over piece %s: %w", pieceCid, err) - } - - blockMetadata, err := blockReader.SkipNext() - for err == nil { - recs = append(recs, model.Record{ - Cid: blockMetadata.Cid, - OffsetSize: model.OffsetSize{ - Offset: blockMetadata.Offset, - Size: blockMetadata.Size, - }, - }) - - blockMetadata, err = blockReader.SkipNext() - } - if !errors.Is(err, io.EOF) { - return fmt.Errorf("generating index for piece %s: %w", pieceCid, err) + log.Infow("add index: data segment check failed. falling back to car", "pieceCid", pieceCid, "err", err) + // Iterate over all the blocks in the piece to extract the index records + if _, err := reader.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("seek to start for piece %s: %w", pieceCid, err) + } + recs, err = parseRecordsFromCar(reader) + if err != nil { + return fmt.Errorf("parse car for piece %s: %w", pieceCid, err) + } } if len(recs) == 0 { @@ -361,6 +366,169 @@ func (ps *PieceDirectory) addIndexForPiece(ctx context.Context, pieceCid cid.Cid return eg.Wait() } +func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { + // Iterate over all the blocks in the piece to extract the index records + recs := make([]model.Record, 0) + opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} + blockReader, err := carv2.NewBlockReader(reader, opts...) + if err != nil { + return nil, fmt.Errorf("getting block reader over piece: %w", err) + } + + blockMetadata, err := blockReader.SkipNext() + for err == nil { + recs = append(recs, model.Record{ + Cid: blockMetadata.Cid, + OffsetSize: model.OffsetSize{ + Offset: blockMetadata.SourceOffset, + Size: blockMetadata.Size, + }, + }) + + blockMetadata, err = blockReader.SkipNext() + } + if !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("generating index for piece: %w", err) + } + return recs, nil +} + +type countingReader struct { + io.Reader + + cnt *int32 +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + atomic.AddInt32(cr.cnt, 1) + return cr.Reader.Read(p) +} + +func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { + concurrency := runtime.NumCPU() + if concurrency < PodsiMinConcurrency { + concurrency = PodsiMinConcurrency + } + if concurrency > PodsiMaxConcurrency { + concurrency = PodsiMaxConcurrency + } + + log.Debugw("podsi: ", "bufferSize", PodsiBuffesrSize, "validationConcurrency", concurrency) + start := time.Now() + + ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() + dsis := datasegment.DataSegmentIndexStartOffset(ps) + if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { + return nil, fmt.Errorf("could not seek to data segment index: %w", err) + } + + var readsCnt int32 + cr := &countingReader{ + Reader: r, + cnt: &readsCnt, + } + indexData, err := datasegment.ParseDataSegmentIndex(bufio.NewReaderSize(cr, PodsiBuffesrSize)) + if err != nil { + return nil, fmt.Errorf("could not parse data segment index: %w", err) + } + + log.Debugw("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String()) + + if len(indexData.Entries) == 0 { + return nil, fmt.Errorf("no data segments found") + } + + start = time.Now() + + if len(indexData.Entries) < concurrency { + concurrency = len(indexData.Entries) + } + + chunkSize := len(indexData.Entries) / concurrency + results := make([][]datasegment.SegmentDesc, concurrency) + + var eg errgroup.Group + for i := 0; i < concurrency; i++ { + i := i + eg.Go(func() error { + start := i * chunkSize + end := start + chunkSize + if i == concurrency-1 { + end = len(indexData.Entries) + } + + res, err := validateEntries(indexData.Entries[start:end]) + if err != nil { + return err + } + + results[i] = res + + return nil + }) + } + + if err := eg.Wait(); err != nil { + return nil, fmt.Errorf("could not calculate valid entries: %w", err) + } + + validSegments := make([]datasegment.SegmentDesc, 0, len(indexData.Entries)) + for _, res := range results { + validSegments = append(validSegments, res...) + } + + if len(validSegments) == 0 { + return nil, fmt.Errorf("no data segments found") + } + + log.Debugw("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String()) + start = time.Now() + readsCnt = 0 + + recs := make([]model.Record, 0) + for _, s := range validSegments { + segOffset := s.UnpaddedOffest() + segSize := s.UnpaddedLength() + + lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) + + cr = &countingReader{ + Reader: lr, + cnt: &readsCnt, + } + + subRecs, err := parseRecordsFromCar(bufio.NewReaderSize(cr, PodsiBuffesrSize)) + if err != nil { + // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. + return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) + } + for i := range subRecs { + subRecs[i].Offset += segOffset + } + recs = append(recs, subRecs...) + } + + log.Debugw("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String()) + + return recs, nil +} + +func validateEntries(entries []datasegment.SegmentDesc) ([]datasegment.SegmentDesc, error) { + res := make([]datasegment.SegmentDesc, 0, len(entries)) + for i, e := range entries { + + if err := e.Validate(); err != nil { + if errors.Is(err, datasegment.ErrValidation) { + continue + } else { + return nil, xerrors.Errorf("got unknown error for entry %d: %w", i, err) + } + } + res = append(res, e) + } + return res, nil +} + // BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal // corresponding to an unsealed sector for this method to work. It will try to build index // using all available deals and will exit as soon as it succeeds for one of the deals @@ -624,14 +792,16 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, pieceCid, err) } - // Seek to the block offset + // Seek to the section offset readerAt := readerutil.NewReadSeekerFromReaderAt(reader, int64(offsetSize.Offset)) - // Read the block data - _, data, err := util.ReadNode(bufio.NewReader(readerAt)) + readCid, data, err := util.ReadNode(bufio.NewReader(readerAt)) if err != nil { return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, pieceCid, err) } + if !bytes.Equal(readCid.Hash(), c.Hash()) { + return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, pieceCid, c) + } return data, nil }() if err != nil { @@ -744,8 +914,59 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( } // process index and store entries + carVersion, err := carv2.ReadVersion(reader) + if err != nil { + return nil, fmt.Errorf("getting car version for piece %s: %w", pieceCid, err) + } + + // handle absolute index offsets for carv2. + var bsR io.ReaderAt + if carVersion == 2 { + // this code handles the current 'absolute' index offsets stored by boost. + // initially, the data looks like [carv2-header carv1-header block block ...] + // we transform the reader here to look like: + // [carv1-header [gap of carv2-header-size] block block ...] + // the carv1 header at the beginning makes the offset used by the subsequent `blockstore.NewReadOnly` work properly. + + // read the carv2 header to get the payload layout + carReader, err := carv2.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("getting car reader for piece %s: %w", pieceCid, err) + } + dataOffset := int64(carReader.Header.DataOffset) + dataSize := int64(carReader.Header.DataSize) + + // read the payload (CARv1) header + sectionReader := io.NewSectionReader(reader, dataOffset, dataSize) + carHeader, err := car.ReadHeader(bufio.NewReader(sectionReader)) + if err != nil { + return nil, fmt.Errorf("reading car header for piece %s: %w", pieceCid, err) + } + + // write the header back out to a buffer + headerBuf := bytes.NewBuffer(nil) + if err := car.WriteHeader(carHeader, headerBuf); err != nil { + return nil, fmt.Errorf("copying car header for piece %s: %w", pieceCid, err) + } + headerLen := int64(headerBuf.Len()) + + // create a reader that will address the payload after the header + sectionReader = io.NewSectionReader(reader, dataOffset+headerLen, dataSize-headerLen) + + bsR = carutil.NewMultiReaderAt( + bytes.NewReader(headerBuf.Bytes()), // payload (CARv1) header + bytes.NewReader(make([]byte, dataOffset)), // padding to account for the CARv2 wrapper + sectionReader, // payload (CARv1) data + ) + } else { + bsR = reader + if _, err := reader.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("seeking back to start of piece %s: %w", pieceCid, err) + } + } + // Create a blockstore from the index and the piece reader - bs, err := blockstore.NewReadOnly(reader, idx, carv2.ZeroLengthSectionAsEOF(true)) + bs, err := blockstore.NewReadOnly(bsR, idx, carv2.ZeroLengthSectionAsEOF(true)) if err != nil { return nil, fmt.Errorf("creating blockstore for piece %s: %w", pieceCid, err) } diff --git a/piecedirectory/piecedirectory_test_util.go b/piecedirectory/piecedirectory_test_util.go index 1b7e79590..eb599fc0c 100644 --- a/piecedirectory/piecedirectory_test_util.go +++ b/piecedirectory/piecedirectory_test_util.go @@ -21,6 +21,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" + "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -45,6 +46,10 @@ func testPieceDirectory(ctx context.Context, t *testing.T, bdsvc *svc.Service) { testImportedIndex(ctx, t, cl) }) + t.Run("data segment index", func(t *testing.T) { + testDataSegmentIndex(ctx, t, cl) + }) + t.Run("flagging pieces", func(t *testing.T) { testFlaggingPieces(ctx, t, cl) }) @@ -234,6 +239,62 @@ func testImportedIndex(ctx context.Context, t *testing.T, cl *client.Store) { require.Equal(t, len(blk.RawData()), sz) } +func testDataSegmentIndex(ctx context.Context, t *testing.T, cl *client.Store) { + // Any calls to get a reader over data should return a reader over the fixture + pr := CreateMockPieceReaderFromPath(t, "./testdata/deal.data") + fstat, err := os.Stat("./testdata/deal.data") + require.NoError(t, err) + + paddedPieceSize := abi.PaddedPieceSize(fstat.Size()) + maddr := address.TestAddress + rdr, err := pr.GetReader(ctx, maddr, 0, 0, paddedPieceSize) + require.NoError(t, err) + pieceCid := CalculateCommp(t, rdr).PieceCID + + pm := NewPieceDirectory(cl, pr, 1) + pm.Start(ctx) + + // Add deal info for the piece - it doesn't matter what it is, the piece + // just needs to have at least one deal associated with it + di := model.DealInfo{ + DealUuid: uuid.New().String(), + ChainDealID: 1, + SectorID: 2, + PieceOffset: 0, + PieceLength: paddedPieceSize, + } + // Adding the deal for the piece causes LID to fetch the piece data + // from the reader and index it + err = pm.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + + // Load the index of blocks + idx, err := pm.GetIterableIndex(ctx, pieceCid) + require.NoError(t, err) + + // Count the blocks in the piece + var count int + var testCid cid.Cid + _ = idx.ForEach(func(h multihash.Multihash, u uint64) error { + testCid = cid.NewCidV1(cid.Raw, h) + count++ + return fmt.Errorf("done") + }) + + // There should be exactly one cid in the piece + require.Equal(t, 1, count) + + // Verify that getting the size of a block works correctly + bss, err := pm.BlockstoreGetSize(ctx, testCid) + require.NoError(t, err) + require.Equal(t, 392273, bss) + + // validate getting a cid from the 2nd segment: + bss, err = pm.BlockstoreGetSize(ctx, cid.MustParse("bafk2bzacecul64ojb2rl7szydmytaaqqfvbceueaooclsshi5ennuyhsgzt2m")) + require.NoError(t, err) + require.Equal(t, 188193, bss) +} + func testFlaggingPieces(ctx context.Context, t *testing.T, cl *client.Store) { // Create a random CAR file _, carFilePath := CreateCarFile(t) diff --git a/piecedirectory/test_util.go b/piecedirectory/test_util.go index 79230c9d6..79d92035c 100644 --- a/piecedirectory/test_util.go +++ b/piecedirectory/test_util.go @@ -3,6 +3,7 @@ package piecedirectory import ( "context" "io" + "os" "testing" "time" @@ -103,4 +104,16 @@ type MockSectionReader struct { car.SectionReader } -func (*MockSectionReader) Close() error { return nil } +func (MockSectionReader) Close() error { return nil } + +// like `CreateMockPieceReader`, but returns a reader over the contents of a file. +func CreateMockPieceReaderFromPath(t *testing.T, path string) *mock_piecedirectory.MockPieceReader { + ctrl := gomock.NewController(t) + pr := mock_piecedirectory.NewMockPieceReader(ctrl) + pr.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn( + func(_ context.Context, _ address.Address, _ abi.SectorNumber, _ abi.PaddedPieceSize, _ abi.PaddedPieceSize) (types.SectionReader, error) { + f, err := os.Open(path) + return f, err + }) + return pr +} diff --git a/piecedirectory/testdata/deal.data b/piecedirectory/testdata/deal.data new file mode 100644 index 000000000..11c7069bb Binary files /dev/null and b/piecedirectory/testdata/deal.data differ diff --git a/piecedirectory/testdata/segment.car b/piecedirectory/testdata/segment.car new file mode 100644 index 000000000..68a1122b3 Binary files /dev/null and b/piecedirectory/testdata/segment.car differ diff --git a/react/src/Deals.css b/react/src/Deals.css index c83a00cd4..b1bcf7132 100644 --- a/react/src/Deals.css +++ b/react/src/Deals.css @@ -54,10 +54,33 @@ align-items: center; } -.deals tr.show-actions td.message .message-content .message-text { +.deals .message-content { + position: relative; +} + +.deals .message-content .message-text { flex-grow: 1; } +.deals .message-content .warning-msg { + position: absolute; + top: -1em; + left: -1em; + padding: 1em; + background-color: #fff; + box-shadow: 0em 0em 1em #ddd; + border-radius: 0.5em; + z-index: 10; + display: none; +} +.deals .message-content .warning-msg.showing { + display: flex; +} + +.deals .message-content .warning { + margin-right: 0.5em; +} + .deals tr.show-actions td.message .message-content .message-text .transfer-rate { padding-left: 1em; color: #999999; diff --git a/react/src/Deals.js b/react/src/Deals.js index edfb0dc7c..fcfbdfc4a 100644 --- a/react/src/Deals.js +++ b/react/src/Deals.js @@ -1,7 +1,7 @@ import {useQuery} from "@apollo/react-hooks"; import { DealsCountQuery, - DealsListQuery, LegacyDealsCountQuery, + DealsListQuery, LegacyDealsCountQuery, SectorStatusQuery, } from "./gql"; import moment from "moment"; import {DebounceInput} from 'react-debounce-input'; @@ -16,6 +16,7 @@ import {DealsPerPage} from "./deals-per-page"; import columnsGapImg from './bootstrap-icons/icons/columns-gap.svg' import xImg from './bootstrap-icons/icons/x-lg.svg' import './Deals.css' +import warningImg from './bootstrap-icons/icons/exclamation-circle.svg' import {Pagination} from "./Pagination"; import {DealActions, IsPaused, IsTransferring, IsOfflineWaitingForData} from "./DealDetail"; import {humanTransferRate} from "./DealTransfers"; @@ -290,9 +291,11 @@ function DealRow(props) { } } - const showActions = (IsPaused(deal) || IsTransferring(deal) || IsOfflineWaitingForData(deal)) + // Show deal action buttons if the deal can be retried / cancelled + var showActions = (IsPaused(deal) || IsTransferring(deal) || IsOfflineWaitingForData(deal)) + const hasAnnounceError = deal.Err && deal.Checkpoint === 'AddedPiece' && (deal.Sector || {}).ID var rowClassName = '' - if (showActions) { + if (showActions || hasAnnounceError) { rowClassName = 'show-actions' } @@ -310,17 +313,81 @@ function DealRow(props) {
- - {deal.Message} - - - {showActions ? : null} + { hasAnnounceError ? ( + + ) : ( + <> + + {deal.Message} + + + {showActions ? : null} + + ) }
) } +// DealRowAnnounceError shows a row with the sealing status, and a warning icon. +// When the user hovers on the warning icon it shows a box with the warning and +// action buttons to retry / pause +function DealRowAnnounceError({deal}) { + const warningMsgElId = "message-"+deal.ID + const warningImgElId = "img-warn-"+deal.ID + const messageBoxId = "message-box-"+deal.ID + useEffect(() => { + const warningImg = document.getElementById(warningImgElId) + const warningMsg = document.getElementById(warningMsgElId) + const messageBox = document.getElementById(messageBoxId) + if(!warningImg || !warningMsg || !messageBox) { + return + } + + warningImg.addEventListener("mouseover", () => { + warningMsg.classList.add('showing') + }) + messageBox.addEventListener("mouseleave", () => { + warningMsg.classList.remove('showing') + }) + + return function () { + warningImg.removeEventListener("mouseover") + messageBox.removeEventListener("mouseleave") + } + }) + + const {data, loading, error} = useQuery(SectorStatusQuery, { + pollInterval: 10000, + fetchPolicy: 'network-only', + variables: { + sectorNumber: deal.Sector.ID + } + }) + + if (error) { + return Sealer: {error.message} + } + if (loading) { + return null + } + + return
+ + + Sealer: {data.sectorStatus.State} + + + + + {deal.Message} + + + +
+} + function TransferRate({deal}) { if (!IsTransferring(deal) || IsPaused(deal) || deal.Transferred === 0 || deal.IsTransferStalled) { return null diff --git a/react/src/gql.js b/react/src/gql.js index 993b9c7e3..d35962baf 100644 --- a/react/src/gql.js +++ b/react/src/gql.js @@ -707,6 +707,16 @@ const SealingPipelineQuery = gql` } `; +const SectorStatusQuery = gql` + query AppSectorStatusQuery($sectorNumber: Uint64!) { + sectorStatus(sectorNumber: $sectorNumber) { + Number + State + DealIDs + } + } +`; + const FundsQuery = gql` query AppFundsQuery { funds { @@ -921,6 +931,7 @@ export { TransferStatsQuery, MpoolQuery, SealingPipelineQuery, + SectorStatusQuery, Libp2pAddrInfoQuery, StorageAskQuery, PublishPendingDealsMutation, diff --git a/retrievalmarket/server/gsunpaidretrieval_test.go b/retrievalmarket/server/gsunpaidretrieval_test.go index 6af1c581b..cb8d5b6bb 100644 --- a/retrievalmarket/server/gsunpaidretrieval_test.go +++ b/retrievalmarket/server/gsunpaidretrieval_test.go @@ -47,6 +47,7 @@ type testCase struct { watch func(client retrievalmarket.RetrievalClient, gsupr *GraphsyncUnpaidRetrieval) ask *retrievalmarket.Ask noUnsealedCopy bool + useCarV2 bool expectErr bool expectClientCancelEvent bool expectProviderCancelEvent bool @@ -66,6 +67,9 @@ func TestGS(t *testing.T) { testCases := []testCase{{ name: "happy path", + }, { + name: "happy path w/ carv2", + useCarV2: true, }, { name: "request missing payload cid", reqPayloadCid: missingCid, @@ -162,22 +166,29 @@ func runRequestTest(t *testing.T, tc testCase) { // Create a CAR file and set up mocks testData := tut.NewLibp2pTestData(ctx, t) + // Create a random CAR file carRootCid, carFilePath := piecedirectory.CreateCarFile(t) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - // Create a random CAR file - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) + var sectionReader car.SectionReader + + if tc.useCarV2 { + var err error + sectionReader, err = os.Open(carFilePath) + require.NoError(t, err) + defer sectionReader.(*os.File).Close() + } else { + carReader, err := car.OpenReader(carFilePath) + require.NoError(t, err) + defer carReader.Close() + sectionReader, err = carReader.DataReader() + require.NoError(t, err) + + } // Any calls to get a reader over data should return a reader over the random CAR file - pr := piecedirectory.CreateMockPieceReader(t, carv1Reader) + pr := piecedirectory.CreateMockPieceReader(t, sectionReader) - carv1Bytes, err := io.ReadAll(carv1Reader) + carv1Bytes, err := io.ReadAll(sectionReader) require.NoError(t, err) carSize := len(carv1Bytes) diff --git a/storagemarket/deal_commp.go b/storagemarket/deal_commp.go index 6d1f54bf6..ea7b22bc0 100644 --- a/storagemarket/deal_commp.go +++ b/storagemarket/deal_commp.go @@ -131,39 +131,42 @@ func generatePieceCommitment(ctx context.Context, commpCalc smtypes.CommpCalcula // remoteCommP makes an API call to the sealing service to calculate commp func remoteCommP(ctx context.Context, commpCalc smtypes.CommpCalculator, filepath string) (*abi.PieceInfo, *dealMakingError) { // Open the CAR file - rd, err := carv2.OpenReader(filepath) + rd, err := os.Open(filepath) if err != nil { return nil, &dealMakingError{ retry: types.DealRetryFatal, - error: fmt.Errorf("failed to get CARv2 reader: %w", err), + error: fmt.Errorf("failed to get reader: %w", err), } } defer func() { if err := rd.Close(); err != nil { - log.Warnf("failed to close CARv2 reader for %s: %w", filepath, err) + log.Warnf("failed to close reader for %s: %w", filepath, err) } }() - // Get the size of the CAR file - size, err := getCarSize(filepath, rd) - if err != nil { - return nil, &dealMakingError{retry: types.DealRetryFatal, error: err} + // (willscott - oct 2023 - remove once raw byte supported): confirm file is a car file + if _, err := carv2.ReadVersion(rd); err != nil { + return nil, &dealMakingError{ + retry: types.DealRetryFatal, + error: fmt.Errorf("failed to read car header: %w", err), + } } + _, _ = rd.Seek(0, io.SeekStart) - // Get the data portion of the CAR file - dataReader, err := rd.DataReader() + // Get the size of the file + st, err := os.Stat(filepath) if err != nil { - return nil, &dealMakingError{ - retry: types.DealRetryManual, - error: fmt.Errorf("getting CAR data reader to calculate commp: %w", err), - } + return nil, &dealMakingError{retry: types.DealRetryFatal, error: err} + } + if st.Size() == 0 { + return nil, &dealMakingError{retry: types.DealRetryFatal, error: fmt.Errorf("empty file")} } // The commp calculation requires the data to be of length // pieceSize.Unpadded(), so add zeros until it reaches that size - pr, numBytes := padreader.New(dataReader, uint64(size)) - log.Debugw("computing remote commp", "size", size, "padded-size", numBytes) + pr, numBytes := padreader.New(rd, uint64(st.Size())) + log.Debugw("computing remote commp", "size", st.Size(), "padded-size", numBytes) pi, err := commpCalc.ComputeDataCid(ctx, numBytes, pr) if err != nil { if ctx.Err() != nil { @@ -182,37 +185,44 @@ func remoteCommP(ctx context.Context, commpCalc smtypes.CommpCalculator, filepat // GenerateCommPLocally calculates commp locally func GenerateCommPLocally(filepath string) (*abi.PieceInfo, error) { - rd, err := carv2.OpenReader(filepath) + rd, err := os.Open(filepath) if err != nil { - return nil, fmt.Errorf("failed to get CARv2 reader: %w", err) + return nil, fmt.Errorf("failed to get reader: %w", err) } defer func() { if err := rd.Close(); err != nil { - log.Warnf("failed to close CARv2 reader for %s: %w", filepath, err) + log.Warnf("failed to close reader for %s: %w", filepath, err) } }() - // dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP. - w := &writer.Writer{} - r, err := rd.DataReader() - if err != nil { - return nil, fmt.Errorf("getting data reader for CAR v1 from CAR v2: %w", err) + // (willscott - oct 2023 - remove once raw byte supported): confirm file is a car file + if _, err := carv2.ReadVersion(rd); err != nil { + return nil, &dealMakingError{ + retry: types.DealRetryFatal, + error: fmt.Errorf("failed to open carv2 reader: %w", err), + } } + _, _ = rd.Seek(0, io.SeekStart) - written, err := io.Copy(w, r) + w := &writer.Writer{} + + written, err := io.Copy(w, rd) if err != nil { return nil, fmt.Errorf("writing to commp writer: %w", err) } - // get the size of the CAR file - size, err := getCarSize(filepath, rd) + // confirm the size of the file + st, err := os.Stat(filepath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get file size: %w", err) } - if written != size { - return nil, fmt.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, rd.Header.DataSize) + if written != st.Size() { + return nil, fmt.Errorf("number of bytes written to CommP writer %d not equal to the file size %d", written, st.Size()) + } + if st.Size() == 0 { + return nil, fmt.Errorf("empty file") } pi, err := w.Sum() @@ -225,18 +235,3 @@ func GenerateCommPLocally(filepath string) (*abi.PieceInfo, error) { PieceCID: pi.PieceCID, }, nil } - -func getCarSize(filepath string, rd *carv2.Reader) (int64, error) { - var size int64 - switch rd.Version { - case 2: - size = int64(rd.Header.DataSize) - case 1: - st, err := os.Stat(filepath) - if err != nil { - return 0, fmt.Errorf("failed to get CARv1 file size: %w", err) - } - size = st.Size() - } - return size, nil -} diff --git a/storagemarket/deal_execution.go b/storagemarket/deal_execution.go index 044a2012c..6d7e6db74 100644 --- a/storagemarket/deal_execution.go +++ b/storagemarket/deal_execution.go @@ -568,28 +568,22 @@ func (p *Provider) addPiece(ctx context.Context, pub event.Emitter, deal *types. } func openReader(filePath string, pieceSize abi.UnpaddedPieceSize) (io.ReadCloser, error) { + st, err := os.Stat(filePath) + if err != nil { + return nil, fmt.Errorf("failed to stat %s: %w", filePath, err) + } + size := uint64(st.Size()) + // Open a reader against the CAR file with the deal data v2r, err := carv2.OpenReader(filePath) if err != nil { return nil, fmt.Errorf("failed to open CAR reader over %s: %w", filePath, err) } + v2r.Close() - var size uint64 - switch v2r.Version { - case 1: - st, err := os.Stat(filePath) - if err != nil { - return nil, fmt.Errorf("failed to stat %s: %w", filePath, err) - } - size = uint64(st.Size()) - case 2: - size = v2r.Header.DataSize - } - - // Inflate the deal size so that it exactly fills a piece - r, err := v2r.DataReader() + r, err := os.Open(filePath) if err != nil { - return nil, fmt.Errorf("failed to open CAR data reader over %s: %w", filePath, err) + return nil, fmt.Errorf("failed to open %s: %w", filePath, err) } reader, err := padreader.NewInflator(r, size, pieceSize) @@ -602,7 +596,7 @@ func openReader(filePath string, pieceSize abi.UnpaddedPieceSize) (io.ReadCloser io.Closer }{ Reader: reader, - Closer: v2r, + Closer: r, }, nil } diff --git a/storagemarket/provider_test.go b/storagemarket/provider_test.go index ba15889a8..40443b612 100644 --- a/storagemarket/provider_test.go +++ b/storagemarket/provider_test.go @@ -1290,12 +1290,10 @@ func (h *ProviderHarness) AssertFundManagerState(t *testing.T, ctx context.Conte } func (h *ProviderHarness) AssertSealedContents(t *testing.T, carV2FilePath string, read []byte) { - cr, err := carv2.OpenReader(carV2FilePath) + r, err := os.Open(carV2FilePath) require.NoError(t, err) - defer cr.Close() + defer r.Close() - r, err := cr.DataReader() - require.NoError(t, err) actual, err := io.ReadAll(r) require.NoError(t, err)