Skip to content

Commit 3425fbc

Browse files
committed
feat: add multistream http download
* Add a feature to split http download into multiple streams * Downnloads over libp2p are always done in one stream only * Add NChunks configuration parameter * Fix tests to handle not just open-ended range requests
1 parent e1090ba commit 3425fbc

File tree

7 files changed

+222
-35
lines changed

7 files changed

+222
-35
lines changed

node/config/def.go

+3
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ func DefaultBoost() *Boost {
213213
Port: 3104,
214214
},
215215
},
216+
HttpDownload: HttpDownloadConfig{
217+
NChunks: 5,
218+
},
216219
}
217220
return cfg
218221
}

node/config/doc_gen.go

+16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/config/types.go

+8
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type Boost struct {
4747
Tracing TracingConfig
4848
LocalIndexDirectory LocalIndexDirectoryConfig
4949
ContractDeals ContractDealsConfig
50+
HttpDownload HttpDownloadConfig
5051

5152
// Lotus configs
5253
LotusDealmaking lotus_config.DealmakingConfig
@@ -421,3 +422,10 @@ type LocalIndexDirectoryConfig struct {
421422
type LocalIndexDirectoryLeveldbConfig struct {
422423
Enabled bool
423424
}
425+
426+
type HttpDownloadConfig struct {
427+
// NChunks is a number of chunks to split HTTP downloads into. Each chunk is downloaded in the goroutine of its own
428+
// which improves the overall download speed. NChunks is always equal to 1 for libp2p transport because libp2p server
429+
// doesn't support range requests yet. NChunks must be greater than 0 and less than 16, with the default of 5.
430+
NChunks int
431+
}

node/modules/storageminer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(
622622
SealingPipelineCacheTimeout: time.Duration(cfg.Dealmaking.SealingPipelineCacheTimeout),
623623
}
624624
dl := logs.NewDealLogger(logsDB)
625-
tspt := httptransport.New(h, dl)
625+
tspt := httptransport.New(h, dl, httptransport.NChunksOpt(cfg.HttpDownload.NChunks))
626626
prov, err := storagemarket.NewProvider(prvCfg, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb, commpc,
627627
sps, cdm, df, logsSqlDB.db, logsDB, piecedirectory, ip, lp, &signatureVerifier{a}, dl, tspt)
628628
if err != nil {

testutil/httptestfileservers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ func HttpTestDisconnectingServer(t *testing.T, dir string, afterEvery int64) *ht
148148
svr := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
149149
// process the start offset
150150
offset := r.Header.Get("Range")
151-
finalOffset := strings.TrimSuffix(strings.TrimPrefix(offset, "bytes="), "-")
152-
start, _ := strconv.ParseInt(finalOffset, 10, 64)
151+
startend := strings.Split(strings.TrimPrefix(offset, "bytes="), "-")
152+
start, _ := strconv.ParseInt(startend[0], 10, 64)
153153
// only send `afterEvery` bytes and then disconnect
154154
end := start + afterEvery
155155

transport/httptransport/http_transport.go

+128-22
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/jpillora/backoff"
2020
p2phttp "github.com/libp2p/go-libp2p-http"
2121
"github.com/libp2p/go-libp2p/core/host"
22+
"golang.org/x/sync/errgroup"
2223
)
2324

2425
const (
@@ -30,6 +31,13 @@ const (
3031
maxBackOff = 10 * time.Minute
3132
factor = 1.5
3233
maxReconnectAttempts = 15
34+
35+
nChunks = 5
36+
// minChunkBytes is a minimum size of a chunk. If splitting download into nChunks results into a chunk being smaller than minChunkBytes
37+
// then the total number of chunks will be reduced. For example if nChunks is 5, number of remaining bytes is 100 and minChunkBytes is 50
38+
// then download will be performed in 2 parallel streams.
39+
// 1 Mib
40+
minChunkBytes = 1048576
3341
)
3442

3543
type httpError struct {
@@ -50,6 +58,15 @@ func BackOffRetryOpt(minBackoff, maxBackoff time.Duration, factor, maxReconnectA
5058
}
5159
}
5260

61+
func NChunksOpt(nChunks int) Option {
62+
return func(h *httpTransport) {
63+
if nChunks < 1 || nChunks > 16 {
64+
return
65+
}
66+
h.nChunks = nChunks
67+
}
68+
}
69+
5370
type httpTransport struct {
5471
libp2pHost host.Host
5572
libp2pClient *http.Client
@@ -59,6 +76,8 @@ type httpTransport struct {
5976
backOffFactor float64
6077
maxReconnectAttempts float64
6178

79+
nChunks int
80+
6281
dl *logs.DealLogger
6382
}
6483

@@ -69,6 +88,7 @@ func New(host host.Host, dealLogger *logs.DealLogger, opts ...Option) *httpTrans
6988
maxBackoffWait: maxBackOff,
7089
backOffFactor: factor,
7190
maxReconnectAttempts: maxReconnectAttempts,
91+
nChunks: nChunks,
7292
dl: dealLogger.Subsystem("http-transport"),
7393
}
7494
for _, o := range opts {
@@ -120,6 +140,12 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI
120140
}
121141
h.dl.Infow(duuid, "existing file size", "file size", fileSize, "deal size", dealInfo.DealSize)
122142

143+
// default to a single stream for libp2p urls as libp2p server doesn't support range requests
144+
nChunks := h.nChunks
145+
if u.Scheme == "libp2p" {
146+
nChunks = 1
147+
}
148+
123149
// construct the transfer instance that will act as the transfer handler
124150
tctx, cancel := context.WithCancel(ctx)
125151
t := &transfer{
@@ -136,6 +162,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI
136162
},
137163
maxReconnectAttempts: h.maxReconnectAttempts,
138164
dl: h.dl,
165+
nChunks: nChunks,
139166
}
140167

141168
cleanupFns := []func(){
@@ -216,16 +243,13 @@ type transfer struct {
216243

217244
client *http.Client
218245
dl *logs.DealLogger
246+
247+
nChunks int
219248
}
220249

221250
func (t *transfer) execute(ctx context.Context) error {
222251
duuid := t.dealInfo.DealUuid
223252
for {
224-
// construct request
225-
req, err := http.NewRequest("GET", t.tInfo.URL, nil)
226-
if err != nil {
227-
return fmt.Errorf("failed to create http req: %w", err)
228-
}
229253

230254
// get the number of bytes already received (the size of the output file)
231255
st, err := os.Stat(t.dealInfo.OutputFile)
@@ -234,15 +258,6 @@ func (t *transfer) execute(ctx context.Context) error {
234258
}
235259
t.nBytesReceived = st.Size()
236260

237-
// add request headers
238-
for name, val := range t.tInfo.Headers {
239-
req.Header.Set(name, val)
240-
}
241-
242-
// add range req to start reading from the last byte we have in the output file
243-
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", t.nBytesReceived))
244-
// init the request with the transfer context
245-
req = req.WithContext(ctx)
246261
// open output file in append-only mode for writing
247262
of, err := os.OpenFile(t.dealInfo.OutputFile, os.O_APPEND|os.O_WRONLY, 0644)
248263
if err != nil {
@@ -252,7 +267,75 @@ func (t *transfer) execute(ctx context.Context) error {
252267

253268
// start the http transfer
254269
remaining := t.dealInfo.DealSize - t.nBytesReceived
255-
reqErr := t.doHttp(ctx, req, of, remaining)
270+
271+
// split download into chunks
272+
// each chunk should be bigger than minChunkBytes
273+
nChunks := int(remaining/minChunkBytes) + 1
274+
if nChunks > t.nChunks {
275+
nChunks = t.nChunks
276+
}
277+
278+
chunkLen := remaining / int64(nChunks)
279+
var pch, nch chan bool
280+
group := errgroup.Group{}
281+
282+
nBytesReceived := t.nBytesReceived
283+
284+
for i := 0; i < nChunks; i++ {
285+
isLast := i == nChunks-1
286+
287+
// construct request
288+
req, err := http.NewRequest("GET", t.tInfo.URL, nil)
289+
if err != nil {
290+
return fmt.Errorf("failed to create http req: %w", err)
291+
}
292+
293+
// add request headers
294+
for name, val := range t.tInfo.Headers {
295+
req.Header.Set(name, val)
296+
}
297+
298+
chunkStart := nBytesReceived + int64(i)*chunkLen
299+
var chunkEnd int64
300+
var srange string
301+
if isLast {
302+
chunkEnd = t.dealInfo.DealSize
303+
srange = fmt.Sprintf("bytes=%d-", chunkStart)
304+
} else {
305+
chunkEnd = chunkStart + chunkLen
306+
srange = fmt.Sprintf("bytes=%d-%d", chunkStart, chunkEnd)
307+
}
308+
309+
// add range req to start reading from the last byte we have in the output file
310+
req.Header.Set("Range", srange)
311+
// init the request with the transfer context
312+
req = req.WithContext(ctx)
313+
314+
if isLast {
315+
nch = nil
316+
} else {
317+
nch = make(chan bool, 1)
318+
}
319+
cpch := pch
320+
cnch := nch
321+
group.Go(func() error {
322+
err, success := t.doHttp(ctx, req, of, chunkEnd-chunkStart, cpch)
323+
// signal to the next download goroutine to either proceed writing into the file if the current download has finished successfully
324+
// or to abandon if it has not
325+
if cnch != nil {
326+
cnch <- (err == nil && success)
327+
close(cnch)
328+
}
329+
return err
330+
})
331+
332+
pch = nch
333+
}
334+
var reqErr *httpError
335+
if err := group.Wait(); err != nil {
336+
reqErr = err.(*httpError)
337+
}
338+
256339
if reqErr == nil {
257340
t.dl.Infow(duuid, "http transfer completed successfully")
258341
// if there's no error, transfer was successful
@@ -326,22 +409,45 @@ func (t *transfer) execute(ctx context.Context) error {
326409
return nil
327410
}
328411

329-
func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer, toRead int64) *httpError {
412+
func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer, toRead int64, pch chan bool) (*httpError, bool) {
330413
duid := t.dealInfo.DealUuid
331414
t.dl.Infow(duid, "sending http request", "received", t.nBytesReceived, "remaining",
332415
toRead, "range-rq", req.Header.Get("Range"))
333416

334417
// send http request and validate response
335418
resp, err := t.client.Do(req)
336419
if err != nil {
337-
return &httpError{error: fmt.Errorf("failed to send http req: %w", err)}
420+
return &httpError{error: fmt.Errorf("failed to send http req: %w", err)}, false
338421
}
339422
// we should either get back a 200 or a 206 -> anything else means something has gone wrong and we return an error.
340423
defer resp.Body.Close() // nolint
424+
425+
// do not fail the whole download if one of the goroutines' range couldn't be satisfied
426+
// if not enough bytes have ben donwloaded, then a length mismatch error will be raised up the stack
427+
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
428+
return nil, false
429+
}
430+
341431
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
342432
return &httpError{
343433
error: fmt.Errorf("http req failed: code: %d, status: %s", resp.StatusCode, resp.Status),
344434
code: resp.StatusCode,
435+
}, false
436+
}
437+
438+
// if previous download goroutine has failed - don't write into the file. Downloads can be performed in parallel while writing
439+
// must be done strictly in sequential order
440+
if pch != nil {
441+
select {
442+
case success := <-pch:
443+
if !success {
444+
return nil, false
445+
}
446+
case <-ctx.Done():
447+
if ctx.Err() != nil {
448+
return &httpError{error: ctx.Err()}, false
449+
}
450+
return nil, false
345451
}
346452
}
347453

@@ -351,7 +457,7 @@ func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer,
351457
for {
352458
if ctx.Err() != nil {
353459
t.dl.LogError(duid, "stopped reading http response: context canceled", ctx.Err())
354-
return &httpError{error: ctx.Err()}
460+
return &httpError{error: ctx.Err()}, false
355461
}
356462
nr, readErr := limitR.Read(buf)
357463

@@ -362,9 +468,9 @@ func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer,
362468
// if the number of read and written bytes don't match -> something has gone wrong, abort the http req.
363469
if nw < 0 || nr != nw {
364470
if writeErr != nil {
365-
return &httpError{error: fmt.Errorf("failed to write to output file: %w", writeErr)}
471+
return &httpError{error: fmt.Errorf("failed to write to output file: %w", writeErr)}, false
366472
}
367-
return &httpError{error: fmt.Errorf("read-write mismatch writing to the output file, read=%d, written=%d", nr, nw)}
473+
return &httpError{error: fmt.Errorf("read-write mismatch writing to the output file, read=%d, written=%d", nr, nw)}, false
368474
}
369475

370476
t.nBytesReceived = t.nBytesReceived + int64(nw)
@@ -375,10 +481,10 @@ func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer,
375481
// the http stream we're reading from has sent us an EOF, nothing to do here.
376482
if readErr == io.EOF {
377483
t.dl.Infow(duid, "http server sent EOF", "received", t.nBytesReceived, "deal-size", t.dealInfo.DealSize)
378-
return nil
484+
return nil, true
379485
}
380486
if readErr != nil {
381-
return &httpError{error: fmt.Errorf("error reading from http response stream: %w", readErr)}
487+
return &httpError{error: fmt.Errorf("error reading from http response stream: %w", readErr)}, false
382488
}
383489
}
384490
}

0 commit comments

Comments
 (0)