Skip to content

Commit

Permalink
refactor the io.seek
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Jan 16, 2024
1 parent ed95973 commit ec52755
Showing 1 changed file with 126 additions and 132 deletions.
258 changes: 126 additions & 132 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,139 +406,133 @@ func (cr *countingReader) Read(p []byte) (n int, err error) {
}

func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) {
concurrency := runtime.NumCPU()
if concurrency < PodsiMinConcurrency {
concurrency = PodsiMinConcurrency
}
if concurrency > PodsiMaxConcurrency {
concurrency = PodsiMaxConcurrency
}

log.Debugw("podsi: ", "bufferSize", PodsiBuffesrSize, "validationConcurrency", concurrency)
start := time.Now()

ps := abi.UnpaddedPieceSize(unpaddedSize).Padded()
dsis := datasegment.DataSegmentIndexStartOffset(ps)
if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil {
log.Debugw("Failed to seek to data segment index", "error", err)
return nil, fmt.Errorf("could not seek to data segment index: %w", err)
}

// Debug: Print the file size
file, ok := r.(io.ReadSeeker)
if ok {
fileSize, err := file.Seek(0, io.SeekEnd)
if err != nil {
log.Debugw("Failed to get file size", "error", err)
} else {
log.Debugw("File size", "size", fileSize)
}
_, err = file.Seek(0, io.SeekStart) // Return the file pointer to the start
if err != nil {
log.Debugw("Failed to seek to the start of the file", "error", err)
}
}

var readsCnt int32
cr := &countingReader{
Reader: r,
cnt: &readsCnt,
}
panicked := false
indexData, err := parseDataSegmentIndex(pieceCid, bufio.NewReaderSize(cr, PodsiBuffesrSize), &panicked)
if err != nil {
log.Debugw("Failed to parse data segment index", "error", err)
return nil, fmt.Errorf("could not parse data segment index: %w", err)
}
if panicked {
log.Debugw("Internal panic while parsing data segment index")
return nil, fmt.Errorf("could not parse data segment index because of an internal panic")
}

log.Debugw("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String())

if len(indexData.Entries) == 0 {
log.Debugw("No data segments found")
return nil, fmt.Errorf("no data segments found")
}

start = time.Now()

if len(indexData.Entries) < concurrency {
concurrency = len(indexData.Entries)
}

chunkSize := len(indexData.Entries) / concurrency
results := make([][]datasegment.SegmentDesc, concurrency)

var eg errgroup.Group
for i := 0; i < concurrency; i++ {
i := i
eg.Go(func() error {
start := i * chunkSize
end := start + chunkSize
if i == concurrency-1 {
end = len(indexData.Entries)
}

res, err := validateEntries(indexData.Entries[start:end])
if err != nil {
return err
}

results[i] = res

return nil
})
}

if err := eg.Wait(); err != nil {
log.Debugw("Failed to calculate valid entries", "error", err)
return nil, fmt.Errorf("could not calculate valid entries: %w", err)
}

validSegments := make([]datasegment.SegmentDesc, 0, len(indexData.Entries))
for _, res := range results {
validSegments = append(validSegments, res...)
}

if len(validSegments) == 0 {
log.Debugw("No valid data segments found")
return nil, fmt.Errorf("no valid data segments found")
}

log.Debugw("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String())
start = time.Now()
readsCnt = 0

recs := make([]model.Record, 0)
for _, s := range validSegments {
segOffset := s.UnpaddedOffest()
segSize := s.UnpaddedLength()

lr := io.NewSectionReader(r, int64(segOffset), int64(segSize))

cr = &countingReader{
Reader: lr,
cnt: &readsCnt,
}

subRecs, err := parseRecordsFromCar(bufio.NewReaderSize(cr, PodsiBuffesrSize))
if err != nil {
log.Debugw("Failed to parse data segment", "error", err)
return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err)
}
for i := range subRecs {
subRecs[i].Offset += segOffset
}
recs = append(recs, subRecs...)
}

log.Debugw("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String())

return recs, nil
}
concurrency := runtime.NumCPU()
if concurrency < PodsiMinConcurrency {
concurrency = PodsiMinConcurrency
}
if concurrency > PodsiMaxConcurrency {
concurrency = PodsiMaxConcurrency
}

log.Debugw("podsi: ", "bufferSize", PodsiBuffesrSize, "validationConcurrency", concurrency)
start := time.Now()

ps := abi.UnpaddedPieceSize(unpaddedSize).Padded()
dsis := datasegment.DataSegmentIndexStartOffset(ps)

// We seek to end of reader to avoid EOF encountered when parsing the segments
// This should be fixed on Miner side permanently before removing this chunk of code
_, err := r.Seek(0, io.SeekEnd)
if err != nil {
log.Debugw("Failed to seek to the end of the piece reader")
return nil, fmt.Errorf("could not seek to end of piece reader: %w", err)
}

// Wind back the seeker
if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil {
log.Debugw("Failed to seek to data segment index", "error", err)
return nil, fmt.Errorf("could not seek to data segment index: %w", err)
}

var readsCnt int32
cr := &countingReader{
Reader: r,
cnt: &readsCnt,
}
panicked := false
indexData, err := parseDataSegmentIndex(pieceCid, bufio.NewReaderSize(cr, PodsiBuffesrSize), &panicked)
if err != nil {
log.Debugw("Failed to parse data segment index", "error", err)
return nil, fmt.Errorf("could not parse data segment index: %w", err)
}
if panicked {
log.Debugw("Internal panic while parsing data segment index")
return nil, fmt.Errorf("could not parse data segment index because of an internal panic")
}

log.Debugw("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String())

if len(indexData.Entries) == 0 {
log.Debugw("No data segments found")
return nil, fmt.Errorf("no data segments found")
}

start = time.Now()

if len(indexData.Entries) < concurrency {
concurrency = len(indexData.Entries)
}

chunkSize := len(indexData.Entries) / concurrency
results := make([][]datasegment.SegmentDesc, concurrency)

var eg errgroup.Group
for i := 0; i < concurrency; i++ {
i := i
eg.Go(func() error {
start := i * chunkSize
end := start + chunkSize
if i == concurrency-1 {
end = len(indexData.Entries)
}

res, err := validateEntries(indexData.Entries[start:end])
if err != nil {
return err
}

results[i] = res

return nil
})
}

if err := eg.Wait(); err != nil {
log.Debugw("Failed to calculate valid entries", "error", err)
return nil, fmt.Errorf("could not calculate valid entries: %w", err)
}

validSegments := make([]datasegment.SegmentDesc, 0, len(indexData.Entries))
for _, res := range results {
validSegments = append(validSegments, res...)
}

if len(validSegments) == 0 {
log.Debugw("No valid data segments found")
return nil, fmt.Errorf("no valid data segments found")
}

log.Debugw("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String())
start = time.Now()
readsCnt = 0

recs := make([]model.Record, 0)
for _, s := range validSegments {
segOffset := s.UnpaddedOffest()
segSize := s.UnpaddedLength()

lr := io.NewSectionReader(r, int64(segOffset), int64(segSize))

cr = &countingReader{
Reader: lr,
cnt: &readsCnt,
}

subRecs, err := parseRecordsFromCar(bufio.NewReaderSize(cr, PodsiBuffesrSize))
if err != nil {
log.Debugw("Failed to parse data segment", "error", err)
return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err)
}
for i := range subRecs {
subRecs[i].Offset += segOffset
}
recs = append(recs, subRecs...)
}

log.Debugw("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String())

return recs, nil
}

// parseDataSegmentIndex is a temporary wrapper around datasegment.ParseDataSegmentIndex that exists only as a workaround
// for "slice bounds out of range" panic inside lotus. This funciton should be removed once the panic is fixed.
Expand Down

0 comments on commit ec52755

Please sign in to comment.