From 3b74921fa742da1e4e96665ceca8f92401026cfc Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Tue, 16 Aug 2022 07:43:51 +0100 Subject: [PATCH] Make processor and forward write max batch sizes configurable (#522) Mke processor and forward write max batch sizes configurable --- cmd/pranadb/runner_test.go | 2 ++ cmd/pranadb/testdata/config.hcl | 2 ++ conf/conf.go | 20 ++++++++++++++++++++ conf/conf_test.go | 16 ++++++++++++++++ push/engine.go | 2 +- push/sched/scheduler.go | 16 +++++++++------- 6 files changed, 50 insertions(+), 8 deletions(-) diff --git a/cmd/pranadb/runner_test.go b/cmd/pranadb/runner_test.go index 2bb27115..76f17d32 100644 --- a/cmd/pranadb/runner_test.go +++ b/cmd/pranadb/runner_test.go @@ -84,6 +84,8 @@ func createConfigWithAllFields() conf.Config { RaftHeartbeatRTT: 30, DisableFsync: true, AggregationCacheSizeRows: 1234, + MaxProcessBatchSize: 777, + MaxForwardWriteBatchSize: 888, DDProfilerTypes: "HEAP,CPU", DDProfilerServiceName: "my-service", diff --git a/cmd/pranadb/testdata/config.hcl b/cmd/pranadb/testdata/config.hcl index d3c5f9b0..4ecc0e25 100644 --- a/cmd/pranadb/testdata/config.hcl +++ b/cmd/pranadb/testdata/config.hcl @@ -57,6 +57,8 @@ raft-heartbeat-rtt = 30 raft-election-rtt = 300 disable-fsync = true aggregation-cache-size-rows = 1234 +max-process-batch-size = 777 +max-forward-write-batch-size = 888 dd-profiler-types = "HEAP,CPU" dd-profiler-service-name = "my-service" diff --git a/conf/conf.go b/conf/conf.go index 9e3c1a4b..7608ad1e 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -16,6 +16,8 @@ const ( DefaultRaftHeartbeatRTT = 30 DefaultRaftElectionRTT = 300 DefaultAggregationCacheSizeRows = 20000 + DefaultMaxProcessBatchSize = 2000 + DefaultMaxForwardWriteBatchSize = 500 ) type Config struct { @@ -59,6 +61,8 @@ type Config struct { DDProfilerEnvironmentName string DDProfilerVersionName string AggregationCacheSizeRows int // The maximum number of rows for an aggregation to cache in memory + MaxProcessBatchSize int + MaxForwardWriteBatchSize int } func (c *Config) ApplyDefaults() { @@ -92,6 +96,12 @@ func (c *Config) ApplyDefaults() { if c.AggregationCacheSizeRows == 0 { c.AggregationCacheSizeRows = DefaultAggregationCacheSizeRows } + if c.MaxProcessBatchSize == 0 { + c.MaxProcessBatchSize = DefaultMaxProcessBatchSize + } + if c.MaxForwardWriteBatchSize == 0 { + c.MaxForwardWriteBatchSize = DefaultMaxForwardWriteBatchSize + } } func (c *Config) Validate() error { //nolint:gocyclo @@ -191,6 +201,12 @@ func (c *Config) Validate() error { //nolint:gocyclo if c.RaftElectionRTT < 2*c.RaftHeartbeatRTT { return errors.NewInvalidConfigurationError("RaftElectionRTT must be > 2 * RaftHeartbeatRTT") } + if c.MaxProcessBatchSize < 1 { + return errors.NewInvalidConfigurationError("MaxProcessBatchSize must be > 0") + } + if c.MaxForwardWriteBatchSize < 1 { + return errors.NewInvalidConfigurationError("MaxForwardWriteBatchSize must be > 0") + } return nil } @@ -222,6 +238,8 @@ func NewDefaultConfig() *Config { RaftHeartbeatRTT: DefaultRaftHeartbeatRTT, RaftElectionRTT: DefaultRaftElectionRTT, AggregationCacheSizeRows: DefaultAggregationCacheSizeRows, + MaxProcessBatchSize: DefaultMaxProcessBatchSize, + MaxForwardWriteBatchSize: DefaultMaxForwardWriteBatchSize, } } @@ -231,6 +249,8 @@ func NewTestConfig(fakeKafkaID int64) *Config { RaftHeartbeatRTT: DefaultRaftHeartbeatRTT, RaftElectionRTT: DefaultRaftElectionRTT, AggregationCacheSizeRows: DefaultAggregationCacheSizeRows, + MaxProcessBatchSize: DefaultMaxProcessBatchSize, + MaxForwardWriteBatchSize: DefaultMaxForwardWriteBatchSize, NodeID: 0, NumShards: 10, TestServer: true, diff --git a/conf/conf_test.go b/conf/conf_test.go index 6f85ce67..541181b0 100644 --- a/conf/conf_test.go +++ b/conf/conf_test.go @@ -232,6 +232,18 @@ func invalidRaftElectionRTTTooSmall() Config { return cnf } +func invalidMaxProcessorBatchSize() Config { + cnf := confAllFields + cnf.MaxProcessBatchSize = 0 + return cnf +} + +func invalidMaxForwardWriteBatchSize() Config { + cnf := confAllFields + cnf.MaxForwardWriteBatchSize = 0 + return cnf +} + var invalidConfigs = []configPair{ {"PDB3000 - Invalid configuration: NodeID must be >= 0", invalidNodeIDConf()}, {"PDB3000 - Invalid configuration: NumShards must be >= 1", invalidNumShardsConf()}, @@ -264,6 +276,8 @@ var invalidConfigs = []configPair{ {"PDB3000 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTZero()}, {"PDB3000 - Invalid configuration: RaftElectionRTT must be > 0", invalidRaftElectionRTTNegative()}, {"PDB3000 - Invalid configuration: RaftElectionRTT must be > 2 * RaftHeartbeatRTT", invalidRaftElectionRTTTooSmall()}, + {"PDB3000 - Invalid configuration: MaxProcessBatchSize must be > 0", invalidMaxProcessorBatchSize()}, + {"PDB3000 - Invalid configuration: MaxForwardWriteBatchSize must be > 0", invalidMaxForwardWriteBatchSize()}, } func TestValidate(t *testing.T) { @@ -305,4 +319,6 @@ var confAllFields = Config{ RaftRTTMs: 100, RaftHeartbeatRTT: 10, RaftElectionRTT: 100, + MaxProcessBatchSize: DefaultMaxForwardWriteBatchSize, + MaxForwardWriteBatchSize: DefaultMaxForwardWriteBatchSize, } diff --git a/push/engine.go b/push/engine.go index 78c21c66..00ea5836 100644 --- a/push/engine.go +++ b/push/engine.go @@ -314,7 +314,7 @@ func (p *Engine) CreateShardListener(shardID uint64) cluster.ShardListener { if _, ok := p.schedulers[shardID]; ok { panic(fmt.Sprintf("there is already a scheduler %d", shardID)) } - sh := sched.NewShardScheduler(shardID, p, p, p.cluster) + sh := sched.NewShardScheduler(shardID, p, p, p.cluster, p.cfg.MaxProcessBatchSize, p.cfg.MaxForwardWriteBatchSize) p.schedulers[shardID] = sh if p.started { sh.Start() diff --git a/push/sched/scheduler.go b/push/sched/scheduler.go index 5e79571b..fcc16e84 100644 --- a/push/sched/scheduler.go +++ b/push/sched/scheduler.go @@ -14,9 +14,6 @@ import ( "time" ) -const maxProcessBatchRows = 2000 -const maxForwardWriteBatchSize = 500 - var ( rowsProcessedVec = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pranadb_rows_processed_total", @@ -53,6 +50,8 @@ type ShardScheduler struct { loopExitWaitGroup sync.WaitGroup queuedWriteRows int clust cluster.Cluster + maxProcessBatchSize int + maxForwardWriteBatchSize int rowsProcessedCounter metrics.Counter shardLagHistogram metrics.Observer batchProcessingTimeHistogram metrics.Observer @@ -79,7 +78,8 @@ type WriteBatchEntry struct { completionChannels []chan error } -func NewShardScheduler(shardID uint64, batchHandler RowsBatchHandler, shardFailListener ShardFailListener, clust cluster.Cluster) *ShardScheduler { +func NewShardScheduler(shardID uint64, batchHandler RowsBatchHandler, shardFailListener ShardFailListener, + clust cluster.Cluster, maxProcessBatchSize int, maxForwardWriteBatchSize int) *ShardScheduler { sShardID := fmt.Sprintf("shard-%04d", shardID) rowsProcessedCounter := rowsProcessedVec.WithLabelValues(sShardID) shardLagHistogram := shardLagVec.WithLabelValues(sShardID) @@ -95,6 +95,8 @@ func NewShardScheduler(shardID uint64, batchHandler RowsBatchHandler, shardFailL batchProcessingTimeHistogram: batchProcessingTimeHistogram, batchSizeHistogram: batchSizeHistogram, clust: clust, + maxProcessBatchSize: maxProcessBatchSize, + maxForwardWriteBatchSize: maxForwardWriteBatchSize, } ss.loopExitWaitGroup.Add(1) return ss @@ -180,8 +182,8 @@ func (s *ShardScheduler) getNextBatch() ([]cluster.ForwardRow, *WriteBatchEntry, func (s *ShardScheduler) getRowsToProcess() []cluster.ForwardRow { numRows := len(s.forwardRows) - if numRows > maxProcessBatchRows { - numRows = maxProcessBatchRows + if numRows > s.maxProcessBatchSize { + numRows = s.maxProcessBatchSize } rows := s.forwardRows[:numRows] s.forwardRows = s.forwardRows[numRows:] @@ -201,7 +203,7 @@ func (s *ShardScheduler) getForwardWriteBatch() *WriteBatchEntry { combinedEntries = append(combinedEntries, batch.writeBatch[9:]...) entries += int(numPuts) completionChannels = append(completionChannels, batch.completionChannels[0]) - if entries >= maxForwardWriteBatchSize { + if entries >= s.maxForwardWriteBatchSize { break } }