-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathsetup_bitswap.go
109 lines (93 loc) · 3.57 KB
/
setup_bitswap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main
import (
"context"
"time"
"github.com/ipfs/boxo/routing/providerquerymanager"
"github.com/ipfs/boxo/bitswap"
bsclient "github.com/ipfs/boxo/bitswap/client"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)
func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h)
// Custom query manager with the content router and the host
// and our custom options to overwrite the default.
pqm, err := providerquerymanager.New(h, cr,
providerquerymanager.WithMaxInProcessRequests(cfg.RoutingMaxRequests),
providerquerymanager.WithMaxProviders(cfg.RoutingMaxProviders),
providerquerymanager.WithMaxTimeout(cfg.RoutingMaxTimeout),
)
if err != nil {
panic(err)
}
context.AfterFunc(ctx, func() {
pqm.Close()
})
// --- Client Options
// bitswap.RebroadcastDelay: default is 1 minute to search for a random
// live-want (1 CID). I think we want to search for random live-wants more
// often although probably it overlaps with general rebroadcasts.
rebroadcastDelay := delay.Fixed(10 * time.Second)
// bitswap.ProviderSearchDelay: default is 1 second.
providerSearchDelay := 1 * time.Second
// --- Bitswap Client Options
clientOpts := []bsclient.Option{
bsclient.RebroadcastDelay(rebroadcastDelay),
bsclient.ProviderSearchDelay(providerSearchDelay),
bsclient.WithoutDuplicatedBlockStats(),
bsclient.WithDefaultProviderQueryManager(false), // we pass it in manually
}
// If peering and shared cache are both enabled, we initialize both a
// Client and a Server with custom request filter and custom options.
// client+server is more expensive but necessary when deployment requires
// serving cached blocks to safelisted peerids
if cfg.PeeringSharedCache && len(cfg.Peering) > 0 {
var peerBlockRequestFilter bsserver.PeerBlockRequestFilter
// Set up request filter to only respond to request for safelisted (peered) nodes
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}
// turn bitswap clients option into bitswap options
var opts []bitswap.Option
for _, o := range clientOpts {
opts = append(opts, bitswap.WithClientOption(o))
}
// ---- Server Options
opts = append(opts,
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
bitswap.WithWantHaveReplaceSize(cfg.BitswapWantHaveReplaceSize),
)
// Initialize client+server
bswap := bitswap.New(bsctx, bn, pqm, bstore, opts...)
bn.Start(bswap)
return &noNotifyExchange{bswap}
}
// By default, rainbow runs with bitswap client alone
bswap := bsclient.New(bsctx, bn, pqm, bstore, clientOpts...)
bn.Start(bswap)
return bswap
}
type noNotifyExchange struct {
exchange.Interface
}
func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}