From 331ce259425224fa523d48e415f09424480902fe Mon Sep 17 00:00:00 2001 From: walter Date: Mon, 20 Jan 2025 10:50:41 +0800 Subject: [PATCH] opt: make clear logs (#394) --- cmd/ccr_syncer/ccr_syncer.go | 2 +- pkg/ccr/base/spec.go | 39 ++++----- pkg/ccr/ingest_binlog_job.go | 40 ++++----- pkg/ccr/job.go | 156 ++++++++++++++++++----------------- pkg/ccr/job_progress.go | 10 ++- pkg/ccr/job_progress_test.go | 9 ++ pkg/ccr/meta.go | 6 +- pkg/ccr/record/upsert.go | 2 +- pkg/rpc/be.go | 2 +- pkg/rpc/fe.go | 40 +++++---- pkg/utils/log.go | 2 +- 11 files changed, 158 insertions(+), 150 deletions(-) diff --git a/cmd/ccr_syncer/ccr_syncer.go b/cmd/ccr_syncer/ccr_syncer.go index 13b253fd..9750fefe 100644 --- a/cmd/ccr_syncer/ccr_syncer.go +++ b/cmd/ccr_syncer/ccr_syncer.go @@ -108,7 +108,7 @@ func parseConfigFile() error { continue } - log.Infof("config %s=%s", key, value) + log.Infof("force set config %s=%s", key, value) if err := flag.Set(key, value); err != nil { return fmt.Errorf("set flag key value '%s': %v", line, err) } diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index c8001c5f..42904875 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -392,7 +392,7 @@ func (s *Spec) CheckTablePropertyValid() ([]string, error) { } func (s *Spec) IsEnableRestoreSnapshotCompression() (bool, error) { - log.Debugf("check frontend enable restore snapshot compression") + log.Tracef("check frontend enable restore snapshot compression") db, err := s.Connect() if err != nil { @@ -429,7 +429,7 @@ func (s *Spec) IsEnableRestoreSnapshotCompression() (bool, error) { } func (s *Spec) GetAllTables() ([]string, error) { - log.Debugf("get all tables in database %s", s.Database) + log.Tracef("get all tables in database %s", s.Database) db, err := s.Connect() if err != nil { @@ -496,7 +496,7 @@ func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ( } func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) { - log.Debugf("get all view from table %s", tableName) + log.Tracef("get all view from table %s", tableName) var results []string // first, query information_schema.tables with table_schema and table_type, get all views' name @@ -635,7 +635,7 @@ func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase st // When create view, the db name of sql is source db name, we should use dest db name to create view createSql := createTable.Sql if createTable.IsCreateView() { - log.Debugf("create view, use dest db name to replace source db name") + log.Tracef("create view, use dest db name to replace source db name") // replace `internal`.`source_db_name`. or `default_cluster:source_db_name`. to `internal`.`dest_db_name`. originalNameNewStyle := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`." @@ -733,7 +733,7 @@ func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) { } func (s *Spec) CancelRestoreIfExists(snapshotName string) error { - log.Debugf("cancel restore %s, db name: %s", snapshotName, s.Database) + log.Tracef("cancel restore %s, db name: %s", snapshotName, s.Database) db, err := s.Connect() if err != nil { @@ -806,11 +806,6 @@ func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []st return xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table") } - // table refs = table - tableRef := utils.FormatKeywordName(table) - - log.Infof("create partial snapshot %s.%s", s.Database, snapshotName) - db, err := s.Connect() if err != nil { return err @@ -820,10 +815,12 @@ func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []st if len(partitions) > 0 { partitionRefs = " PARTITION (`" + strings.Join(partitions, "`,`") + "`)" } + tableRef := utils.FormatKeywordName(table) backupSnapshotSql := fmt.Sprintf( "BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON (%s%s) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs) - log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql) + log.Infof("create partial snapshot %s.%s, backup snapshot sql: %s", + s.Database, snapshotName, backupSnapshotSql) _, err = db.Exec(backupSnapshotSql) if err != nil { if strings.Contains(err.Error(), "Unknown table") { @@ -840,7 +837,7 @@ func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []st // TODO: Add TaskErrMsg func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, error) { - log.Debugf("check backup state of snapshot %s", snapshotName) + log.Tracef("check backup state of snapshot %s", snapshotName) db, err := s.Connect() if err != nil { @@ -878,7 +875,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, er } func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) { - log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName) + log.Tracef("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName) // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. if backupState, status, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) { @@ -899,7 +896,7 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) { // Get the valid (running or finished) backup job with a unique prefix to indicate // if a backup job needs to be issued again. func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) { - log.Debugf("get valid backup job if exists, database: %s, label prefix: %s", s.Database, snapshotNamePrefix) + log.Tracef("get valid backup job if exists, database: %s, label prefix: %s", s.Database, snapshotNamePrefix) db, err := s.Connect() if err != nil { @@ -908,7 +905,7 @@ func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) { query := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName LIKE \"%s%%\"", utils.FormatKeywordName(s.Database), snapshotNamePrefix) - log.Infof("show backup state sql: %s", query) + log.Debugf("show backup state sql: %s", query) rows, err := db.Query(query) if err != nil { return "", xerror.Wrap(err, xerror.Normal, "query backup state failed") @@ -952,7 +949,7 @@ func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) { // Get the valid (running or finished) restore job with a unique prefix to indicate // if a restore job needs to be issued again. func (s *Spec) GetValidRestoreJob(snapshotNamePrefix string) (string, error) { - log.Debugf("get valid restore job if exists, label prefix: %s", snapshotNamePrefix) + log.Tracef("get valid restore job if exists, label prefix: %s", snapshotNamePrefix) db, err := s.Connect() if err != nil { @@ -961,7 +958,7 @@ func (s *Spec) GetValidRestoreJob(snapshotNamePrefix string) (string, error) { query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label LIKE \"%s%%\"", utils.FormatKeywordName(s.Database), snapshotNamePrefix) - log.Infof("show restore state sql: %s", query) + log.Debugf("show restore state sql: %s", query) rows, err := db.Query(query) if err != nil { return "", xerror.Wrap(err, xerror.Normal, "query restore state failed") @@ -1039,7 +1036,7 @@ func (s *Spec) queryRestoreInfo(db *sql.DB, snapshotName string) (*RestoreInfo, } func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, error) { - log.Debugf("check restore state %s", snapshotName) + log.Tracef("check restore state %s", snapshotName) db, err := s.Connect() if err != nil { @@ -1059,7 +1056,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, } func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { - log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName) + log.Tracef("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName) // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) { @@ -1080,7 +1077,7 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { } func (s *Spec) GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) { - log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName) + log.Tracef("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName) for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil { @@ -1226,7 +1223,7 @@ func (s *Spec) Update(event SpecEvent) { } func (s *Spec) LightningSchemaChange(srcDatabase, tableAlias string, lightningSchemaChange *record.ModifyTableAddOrDropColumns) error { - log.Debugf("lightningSchemaChange %v", lightningSchemaChange) + log.Tracef("lighting schema change %v", lightningSchemaChange) rawSql := lightningSchemaChange.RawSql diff --git a/pkg/ccr/ingest_binlog_job.go b/pkg/ccr/ingest_binlog_job.go index 5edbf1c7..5f0cfdda 100644 --- a/pkg/ccr/ingest_binlog_job.go +++ b/pkg/ccr/ingest_binlog_job.go @@ -115,10 +115,11 @@ type tabletIngestBinlogHandler struct { // handle Replica func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *ReplicaMeta) bool { destReplicaId := destReplica.Id - log.Debugf("handle dest replica id: %d", destReplicaId) + log.Tracef("txn %d tablet ingest binlog: handle dest replica id: %d, dest tablet id %d", + h.ingestJob.txnId, destReplicaId, h.destTablet.Id) if h.cancel.Load() { - log.Infof("job canceled, replica id: %d", destReplicaId) + log.Infof("txn %d job canceled, replica id: %d", h.ingestJob.txnId, destReplicaId) return true } @@ -188,7 +189,7 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli return } - log.Debugf("ingest resp: %v", resp) + log.Tracef("txn %d tablet ingest binlog resp: %v", j.txnId, resp) if !resp.IsSetStatus() { err = xerror.Errorf(xerror.BE, "ingest resp status not set, req: %+v", req) j.setError(err) @@ -211,7 +212,8 @@ func (h *tabletIngestBinlogHandler) handleReplica(srcReplica, destReplica *Repli } func (h *tabletIngestBinlogHandler) handle() { - log.Debugf("handle tablet ingest binlog, src tablet id: %d, dest tablet id: %d", h.srcTablet.Id, h.destTablet.Id) + log.Tracef("txn %d, tablet ingest binlog, src tablet id: %d, dest tablet id: %d, total %d replicas", + h.ingestJob.txnId, h.srcTablet.Id, h.destTablet.Id, h.srcTablet.ReplicaMetas.Len()) // all src replicas version > binlogVersion srcReplicas := make([]*ReplicaMeta, 0, h.srcTablet.ReplicaMetas.Len()) @@ -365,10 +367,10 @@ type prepareIndexArg struct { } func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) { - log.Debugf("prepareIndex: %v", arg) + log.Tracef("txn %d ingest binlog: prepare index %s, src %d, dest %d", + j.txnId, arg.srcIndexMeta.Name, arg.srcIndexMeta.Id, arg.destIndexMeta.Id) // Step 1: check tablets - log.Debugf("arg %+v", arg) srcTablets, err := j.srcMeta.GetTablets(arg.srcTableId, arg.srcPartitionId, arg.srcIndexMeta.Id) if err != nil { j.setError(err) @@ -387,7 +389,7 @@ func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) { } if srcTablets.Len() == 0 { - log.Warn("src tablets length: 0, skip") + log.Warnf("txn %d ingest binlog: src tablets length: 0, skip", j.txnId) return } @@ -430,7 +432,7 @@ func (j *IngestBinlogJob) prepareIndex(arg *prepareIndexArg) { } func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partitionRecord record.PartitionRecord, indexIds []int64) { - log.Debugf("partitionRecord: %v", partitionRecord) + log.Tracef("txn %d ingest binlog: prepare partition: %v", j.txnId, partitionRecord) // 废弃 preparePartition, 上面index的那部分是这里的实现 // 还是要求一下和下游对齐的index length,这个是不可以recover的 // 思考那些是recover用的,主要就是tablet那块的 @@ -491,7 +493,6 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit } srcIndexName := getSrcIndexName(job, srcIndexMeta) - log.Debugf("src idx id %d, name %s", indexId, srcIndexName) if _, ok := destIndexNameMap[srcIndexName]; !ok { j.setError(xerror.Errorf(xerror.Meta, "index name %v not found in dest meta, is base index: %t, src index id: %d", @@ -511,12 +512,12 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit } for _, indexId := range indexIds { if j.srcMeta.IsIndexDropped(indexId) { - log.Infof("skip the dropped index %d", indexId) + log.Infof("txn %d ingest binlog: skip the dropped index %d", j.txnId, indexId) continue } if featureFilterShadowIndexesUpsert { if _, ok := j.ccrJob.progress.ShadowIndexes[indexId]; ok { - log.Infof("skip the shadow index %d", indexId) + log.Infof("txn %d ingest binlog: skip the shadow index %d", j.txnId, indexId) continue } } @@ -530,9 +531,9 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit } func (j *IngestBinlogJob) prepareTable(tableRecord *record.TableRecord) { - log.Debugf("tableRecord: %v", tableRecord) + log.Tracef("txn %d ingest binlog: prepare table: %d", j.txnId, tableRecord.Id) if j.srcMeta.IsTableDropped(tableRecord.Id) { - log.Infof("skip the dropped table %d", tableRecord.Id) + log.Infof("txn %d ingest binlog: skip the dropped table %d", j.txnId, tableRecord.Id) return } @@ -604,8 +605,8 @@ func (j *IngestBinlogJob) prepareTable(tableRecord *record.TableRecord) { continue } if j.srcMeta.IsPartitionDropped(partitionRecord.Id) { - log.Infof("skip the dropped partition %d, range: %s, version: %d", - partitionRecord.Id, partitionRecord.Range, partitionRecord.Version) + log.Infof("txn %d skip the dropped partition %d, range: %s, version: %d", + j.txnId, partitionRecord.Id, partitionRecord.Range, partitionRecord.Version) continue } j.preparePartition(srcTableId, destTableId, partitionRecord, tableRecord.IndexIds) @@ -613,7 +614,7 @@ func (j *IngestBinlogJob) prepareTable(tableRecord *record.TableRecord) { } func (j *IngestBinlogJob) prepareBackendMap() { - log.Debug("prepareBackendMap") + log.Tracef("txn %d ingest binlog: prepare backend map", j.txnId) var err error j.srcBackendMap, err = j.srcMeta.GetBackendMap() @@ -630,7 +631,7 @@ func (j *IngestBinlogJob) prepareBackendMap() { } func (j *IngestBinlogJob) prepareTabletIngestJobs() { - log.Debugf("prepareTabletIngestJobs, table length: %d", len(j.tableRecords)) + log.Tracef("txn %d ingest binlog: prepare tablet ingest jobs, table length: %d", j.txnId, len(j.tableRecords)) j.tabletIngestJobs = make([]*tabletIngestBinlogHandler, 0) for _, tableRecord := range j.tableRecords { @@ -642,8 +643,7 @@ func (j *IngestBinlogJob) prepareTabletIngestJobs() { } func (j *IngestBinlogJob) runTabletIngestJobs() { - log.Debugf("runTabletIngestJobs, job length: %d", len(j.tabletIngestJobs)) - + log.Infof("txn %d ingest binlog: run %d tablet ingest jobs", j.txnId, len(j.tabletIngestJobs)) for _, tabletIngestJob := range j.tabletIngestJobs { j.wg.Add(1) go func(tabletIngestJob *tabletIngestBinlogHandler) { @@ -655,7 +655,7 @@ func (j *IngestBinlogJob) runTabletIngestJobs() { } func (j *IngestBinlogJob) prepareMeta() { - log.Debug("prepareMeta") + log.Tracef("txn %d ingest binlog: prepare meta with %d table records", j.txnId, len(j.tableRecords)) srcTableIds := make([]int64, 0, len(j.tableRecords)) job := j.ccrJob factory := j.factory diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index c28d9ef3..e4b71e46 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -299,7 +299,7 @@ func (j *Job) genExtraInfo() (*base.ExtraInfo, error) { return nil, err } - log.Debugf("found backends: %v", backends) + log.Tracef("found backends: %v", backends) beNetworkMap := make(map[int64]base.NetworkAddr) for _, backend := range backends { @@ -356,7 +356,7 @@ func (j *Job) addExtraInfo(jobInfo []byte) ([]byte, error) { if err != nil { return nil, err } - log.Debugf("extraInfo: %v", extraInfo) + log.Tracef("snapshot extra info: %v", extraInfo) jobInfoMap["extra_info"] = extraInfo jobInfoBytes, err := json.Marshal(jobInfoMap) @@ -439,13 +439,14 @@ func (j *Job) partialSync() error { return err } if snapshotName != "" { - log.Infof("partial sync status: find a valid backup job %s", snapshotName) + log.Infof("partial sync status: there has a exist backup job %s", snapshotName) j.progress.NextSubVolatile(WaitBackupDone, snapshotName) return nil } } snapshotName := NewLabelWithTs(prefix) + log.Infof("partial sync status: create snapshot %s", snapshotName) err := j.ISrc.CreatePartialSnapshot(snapshotName, table, partitions) if err != nil && err == base.ErrBackupPartitionNotFound { log.Warnf("partial sync status: partition not found in the upstream, step to table partial sync") @@ -470,7 +471,7 @@ func (j *Job) partialSync() error { } if !backupFinished { - log.Infof("partial sync status: backup job %s is running", snapshotName) + // CheckBackupFinished already logs the info return nil } @@ -487,7 +488,7 @@ func (j *Job) partialSync() error { return err } - log.Debugf("partial sync begin get snapshot %s", snapshotName) + log.Tracef("partial sync begin get snapshot %s", snapshotName) compress := false // partial snapshot no need to compress snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName, compress) if err != nil { @@ -496,7 +497,7 @@ func (j *Job) partialSync() error { if snapshotResp.Status.GetStatusCode() == tstatus.TStatusCode_SNAPSHOT_NOT_EXIST || snapshotResp.Status.GetStatusCode() == tstatus.TStatusCode_SNAPSHOT_EXPIRED { - log.Warnf("get snapshot %s: %s (%s), retry with new partial sync", snapshotName, + log.Warnf("force new partial sync, because get snapshot %s: %s (%s)", snapshotName, utils.FirstOr(snapshotResp.Status.GetErrorMsgs(), "unknown"), snapshotResp.Status.GetStatusCode()) replace := len(j.progress.TableAliases) > 0 @@ -510,7 +511,7 @@ func (j *Job) partialSync() error { return xerror.New(xerror.Normal, "jobInfo is not set") } - log.Tracef("job: %.128s", snapshotResp.GetJobInfo()) + log.Tracef("partial sync snapshot job: %.128s", snapshotResp.GetJobInfo()) backupJobInfo, err := NewBackupJobInfoFromJson(snapshotResp.GetJobInfo()) if err != nil { @@ -552,8 +553,8 @@ func (j *Job) partialSync() error { snapshotResp := inMemoryData.SnapshotResp jobInfo := snapshotResp.GetJobInfo() - log.Infof("partial sync snapshot response meta size: %d, job info size: %d, expired at: %d", - len(snapshotResp.Meta), len(snapshotResp.JobInfo), snapshotResp.GetExpiredAt()) + log.Infof("partial snapshot %s response meta size: %d, job info size: %d, expired at: %d", + inMemoryData.SnapshotName, len(snapshotResp.Meta), len(snapshotResp.JobInfo), snapshotResp.GetExpiredAt()) jobInfoBytes, err := j.addExtraInfo(jobInfo) if err != nil { @@ -587,7 +588,7 @@ func (j *Job) partialSync() error { return nil } if name != "" { - log.Infof("partial sync status: find a valid restore job %s", name) + log.Infof("partial sync status: there has a exist restore job %s", name) inMemoryData.RestoreLabel = name j.progress.NextSubVolatile(WaitRestoreDone, inMemoryData) break @@ -603,7 +604,7 @@ func (j *Job) partialSync() error { if err != nil { return err } - log.Debugf("partial sync begin restore snapshot %s to %s", snapshotName, restoreSnapshotName) + log.Infof("partial sync begin restore snapshot %s to %s", snapshotName, restoreSnapshotName) var tableRefs []*festruct.TTableRef @@ -644,7 +645,7 @@ func (j *Job) partialSync() error { if restoreResp.Status.GetStatusCode() != tstatus.TStatusCode_OK { return xerror.Errorf(xerror.Normal, "restore snapshot failed, status: %v", restoreResp.Status) } - log.Infof("partial sync restore snapshot resp: %v", restoreResp) + log.Tracef("partial sync restore snapshot resp: %v", restoreResp) inMemoryData.RestoreLabel = restoreSnapshotName j.progress.NextSubVolatile(WaitRestoreDone, inMemoryData) @@ -657,17 +658,18 @@ func (j *Job) partialSync() error { snapshotResp := inMemoryData.SnapshotResp if snapshotResp.GetExpiredAt() > 0 && time.Now().UnixMilli() > snapshotResp.GetExpiredAt() { - log.Infof("partial sync snapshot %s is expired, cancel and retry with new partial sync", restoreSnapshotName) + log.Warnf("cancel the expired restore job %s", restoreSnapshotName) if err := j.IDest.CancelRestoreIfExists(restoreSnapshotName); err != nil { return err } + log.Infof("force partial sync, because the snapshot %s is expired", restoreSnapshotName) replace := len(j.progress.TableAliases) > 0 return j.newPartialSnapshot(tableId, table, partitions, replace) } restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName) if errors.Is(err, base.ErrRestoreSignatureNotMatched) { - log.Warnf("snapshot %s signature not match, retry partial sync with replace", restoreSnapshotName) + log.Warnf("force partial sync with replace, because the snapshot %s signature is not matched", restoreSnapshotName) return j.newPartialSnapshot(tableId, table, nil, true) } else if err != nil { j.progress.NextSubVolatile(RestoreSnapshot, inMemoryData) @@ -675,7 +677,7 @@ func (j *Job) partialSync() error { } if !restoreFinished { - log.Infof("partial sync status: restore job %s is running", restoreSnapshotName) + // CheckRestoreFinished already logs the info return nil } @@ -782,7 +784,7 @@ func (j *Job) fullSync() error { return err } if snapshotName != "" { - log.Infof("fullsync status: find a valid backup job %s", snapshotName) + log.Infof("fullsync status: there has a exist backup job %s", snapshotName) j.progress.NextSubVolatile(WaitBackupDone, snapshotName) return nil } @@ -813,6 +815,7 @@ func (j *Job) fullSync() error { } snapshotName := NewLabelWithTs(prefix) + log.Infof("fullsync status: create snapshot %s", snapshotName) if err := j.ISrc.CreateSnapshot(snapshotName, backupTableList); err != nil { return err } @@ -828,7 +831,7 @@ func (j *Job) fullSync() error { return err } if !backupFinished { - log.Infof("fullsync status: backup job %s is running", snapshotName) + // CheckBackupFinished already logs the info return nil } @@ -845,7 +848,7 @@ func (j *Job) fullSync() error { return err } - log.Debugf("fullsync begin get snapshot %s", snapshotName) + log.Tracef("fullsync begin get snapshot %s", snapshotName) compress := false snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName, compress) if err != nil { @@ -854,10 +857,10 @@ func (j *Job) fullSync() error { if snapshotResp.Status.GetStatusCode() == tstatus.TStatusCode_SNAPSHOT_NOT_EXIST || snapshotResp.Status.GetStatusCode() == tstatus.TStatusCode_SNAPSHOT_EXPIRED { - info := fmt.Sprintf("get snapshot %s: %s (%s), retry with new full sync", snapshotName, + info := fmt.Sprintf("get snapshot %s: %s (%s)", snapshotName, utils.FirstOr(snapshotResp.Status.GetErrorMsgs(), "unknown"), snapshotResp.Status.GetStatusCode()) - log.Warnf("%s", info) + log.Warnf("force full sync, because %s", info) return j.newSnapshot(j.progress.CommitSeq, info) } else if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK { err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status) @@ -865,7 +868,7 @@ func (j *Job) fullSync() error { } if !snapshotResp.IsSetJobInfo() { - return xerror.New(xerror.Normal, "jobInfo is not set") + return xerror.New(xerror.Normal, "jobInfo of the snapshot resp is not set") } if snapshotResp.GetCompressed() { @@ -881,7 +884,7 @@ func (j *Job) fullSync() error { } } - log.Tracef("fullsync snapshot job: %.128s", snapshotResp.GetJobInfo()) + log.Tracef("fullsync snapshot job info: %.128s", snapshotResp.GetJobInfo()) backupJobInfo, err := NewBackupJobInfoFromJson(snapshotResp.GetJobInfo()) if err != nil { return err @@ -929,14 +932,15 @@ func (j *Job) fullSync() error { snapshotResp := inMemoryData.SnapshotResp jobInfo := snapshotResp.GetJobInfo() - log.Infof("snapshot response meta size: %d, job info size: %d, expired at: %d, commit seq: %d", - len(snapshotResp.Meta), len(snapshotResp.JobInfo), snapshotResp.GetExpiredAt(), snapshotResp.GetCommitSeq()) + log.Infof("snapshot %s response meta size: %d, job info size: %d, expired at: %d, commit seq: %d", + inMemoryData.SnapshotName, len(snapshotResp.Meta), len(snapshotResp.JobInfo), + snapshotResp.GetExpiredAt(), snapshotResp.GetCommitSeq()) jobInfoBytes, err := j.addExtraInfo(jobInfo) if err != nil { return err } - log.Debugf("job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes)) + log.Debugf("fullsync job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes)) snapshotResp.SetJobInfo(jobInfoBytes) j.progress.NextSubVolatile(RestoreSnapshot, inMemoryData) @@ -963,7 +967,7 @@ func (j *Job) fullSync() error { return nil } if restoreSnapshotName != "" { - log.Infof("fullsync status: find a valid restore job %s", restoreSnapshotName) + log.Infof("fullsync status: there has a exist restore job %s", restoreSnapshotName) inMemoryData.RestoreLabel = restoreSnapshotName j.progress.NextSubVolatile(WaitRestoreDone, inMemoryData) break @@ -980,7 +984,7 @@ func (j *Job) fullSync() error { if err != nil { return err } - log.Debugf("begin restore snapshot %s to %s", snapshotName, restoreSnapshotName) + log.Infof("fullsync status: begin restore snapshot %s to %s", snapshotName, restoreSnapshotName) var tableRefs []*festruct.TTableRef if j.isTableSyncWithAlias() { @@ -1060,7 +1064,7 @@ func (j *Job) fullSync() error { if restoreResp.Status.GetStatusCode() != tstatus.TStatusCode_OK { return xerror.Errorf(xerror.Normal, "restore snapshot failed, status: %v", restoreResp.Status) } - log.Infof("resp: %v", restoreResp) + log.Tracef("fullsync restore snapshot resp: %v", restoreResp) inMemoryData.RestoreLabel = restoreSnapshotName j.progress.NextSubVolatile(WaitRestoreDone, inMemoryData) @@ -1074,11 +1078,12 @@ func (j *Job) fullSync() error { snapshotResp := inMemoryData.SnapshotResp if snapshotResp.GetExpiredAt() > 0 && time.Now().UnixMilli() > snapshotResp.GetExpiredAt() { - info := fmt.Sprintf("snapshot %s is expired, cancel and retry with new full sync", restoreSnapshotName) - log.Infof("%s", info) + log.Warnf("cancel the expired restore job %s", restoreSnapshotName) if err := j.IDest.CancelRestoreIfExists(restoreSnapshotName); err != nil { return err } + info := fmt.Sprintf("the snapshot %s is expired", restoreSnapshotName) + log.Infof("force full sync, because %s", info) return j.newSnapshot(j.progress.CommitSeq, info) } @@ -1130,7 +1135,7 @@ func (j *Job) fullSync() error { } if !restoreFinished { - log.Infof("fullsync status: restore job %s is running", restoreSnapshotName) + // CheckRestoreFinished already logs the info return nil } @@ -1477,7 +1482,7 @@ func (j *Job) getRelatedTableRecords(upsert *record.Upsert) ([]*record.TableReco // Table ingestBinlog func (j *Job) ingestBinlog(txnId int64, tableRecords []*record.TableRecord) ([]*ttypes.TTabletCommitInfo, error) { - log.Infof("ingestBinlog, txnId: %d", txnId) + log.Tracef("txn %d ingest binlog", txnId) job, err := j.jobFactory.CreateJob(NewIngestContext(txnId, tableRecords, j.progress.TableMapping), j, "IngestBinlog") if err != nil { @@ -1586,14 +1591,14 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { } rollback := func(err error, inMemoryData *inMemoryData) { - log.Errorf("need rollback, err: %+v", err) + log.Errorf("txn %d need rollback, commitSeq: %d, label: %s, err: %+v", + inMemoryData.TxnId, inMemoryData.CommitSeq, inMemoryData.Label, err) j.progress.NextSubCheckpoint(RollbackTransaction, inMemoryData) } committed := func() { - log.Infof("txn committed, commitSeq: %d, cleanup", j.progress.CommitSeq) - inMemoryData := j.progress.InMemoryData.(*inMemoryData) + log.Debugf("txn %d committed, commitSeq: %d, cleanup", inMemoryData.TxnId, j.progress.CommitSeq) commitSeq := j.progress.CommitSeq destTableIds := inMemoryData.DestTableIds if j.SyncType == DBSync && len(j.progress.TableCommitSeqMap) > 0 { @@ -1626,7 +1631,7 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { if err != nil { return err } - log.Debugf("upsert: %v", upsert) + log.Tracef("upsert: %v", upsert) // Step 1: get related tableRecords var isTxnInsert bool = false @@ -1690,7 +1695,6 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { commitSeq := inMemoryData.CommitSeq sourceStids := inMemoryData.SourceStids isTxnInsert := inMemoryData.IsTxnInsert - log.Debugf("begin txn, dest: %v, commitSeq: %d", dest, commitSeq) destRpc, err := j.factory.NewFeRpc(dest) if err != nil { @@ -1703,6 +1707,7 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { } else { label = j.newLabel(commitSeq) } + log.Tracef("begin txn, label: %s, dest: %v, commitSeq: %d", label, dest, commitSeq) var beginTxnResp *festruct.TBeginTxnResult_ if isTxnInsert { @@ -1711,11 +1716,11 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { } else { beginTxnResp, err = destRpc.BeginTransaction(dest, label, inMemoryData.DestTableIds) } - if err != nil { return err } - log.Debugf("resp: %v", beginTxnResp) + log.Tracef("begin txn resp: %v", beginTxnResp) + if beginTxnResp.GetStatus().GetStatusCode() != tstatus.TStatusCode_OK { if isTableNotFound(beginTxnResp.GetStatus()) && j.SyncType == DBSync { // It might caused by the staled TableMapping entries. @@ -1731,16 +1736,17 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { if isTxnInsert { destStids := beginTxnResp.GetSubTxnIds() inMemoryData.DestStids = destStids - log.Debugf("TxnId: %d, DbId: %d, destStids: %v", txnId, beginTxnResp.GetDbId(), destStids) + log.Infof("begin txn %d, label: %s, db: %d, destStids: %v", + txnId, label, beginTxnResp.GetDbId(), destStids) } else { - log.Debugf("TxnId: %d, DbId: %d", txnId, beginTxnResp.GetDbId()) + log.Infof("begin txn %d, label: %s, db: %d", txnId, label, beginTxnResp.GetDbId()) } inMemoryData.TxnId = txnId j.progress.NextSubCheckpoint(IngestBinlog, inMemoryData) case IngestBinlog: - log.Debug("ingest binlog") + log.Trace("ingest binlog") if err := updateInMemory(); err != nil { return err } @@ -1790,7 +1796,7 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { case CommitTransaction: // Step 4: commit txn - log.Debug("commit txn") + log.Tracef("commit txn") if err := updateInMemory(); err != nil { return err } @@ -1816,6 +1822,7 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { rollback(err, inMemoryData) return err } + log.Tracef("commit txn %d resp: %v", txnId, resp) if statusCode := resp.Status.GetStatusCode(); statusCode == tstatus.TStatusCode_PUBLISH_TIMEOUT { dest.WaitTransactionDone(txnId) @@ -1825,13 +1832,12 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { return err } - log.Infof("TxnId: %d committed, resp: %v", txnId, resp) + log.Infof("commit txn %d success", txnId) committed() - return nil case RollbackTransaction: - log.Debugf("Rollback txn") + log.Tracef("Rollback txn") // Not Step 5: just rollback txn if err := updateInMemory(); err != nil { return err @@ -1862,7 +1868,7 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { } } - log.Infof("rollback TxnId: %d resp: %v", txnId, resp) + log.Infof("rollback txn %d success", txnId) j.progress.Rollback() return nil @@ -2935,7 +2941,7 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error { // return: error && bool backToRunLoop func (j *Job) handleBinlogs(binlogs []*festruct.TBinlog) (error, bool) { - log.Infof("handle binlogs, binlogs size: %d", len(binlogs)) + log.Tracef("handle binlogs, binlogs size: %d", len(binlogs)) for _, binlog := range binlogs { // Step 1: dispatch handle binlog @@ -2947,7 +2953,7 @@ func (j *Job) handleBinlogs(binlogs []*festruct.TBinlog) (error, bool) { // Step 2: check job state, if not incrementalSync, such as DBPartialSync, break if !j.isIncrementalSync() { - log.Debugf("job state is not incremental sync, back to run loop, job state: %s", j.progress.SyncState) + log.Tracef("job state is not incremental sync, back to run loop, job state: %s", j.progress.SyncState) return nil, true } @@ -2964,6 +2970,7 @@ func (j *Job) handleBinlogs(binlogs []*festruct.TBinlog) (error, bool) { } if reachSwitchToDBIncrementalSync { + log.Infof("all table commit seq reach the commit seq, switch to incremental sync, commit seq: %d", commitSeq) j.progress.TableCommitSeqMap = nil j.progress.NextWithPersist(j.progress.CommitSeq, DBIncrementalSync, Done, "") } @@ -2986,7 +2993,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error { return xerror.Errorf(xerror.Normal, "the progress isn't done, need rollback, commit seq: %d", j.progress.CommitSeq) } - log.Debugf("binlog type: %s, binlog data: %s", binlog.GetType(), binlog.GetData()) + log.Debugf("binlog type: %s, commit seq: %d, binlog data: %s", + binlog.GetType(), binlog.GetCommitSeq(), binlog.GetData()) // Step 2: update job progress j.progress.StartHandle(binlog.GetCommitSeq()) @@ -3083,13 +3091,13 @@ func (j *Job) incrementalSync() error { // Force fullsync unconditionally if j.Extra.SkipBinlog && j.Extra.SkipBy == SkipByFullSync { - info := fmt.Sprintf("skip binlog via fullsync by user, commit seq %d", j.progress.CommitSeq) - log.Warnf("%s", info) + info := fmt.Sprintf("the user required skipping the binlog, commit seq %d", j.progress.CommitSeq) + log.Warnf("force full sync, because %s", info) return j.newSnapshot(j.progress.CommitSeq, info) } // Step 1: get binlog - log.Debug("start incremental sync") + log.Trace("start incremental sync") src := &j.Src srcRpc, err := j.factory.NewFeRpc(src) if err != nil { @@ -3101,13 +3109,13 @@ func (j *Job) incrementalSync() error { for { // The CommitSeq is equals to PrevCommitSeq in here. commitSeq := j.progress.CommitSeq - log.Debugf("src: %s, commitSeq: %v", src, commitSeq) + log.Tracef("src: %s, commitSeq: %d", src, commitSeq) getBinlogResp, err := srcRpc.GetBinlog(src, commitSeq) if err != nil { return err } - log.Debugf("resp: %v", getBinlogResp) + log.Tracef("get binlog resp: %v", getBinlogResp) // Step 2.1: check binlog status status := getBinlogResp.GetStatus() @@ -3159,13 +3167,13 @@ func (j *Job) recoverJobProgress() error { func (j *Job) tableSync() error { switch j.progress.SyncState { case TableFullSync: - log.Debug("table full sync") + log.Trace("run table full sync") return j.fullSync() case TableIncrementalSync: - log.Debug("table incremental sync") + log.Trace("run table incremental sync") return j.incrementalSync() case TablePartialSync: - log.Debug("table partial sync") + log.Trace("run table partial sync") return j.partialSync() default: return xerror.Errorf(xerror.Normal, "unknown sync state: %v", j.progress.SyncState) @@ -3173,31 +3181,25 @@ func (j *Job) tableSync() error { } func (j *Job) dbTablesIncrementalSync() error { - log.Debug("db tables incremental sync") - return j.incrementalSync() } -func (j *Job) dbSpecificTableFullSync() error { - log.Debug("db specific table full sync") - - return nil -} - func (j *Job) dbSync() error { switch j.progress.SyncState { case DBFullSync: - log.Debug("db full sync") + log.Trace("run db full sync") return j.fullSync() case DBTablesIncrementalSync: + log.Trace("run db tables incremental sync") return j.dbTablesIncrementalSync() case DBSpecificTableFullSync: - return j.dbSpecificTableFullSync() + log.Trace("run db specific table full sync") + return nil case DBIncrementalSync: - log.Debug("db incremental sync") + log.Trace("run db incremental sync") return j.incrementalSync() case DBPartialSync: - log.Debug("db partial sync") + log.Trace("run db partial sync") return j.partialSync() default: return xerror.Errorf(xerror.Normal, "unknown db sync state: %v", j.progress.SyncState) @@ -3307,7 +3309,8 @@ func (j *Job) run() { } func (j *Job) newSnapshot(commitSeq int64, fullSyncInfo string) error { - log.Infof("new snapshot, commitSeq: %d", commitSeq) + log.Infof("new snapshot, commitSeq: %d, prevCommitSeq: %d, prevSyncState: %s, prevSubSyncState: %s", + commitSeq, j.progress.PrevCommitSeq, j.progress.SyncState, j.progress.SubSyncState) if fullSyncInfo != "" { j.progress.SetFullSyncInfo(fullSyncInfo) @@ -3325,7 +3328,7 @@ func (j *Job) newSnapshot(commitSeq int64, fullSyncInfo string) error { return nil default: err := xerror.Panicf(xerror.Normal, "unknown table sync type: %v", j.SyncType) - log.Fatalf("run %+v", err) + log.Fatalf("new snapshot: %+v", err) return err } } @@ -3392,11 +3395,10 @@ func (j *Job) Run() error { var err error for i := 0; i < 3; i++ { isProgressExist, err = j.db.IsProgressExist(j.Name) - if err != nil { - log.Errorf("check progress exist failed, error: %+v", err) - continue + if err == nil { + break } - break + log.Errorf("check progress exist failed, error: %+v", err) } if err != nil { return err @@ -3508,7 +3510,7 @@ func (j *Job) updateFrontends() error { } func (j *Job) FirstRun() error { - log.Infof("first run check job, src: %s, dest: %s", &j.Src, &j.Dest) + log.Infof("first run check job, name: %s, src: %s, dest: %s", j.Name, &j.Src, &j.Dest) // Step 0: get all frontends if err := j.updateFrontends(); err != nil { diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index 62d35e37..fdaed877 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -40,7 +40,7 @@ const ( // Database sync state machine states DBFullSync SyncState = 0 DBTablesIncrementalSync SyncState = 1 - DBSpecificTableFullSync SyncState = 2 + DBSpecificTableFullSync SyncState = 2 // Deprecated by DBPartialSync DBIncrementalSync SyncState = 3 DBPartialSync SyncState = 4 // sync partitions @@ -367,7 +367,8 @@ func (j *JobProgress) IsDone() bool { return j.SubSyncState == Done && j.PrevCom // TODO(Drogon): check reset some fields func (j *JobProgress) Done() { - log.Debugf("job %s step next", j.JobName) + log.Debugf("job %s step next, sync state: %s, commitSeq: %d, prevCommitSeq: %d", + j.JobName, j.SyncState, j.CommitSeq, j.PrevCommitSeq) j.SubSyncState = Done j.PrevCommitSeq = j.CommitSeq @@ -378,7 +379,7 @@ func (j *JobProgress) Done() { } func (j *JobProgress) Rollback() { - log.Debugf("job %s step rollback", j.JobName) + log.Infof("rollback progress, set commitSeq from %d to %d", j.CommitSeq, j.PrevCommitSeq) j.SubSyncState = Done // if rollback, then prev commit seq is the last commit seq @@ -391,7 +392,8 @@ func (j *JobProgress) Rollback() { // write progress to db, busy loop until success // TODO: add timeout check func (j *JobProgress) Persist() { - log.Trace("update job progress") + log.Tracef("update job progress, state: %s, subState: %s, commitSeq: %d, prevCommitSeq: %d", + j.SyncState, j.SubSyncState, j.CommitSeq, j.PrevCommitSeq) for { // Step 1: to json diff --git a/pkg/ccr/job_progress_test.go b/pkg/ccr/job_progress_test.go index 8773d6cd..cdbf082c 100644 --- a/pkg/ccr/job_progress_test.go +++ b/pkg/ccr/job_progress_test.go @@ -97,6 +97,15 @@ func TestJobProgress_MarshalJSON(t *testing.T) { "data": "test-data", "table_aliases": { "table": "alias" + }, + "full_sync_info": { + "prev_commit_seq": 0, + "commit_seq": 0, + "sub_sync_state": { + "state": 0, + "binlog_type": 0 + }, + "info": "" } }`, wantErr: false, diff --git a/pkg/ccr/meta.go b/pkg/ccr/meta.go index a238e73c..31fbc428 100644 --- a/pkg/ccr/meta.go +++ b/pkg/ccr/meta.go @@ -140,7 +140,7 @@ func (m *Meta) GetFullTableName(tableName string) string { // Update table meta, return xerror.Meta category if no such table exists. func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error) { - log.Infof("UpdateTable tableName: %s, tableId: %d", tableName, tableId) + log.Tracef("UpdateTable tableName: %s, tableId: %d", tableName, tableId) dbId, err := m.GetDbId() if err != nil { @@ -161,7 +161,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error) } query := fmt.Sprintf("show proc '/dbs/%d/'", dbId) - log.Infof("UpdateTable Sql: %s", query) + log.Tracef("UpdateTable Sql: %s", query) rows, err := db.Query(query) if err != nil { return nil, xerror.Wrap(err, xerror.Normal, query) @@ -190,7 +190,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error) // match parsedDbname == dbname, return dbId if parsedTableName == tableName || parsedTableId == tableId { fullTableName := m.GetFullTableName(parsedTableName) - log.Debugf("found table:%s, tableId:%d, type:%s", fullTableName, parsedTableId, parsedTableType) + log.Tracef("update table found table:%s, tableId:%d, type:%s", fullTableName, parsedTableId, parsedTableType) m.TableName2IdMap[fullTableName] = parsedTableId tableMeta := &TableMeta{ DatabaseMeta: &m.DatabaseMeta, diff --git a/pkg/ccr/record/upsert.go b/pkg/ccr/record/upsert.go index ec8d9226..b2f6a16c 100644 --- a/pkg/ccr/record/upsert.go +++ b/pkg/ccr/record/upsert.go @@ -42,7 +42,7 @@ type TableRecord struct { IndexIds []int64 `json:"indexIds"` } -func (t TableRecord) String() string { +func (t *TableRecord) String() string { return fmt.Sprintf("TableRecord{Id: %d, PartitionRecords: %v, IndexIds: %v}", t.Id, t.PartitionRecords, t.IndexIds) } diff --git a/pkg/rpc/be.go b/pkg/rpc/be.go index 4b69eae0..3f7f7a41 100644 --- a/pkg/rpc/be.go +++ b/pkg/rpc/be.go @@ -38,7 +38,7 @@ type BeRpc struct { } func (beRpc *BeRpc) IngestBinlog(req *bestruct.TIngestBinlogRequest) (*bestruct.TIngestBinlogResult_, error) { - log.Debugf("IngestBinlog req: %+v, txnId: %d, be: %v", req, req.GetTxnId(), beRpc.backend) + log.Tracef("IngestBinlog req: %+v, txnId: %d, be: %v", req, req.GetTxnId(), beRpc.backend) client := beRpc.client if result, err := client.IngestBinlog(context.Background(), req); err != nil { diff --git a/pkg/rpc/fe.go b/pkg/rpc/fe.go index 8f2fa285..b7ae4f59 100644 --- a/pkg/rpc/fe.go +++ b/pkg/rpc/fe.go @@ -240,7 +240,6 @@ type call0Result struct { func (r *retryWithMasterRedirectAndCachedClientsRpc) call0(masterClient IFeRpc) *call0Result { caller := r.caller resp, err := caller(masterClient) - log.Tracef("call resp: %.128v, error: %+v", resp, err) // Step 1: check error if err != nil { @@ -308,7 +307,6 @@ func (r *retryWithMasterRedirectAndCachedClientsRpc) call() (resultType, error) // Step 1: try master result := r.call0(masterClient) - log.Tracef("call0 result: %+v", result) if result.err == nil { return result.resp, nil } @@ -538,7 +536,7 @@ func (rpc *singleFeClient) Address() string { // 11: optional string token // } func (rpc *singleFeClient) BeginTransaction(spec *base.Spec, label string, tableIds []int64) (*festruct.TBeginTxnResult_, error) { - log.Debugf("Call BeginTransaction, addr: %s, spec: %s, label: %s, tableIds: %v", rpc.Address(), spec, label, tableIds) + log.Tracef("Call BeginTransaction, addr: %s, spec: %s, label: %s, tableIds: %v", rpc.Address(), spec, label, tableIds) client := rpc.client req := &festruct.TBeginTxnRequest{ @@ -547,7 +545,7 @@ func (rpc *singleFeClient) BeginTransaction(spec *base.Spec, label string, table setAuthInfo(req, spec) req.TableIds = tableIds - log.Debugf("BeginTransaction user %s, label: %s, tableIds: %v", req.GetUser(), label, tableIds) + log.Tracef("BeginTransaction user %s, label: %s, tableIds: %v", req.GetUser(), label, tableIds) if result, err := client.BeginTxn(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "BeginTransaction error: %v, req: %+v", err, req) } else { @@ -556,7 +554,7 @@ func (rpc *singleFeClient) BeginTransaction(spec *base.Spec, label string, table } func (rpc *singleFeClient) BeginTransactionForTxnInsert(spec *base.Spec, label string, tableIds []int64, stidNum int64) (*festruct.TBeginTxnResult_, error) { - log.Debugf("Call BeginTransactionForTxnInsert, addr: %s, spec: %s, label: %s, tableIds: %v", rpc.Address(), spec, label, tableIds) + log.Tracef("Call BeginTransactionForTxnInsert, addr: %s, spec: %s, label: %s, tableIds: %v", rpc.Address(), spec, label, tableIds) client := rpc.client req := &festruct.TBeginTxnRequest{ @@ -566,7 +564,7 @@ func (rpc *singleFeClient) BeginTransactionForTxnInsert(spec *base.Spec, label s req.TableIds = tableIds req.SubTxnNum = stidNum - log.Debugf("BeginTransactionForTxnInsert user %s, label: %s, tableIds: %v", req.GetUser(), label, tableIds) + log.Tracef("BeginTransactionForTxnInsert user %s, label: %s, tableIds: %v", req.GetUser(), label, tableIds) if result, err := client.BeginTxn(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "BeginTransactionForTxnInsert error: %v, req: %+v", err, req) } else { @@ -589,7 +587,7 @@ func (rpc *singleFeClient) BeginTransactionForTxnInsert(spec *base.Spec, label s // 12: optional i64 db_id // } func (rpc *singleFeClient) CommitTransaction(spec *base.Spec, txnId int64, commitInfos []*festruct_types.TTabletCommitInfo) (*festruct.TCommitTxnResult_, error) { - log.Debugf("Call CommitTransaction, addr: %s spec: %s, txnId: %d, commitInfos: %v", rpc.Address(), spec, txnId, commitInfos) + log.Tracef("Call CommitTransaction, addr: %s spec: %s, txnId: %d, commitInfos: %v", rpc.Address(), spec, txnId, commitInfos) client := rpc.client req := &festruct.TCommitTxnRequest{} @@ -605,7 +603,7 @@ func (rpc *singleFeClient) CommitTransaction(spec *base.Spec, txnId int64, commi } func (rpc *singleFeClient) CommitTransactionForTxnInsert(spec *base.Spec, txnId int64, isTxnInsert bool, subTxnInfos []*festruct.TSubTxnInfo) (*festruct.TCommitTxnResult_, error) { - log.Debugf("Call CommitTransactionForTxnInsert, addr: %s spec: %s, txnId: %d, subTxnInfos: %v", rpc.Address(), spec, txnId, subTxnInfos) + log.Tracef("Call CommitTransactionForTxnInsert, addr: %s spec: %s, txnId: %d, subTxnInfos: %v", rpc.Address(), spec, txnId, subTxnInfos) client := rpc.client req := &festruct.TCommitTxnRequest{} @@ -635,7 +633,7 @@ func (rpc *singleFeClient) CommitTransactionForTxnInsert(spec *base.Spec, txnId // 12: optional i64 db_id // } func (rpc *singleFeClient) RollbackTransaction(spec *base.Spec, txnId int64) (*festruct.TRollbackTxnResult_, error) { - log.Debugf("Call RollbackTransaction, addr: %s, spec: %s, txnId: %d", rpc.Address(), spec, txnId) + log.Tracef("Call RollbackTransaction, addr: %s, spec: %s, txnId: %d", rpc.Address(), spec, txnId) client := rpc.client req := &festruct.TRollbackTxnRequest{} @@ -660,7 +658,7 @@ func (rpc *singleFeClient) RollbackTransaction(spec *base.Spec, txnId int64) (*f // 8: required i64 prev_commit_seq // } func (rpc *singleFeClient) GetBinlog(spec *base.Spec, commitSeq int64) (*festruct.TGetBinlogResult_, error) { - log.Debugf("Call GetBinlog, addr: %s, spec: %s, commit seq: %d", rpc.Address(), spec, commitSeq) + log.Tracef("Call GetBinlog, addr: %s, spec: %s, commit seq: %d", rpc.Address(), spec, commitSeq) client := rpc.client req := &festruct.TGetBinlogRequest{ @@ -675,7 +673,7 @@ func (rpc *singleFeClient) GetBinlog(spec *base.Spec, commitSeq int64) (*festruc } } - log.Debugf("GetBinlog user %s, db %s, tableId %d, prev seq: %d", req.GetUser(), req.GetDb(), + log.Tracef("GetBinlog user %s, db %s, tableId %d, prev seq: %d", req.GetUser(), req.GetDb(), req.GetTableId(), req.GetPrevCommitSeq()) if resp, err := client.GetBinlog(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "GetBinlog error: %v, req: %+v", err, req) @@ -685,7 +683,7 @@ func (rpc *singleFeClient) GetBinlog(spec *base.Spec, commitSeq int64) (*festruc } func (rpc *singleFeClient) GetBinlogLag(spec *base.Spec, commitSeq int64) (*festruct.TGetBinlogLagResult_, error) { - log.Debugf("Call GetBinlogLag, addr: %s, spec: %s, commit seq: %d", rpc.Address(), spec, commitSeq) + log.Tracef("Call GetBinlogLag, addr: %s, spec: %s, commit seq: %d", rpc.Address(), spec, commitSeq) client := rpc.client req := &festruct.TGetBinlogRequest{ @@ -701,7 +699,7 @@ func (rpc *singleFeClient) GetBinlogLag(spec *base.Spec, commitSeq int64) (*fest } } - log.Debugf("GetBinlog user %s, db %s, tableId %d, prev seq: %d", req.GetUser(), req.GetDb(), + log.Tracef("GetBinlog user %s, db %s, tableId %d, prev seq: %d", req.GetUser(), req.GetDb(), req.GetTableId(), req.GetPrevCommitSeq()) if resp, err := client.GetBinlogLag(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "GetBinlogLag error: %v, req: %+v", err, req) @@ -723,7 +721,7 @@ func (rpc *singleFeClient) GetBinlogLag(spec *base.Spec, commitSeq int64) (*fest // 10: optional bool enable_compress // } func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compress bool) (*festruct.TGetSnapshotResult_, error) { - log.Debugf("Call GetSnapshot, addr: %s, spec: %s, label: %s", rpc.Address(), spec, labelName) + log.Tracef("Call GetSnapshot, addr: %s, spec: %s, label: %s", rpc.Address(), spec, labelName) client := rpc.client snapshotType := festruct.TSnapshotType_LOCAL @@ -737,7 +735,7 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compre } setAuthInfo(req, spec) - log.Debugf("GetSnapshotRequest user %s, db %s, table %s, label name %s, snapshot name %s, snapshot type %d, enable compress %t", + log.Tracef("GetSnapshotRequest user %s, db %s, table %s, label name %s, snapshot name %s, snapshot type %d, enable compress %t", req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), req.GetSnapshotName(), req.GetSnapshotType(), req.GetEnableCompress()) if resp, err := client.GetSnapshot(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "GetSnapshot error: %v, req: %+v", err, req) @@ -768,7 +766,7 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compre // Restore Snapshot rpc func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) { // NOTE: ignore meta, because it's too large - log.Debugf("Call RestoreSnapshot, addr: %s, spec: %s", rpc.Address(), spec) + log.Tracef("Call RestoreSnapshot, addr: %s, spec: %s", rpc.Address(), spec) client := rpc.client repoName := "__keep_on_local__" @@ -819,7 +817,7 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreS } func (rpc *singleFeClient) GetMasterToken(spec *base.Spec) (*festruct.TGetMasterTokenResult_, error) { - log.Debugf("Call GetMasterToken, addr: %s, spec: %s", rpc.Address(), spec) + log.Tracef("Call GetMasterToken, addr: %s, spec: %s", rpc.Address(), spec) client := rpc.client req := &festruct.TGetMasterTokenRequest{ @@ -828,7 +826,7 @@ func (rpc *singleFeClient) GetMasterToken(spec *base.Spec) (*festruct.TGetMaster Password: &spec.Password, } - log.Debugf("GetMasterToken user: %s", *req.User) + log.Tracef("GetMasterToken user: %s", *req.User) if resp, err := client.GetMasterToken(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "GetMasterToken failed, req: %+v", req) } else { @@ -857,13 +855,13 @@ func (rpc *singleFeClient) getMeta(spec *base.Spec, reqTables []*festruct.TGetMe } func (rpc *singleFeClient) GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error) { - log.Debugf("GetMetaDb, addr: %s, spec: %s", rpc.Address(), spec) + log.Tracef("GetMetaDb, addr: %s, spec: %s", rpc.Address(), spec) return rpc.getMeta(spec, nil) } func (rpc *singleFeClient) GetTableMeta(spec *base.Spec, tableIds []int64) (*festruct.TGetMetaResult_, error) { - log.Debugf("GetMetaTable, addr: %s, tableIds: %v", rpc.Address(), tableIds) + log.Tracef("GetMetaTable, addr: %s, tableIds: %v", rpc.Address(), tableIds) reqTables := make([]*festruct.TGetMetaTable, 0, len(tableIds)) for _, tableId := range tableIds { @@ -877,7 +875,7 @@ func (rpc *singleFeClient) GetTableMeta(spec *base.Spec, tableIds []int64) (*fes } func (rpc *singleFeClient) GetBackends(spec *base.Spec) (*festruct.TGetBackendMetaResult_, error) { - log.Debugf("GetBackends, addr: %s, spec: %s", rpc.Address(), spec) + log.Tracef("GetBackends, addr: %s, spec: %s", rpc.Address(), spec) client := rpc.client req := &festruct.TGetBackendMetaRequest{ diff --git a/pkg/utils/log.go b/pkg/utils/log.go index 4172dd75..02e291ab 100644 --- a/pkg/utils/log.go +++ b/pkg/utils/log.go @@ -37,7 +37,7 @@ var ( ) func init() { - flag.StringVar(&logLevel, "log_level", "trace", "log level") + flag.StringVar(&logLevel, "log_level", "debug", "log level") flag.StringVar(&logFilename, "log_filename", "", "log filename") flag.BoolVar(&logAlsoToStderr, "log_also_to_stderr", false, "log also to stderr") flag.IntVar(&logRetainNum, "log_retain_num", 30, "log retain number")