Skip to content

Commit

Permalink
Make processor and forward write max batch sizes configurable (#522)
Browse files Browse the repository at this point in the history
Mke processor and forward write max batch sizes configurable
  • Loading branch information
purplefox authored Aug 16, 2022
1 parent eb8c698 commit 3b74921
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 8 deletions.
2 changes: 2 additions & 0 deletions cmd/pranadb/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func createConfigWithAllFields() conf.Config {
RaftHeartbeatRTT: 30,
DisableFsync: true,
AggregationCacheSizeRows: 1234,
MaxProcessBatchSize: 777,
MaxForwardWriteBatchSize: 888,

DDProfilerTypes: "HEAP,CPU",
DDProfilerServiceName: "my-service",
Expand Down
2 changes: 2 additions & 0 deletions cmd/pranadb/testdata/config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ raft-heartbeat-rtt = 30
raft-election-rtt = 300
disable-fsync = true
aggregation-cache-size-rows = 1234
max-process-batch-size = 777
max-forward-write-batch-size = 888

dd-profiler-types = "HEAP,CPU"
dd-profiler-service-name = "my-service"
Expand Down
20 changes: 20 additions & 0 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
DefaultRaftHeartbeatRTT = 30
DefaultRaftElectionRTT = 300
DefaultAggregationCacheSizeRows = 20000
DefaultMaxProcessBatchSize = 2000
DefaultMaxForwardWriteBatchSize = 500
)

type Config struct {
Expand Down Expand Up @@ -59,6 +61,8 @@ type Config struct {
DDProfilerEnvironmentName string
DDProfilerVersionName string
AggregationCacheSizeRows int // The maximum number of rows for an aggregation to cache in memory
MaxProcessBatchSize int
MaxForwardWriteBatchSize int
}

func (c *Config) ApplyDefaults() {
Expand Down Expand Up @@ -92,6 +96,12 @@ func (c *Config) ApplyDefaults() {
if c.AggregationCacheSizeRows == 0 {
c.AggregationCacheSizeRows = DefaultAggregationCacheSizeRows
}
if c.MaxProcessBatchSize == 0 {
c.MaxProcessBatchSize = DefaultMaxProcessBatchSize
}
if c.MaxForwardWriteBatchSize == 0 {
c.MaxForwardWriteBatchSize = DefaultMaxForwardWriteBatchSize
}
}

func (c *Config) Validate() error { //nolint:gocyclo
Expand Down Expand Up @@ -191,6 +201,12 @@ func (c *Config) Validate() error { //nolint:gocyclo
if c.RaftElectionRTT < 2*c.RaftHeartbeatRTT {
return errors.NewInvalidConfigurationError("RaftElectionRTT must be > 2 * RaftHeartbeatRTT")
}
if c.MaxProcessBatchSize < 1 {
return errors.NewInvalidConfigurationError("MaxProcessBatchSize must be > 0")
}
if c.MaxForwardWriteBatchSize < 1 {
return errors.NewInvalidConfigurationError("MaxForwardWriteBatchSize must be > 0")
}
return nil
}

Expand Down Expand Up @@ -222,6 +238,8 @@ func NewDefaultConfig() *Config {
RaftHeartbeatRTT: DefaultRaftHeartbeatRTT,
RaftElectionRTT: DefaultRaftElectionRTT,
AggregationCacheSizeRows: DefaultAggregationCacheSizeRows,
MaxProcessBatchSize: DefaultMaxProcessBatchSize,
MaxForwardWriteBatchSize: DefaultMaxForwardWriteBatchSize,
}
}

Expand All @@ -231,6 +249,8 @@ func NewTestConfig(fakeKafkaID int64) *Config {
RaftHeartbeatRTT: DefaultRaftHeartbeatRTT,
RaftElectionRTT: DefaultRaftElectionRTT,
AggregationCacheSizeRows: DefaultAggregationCacheSizeRows,
MaxProcessBatchSize: DefaultMaxProcessBatchSize,
MaxForwardWriteBatchSize: DefaultMaxForwardWriteBatchSize,
NodeID: 0,
NumShards: 10,
TestServer: true,
Expand Down
16 changes: 16 additions & 0 deletions conf/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ func invalidRaftElectionRTTTooSmall() Config {
return cnf
}

func invalidMaxProcessorBatchSize() Config {
cnf := confAllFields
cnf.MaxProcessBatchSize = 0
return cnf
}

func invalidMaxForwardWriteBatchSize() Config {
cnf := confAllFields
cnf.MaxForwardWriteBatchSize = 0
return cnf
}

var invalidConfigs = []configPair{
{"PDB3000 - Invalid configuration: NodeID must be >= 0", invalidNodeIDConf()},
{"PDB3000 - Invalid configuration: NumShards must be >= 1", invalidNumShardsConf()},
Expand Down Expand Up @@ -264,6 +276,8 @@ var invalidConfigs = []configPair{
{"PDB3000 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTZero()},
{"PDB3000 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTNegative()},
{"PDB3000 - Invalid configuration: RaftElectionRTT must be > 2 * RaftHeartbeatRTT", invalidRaftElectionRTTTooSmall()},
{"PDB3000 - Invalid configuration: MaxProcessBatchSize must be > 0", invalidMaxProcessorBatchSize()},
{"PDB3000 - Invalid configuration: MaxForwardWriteBatchSize must be > 0", invalidMaxForwardWriteBatchSize()},
}

func TestValidate(t *testing.T) {
Expand Down Expand Up @@ -305,4 +319,6 @@ var confAllFields = Config{
RaftRTTMs: 100,
RaftHeartbeatRTT: 10,
RaftElectionRTT: 100,
MaxProcessBatchSize: DefaultMaxForwardWriteBatchSize,
MaxForwardWriteBatchSize: DefaultMaxForwardWriteBatchSize,
}
2 changes: 1 addition & 1 deletion push/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (p *Engine) CreateShardListener(shardID uint64) cluster.ShardListener {
if _, ok := p.schedulers[shardID]; ok {
panic(fmt.Sprintf("there is already a scheduler %d", shardID))
}
sh := sched.NewShardScheduler(shardID, p, p, p.cluster)
sh := sched.NewShardScheduler(shardID, p, p, p.cluster, p.cfg.MaxProcessBatchSize, p.cfg.MaxForwardWriteBatchSize)
p.schedulers[shardID] = sh
if p.started {
sh.Start()
Expand Down
16 changes: 9 additions & 7 deletions push/sched/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"time"
)

const maxProcessBatchRows = 2000
const maxForwardWriteBatchSize = 500

var (
rowsProcessedVec = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "pranadb_rows_processed_total",
Expand Down Expand Up @@ -53,6 +50,8 @@ type ShardScheduler struct {
loopExitWaitGroup sync.WaitGroup
queuedWriteRows int
clust cluster.Cluster
maxProcessBatchSize int
maxForwardWriteBatchSize int
rowsProcessedCounter metrics.Counter
shardLagHistogram metrics.Observer
batchProcessingTimeHistogram metrics.Observer
Expand All @@ -79,7 +78,8 @@ type WriteBatchEntry struct {
completionChannels []chan error
}

func NewShardScheduler(shardID uint64, batchHandler RowsBatchHandler, shardFailListener ShardFailListener, clust cluster.Cluster) *ShardScheduler {
func NewShardScheduler(shardID uint64, batchHandler RowsBatchHandler, shardFailListener ShardFailListener,
clust cluster.Cluster, maxProcessBatchSize int, maxForwardWriteBatchSize int) *ShardScheduler {
sShardID := fmt.Sprintf("shard-%04d", shardID)
rowsProcessedCounter := rowsProcessedVec.WithLabelValues(sShardID)
shardLagHistogram := shardLagVec.WithLabelValues(sShardID)
Expand All @@ -95,6 +95,8 @@ func NewShardScheduler(shardID uint64, batchHandler RowsBatchHandler, shardFailL
batchProcessingTimeHistogram: batchProcessingTimeHistogram,
batchSizeHistogram: batchSizeHistogram,
clust: clust,
maxProcessBatchSize: maxProcessBatchSize,
maxForwardWriteBatchSize: maxForwardWriteBatchSize,
}
ss.loopExitWaitGroup.Add(1)
return ss
Expand Down Expand Up @@ -180,8 +182,8 @@ func (s *ShardScheduler) getNextBatch() ([]cluster.ForwardRow, *WriteBatchEntry,

func (s *ShardScheduler) getRowsToProcess() []cluster.ForwardRow {
numRows := len(s.forwardRows)
if numRows > maxProcessBatchRows {
numRows = maxProcessBatchRows
if numRows > s.maxProcessBatchSize {
numRows = s.maxProcessBatchSize
}
rows := s.forwardRows[:numRows]
s.forwardRows = s.forwardRows[numRows:]
Expand All @@ -201,7 +203,7 @@ func (s *ShardScheduler) getForwardWriteBatch() *WriteBatchEntry {
combinedEntries = append(combinedEntries, batch.writeBatch[9:]...)
entries += int(numPuts)
completionChannels = append(completionChannels, batch.completionChannels[0])
if entries >= maxForwardWriteBatchSize {
if entries >= s.maxForwardWriteBatchSize {
break
}
}
Expand Down

0 comments on commit 3b74921

Please sign in to comment.