Skip to content

Commit 5c3d7a8

Browse files
authored
*: add more metrics and some temporary adjustment (#276)
1 parent e5e7ab8 commit 5c3d7a8

File tree

9 files changed

+63
-32
lines changed

9 files changed

+63
-32
lines changed

cmd/drainer/main.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@ func main() {
2424
log.Fatalf("verifying flags error, See 'drainer --help'. %s", errors.ErrorStack(err))
2525
}
2626

27+
drainer.InitLogger(cfg)
2728
version.PrintVersionInfo()
2829
log.Infof("use config: %+v", cfg)
2930

30-
drainer.InitLogger(cfg)
31-
3231
bs, err := drainer.NewServer(cfg)
3332
if err != nil {
3433
log.Fatalf("create drainer server error, %s", errors.ErrorStack(err))

drainer/collector.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ func (c *Collector) updateCollectStatus(synced bool) {
167167
// updateStatus queries pumps' status , deletes the offline pump
168168
// and updates pumps' latest ts
169169
func (c *Collector) updateStatus(ctx context.Context) error {
170+
begin := time.Now()
171+
defer func() {
172+
publishBinlogHistogram.WithLabelValues("drainer").Observe(time.Since(begin).Seconds())
173+
}()
174+
170175
if err := c.updatePumpStatus(ctx); err != nil {
171176
log.Errorf("DetectPumps error: %v", errors.ErrorStack(err))
172177
c.updateCollectStatus(false)
@@ -303,6 +308,7 @@ func (c *Collector) LoadHistoryDDLJobs() ([]*model.Job, error) {
303308

304309
// publishBinlogs collects binlogs whose commitTS are in (minTS, maxTS], then publish them in ascending commitTS order
305310
func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
311+
begin := time.Now()
306312
// multiple ways sort:
307313
// 1. get multiple way sorted binlogs
308314
// 2. use heap to merge sort
@@ -316,11 +322,13 @@ func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
316322
bss[id] = bs
317323
binlogOffsets[id] = 1
318324
// first push the first item into heap every pump
319-
c.bh.push(ctx, bs[0])
325+
c.bh.push(ctx, bs[0], false)
320326
}
321327
total += bs.Len()
322328
}
329+
publishBinlogHistogram.WithLabelValues("drainer_collector").Observe(time.Since(begin).Seconds())
323330

331+
begin = time.Now()
324332
item := c.bh.pop()
325333
for item != nil {
326334
c.syncer.Add(item)
@@ -329,11 +337,12 @@ func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
329337
delete(bss, item.nodeID)
330338
} else {
331339
// push next item into heap and increase the offset
332-
c.bh.push(ctx, bss[item.nodeID][binlogOffsets[item.nodeID]])
340+
c.bh.push(ctx, bss[item.nodeID][binlogOffsets[item.nodeID]], false)
333341
binlogOffsets[item.nodeID] = binlogOffsets[item.nodeID] + 1
334342
}
335343
item = c.bh.pop()
336344
}
345+
publishBinlogHistogram.WithLabelValues("drainer_merge_sort").Observe(time.Since(begin).Seconds())
337346

338347
publishBinlogCounter.WithLabelValues("drainer").Add(float64(total))
339348
}

drainer/heap.go

+2-17
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,14 @@ func newBinlogHeap(size int) *binlogHeap {
6767
}
6868
}
6969

70-
func (b *binlogHeap) push(ctx context.Context, item *binlogItem) {
70+
func (b *binlogHeap) push(ctx context.Context, item *binlogItem, check bool) {
7171
for {
7272
select {
7373
case <-ctx.Done():
7474
return
7575
default:
7676
b.Lock()
77-
if b.bh.Len() == b.size {
77+
if check && b.bh.Len() == b.size {
7878
b.Unlock()
7979
time.Sleep(pushRetryTime)
8080
continue
@@ -97,18 +97,3 @@ func (b *binlogHeap) pop() *binlogItem {
9797
b.Unlock()
9898
return item.(*binlogItem)
9999
}
100-
101-
func (b *binlogHeap) peek() *binlogItem {
102-
b.Lock()
103-
if b.bh.Len() == 0 {
104-
b.Unlock()
105-
return nil
106-
}
107-
108-
item := heap.Pop(b.bh)
109-
heap.Push(b.bh, item)
110-
b.Unlock()
111-
112-
return item.(*binlogItem)
113-
114-
}

drainer/heap_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *testDrainerSuite) TestHeap(c *C) {
4646
wg.Add(1)
4747
defer wg.Done()
4848
for _, cs := range testCase {
49-
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"))
49+
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"), true)
5050
}
5151
}()
5252

@@ -64,12 +64,12 @@ func (s *testDrainerSuite) TestHeap(c *C) {
6464

6565
//test push block and cancel push operator
6666
bh = newBinlogHeap(1)
67-
bh.push(ctx, newBinlogItem(testCase[0], pb.Pos{}, "testnode"))
67+
bh.push(ctx, newBinlogItem(testCase[0], pb.Pos{}, "testnode"), true)
6868
go func() {
6969
wg.Add(1)
7070
defer wg.Done()
7171
for _, cs := range testCase {
72-
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"))
72+
bh.push(ctx, newBinlogItem(cs, pb.Pos{}, "testnode"), true)
7373
}
7474
}()
7575

drainer/metrics.go

+20
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,24 @@ var (
2626
Help: "offset for each pump.",
2727
}, []string{"nodeID"})
2828

29+
findMatchedBinlogHistogram = prometheus.NewHistogramVec(
30+
prometheus.HistogramOpts{
31+
Namespace: "binlog",
32+
Subsystem: "drainer",
33+
Name: "find_matched_binlog_duration_time",
34+
Help: "Bucketed histogram of find a matched binlog.",
35+
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
36+
}, []string{"nodeID"})
37+
38+
publishBinlogHistogram = prometheus.NewHistogramVec(
39+
prometheus.HistogramOpts{
40+
Namespace: "binlog",
41+
Subsystem: "drainer",
42+
Name: "publish_binlog_duration_time",
43+
Help: "Bucketed histogram of publish a binlog.",
44+
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
45+
}, []string{"nodeID"})
46+
2947
publishBinlogCounter = prometheus.NewCounterVec(
3048
prometheus.CounterOpts{
3149
Namespace: "binlog",
@@ -114,6 +132,8 @@ func init() {
114132
prometheus.MustRegister(txnHistogram)
115133
prometheus.MustRegister(readBinlogHistogram)
116134
prometheus.MustRegister(readBinlogSizeHistogram)
135+
prometheus.MustRegister(publishBinlogHistogram)
136+
prometheus.MustRegister(findMatchedBinlogHistogram)
117137
}
118138

119139
type metricClient struct {

drainer/pump.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package drainer
22

33
import (
4+
"fmt"
45
"io"
56
"strconv"
67
"sync"
@@ -21,6 +22,9 @@ import (
2122
pb "github.com/pingcap/tipb/go-binlog"
2223
)
2324

25+
// sleep 10 millisecond to wait matched binlog
26+
var waitMatchedTime = 10 * time.Millisecond
27+
2428
type binlogEntity struct {
2529
tp pb.BinlogType
2630
startTS int64
@@ -157,10 +161,12 @@ func (p *Pump) publish(t *tikv.LockResolver) {
157161
case entity = <-p.binlogChan:
158162
}
159163

164+
begin := time.Now()
160165
switch entity.tp {
161166
case pb.BinlogType_Prewrite:
162167
// while we meet the prebinlog we must find it's mathced commit binlog
163168
p.mustFindCommitBinlog(t, entity.startTS)
169+
findMatchedBinlogHistogram.WithLabelValues(p.nodeID).Observe(time.Since(begin).Seconds())
164170
case pb.BinlogType_Commit, pb.BinlogType_Rollback:
165171
// if the commitTs is larger than maxCommitTs,
166172
// we would publish all binlogs:
@@ -175,6 +181,7 @@ func (p *Pump) publish(t *tikv.LockResolver) {
175181
} else {
176182
binlogs = make(map[int64]*binlogItem)
177183
}
184+
publishBinlogHistogram.WithLabelValues(p.nodeID).Observe(time.Since(begin).Seconds())
178185
}
179186
}
180187

@@ -195,7 +202,7 @@ func (p *Pump) mustFindCommitBinlog(t *tikv.LockResolver, startTS int64) {
195202

196203
b, ok := p.getPrewriteBinlogEntity(startTS)
197204
if ok {
198-
time.Sleep(waitTime)
205+
time.Sleep(waitMatchedTime)
199206
// check again after sleep a moment
200207
b, ok = p.getPrewriteBinlogEntity(startTS)
201208
if ok {
@@ -296,7 +303,7 @@ func (p *Pump) putIntoHeap(items map[int64]*binlogItem) {
296303
// if we meet a smaller binlog, we should ignore it. because we have published binlogs that before window low boundary
297304
continue
298305
}
299-
p.bh.push(p.ctx, item)
306+
p.bh.push(p.ctx, item, true)
300307
}
301308

302309
errorBinlogCount.Add(float64(errorBinlogs))
@@ -352,8 +359,9 @@ func (p *Pump) getDDLJob(id int64) (*model.Job, error) {
352359
}
353360

354361
func (p *Pump) collectBinlogs(windowLower, windowUpper int64) binlogItems {
362+
begin := time.Now()
355363
var bs binlogItems
356-
item := p.bh.peek()
364+
item := p.bh.pop()
357365
for item != nil && item.binlog.CommitTs <= windowUpper {
358366
// make sure to discard old binlogs whose commitTS is earlier or equal minTS
359367
if item.binlog.CommitTs > windowLower {
@@ -363,10 +371,14 @@ func (p *Pump) collectBinlogs(windowLower, windowUpper int64) binlogItems {
363371
if ComparePos(p.currentPos, item.pos) == -1 {
364372
p.currentPos = item.pos
365373
}
366-
_ = p.bh.pop()
367-
item = p.bh.peek()
374+
item = p.bh.pop()
375+
}
376+
if item != nil {
377+
p.bh.push(p.ctx, item, false)
368378
}
369379

380+
publishBinlogHistogram.WithLabelValues(fmt.Sprintf("%s_collect_binlogs", p.nodeID)).Observe(time.Since(begin).Seconds())
381+
370382
return bs
371383
}
372384

drainer/util.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ func genDrainerID(listenAddr string) (string, error) {
116116
}
117117

118118
func execute(executor executor.Executor, sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error {
119-
// compute txn duration
119+
if len(sqls) == 0 {
120+
return nil
121+
}
122+
120123
beginTime := time.Now()
121124
defer func() {
122125
txnHistogram.Observe(time.Since(beginTime).Seconds())

pkg/offsets/offset.go

+2
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ func (ks *KafkaSeeker) getAndCompare(topic string, partition int32, offset int64
163163
for msg := range pc.Messages() {
164164
bp, err := ks.operator.Decode(msg)
165165
if err != nil {
166+
//log.Errorf("decode message(offset %d) error %v", message.Offset, err)
167+
//return 1, -2, nil
166168
return 0, bp, errors.Annotatef(err, "decode %s", msg)
167169
}
168170

pkg/sql/sql.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func ExecuteSQLs(db *sql.DB, sqls []string, args [][]interface{}, isDDL bool) er
4141
time.Sleep(RetryWaitTime)
4242
}
4343

44-
err = appleTxn(db, sqls, args)
44+
err = ExecuteTxn(db, sqls, args)
4545
if err == nil {
4646
return nil
4747
}
@@ -50,7 +50,8 @@ func ExecuteSQLs(db *sql.DB, sqls []string, args [][]interface{}, isDDL bool) er
5050
return errors.Trace(err)
5151
}
5252

53-
func appleTxn(db *sql.DB, sqls []string, args [][]interface{}) error {
53+
// ExecuteTxn executes transaction
54+
func ExecuteTxn(db *sql.DB, sqls []string, args [][]interface{}) error {
5455
txn, err := db.Begin()
5556
if err != nil {
5657
log.Errorf("exec sqls[%v] begin failed %v", sqls, errors.ErrorStack(err))

0 commit comments

Comments
 (0)