diff --git a/README.md b/README.md index a196d9d44..231da8af4 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,6 @@ package main import ( "github.com/go-mysql-org/go-mysql/canal" - "github.com/siddontang/go-log/log" ) type MyEventHandler struct { @@ -491,27 +490,10 @@ We pass all tests in https://github.com/bradfitz/go-sql-test using go-mysql driv ## Logging -Logging by default is send to stdout. - -To disable logging completely: -```go -import "github.com/siddontang/go-log/log" -... - nullHandler, _ := log.NewNullHandler() - cfg.Logger = log.NewDefault(nullHandler) -``` - -To write logging to any [`io.Writer`](https://pkg.go.dev/io#Writer): -```go -import "github.com/siddontang/go-log/log" -... - w := ... - streamHandler, _ := log.NewStreamHandler(w) - cfg.Logger = log.NewDefault(streamHandler) -``` - -Or you can implement your own [`log.Handler`](https://pkg.go.dev/github.com/siddontang/go-log/log#Handler). +Logging uses [log/slog](https://pkg.go.dev/log/slog) and by default is sent to standard out. +For the old logging package `github.com/siddontang/go-log/log`, a converting package +`https://github.com/serprex/slog-siddontang` is available. ## How to migrate to this repo To change the used package in your repo it's enough to add this `replace` directive to your `go.mod`: ``` diff --git a/canal/canal.go b/canal/canal.go index 20c6194b6..ca0638ce3 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "log/slog" "net" "os" "regexp" @@ -21,7 +22,6 @@ import ( "github.com/go-mysql-org/go-mysql/utils" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser" - "github.com/siddontang/go-log/log" ) // Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... @@ -66,8 +66,7 @@ var ( func NewCanal(cfg *Config) (*Canal, error) { c := new(Canal) if cfg.Logger == nil { - streamHandler, _ := log.NewStreamHandler(os.Stdout) - cfg.Logger = log.NewDefault(streamHandler) + cfg.Logger = slog.Default() } if cfg.Dialer == nil { dialer := &net.Dialer{} @@ -243,14 +242,14 @@ func (c *Canal) run() error { close(c.dumpDoneCh) if err != nil { - c.cfg.Logger.Errorf("canal dump mysql err: %v", err) + c.cfg.Logger.Error("canal dump mysql err", slog.Any("error", err)) return errors.Trace(err) } } if err := c.runSyncBinlog(); err != nil { if errors.Cause(err) != context.Canceled { - c.cfg.Logger.Errorf("canal start sync binlog err: %v", err) + c.cfg.Logger.Error("canal start sync binlog err", slog.Any("error", err)) return errors.Trace(err) } } @@ -259,7 +258,7 @@ func (c *Canal) run() error { } func (c *Canal) Close() { - c.cfg.Logger.Infof("closing canal") + c.cfg.Logger.Info("closing canal") c.m.Lock() defer c.m.Unlock() @@ -379,7 +378,7 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) { c.errorTablesGetTime[key] = utils.Now() c.tableLock.Unlock() // log error and return ErrMissingTableMeta - c.cfg.Logger.Errorf("canal get table meta err: %v", errors.Trace(err)) + c.cfg.Logger.Error("canal get table meta err", slog.Any("error", errors.Trace(err))) return nil, schema.ErrMissingTableMeta } return nil, err diff --git a/canal/canal_test.go b/canal/canal_test.go index 73c04d8da..1f0fea291 100644 --- a/canal/canal_test.go +++ b/canal/canal_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/pingcap/tidb/pkg/parser" - "github.com/siddontang/go-log/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -99,7 +98,7 @@ func (s *canalTestSuite) SetupSuite() { s.execute("SET GLOBAL binlog_format = 'ROW'") - s.c.SetEventHandler(&testEventHandler{}) + s.c.SetEventHandler(&testEventHandler{T: s.T()}) go func() { set, _ := mysql.ParseGTIDSet("mysql", "") err = s.c.StartFromGTID(set) @@ -126,10 +125,11 @@ func (s *canalTestSuite) execute(query string, args ...interface{}) *mysql.Resul type testEventHandler struct { DummyEventHandler + T *testing.T } func (h *testEventHandler) OnRow(e *RowsEvent) error { - log.Infof("OnRow %s %v\n", e.Action, e.Rows) + h.T.Log("OnRow", e.Action, e.Rows) umi, ok := e.Rows[0][4].(uint32) // 4th col is umi. mysqldump gives uint64 instead of uint32 if ok && (umi != umiA && umi != umiB && umi != umiC) { return fmt.Errorf("invalid unsigned medium int %d", umi) diff --git a/canal/config.go b/canal/config.go index ebc397654..35df54632 100644 --- a/canal/config.go +++ b/canal/config.go @@ -2,6 +2,7 @@ package canal import ( "crypto/tls" + "log/slog" "math/rand" "net" "os" @@ -9,8 +10,6 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" - "github.com/siddontang/go-log/loggers" "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" @@ -101,7 +100,7 @@ type Config struct { TLSConfig *tls.Config // Set Logger - Logger loggers.Advanced + Logger *slog.Logger // Set Dialer Dialer client.Dialer @@ -150,8 +149,7 @@ func NewDefaultConfig() *Config { c.Dump.DiscardErr = true c.Dump.SkipMasterData = false - streamHandler, _ := log.NewStreamHandler(os.Stdout) - c.Logger = log.NewDefault(streamHandler) + c.Logger = slog.Default() dialer := &net.Dialer{} c.Dialer = dialer.DialContext diff --git a/canal/dump.go b/canal/dump.go index 91b303976..1c1a86225 100644 --- a/canal/dump.go +++ b/canal/dump.go @@ -3,6 +3,7 @@ package canal import ( "encoding/hex" "fmt" + "log/slog" "strconv" "strings" "time" @@ -49,7 +50,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error e == schema.ErrMissingTableMeta { return nil } - h.c.cfg.Logger.Errorf("get %s.%s information err: %v", db, table, err) + h.c.cfg.Logger.Error("error getting table information", slog.String("database", db), slog.String("table", table), slog.Any("error", err)) return errors.Trace(err) } @@ -163,7 +164,7 @@ func (c *Canal) dump() error { if err != nil { return errors.Trace(err) } - c.cfg.Logger.Infof("skip master data, get current binlog position %v", pos) + c.cfg.Logger.Info("skip master data, get current binlog position", slog.Any("position", pos)) h.name = pos.Name h.pos = uint64(pos.Pos) } @@ -185,8 +186,7 @@ func (c *Canal) dump() error { c.master.UpdateGTIDSet(h.gset) startPos = h.gset } - c.cfg.Logger.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at %s", - time.Since(start).Seconds(), startPos) + c.cfg.Logger.Info("dump MySQL and parse OK", slog.Duration("use", time.Since(start)), slog.String("position", startPos.String())) return nil } @@ -196,7 +196,7 @@ func (c *Canal) tryDump() error { if (len(pos.Name) > 0 && pos.Pos > 0) || (gset != nil && gset.String() != "") { // we will sync with binlog name and position - c.cfg.Logger.Infof("skip dump, use last binlog replication pos %s or GTID set %v", pos, gset) + c.cfg.Logger.Info("skip dump, use last binlog replication position or GTID set", slog.String("file", pos.Name), slog.Uint64("position", uint64(pos.Pos)), slog.Any("GTID set", gset)) return nil } diff --git a/canal/master.go b/canal/master.go index c2cc462f0..a86a8f3ba 100644 --- a/canal/master.go +++ b/canal/master.go @@ -1,10 +1,10 @@ package canal import ( + "log/slog" "sync" "github.com/go-mysql-org/go-mysql/mysql" - "github.com/siddontang/go-log/loggers" ) type masterInfo struct { @@ -16,11 +16,11 @@ type masterInfo struct { timestamp uint32 - logger loggers.Advanced + logger *slog.Logger } func (m *masterInfo) Update(pos mysql.Position) { - m.logger.Debugf("update master position %s", pos) + m.logger.Debug("update master position", slog.Any("pos", pos)) m.Lock() m.pos = pos @@ -28,7 +28,7 @@ func (m *masterInfo) Update(pos mysql.Position) { } func (m *masterInfo) UpdateTimestamp(ts uint32) { - m.logger.Debugf("update master timestamp %d", ts) + m.logger.Debug("update master timestamp", slog.Int64("ts", int64(ts))) m.Lock() m.timestamp = ts @@ -36,7 +36,7 @@ func (m *masterInfo) UpdateTimestamp(ts uint32) { } func (m *masterInfo) UpdateGTIDSet(gset mysql.GTIDSet) { - m.logger.Debugf("update master gtid set %s", gset) + m.logger.Debug("update master gtid set", slog.Any("gset", gset)) m.Lock() m.gset = gset diff --git a/canal/sync.go b/canal/sync.go index 6e4e538c3..ff515a06a 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -1,6 +1,7 @@ package canal import ( + "log/slog" "sync/atomic" "time" @@ -20,7 +21,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { if err != nil { return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err) } - c.cfg.Logger.Infof("start sync binlog at binlog file %v", pos) + c.cfg.Logger.Info("start sync binlog at binlog file", slog.Any("pos", pos)) return s, nil } else { gsetClone := gset.Clone() @@ -28,7 +29,7 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { if err != nil { return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err) } - c.cfg.Logger.Infof("start sync binlog at GTID set %v", gsetClone) + c.cfg.Logger.Info("start sync binlog at GTID set", slog.Any("gset", gsetClone)) return s, nil } } @@ -57,7 +58,7 @@ func (c *Canal) runSyncBinlog() error { // and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899 if ev.Header.Timestamp == 0 { fakeRotateLogName := string(e.NextLogName) - c.cfg.Logger.Infof("received fake rotate event, next log name is %s", e.NextLogName) + c.cfg.Logger.Info("received fake rotate event", slog.String("nextLogName", string(e.NextLogName))) if fakeRotateLogName != c.master.Position().Name { c.cfg.Logger.Info("log name changed, the fake rotate event will be handled as a real rotate event") @@ -93,7 +94,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { case *replication.RotateEvent: pos.Name = string(e.NextLogName) pos.Pos = uint32(e.Position) - c.cfg.Logger.Infof("rotate binlog to %s", pos) + c.cfg.Logger.Info("rotate binlog", slog.Any("pos", pos)) savePos = true force = true if err = c.eventHandler.OnRotate(ev.Header, e); err != nil { @@ -101,9 +102,8 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { } case *replication.RowsEvent: // we only focus row based event - err = c.handleRowsEvent(ev) - if err != nil { - c.cfg.Logger.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err) + if err := c.handleRowsEvent(ev); err != nil { + c.cfg.Logger.Error("handle rows event", slog.String("file", pos.Name), slog.Uint64("position", uint64(curPos)), slog.Any("error", err)) return errors.Trace(err) } return nil @@ -113,7 +113,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { for _, subEvent := range ev.Events { err = c.handleEvent(subEvent) if err != nil { - c.cfg.Logger.Errorf("handle transaction payload subevent at (%s, %d) error %v", pos.Name, curPos, err) + c.cfg.Logger.Error("handle transaction payload subevent", slog.String("file", pos.Name), slog.Uint64("position", uint64(curPos)), slog.Any("error", err)) return errors.Trace(err) } } @@ -144,7 +144,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error { if err != nil { // The parser does not understand all syntax. // For example, it won't parse [CREATE|DROP] TRIGGER statements. - c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) + c.cfg.Logger.Error("error parsing query, will skip this event", slog.String("query", string(e.Query)), slog.Any("error", err)) return nil } if len(stmts) > 0 { @@ -246,7 +246,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (err error) { c.ClearTableCache([]byte(db), []byte(table)) - c.cfg.Logger.Infof("table structure changed, clear table cache: %s.%s\n", db, table) + c.cfg.Logger.Info("table structure changed, clear table cache", slog.String("database", db), slog.String("table", table)) if err = c.eventHandler.OnTableChanged(header, db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { return errors.Trace(err) } @@ -316,7 +316,8 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { if curPos.Compare(pos) >= 0 { return nil } else { - c.cfg.Logger.Debugf("master pos is %v, wait catching %v", curPos, pos) + c.cfg.Logger.Debug("master pos is behind, wait to catch up", slog.String("master file", curPos.Name), slog.Uint64("master position", uint64(curPos.Pos)), + slog.String("target file", pos.Name), slog.Uint64("target position", uint64(curPos.Pos))) time.Sleep(100 * time.Millisecond) } } diff --git a/client/pool.go b/client/pool.go index f6cb5dd39..5cab93a1a 100644 --- a/client/pool.go +++ b/client/pool.go @@ -2,7 +2,7 @@ package client import ( "context" - "log" + "log/slog" "math" "math/rand" "sync" @@ -29,7 +29,7 @@ type ( LogFunc func(format string, args ...interface{}) Pool struct { - logFunc LogFunc + logger *slog.Logger minAlive int maxAlive int maxIdle int @@ -109,7 +109,7 @@ func NewPoolWithOptions( } pool := &Pool{ - logFunc: po.logFunc, + logger: po.logger, minAlive: po.minAlive, maxAlive: po.maxAlive, maxIdle: po.maxIdle, @@ -159,7 +159,7 @@ func NewPoolWithOptions( // // Deprecated: use NewPoolWithOptions func NewPool( - logFunc LogFunc, + logger *slog.Logger, minAlive int, maxAlive int, maxIdle int, @@ -174,12 +174,12 @@ func NewPool( user, password, dbName, - WithLogFunc(logFunc), + WithLogger(logger), WithPoolLimits(minAlive, maxAlive, maxIdle), WithConnOptions(options...), ) - if err != nil { - pool.logFunc(`Pool: NewPool: %s`, err.Error()) + if err != nil && logger != nil { + logger.Error("Pool: NewPool", slog.Any("error", err)) } return pool @@ -312,7 +312,9 @@ func (pool *Pool) newConnectionProducer() { pool.synchro.stats.TotalCount-- // Bad luck, should try again pool.synchro.Unlock() - pool.logFunc("Cannot establish new db connection: %s", err.Error()) + if pool.logger != nil { + pool.logger.Error("Pool: cannot establish new db connection", slog.Any("error", err)) + } timer := time.NewTimer( time.Duration(10+rand.Intn(90)) * time.Millisecond, @@ -460,21 +462,21 @@ func (pool *Pool) spawnConnectionsIfNeeded() bool { pool.synchro.Lock() totalCount := pool.synchro.stats.TotalCount idleCount := len(pool.synchro.idleConnections) - needSpanNew := pool.minAlive - totalCount + needSpawnNew := pool.minAlive - totalCount pool.synchro.Unlock() - if needSpanNew <= 0 { + if needSpawnNew <= 0 { return false } // Не хватает соединений, нужно создать еще - if needSpanNew > MaxNewConnectionAtOnce { - needSpanNew = MaxNewConnectionAtOnce + if needSpawnNew > MaxNewConnectionAtOnce { + needSpawnNew = MaxNewConnectionAtOnce } - pool.logFunc(`Pool: Setup %d new connections (total: %d idle: %d)...`, needSpanNew, totalCount, idleCount) - pool.startNewConnections(needSpanNew) + pool.logger.Info("Pool: Setup new connections", slog.Int("new", needSpawnNew), slog.Int("total", totalCount), slog.Int("idle", idleCount)) + pool.startNewConnections(needSpawnNew) return true } @@ -517,7 +519,7 @@ func (pool *Pool) closeIdleConnectionsIfCan() { pool.synchro.Unlock() - pool.logFunc(`Pool: Close %d idle connections (in fly %d)`, len(toClose), inFly) + pool.logger.Info("Pool: close idle connections", slog.Int("closed", len(toClose)), slog.Int("inFly", inFly)) for _, connection := range toClose { pool.closeConn(connection.conn) } @@ -532,7 +534,7 @@ func (pool *Pool) closeConn(conn *Conn) { } func (pool *Pool) startNewConnections(count int) { - pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, count) + pool.logger.Info("Pool: Setup new connections (minimal pool size)", slog.Int("count", count)) connections := make([]Connection, 0, count) for i := 0; i < count; i++ { @@ -542,7 +544,7 @@ func (pool *Pool) startNewConnections(count int) { pool.synchro.Unlock() connections = append(connections, conn) } else { - pool.logFunc(`Pool: createNewConnection: %s`, err) + pool.logger.Warn("Pool: createNewConnection failed", slog.Any("error", err)) } } @@ -558,7 +560,7 @@ func (pool *Pool) ping(conn *Conn) error { _ = conn.SetDeadline(deadline) err := conn.Ping() if err != nil { - pool.logFunc(`Pool: ping query fail: %s`, err.Error()) + pool.logger.Error("Pool: ping query fail", slog.Any("error", err)) } else { _ = conn.SetDeadline(time.Time{}) } @@ -605,7 +607,7 @@ func (pool *Pool) checkConnection(ctx context.Context) error { // getDefaultPoolOptions returns pool config for low load services func getDefaultPoolOptions() poolOptions { return poolOptions{ - logFunc: log.Printf, + logger: slog.Default(), minAlive: 1, maxAlive: 10, maxIdle: 2, diff --git a/client/pool_options.go b/client/pool_options.go index 90bf5bd0d..1dec5ed00 100644 --- a/client/pool_options.go +++ b/client/pool_options.go @@ -1,12 +1,13 @@ package client import ( + "log/slog" "time" ) type ( poolOptions struct { - logFunc LogFunc + logger *slog.Logger minAlive int maxAlive int @@ -40,9 +41,9 @@ func WithPoolLimits(minAlive, maxAlive, maxIdle int) PoolOption { } } -func WithLogFunc(f LogFunc) PoolOption { +func WithLogger(logger *slog.Logger) PoolOption { return func(o *poolOptions) { - o.logFunc = f + o.logger = logger } } diff --git a/client/pool_test.go b/client/pool_test.go index d3f4ee9c0..0f393ccd8 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/go-mysql-org/go-mysql/test_util" - "github.com/siddontang/go-log/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -30,7 +29,6 @@ func (s *poolTestSuite) TestPool_Close() { addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port) pool, err := NewPoolWithOptions(addr, *testUser, *testPassword, "", WithPoolLimits(5, 10, 5), - WithLogFunc(log.Debugf), ) require.NoError(s.T(), err) @@ -53,7 +51,6 @@ func (s *poolTestSuite) TestPool_WrongPassword() { _, err := NewPoolWithOptions(addr, *testUser, "wrong-password", "", WithPoolLimits(5, 10, 5), - WithLogFunc(log.Debugf), WithNewPoolPingTimeout(time.Second), ) @@ -71,7 +68,6 @@ func (s *poolTestSuite) TestPool_WrongAddr() { _, err = NewPoolWithOptions(laddr.String(), *testUser, *testPassword, "", WithPoolLimits(5, 10, 5), - WithLogFunc(log.Debugf), WithNewPoolPingTimeout(time.Second), ) diff --git a/cmd/go-mysqlbinlog/main.go b/cmd/go-mysqlbinlog/main.go index 12f3ced01..8d322c3d3 100644 --- a/cmd/go-mysqlbinlog/main.go +++ b/cmd/go-mysqlbinlog/main.go @@ -7,6 +7,7 @@ import ( "context" "flag" "fmt" + "log/slog" "os" "github.com/pingcap/errors" @@ -31,6 +32,8 @@ var ( backupPath = flag.String("backup_path", "", "backup path to store binlog files") rawMode = flag.Bool("raw", false, "Use raw mode") + format = flag.String("format", "plain", "log format") + verbose = flag.Bool("verbose", false, "verbose logging") ) func main() { @@ -50,6 +53,19 @@ func main() { MaxReconnectAttempts: 10, } + logOpts := &slog.HandlerOptions{ + AddSource: *verbose, + } + + switch *format { + case "json": + cfg.Logger = slog.New(slog.NewJSONHandler(os.Stdout, logOpts)) + case "plain": + cfg.Logger = slog.New(slog.NewTextHandler(os.Stdout, logOpts)) + default: + panic("unsupported log format") + } + err := mysql.ValidateFlavor(*flavor) if err != nil { fmt.Printf("Flavor error: %v\n", errors.ErrorStack(err)) diff --git a/driver/driver_options_test.go b/driver/driver_options_test.go index 346b85e0f..ce96b369f 100644 --- a/driver/driver_options_test.go +++ b/driver/driver_options_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" "github.com/stretchr/testify/require" "github.com/go-mysql-org/go-mysql/client" @@ -40,7 +39,6 @@ type mockHandler struct { } func TestDriverOptions_SetRetriesOn(t *testing.T) { - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() var wg sync.WaitGroup @@ -66,7 +64,6 @@ func TestDriverOptions_SetRetriesOn(t *testing.T) { } func TestDriverOptions_SetRetriesOff(t *testing.T) { - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() var wg sync.WaitGroup @@ -117,7 +114,6 @@ func TestDriverOptions_SetCompression(t *testing.T) { } func TestDriverOptions_ConnectTimeout(t *testing.T) { - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() @@ -135,7 +131,6 @@ func TestDriverOptions_ConnectTimeout(t *testing.T) { } func TestDriverOptions_BufferSize(t *testing.T) { - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() @@ -161,7 +156,6 @@ func TestDriverOptions_BufferSize(t *testing.T) { } func TestDriverOptions_ReadTimeout(t *testing.T) { - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() @@ -183,7 +177,6 @@ func TestDriverOptions_ReadTimeout(t *testing.T) { } func TestDriverOptions_writeTimeout(t *testing.T) { - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() @@ -231,7 +224,6 @@ func TestDriverOptions_namedValueChecker(t *testing.T) { return nil }) - log.SetLevel(log.LevelDebug) srv := CreateMockServer(t) defer srv.Stop() conn, err := sql.Open("mysql", "root@127.0.0.1:3307/test?writeTimeout=1s") diff --git a/dump/dumper.go b/dump/dumper.go index 537bedc40..608cfe0fd 100644 --- a/dump/dumper.go +++ b/dump/dumper.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "log/slog" "net" "os" "os/exec" @@ -12,8 +13,6 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" - "github.com/siddontang/go-log/loggers" ) // Unlick mysqldump, Dumper is designed for parsing and syning data easily. @@ -51,7 +50,7 @@ type Dumper struct { mysqldumpVersion string sourceDataSupported bool - Logger loggers.Advanced + Logger *slog.Logger } func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) { @@ -96,8 +95,7 @@ func NewDumper(executionPath string, addr string, user string, password string) d.ErrOut = os.Stderr - streamHandler, _ := log.NewStreamHandler(os.Stdout) - d.Logger = log.NewDefault(streamHandler) + d.Logger = slog.Default() return d, nil } @@ -312,7 +310,7 @@ func (d *Dumper) Dump(w io.Writer) error { } args[passwordArgIndex] = "--password=******" - d.Logger.Infof("exec mysqldump with %v", args) + d.Logger.Info("exec mysqldump with", slog.Any("args", args)) args[passwordArgIndex] = passwordArg cmd := exec.Command(d.ExecutionPath, args...) diff --git a/go.mod b/go.mod index 0389350b6..f0b704b47 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb github.com/pingcap/tidb/pkg/parser v0.0.0-20241118164214-4f047be191be github.com/shopspring/decimal v1.2.0 - github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 github.com/stretchr/testify v1.8.4 ) diff --git a/go.sum b/go.sum index cbf978426..c0f5a8692 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,6 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= diff --git a/mysql/mariadb_gtid.go b/mysql/mariadb_gtid.go index c1fcc936c..fede4a2f2 100644 --- a/mysql/mariadb_gtid.go +++ b/mysql/mariadb_gtid.go @@ -3,12 +3,12 @@ package mysql import ( "bytes" "fmt" + "log/slog" "sort" "strconv" "strings" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" ) // MariadbGTID represent mariadb gtid, [domain ID]-[server-id]-[sequence] @@ -94,7 +94,7 @@ func (gtid *MariadbGTID) forward(newer *MariadbGTID) error { | mysqld-bin.000001 | 2215 | Gtid | 111 | 2257 | BEGIN GTID 0-111-6 | */ if newer.SequenceNumber <= gtid.SequenceNumber { - log.Warnf("out of order binlog appears with gtid %s vs current position gtid %s", newer, gtid) + slog.Warn("out of order binlog", slog.Any("new", newer), slog.Any("current", gtid)) } gtid.ServerID = newer.ServerID diff --git a/mysql/queryattributes.go b/mysql/queryattributes.go index 8e15c7dcb..f5f248dda 100644 --- a/mysql/queryattributes.go +++ b/mysql/queryattributes.go @@ -2,8 +2,8 @@ package mysql import ( "encoding/binary" - - "github.com/siddontang/go-log/log" + "fmt" + "log/slog" ) // Query Attributes in MySQL are key/value pairs passed along with COM_QUERY or COM_STMT_EXECUTE @@ -27,7 +27,7 @@ func (qa *QueryAttribute) TypeAndFlag() []byte { case uint64: return []byte{MYSQL_TYPE_LONGLONG, PARAM_UNSIGNED} default: - log.Warnf("query attribute with unsupported type %T", v) + slog.Warn("query attribute with unsupported type", slog.String("type", fmt.Sprintf("%T", v))) } return []byte{0x0, 0x0} // type 0x0, flag 0x0, to not break the protocol } @@ -42,7 +42,7 @@ func (qa *QueryAttribute) ValueBytes() []byte { binary.LittleEndian.PutUint64(b, v) return b default: - log.Warnf("query attribute with unsupported type %T", v) + slog.Warn("query attribute with unsupported type", slog.String("type", fmt.Sprintf("%T", v))) } return []byte{0x0} // 0 length value to not break the protocol } diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 5214e92dd..e7ef127bb 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -5,7 +5,6 @@ import ( "time" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" ) var ( @@ -74,8 +73,6 @@ func (s *BinlogStreamer) close() { func (s *BinlogStreamer) closeWithError(err error) { if err == nil { err = ErrSyncClosed - } else { - log.Errorf("close sync with err: %v", err) } select { diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 318905661..3ef4fb63a 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/binary" "fmt" + "log/slog" "net" "os" "strconv" @@ -14,8 +15,6 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" - "github.com/siddontang/go-log/loggers" "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" @@ -113,7 +112,7 @@ type BinlogSyncerConfig struct { Option func(*client.Conn) error // Set Logger - Logger loggers.Advanced + Logger *slog.Logger // Set Dialer Dialer client.Dialer @@ -169,11 +168,11 @@ type BinlogSyncer struct { // NewBinlogSyncer creates the BinlogSyncer with the given configuration. func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { if cfg.Logger == nil { - streamHandler, _ := log.NewStreamHandler(os.Stdout) - cfg.Logger = log.NewDefault(streamHandler) + cfg.Logger = slog.Default() } if cfg.ServerID == 0 { - cfg.Logger.Fatal("can't use 0 as the server ID") + cfg.Logger.Error("can't use 0 as the server ID, will panic") + panic("can't use 0 as the server ID") } if cfg.Dialer == nil { dialer := &net.Dialer{} @@ -186,7 +185,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { // Clear the Password to avoid outputting it in logs. pass := cfg.Password cfg.Password = "" - cfg.Logger.Infof("create BinlogSyncer with config %+v", cfg) + cfg.Logger.Info("create BinlogSyncer", slog.Any("config", cfg)) cfg.Password = pass b := new(BinlogSyncer) @@ -228,7 +227,7 @@ func (b *BinlogSyncer) close() { if b.c != nil { err := b.c.SetReadDeadline(utils.Now().Add(100 * time.Millisecond)) if err != nil { - b.cfg.Logger.Warnf(`could not set read deadline: %s`, err) + b.cfg.Logger.Warn("could not set read deadline", slog.Any("error", err)) } } @@ -337,18 +336,18 @@ func (b *BinlogSyncer) registerSlave() error { if b.cfg.HeartbeatPeriod > 0 { _, err = b.c.Execute(fmt.Sprintf("SET @master_heartbeat_period=%d;", b.cfg.HeartbeatPeriod)) if err != nil { - b.cfg.Logger.Errorf("failed to set @master_heartbeat_period=%d, err: %v", b.cfg.HeartbeatPeriod, err) + b.cfg.Logger.Error(fmt.Sprintf("failed to set @master_heartbeat_period=%d", b.cfg.HeartbeatPeriod), slog.Any("error", err)) return errors.Trace(err) } } serverUUID, err := uuid.NewUUID() if err != nil { - b.cfg.Logger.Errorf("failed to get new uuid %v", err) + b.cfg.Logger.Error("failed to get new uuid", slog.Any("error", err)) return errors.Trace(err) } if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil { - b.cfg.Logger.Errorf("failed to set @slave_uuid = '%s', err: %v", serverUUID, err) + b.cfg.Logger.Error(fmt.Sprintf("failed to set @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID), slog.Any("error", err)) return errors.Trace(err) } @@ -373,7 +372,7 @@ func (b *BinlogSyncer) enableSemiSync() error { } else { s, _ := r.GetString(0, 1) if s != "ON" { - b.cfg.Logger.Errorf("master does not support semi synchronous replication, use no semi-sync") + b.cfg.Logger.Error("master does not support semi synchronous replication, use no semi-sync") b.cfg.SemiSyncEnabled = false return nil } @@ -400,7 +399,7 @@ func (b *BinlogSyncer) prepare() error { return errors.Trace(err) } - b.cfg.Logger.Infof("Connected to %s %s server", b.cfg.Flavor, b.c.GetServerVersion()) + b.cfg.Logger.Info("Connected to server", slog.String("flavor", b.cfg.Flavor), slog.String("version", b.c.GetServerVersion())) return nil } @@ -423,18 +422,17 @@ func (b *BinlogSyncer) GetNextPosition() mysql.Position { func (b *BinlogSyncer) checkFlavor() { serverVersion := b.c.GetServerVersion() if b.cfg.Flavor != mysql.MariaDBFlavor && - strings.Contains(b.c.GetServerVersion(), "MariaDB") { + strings.Contains(serverVersion, "MariaDB") { // Setting the flavor to `mysql` causes MariaDB to try and behave // in a MySQL compatible way. In this mode MariaDB won't use // MariaDB specific binlog event types, but may used dummy events instead. - b.cfg.Logger.Errorf("misconfigured flavor (%s) for server %s", - b.cfg.Flavor, serverVersion) + b.cfg.Logger.Error("misconfigured flavor for server", slog.String("flavor", b.cfg.Flavor), slog.String("version", serverVersion)) } } // StartSync starts syncing from the `pos` position. func (b *BinlogSyncer) StartSync(pos mysql.Position) (*BinlogStreamer, error) { - b.cfg.Logger.Infof("begin to sync binlog from position %s", pos) + b.cfg.Logger.Info("begin to sync binlog from position", slog.Any("position", pos)) b.m.Lock() defer b.m.Unlock() @@ -454,7 +452,7 @@ func (b *BinlogSyncer) StartSync(pos mysql.Position) (*BinlogStreamer, error) { // StartSyncGTID starts syncing from the `gset` GTIDSet. func (b *BinlogSyncer) StartSyncGTID(gset mysql.GTIDSet) (*BinlogStreamer, error) { - b.cfg.Logger.Infof("begin to sync binlog from GTID set %s", gset) + b.cfg.Logger.Info("begin to sync binlog from GTID set", slog.Any("GTID set", gset)) b.prevMySQLGTIDEvent = nil b.prevGset = gset @@ -656,17 +654,17 @@ func (b *BinlogSyncer) retrySync() error { b.prevMySQLGTIDEvent = nil if b.prevGset != nil { - msg := fmt.Sprintf("begin to re-sync from %s", b.prevGset.String()) + extra := []interface{}{slog.String("GTID Set", b.prevGset.String())} if b.currGset != nil { - msg = fmt.Sprintf("%v (last read GTID=%v)", msg, b.currGset) + extra = append(extra, slog.String("last read GTID", b.currGset.String())) } - b.cfg.Logger.Infof(msg) + b.cfg.Logger.Info("begin to re-sync", extra...) if err := b.prepareSyncGTID(b.prevGset); err != nil { return errors.Trace(err) } } else { - b.cfg.Logger.Infof("begin to re-sync from %s", b.nextPos) + b.cfg.Logger.Info("begin to re-sync", slog.String("file", b.nextPos.Name), slog.Uint64("position", uint64(b.nextPos.Pos))) if err := b.prepareSyncPos(b.nextPos); err != nil { return errors.Trace(err) } @@ -735,7 +733,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { } if err != nil { - b.cfg.Logger.Error(err) + b.cfg.Logger.Error(err.Error()) // we meet connection error, should re-connect again with // last nextPos or nextGTID we got. if len(b.nextPos.Name) == 0 && b.prevGset == nil { @@ -759,17 +757,17 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { b.retryCount++ if err = b.retrySync(); err != nil { if b.cfg.MaxReconnectAttempts > 0 && b.retryCount >= b.cfg.MaxReconnectAttempts { - b.cfg.Logger.Errorf( - "retry sync err: %v, exceeded max retries (%d)", - err, b.cfg.MaxReconnectAttempts, + b.cfg.Logger.Error( + "retry sync err, exceeded max retries", + slog.Any("error", err), slog.Int("maxAttempts", b.cfg.MaxReconnectAttempts), ) s.closeWithError(err) return } - b.cfg.Logger.Errorf( - "retry sync err: %v, wait 1s and retry again (retries: %d/%d)", - err, b.retryCount, b.cfg.MaxReconnectAttempts, + b.cfg.Logger.Error( + "retry sync err, wait 1s and retry again", + slog.Any("error", err), slog.Int("retryCount", b.retryCount), slog.Int("maxAttempts", b.cfg.MaxReconnectAttempts), ) continue } @@ -816,7 +814,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { b.cfg.Logger.Info("receive EOF packet, no more binlog event now.") continue default: - b.cfg.Logger.Errorf("invalid stream header %c", data[0]) + b.cfg.Logger.Error("invalid stream header", slog.Int("header", int(data[0]))) continue } } @@ -859,7 +857,7 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need case *RotateEvent: b.nextPos.Name = string(event.NextLogName) b.nextPos.Pos = uint32(event.Position) - b.cfg.Logger.Infof("rotate to %s", b.nextPos) + b.cfg.Logger.Info("rotate to next binlog", slog.String("file", b.nextPos.Name), slog.Uint64("position", uint64(b.nextPos.Pos))) case *GTIDEvent: if b.prevGset == nil { @@ -973,11 +971,11 @@ func (b *BinlogSyncer) newConnection(ctx context.Context) (*client.Conn, error) func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) { cmd := fmt.Sprintf("KILL %d", id) if _, err := conn.Execute(cmd); err != nil { - b.cfg.Logger.Errorf("kill connection %d error %v", id, err) + b.cfg.Logger.Error("kill connection", slog.Any("error", err), slog.Int64("id", int64(id))) // Unknown thread id if code := mysql.ErrorCode(err.Error()); code != mysql.ER_NO_SUCH_THREAD { - b.cfg.Logger.Error(errors.Trace(err)) + b.cfg.Logger.Error(errors.Trace(err).Error()) } } - b.cfg.Logger.Infof("kill last connection id %d", id) + b.cfg.Logger.Info("kill last connection", slog.Int64("id", int64(id))) } diff --git a/server/caching_sha2_cache_test.go b/server/caching_sha2_cache_test.go index c11e5e76d..9c9530811 100644 --- a/server/caching_sha2_cache_test.go +++ b/server/caching_sha2_cache_test.go @@ -12,7 +12,6 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -25,8 +24,6 @@ import ( // NOTE the idea here is to plugin a throttled credential provider so that the first connection (cache miss) will take longer time // than the second connection (cache hit). Remember to set the password for MySQL user otherwise it won't cache empty password. func TestCachingSha2Cache(t *testing.T) { - log.SetLevel(log.LevelDebug) - remoteProvider := &RemoteThrottleProvider{ InMemoryProvider: NewInMemoryProvider(), } @@ -42,8 +39,6 @@ func TestCachingSha2Cache(t *testing.T) { } func TestCachingSha2CacheTLS(t *testing.T) { - log.SetLevel(log.LevelDebug) - remoteProvider := &RemoteThrottleProvider{ InMemoryProvider: NewInMemoryProvider(), } diff --git a/server/server_test.go b/server/server_test.go index 0311448f9..d29451022 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -12,7 +12,6 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" - "github.com/siddontang/go-log/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -60,8 +59,6 @@ func prepareServerConf() []*Server { } func Test(t *testing.T) { - log.SetLevel(log.LevelDebug) - // general tests inMemProvider := NewInMemoryProvider() inMemProvider.AddUser(*testUser, *testPassword)