Skip to content

Commit 24a9bb3

Browse files
LexLuthrdirkmc
andauthored
fix: improve stalled retrieval cancellation (#1449)
* refactor stalled retrieval cancel * add ctx with timeout * implement suggestions * update err wrapping * fix: set short cancel timeout for unpaid retrievals only --------- Co-authored-by: Dirk McCormick <[email protected]>
1 parent fd35f62 commit 24a9bb3

File tree

3 files changed

+31
-15
lines changed

3 files changed

+31
-15
lines changed

node/config/def.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func DefaultBoost() *Boost {
9898

9999
DealProposalLogDuration: Duration(time.Hour * 24),
100100
RetrievalLogDuration: Duration(time.Hour * 24),
101-
StalledRetrievalTimeout: Duration(time.Minute * 30),
101+
StalledRetrievalTimeout: Duration(time.Second * 30),
102102

103103
RetrievalPricing: &lotus_config.RetrievalPricing{
104104
Strategy: RetrievalPricingDefaultMode,

retrievalmarket/rtvllog/retrieval_log.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rtvllog
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67
"time"
78

@@ -295,22 +296,36 @@ func (r *RetrievalLog) gcRetrievals(ctx context.Context) {
295296
continue
296297
}
297298

299+
var wg sync.WaitGroup
298300
for _, row := range rows {
299-
chid := datatransfer.ChannelID{Initiator: row.PeerID, Responder: row.LocalPeerID, ID: row.TransferID}
300-
// Try to cancel via unpaid graphsync first
301-
err := r.gsur.CancelTransfer(ctx, row.TransferID, &row.PeerID)
302-
303-
if err != nil {
304-
// Attempt to terminate legacy, paid retrievals if we didnt cancel a free retrieval
305-
err = r.dataTransfer.CloseDataTransferChannel(ctx, chid)
306-
}
307-
308-
if err != nil {
309-
log.Debugw("error canceling retrieval", "dealID", row.DealID, "err", err)
310-
} else {
311-
log.Infof("Canceled retrieval %s, older than %s", row.DealID, r.stalledTimeout)
301+
if row.TransferID <= 0 {
302+
continue
312303
}
304+
wg.Add(1)
305+
go func(s RetrievalDealState) {
306+
// Don't wait for more than 5 seconds for the cancel
307+
// message to be sent when cancelling an unpaid retrieval
308+
unpaidRtrvCtx, cancel := context.WithTimeout(ctx, time.Second*5)
309+
defer cancel()
310+
defer wg.Done()
311+
312+
// Try to cancel an unpaid retrieval with the given transfer id first
313+
err := r.gsur.CancelTransfer(unpaidRtrvCtx, s.TransferID, &s.PeerID)
314+
if err != nil && errors.Is(err, server.ErrRetrievalNotFound) {
315+
// Couldn't find an unpaid retrieval with that id, try
316+
// to cancel a legacy, paid retrieval
317+
chid := datatransfer.ChannelID{Initiator: s.PeerID, Responder: s.LocalPeerID, ID: s.TransferID}
318+
err = r.dataTransfer.CloseDataTransferChannel(ctx, chid)
319+
}
320+
321+
if err != nil {
322+
log.Debugw("error canceling retrieval", "dealID", s.DealID, "err", err)
323+
} else {
324+
log.Infof("Canceled retrieval %s, older than %s", s.DealID, r.stalledTimeout)
325+
}
326+
}(row)
313327
}
328+
wg.Wait()
314329
}
315330
}
316331
}

retrievalmarket/server/gsunpaidretrieval.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
)
3131

3232
var log = logging.Logger("boostgs")
33+
var ErrRetrievalNotFound = fmt.Errorf("no transfer found")
3334

3435
var incomingReqExtensions = []graphsync.ExtensionName{
3536
extension.ExtensionIncomingRequest1_1,
@@ -175,7 +176,7 @@ func (g *GraphsyncUnpaidRetrieval) CancelTransfer(ctx context.Context, id datatr
175176

176177
if state == nil {
177178
g.activeRetrievalsLk.Unlock()
178-
return fmt.Errorf("no transfer with id %d", id)
179+
return fmt.Errorf("failed to cancel with id %d: %w", id, ErrRetrievalNotFound)
179180
}
180181

181182
rcpt := state.cs.recipient

0 commit comments

Comments
 (0)