Skip to content

Commit 3342a5f

Browse files
authored
fix: parametrise add index parallelism (#1827)
* fix: parametrise add index parallelism Add two new configuration parameters: * AddIndexConcurrency for boostd that sets the number of concurrent tasks that each add index operation can be split into; * insert-concurrency for boost-data that sets the number of concurrent inserts that each add index operation can be split into; * change default add index concurrency to 1 * check max insert batch size
1 parent 3233c51 commit 3342a5f

File tree

10 files changed

+97
-26
lines changed

10 files changed

+97
-26
lines changed

cmd/boostd/recover.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/filecoin-project/boost/db"
1919
bdclient "github.com/filecoin-project/boost/extern/boostd-data/client"
2020
"github.com/filecoin-project/boost/extern/boostd-data/model"
21+
"github.com/filecoin-project/boost/node/config"
2122
"github.com/filecoin-project/boost/piecedirectory"
2223
"github.com/filecoin-project/go-address"
2324
"github.com/filecoin-project/go-commp-utils/writer"
@@ -103,6 +104,11 @@ var lidCmd = &cli.Command{
103104
Usage: "",
104105
Value: 4,
105106
},
107+
&cli.IntFlag{
108+
Name: "add-index-concurrency",
109+
Usage: "the maximum number of parallel tasks that a single add index operation can be split into",
110+
Value: config.DefaultAddIndexConcurrency,
111+
},
106112
&cli.BoolFlag{
107113
Name: "ignore-commp",
108114
Usage: "whether we should ignore sanity check of local data vs chain data",
@@ -203,7 +209,7 @@ func action(cctx *cli.Context) error {
203209
return fmt.Errorf("connecting to local index directory service: %w", err)
204210
}
205211
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
206-
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
212+
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"), piecedirectory.WithAddIndexConcurrency(cctx.Int("add-index-concurrency")))
207213
pd.Start(ctx)
208214
}
209215

cmd/booster-bitswap/run.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
bdclient "github.com/filecoin-project/boost/extern/boostd-data/client"
1414
"github.com/filecoin-project/boost/extern/boostd-data/shared/tracing"
1515
"github.com/filecoin-project/boost/metrics"
16+
"github.com/filecoin-project/boost/node/config"
1617
"github.com/filecoin-project/boost/piecedirectory"
1718
lcli "github.com/filecoin-project/lotus/cli"
1819
"github.com/libp2p/go-libp2p/core/peer"
@@ -67,6 +68,11 @@ var runCmd = &cli.Command{
6768
Usage: "the maximum number of add index operations that can run in parallel",
6869
Value: 4,
6970
},
71+
&cli.IntFlag{
72+
Name: "add-index-concurrency",
73+
Usage: "the maximum number of parallel tasks that a single add index operation can be split into",
74+
Value: config.DefaultAddIndexConcurrency,
75+
},
7076
&cli.StringFlag{
7177
Name: "proxy",
7278
Usage: "the multiaddr of the libp2p proxy that this node connects through",
@@ -234,7 +240,8 @@ var runCmd = &cli.Command{
234240
if err != nil {
235241
return fmt.Errorf("starting block filter: %w", err)
236242
}
237-
pd := piecedirectory.NewPieceDirectory(cl, sa, cctx.Int("add-index-throttle"))
243+
pd := piecedirectory.NewPieceDirectory(cl, sa, cctx.Int("add-index-throttle"),
244+
piecedirectory.WithAddIndexConcurrency(cctx.Int("add-index-concurrency")))
238245
remoteStore := remoteblockstore.NewRemoteBlockstore(pd, &bitswapBlockMetrics)
239246
server := NewBitswapServer(remoteStore, host, multiFilter)
240247

cmd/booster-http/run.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/filecoin-project/boost/extern/boostd-data/model"
1919
"github.com/filecoin-project/boost/extern/boostd-data/shared/tracing"
2020
"github.com/filecoin-project/boost/metrics"
21+
"github.com/filecoin-project/boost/node/config"
2122
"github.com/filecoin-project/boost/piecedirectory"
2223
"github.com/filecoin-project/dagstore/mount"
2324
"github.com/filecoin-project/go-address"
@@ -75,6 +76,11 @@ var runCmd = &cli.Command{
7576
Usage: "the maximum number of add index operations that can run in parallel",
7677
Value: 4,
7778
},
79+
&cli.IntFlag{
80+
Name: "add-index-concurrency",
81+
Usage: "the maximum number of parallel tasks that a single add index operation can be split into",
82+
Value: config.DefaultAddIndexConcurrency,
83+
},
7884
&cli.StringFlag{
7985
Name: "api-fullnode",
8086
Usage: "the endpoint for the full node API",
@@ -242,7 +248,8 @@ var runCmd = &cli.Command{
242248
defer sa.Close()
243249

244250
// Create the server API
245-
pd := piecedirectory.NewPieceDirectory(cl, sa, cctx.Int("add-index-throttle"))
251+
pd := piecedirectory.NewPieceDirectory(cl, sa, cctx.Int("add-index-throttle"),
252+
piecedirectory.WithAddIndexConcurrency(cctx.Int("add-index-concurrency")))
246253

247254
opts := &HttpServerOptions{
248255
ServePieces: servePieces,

extern/boostd-data/cmd/run.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import (
2828
"go.opencensus.io/tag"
2929
)
3030

31+
const (
32+
DefaultInsertConcurrency = 4
33+
)
34+
3135
var runCmd = &cli.Command{
3236
Name: "run",
3337
Subcommands: []*cli.Command{
@@ -104,6 +108,11 @@ var yugabyteCmd = &cli.Command{
104108
Usage: "postgres connect string eg 'postgresql://postgres:postgres@localhost'",
105109
Required: true,
106110
},
111+
&cli.IntFlag{
112+
Name: "insert-concurrency",
113+
Usage: "the number of concurrent tasks that each add index operation is split into",
114+
Value: DefaultInsertConcurrency,
115+
},
107116
&cli.IntFlag{
108117
Name: "CQLTimeout",
109118
Usage: "client timeout value in seconds for CQL queries",
@@ -115,9 +124,10 @@ var yugabyteCmd = &cli.Command{
115124
Action: func(cctx *cli.Context) error {
116125
// Create a yugabyte data service
117126
settings := yugabyte.DBSettings{
118-
Hosts: cctx.StringSlice("hosts"),
119-
ConnectString: cctx.String("connect-string"),
120-
CQLTimeout: cctx.Int("CQLTimeout"),
127+
Hosts: cctx.StringSlice("hosts"),
128+
ConnectString: cctx.String("connect-string"),
129+
CQLTimeout: cctx.Int("CQLTimeout"),
130+
InsertConcurrency: cctx.Int("insert-concurrency"),
121131
}
122132

123133
// One of the migrations requires a miner address. But we don't want to
@@ -223,6 +233,11 @@ var yugabyteMigrateCmd = &cli.Command{
223233
Name: "miner-address",
224234
Usage: "default miner address eg f1234",
225235
},
236+
&cli.IntFlag{
237+
Name: "insert-concurrency",
238+
Usage: "the number of concurrent tasks that each add index operation is split into",
239+
Value: DefaultInsertConcurrency,
240+
},
226241
&cli.IntFlag{
227242
Name: "CQLTimeout",
228243
Usage: "client timeout value in seconds for CQL queries",
@@ -236,9 +251,10 @@ var yugabyteMigrateCmd = &cli.Command{
236251

237252
// Create a yugabyte data service
238253
settings := yugabyte.DBSettings{
239-
Hosts: cctx.StringSlice("hosts"),
240-
ConnectString: cctx.String("connect-string"),
241-
CQLTimeout: cctx.Int("CQLTimeout"),
254+
Hosts: cctx.StringSlice("hosts"),
255+
ConnectString: cctx.String("connect-string"),
256+
CQLTimeout: cctx.Int("CQLTimeout"),
257+
InsertConcurrency: cctx.Int("insert-concurrency"),
242258
}
243259

244260
maddr := migrations.DisabledMinerAddr
@@ -280,6 +296,11 @@ var yugabyteAddIndexCmd = &cli.Command{
280296
Usage: "filename must be same as pieceCID",
281297
Required: true,
282298
},
299+
&cli.IntFlag{
300+
Name: "insert-concurrency",
301+
Usage: "the number of concurrent tasks that each add index operation is split into",
302+
Value: DefaultInsertConcurrency,
303+
},
283304
&cli.IntFlag{
284305
Name: "CQLTimeout",
285306
Usage: "client timeout value in seconds for CQL queries",
@@ -292,9 +313,10 @@ var yugabyteAddIndexCmd = &cli.Command{
292313

293314
// Create a yugabyte data service
294315
settings := yugabyte.DBSettings{
295-
Hosts: cctx.StringSlice("hosts"),
296-
ConnectString: cctx.String("connect-string"),
297-
CQLTimeout: cctx.Int("CQLTimeout"),
316+
Hosts: cctx.StringSlice("hosts"),
317+
ConnectString: cctx.String("connect-string"),
318+
CQLTimeout: cctx.Int("CQLTimeout"),
319+
InsertConcurrency: cctx.Int("insert-concurrency"),
298320
}
299321

300322
migrator := yugabyte.NewMigrator(settings, migrations.DisabledMinerAddr)

extern/boostd-data/yugabyte/service.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ const CqlTimeout = 60
3434

3535
// The Cassandra driver has a 50k limit on batch statements. Keeping
3636
// batch size small makes sure we're under the limit.
37-
const InsertBatchSize = 10000
37+
const InsertBatchSize = 10_000
38+
const MaxInsertBatchSize = 50_000
3839

3940
const InsertConcurrency = 4
4041

@@ -81,6 +82,9 @@ func NewStore(settings DBSettings, migrator *Migrator, opts ...StoreOpt) *Store
8182
if settings.InsertBatchSize == 0 {
8283
settings.InsertBatchSize = InsertBatchSize
8384
}
85+
if settings.InsertBatchSize > MaxInsertBatchSize {
86+
settings.InsertBatchSize = MaxInsertBatchSize
87+
}
8488
if settings.InsertConcurrency == 0 {
8589
settings.InsertConcurrency = InsertConcurrency
8690
}

node/config/def.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ const (
2323
// CommP and traversing a DAG with graphsync; invokes a budget on DAG depth and density.
2424
var MaxTraversalLinks uint64 = 32 * (1 << 20)
2525

26+
const (
27+
DefaultAddIndexConcurrency = 1
28+
)
29+
2630
func init() {
2731
if envMaxTraversal, err := strconv.ParseUint(os.Getenv("LOTUS_MAX_TRAVERSAL_LINKS"), 10, 64); err == nil {
2832
MaxTraversalLinks = envMaxTraversal
@@ -87,6 +91,7 @@ func DefaultBoost() *Boost {
8791
Enabled: false,
8892
},
8993
ParallelAddIndexLimit: 4,
94+
AddIndexConcurrency: DefaultAddIndexConcurrency,
9095
EmbeddedServicePort: 8042,
9196
ServiceApiInfo: "",
9297
ServiceRPCTimeout: Duration(15 * time.Minute),

node/config/doc_gen.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/config/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,9 @@ type LocalIndexDirectoryConfig struct {
418418
// the piece from the sealing subsystem, creates an index of where each block
419419
// is in the piece, and adds the index to the local index directory.
420420
ParallelAddIndexLimit int
421+
// AddIndexConcurrency sets the number of concurrent tasks that each add index operation is split into.
422+
// This setting is usefull to better utilise bandwidth between boostd and boost-data. The default value is 4.
423+
AddIndexConcurrency int
421424
// The port that the embedded local index directory data service runs on.
422425
// Set this value to zero to disable the embedded local index directory data service
423426
// (in that case the local index directory data service must be running externally)

node/modules/piecedirectory.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ func NewPieceDirectory(cfg *config.Boost) func(lc fx.Lifecycle, maddr dtypes.Min
136136

137137
// Create the piece directory implementation
138138
pdctx, cancel := context.WithCancel(context.Background())
139-
pd := piecedirectory.NewPieceDirectory(store, sa, cfg.LocalIndexDirectory.ParallelAddIndexLimit)
139+
pd := piecedirectory.NewPieceDirectory(store, sa,
140+
cfg.LocalIndexDirectory.ParallelAddIndexLimit,
141+
piecedirectory.WithAddIndexConcurrency(cfg.LocalIndexDirectory.AddIndexConcurrency))
140142
lc.Append(fx.Hook{
141143
OnStart: func(ctx context.Context) error {
142144
err := sa.Start(ctx, log)

piecedirectory/piecedirectory.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/filecoin-project/boost/extern/boostd-data/model"
1414
"github.com/filecoin-project/boost/extern/boostd-data/shared/tracing"
1515
bdtypes "github.com/filecoin-project/boost/extern/boostd-data/svc/types"
16+
"github.com/filecoin-project/boost/node/config"
1617
"github.com/filecoin-project/boost/piecedirectory/types"
1718
"github.com/filecoin-project/go-address"
1819
"github.com/filecoin-project/go-state-types/abi"
@@ -37,16 +38,23 @@ import (
3738
var log = logging.Logger("piecedirectory")
3839

3940
const (
40-
MaxCachedReaders = 128
41-
AddIndexConcurrency = 8
41+
MaxCachedReaders = 128
4242
)
4343

44-
type Settings struct {
44+
type settings struct {
4545
addIndexConcurrency int
4646
}
4747

48+
type Option func(*settings)
49+
50+
func WithAddIndexConcurrency(c int) Option {
51+
return func(s *settings) {
52+
s.addIndexConcurrency = c
53+
}
54+
}
55+
4856
type PieceDirectory struct {
49-
settings *Settings
57+
settings *settings
5058
store *bdclient.Store
5159
pieceReader types.PieceReader
5260

@@ -60,13 +68,7 @@ type PieceDirectory struct {
6068
addIdxOpByCid sync.Map
6169
}
6270

63-
func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThrottleSize int) *PieceDirectory {
64-
return NewPieceDirectoryWithSettings(store, pr, addIndexThrottleSize, &Settings{
65-
addIndexConcurrency: AddIndexConcurrency,
66-
})
67-
}
68-
69-
func NewPieceDirectoryWithSettings(store *bdclient.Store, pr types.PieceReader, addIndexThrottleSize int, settings *Settings) *PieceDirectory {
71+
func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThrottleSize int, opts ...Option) *PieceDirectory {
7072
prCache := ttlcache.NewCache()
7173
_ = prCache.SetTTL(30 * time.Second)
7274
prCache.SetCacheSizeLimit(MaxCachedReaders)
@@ -77,7 +79,13 @@ func NewPieceDirectoryWithSettings(store *bdclient.Store, pr types.PieceReader,
7779
pieceReaderCache: prCache,
7880
addIdxThrottleSize: addIndexThrottleSize,
7981
addIdxThrottle: make(chan struct{}, addIndexThrottleSize),
80-
settings: settings,
82+
settings: &settings{
83+
addIndexConcurrency: config.DefaultAddIndexConcurrency,
84+
},
85+
}
86+
87+
for _, opt := range opts {
88+
opt(pd.settings)
8189
}
8290

8391
expireCallback := func(key string, reason ttlcache.EvictionReason, value interface{}) {

0 commit comments

Comments
 (0)