Skip to content

Commit fe4317d

Browse files
authored
pump/* add a api to get binlog by ts (#449) (#455)
* pump/* add a api to get binlog by ts
1 parent 4ff2707 commit fe4317d

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

pump/server.go

+27
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net"
88
"net/http"
99
"net/url"
10+
"strconv"
1011
"strings"
1112
"sync"
1213
"sync/atomic"
@@ -400,6 +401,7 @@ func (s *Server) Start() error {
400401
router.HandleFunc("/status", s.Status).Methods("GET")
401402
router.HandleFunc("/state/{nodeID}/{action}", s.ApplyAction).Methods("PUT")
402403
router.HandleFunc("/drainers", s.AllDrainers).Methods("GET")
404+
router.HandleFunc("/debug/binlog/{ts}", s.BinlogByTS).Methods("GET")
403405
http.Handle("/", router)
404406
prometheus.DefaultGatherer = registry
405407
http.Handle("/metrics", prometheus.Handler())
@@ -593,6 +595,31 @@ func (s *Server) Status(w http.ResponseWriter, r *http.Request) {
593595
s.PumpStatus().Status(w, r)
594596
}
595597

598+
// BinlogByTS exposes api get get binlog by ts
599+
func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) {
600+
tsStr := mux.Vars(r)["ts"]
601+
ts, err := strconv.ParseInt(tsStr, 10, 64)
602+
if err != nil {
603+
w.Write([]byte(fmt.Sprintf("invalid parameter ts: %s", tsStr)))
604+
return
605+
}
606+
607+
binlog, err := s.storage.GetBinlog(ts)
608+
if err != nil {
609+
w.Write([]byte(err.Error()))
610+
return
611+
}
612+
613+
w.Write([]byte(binlog.String()))
614+
if len(binlog.PrewriteValue) > 0 {
615+
prewriteValue := new(pb.PrewriteValue)
616+
prewriteValue.Unmarshal(binlog.PrewriteValue)
617+
618+
w.Write([]byte("\n\n PrewriteValue: \n"))
619+
w.Write([]byte(prewriteValue.String()))
620+
}
621+
}
622+
596623
// PumpStatus returns all pumps' status.
597624
func (s *Server) PumpStatus() *HTTPStatus {
598625
status, err := s.node.NodesStatus(s.ctx)

pump/storage/storage.go

+7
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ type Storage interface {
5050

5151
MaxCommitTS() int64
5252

53+
// GetBinlog return the binlog of ts
54+
GetBinlog(ts int64) (binlog *pb.Binlog, err error)
55+
5356
// PullCommitBinlog return the chan to consume the binlog
5457
PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
5558

@@ -427,6 +430,10 @@ func (a *Append) resolve(startTS int64) bool {
427430
return false
428431
}
429432

433+
func (a *Append) GetBinlog(ts int64) (*pb.Binlog, error) {
434+
return a.readBinlogByTS(ts)
435+
}
436+
430437
func (a *Append) readBinlogByTS(ts int64) (*pb.Binlog, error) {
431438
var vp valuePointer
432439

0 commit comments

Comments
 (0)