Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dump query from audit log table #10

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 76 additions & 16 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
var DumpConfig = Dump{}

type Dump struct {
AuditLogPaths []string
AuditLogPaths []string
AuditLogTable string

AuditLogUnescape bool
OutputDDLDir string
OutputQueryDir string
Expand All @@ -60,6 +62,7 @@ type Dump struct {
QueryStates []string
OnlySelect bool
Strict bool
From, To string

Clean bool
}
Expand Down Expand Up @@ -147,7 +150,10 @@ func init() {
pFlags.StringSliceVar(&DumpConfig.QueryStates, "query-states", []string{}, "Dump queries with states, like 'ok', 'eof' and 'err'")
pFlags.BoolVar(&DumpConfig.OnlySelect, "only-select", true, "Only dump SELECT queries")
pFlags.BoolVarP(&DumpConfig.Strict, "strict", "s", false, "Filter out sqls that can't be parsed")
pFlags.StringSliceVar(&DumpConfig.AuditLogPaths, "audit-logs", nil, "Audit log paths, either local path or ssh://xxx")
pFlags.StringVar(&DumpConfig.From, "from", "", "Dump queries from this time, like '2006-01-02 15:04:05'")
pFlags.StringVar(&DumpConfig.To, "to", "", "Dump queries to this time, like '2006-01-02 16:04:05'")
pFlags.StringSliceVar(&DumpConfig.AuditLogPaths, "audit-logs", nil, "Scan query from audit log files, either local path or 'ssh://xxx'")
pFlags.StringVar(&DumpConfig.AuditLogTable, "audit-log-table", "", "Scan query from audit log table, like 'audit_db.audit_tbl'")
pFlags.BoolVar(&DumpConfig.AuditLogUnescape, "audit-log-unescape", true, "Unescape '\\n', '\\t' and '\\r' in audit log")
pFlags.StringVar(&DumpConfig.AuditLogEncoding, "audit-log-encoding", "auto", "Audit log encoding, like utf8, gbk, ...")
pFlags.StringVar(&DumpConfig.SSHAddress, "ssh-address", "", "SSH address for downloading audit log, default is 'root@{db_host}:22'")
Expand All @@ -168,10 +174,25 @@ func completeDumpConfig() error {
DumpConfig.OutputQueryDir = filepath.Join(GlobalConfig.OutputDir, "sql")
DumpConfig.LocalAuditLogCacheDir = filepath.Join(GlobalConfig.DataDir, "auditlog")

if DumpConfig.AuditLogTable != "" && !strings.Contains(DumpConfig.AuditLogTable, ".") {
return errors.New("Need to specific database in '--audit-log-table', like 'audit_db.audit_tbl'")
}

if DumpConfig.QueryMinDuration_ > 0 {
DumpConfig.QueryMinDurationMs = int(DumpConfig.QueryMinDuration_.Milliseconds())
}

if DumpConfig.From != "" {
if _, err := time.Parse(time.DateTime, DumpConfig.From); err != nil {
return err
}
}
if DumpConfig.To != "" {
if _, err := time.Parse(time.DateTime, DumpConfig.To); err != nil {
return err
}
}

GlobalConfig.DBs, GlobalConfig.Tables = lo.Uniq(GlobalConfig.DBs), lo.Uniq(GlobalConfig.Tables)
dbs, tables := GlobalConfig.DBs, GlobalConfig.Tables
if DumpConfig.DumpSchema && len(dbs) == 0 {
Expand Down Expand Up @@ -336,11 +357,57 @@ func outputSchemas(schemas []*src.DBSchema) error {
}

func dumpQueries(ctx context.Context) ([][]string, error) {
opts := src.AuditLogScanOpts{
DBs: GlobalConfig.DBs,
QueryMinDurationMs: DumpConfig.QueryMinDurationMs,
QueryStates: DumpConfig.QueryStates,
Unique: DumpConfig.QueryOutputMode == "unique",
UniqueNormalize: DumpConfig.QueryUniqueNormalize,
Unescape: DumpConfig.AuditLogUnescape,
OnlySelect: DumpConfig.OnlySelect,
Strict: DumpConfig.Strict,
From: DumpConfig.From,
To: DumpConfig.To,
}

if DumpConfig.AuditLogTable != "" {
return dumpQueriesFromTable(ctx, opts)
}
return dumpQueriesFromFile(ctx, opts)
}

func dumpQueriesFromTable(ctx context.Context, opts src.AuditLogScanOpts) ([][]string, error) {
if opts.From == "" || opts.To == "" {
return nil, errors.New("Must specific both '--from' and '--to' when dumping from audit log table")
}
if opts.Unique {
return nil, errors.New("Not yet support '--query-output-mode=unique' with '--audit-log-table'")
}

dbTable := strings.SplitN(DumpConfig.AuditLogTable, ".", 2)
dbname, table := dbTable[0], dbTable[1]

db, err := connectDB(dbname)
if err != nil {
return nil, err
}

logrus.Infof("Dumping queries from audit log table '%s'...\n", DumpConfig.AuditLogTable)

sqls, err := src.GetDBAuditLogs(ctx, db, dbname, table, opts, GlobalConfig.Parallel)
if err != nil {
logrus.Errorf("Extract queries from audit logs table failed, %v\n", err)
return nil, err
}
return [][]string{sqls}, nil
}

func dumpQueriesFromFile(ctx context.Context, opts src.AuditLogScanOpts) ([][]string, error) {
auditLogs := DumpConfig.AuditLogPaths
if len(auditLogs) == 0 {
sshUrl, err := chooseRemoteAuditLog(ctx)
if err != nil {
return nil, fmt.Errorf("Please specific audit log path by --audit-logs, error: %v", err)
return nil, fmt.Errorf("Please specific audit log files by '--audit-logs' or table by '--audit-log-table', error: %v", err)
}
auditLogs = []string{sshUrl}
}
Expand Down Expand Up @@ -372,23 +439,16 @@ func dumpQueries(ctx context.Context) ([][]string, error) {
auditLogFiles = append(auditLogFiles, localPaths...)
}

logrus.Infoln("Dumping queries from audit logs...")
logrus.Infoln("Dumping queries from audit log files...")

queries, err := src.ExtractQueriesFromAuditLogs(
GlobalConfig.DBs,
auditLogFiles,
DumpConfig.AuditLogEncoding,
DumpConfig.QueryMinDurationMs,
DumpConfig.QueryStates,
opts,
GlobalConfig.Parallel,
DumpConfig.QueryOutputMode == "unique",
DumpConfig.QueryUniqueNormalize,
DumpConfig.AuditLogUnescape,
DumpConfig.OnlySelect,
DumpConfig.Strict,
)
if err != nil {
logrus.Errorf("Extract queries from audit logs failed, %v\n", err)
logrus.Errorf("Extract queries from audit logs file failed, %v\n", err)
return nil, err
}

Expand Down Expand Up @@ -466,7 +526,7 @@ func outputQueryFileNameFormat(total int) string {
}

func chooseRemoteAuditLog(ctx context.Context) (string, error) {
conn, err := connectDB(GlobalConfig.DBs[0])
conn, err := connectDB("information_schema")
if err != nil {
return "", err
}
Expand Down Expand Up @@ -494,7 +554,7 @@ func chooseRemoteAuditLog(ctx context.Context) (string, error) {
return "", errors.New("No audit log found on remote server")
}

choosed, err := src.Choose("Choose audit log on remote server to dump:", auditLogs)
choosed, err := src.Choose("Choose audit log on remote server to dump", auditLogs)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -551,7 +611,7 @@ func expandSSHPath(remotePath string) (string, error) {

func connectDB(db string) (*sqlx.DB, error) {
if db == "" {
return nil, fmt.Errorf("database name is required, please use --db flag")
return nil, fmt.Errorf("database name is required")
}
return src.NewDB(GlobalConfig.DBHost, GlobalConfig.DBPort, GlobalConfig.DBUser, GlobalConfig.DBPassword, db)
}
4 changes: 2 additions & 2 deletions cmd/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func completeReplayConfig() (err error) {

var t time.Time
if ReplayConfig.From_ != "" {
t, err = time.Parse("2006-01-02 15:04:05", ReplayConfig.From_)
t, err = time.Parse(time.DateTime, ReplayConfig.From_)
if err != nil {
return err
}
ReplayConfig.From = t.UnixMilli()
}
if ReplayConfig.To_ != "" {
t, err = time.Parse("2006-01-02 15:04:05", ReplayConfig.To_)
t, err = time.Parse(time.DateTime, ReplayConfig.To_)
if err != nil {
return err
}
Expand Down
Loading
Loading