diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index 22c2fb77d..8bc77ed27 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -406,119 +406,140 @@ func (cr *countingReader) Read(p []byte) (n int, err error) { } func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { - - concurrency := runtime.NumCPU() - if concurrency < PodsiMinConcurrency { - concurrency = PodsiMinConcurrency - } - if concurrency > PodsiMaxConcurrency { - concurrency = PodsiMaxConcurrency - } - - log.Debugw("podsi: ", "bufferSize", PodsiBuffesrSize, "validationConcurrency", concurrency) - start := time.Now() - - ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() - dsis := datasegment.DataSegmentIndexStartOffset(ps) - if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { - return nil, fmt.Errorf("could not seek to data segment index: %w", err) - } - - var readsCnt int32 - cr := &countingReader{ - Reader: r, - cnt: &readsCnt, - } - panicked := false - indexData, err := parseDataSegmentIndex(pieceCid, bufio.NewReaderSize(cr, PodsiBuffesrSize), &panicked) - if err != nil { - return nil, fmt.Errorf("could not parse data segment index: %w", err) - } - if panicked { - return nil, fmt.Errorf("could not parse data segment index because of an internal panic") - } - - log.Debugw("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String()) - - if len(indexData.Entries) == 0 { - return nil, fmt.Errorf("no data segments found") - } - - start = time.Now() - - if len(indexData.Entries) < concurrency { - concurrency = len(indexData.Entries) - } - - chunkSize := len(indexData.Entries) / concurrency - results := make([][]datasegment.SegmentDesc, concurrency) - - var eg errgroup.Group - for i := 0; i < concurrency; i++ { - i := i - eg.Go(func() error { - start := i * chunkSize - end := start + chunkSize - if i == concurrency-1 { - end = len(indexData.Entries) - } - - res, err := validateEntries(indexData.Entries[start:end]) - if err != nil { - return err - } - - results[i] = res - - return nil - }) - } - - if err := eg.Wait(); err != nil { - return nil, fmt.Errorf("could not calculate valid entries: %w", err) - } - - validSegments := make([]datasegment.SegmentDesc, 0, len(indexData.Entries)) - for _, res := range results { - validSegments = append(validSegments, res...) - } - - if len(validSegments) == 0 { - return nil, fmt.Errorf("no data segments found") - } - - log.Debugw("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String()) - start = time.Now() - readsCnt = 0 - - recs := make([]model.Record, 0) - for _, s := range validSegments { - segOffset := s.UnpaddedOffest() - segSize := s.UnpaddedLength() - - lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) - - cr = &countingReader{ - Reader: lr, - cnt: &readsCnt, - } - - subRecs, err := parseRecordsFromCar(bufio.NewReaderSize(cr, PodsiBuffesrSize)) - if err != nil { - // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. - return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) - } - for i := range subRecs { - subRecs[i].Offset += segOffset - } - recs = append(recs, subRecs...) - } - - log.Debugw("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String()) - - return recs, nil + concurrency := runtime.NumCPU() + if concurrency < PodsiMinConcurrency { + concurrency = PodsiMinConcurrency + } + if concurrency > PodsiMaxConcurrency { + concurrency = PodsiMaxConcurrency + } + + log.Debugw("podsi: ", "bufferSize", PodsiBuffesrSize, "validationConcurrency", concurrency) + start := time.Now() + + ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() + dsis := datasegment.DataSegmentIndexStartOffset(ps) + if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { + log.Debugw("Failed to seek to data segment index", "error", err) + return nil, fmt.Errorf("could not seek to data segment index: %w", err) + } + + // Debug: Print the file size + file, ok := r.(io.ReadSeeker) + if ok { + fileSize, err := file.Seek(0, io.SeekEnd) + if err != nil { + log.Debugw("Failed to get file size", "error", err) + } else { + log.Debugw("File size", "size", fileSize) + } + _, err = file.Seek(0, io.SeekStart) // Return the file pointer to the start + if err != nil { + log.Debugw("Failed to seek to the start of the file", "error", err) + } + } + + var readsCnt int32 + cr := &countingReader{ + Reader: r, + cnt: &readsCnt, + } + panicked := false + indexData, err := parseDataSegmentIndex(pieceCid, bufio.NewReaderSize(cr, PodsiBuffesrSize), &panicked) + if err != nil { + log.Debugw("Failed to parse data segment index", "error", err) + return nil, fmt.Errorf("could not parse data segment index: %w", err) + } + if panicked { + log.Debugw("Internal panic while parsing data segment index") + return nil, fmt.Errorf("could not parse data segment index because of an internal panic") + } + + log.Debugw("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String()) + + if len(indexData.Entries) == 0 { + log.Debugw("No data segments found") + return nil, fmt.Errorf("no data segments found") + } + + start = time.Now() + + if len(indexData.Entries) < concurrency { + concurrency = len(indexData.Entries) + } + + chunkSize := len(indexData.Entries) / concurrency + results := make([][]datasegment.SegmentDesc, concurrency) + + var eg errgroup.Group + for i := 0; i < concurrency; i++ { + i := i + eg.Go(func() error { + start := i * chunkSize + end := start + chunkSize + if i == concurrency-1 { + end = len(indexData.Entries) + } + + res, err := validateEntries(indexData.Entries[start:end]) + if err != nil { + return err + } + + results[i] = res + + return nil + }) + } + + if err := eg.Wait(); err != nil { + log.Debugw("Failed to calculate valid entries", "error", err) + return nil, fmt.Errorf("could not calculate valid entries: %w", err) + } + + validSegments := make([]datasegment.SegmentDesc, 0, len(indexData.Entries)) + for _, res := range results { + validSegments = append(validSegments, res...) + } + + if len(validSegments) == 0 { + log.Debugw("No valid data segments found") + return nil, fmt.Errorf("no valid data segments found") + } + + log.Debugw("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String()) + start = time.Now() + readsCnt = 0 + + recs := make([]model.Record, 0) + for _, s := range validSegments { + segOffset := s.UnpaddedOffest() + segSize := s.UnpaddedLength() + + lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) + + cr = &countingReader{ + Reader: lr, + cnt: &readsCnt, + } + + subRecs, err := parseRecordsFromCar(bufio.NewReaderSize(cr, PodsiBuffesrSize)) + if err != nil { + log.Debugw("Failed to parse data segment", "error", err) + return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) + } + for i := range subRecs { + subRecs[i].Offset += segOffset + } + recs = append(recs, subRecs...) + } + + log.Debugw("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String()) + + return recs, nil } + // parseDataSegmentIndex is a temporary wrapper around datasegment.ParseDataSegmentIndex that exists only as a workaround // for "slice bounds out of range" panic inside lotus. This funciton should be removed once the panic is fixed. func parseDataSegmentIndex(pieceCid cid.Cid, unpaddedReader io.Reader, panicked *bool) (datasegment.IndexData, error) {