Skip to content

Commit 3153ec2

Browse files
committed
implement backup wal files
1 parent fa6f02d commit 3153ec2

File tree

1 file changed

+129
-21
lines changed
  • cmd/util/cmd/find-trie-root

1 file changed

+129
-21
lines changed

cmd/util/cmd/find-trie-root/cmd.go

+129-21
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ var (
2121
flagRootHash string
2222
flagFrom int
2323
flagTo int
24-
flagOutputDir string
24+
flagBackupDir string
25+
flagTrimAsLatestWAL bool
2526
)
2627

2728
// find trie root hash from the wal files.
@@ -50,7 +51,9 @@ func init() {
5051
Cmd.Flags().IntVar(&flagFrom, "from", 0, "from segment")
5152
Cmd.Flags().IntVar(&flagTo, "to", math.MaxInt32, "to segment")
5253

53-
Cmd.Flags().StringVar(&flagOutputDir, "output-dir", "", "output directory")
54+
Cmd.Flags().StringVar(&flagBackupDir, "backup-dir", "", "directory for backup wal files. must be not exist or empty folder. required when --trim-as-latest-wal flag is set to true.")
55+
56+
Cmd.Flags().BoolVar(&flagTrimAsLatestWAL, "trim-as-latest-wal", false, "trim the wal file to the last record with the target trie root hash")
5457
}
5558

5659
func run(*cobra.Command, []string) {
@@ -59,26 +62,46 @@ func run(*cobra.Command, []string) {
5962
log.Fatal().Err(err).Msg("cannot parse input")
6063
}
6164

62-
if flagExecutionStateDir == flagOutputDir {
63-
log.Fatal().Msg("output directory cannot be the same as the execution state directory")
65+
if flagExecutionStateDir == flagBackupDir {
66+
log.Fatal().Msg("backup directory cannot be the same as the execution state directory")
6467
}
6568

6669
segment, offset, err := searchRootHashInSegments(rootHash, flagExecutionStateDir, flagFrom, flagTo)
6770
if err != nil {
6871
log.Fatal().Err(err).Msg("cannot find root hash in segments")
6972
}
70-
log.Info().Msgf("found root hash in segment %d at offset %d", segment, offset)
7173

72-
if len(flagOutputDir) == 0 {
74+
segmentFile := prometheusWAL.SegmentName(flagExecutionStateDir, segment)
75+
76+
log.Info().Msgf("found root hash in segment %d at offset %d, segment file: %v", segment, offset, segmentFile)
77+
78+
if !flagTrimAsLatestWAL {
79+
log.Info().Msg("not trimming WAL. Exiting. to trim the WAL, use --trim-as-latest-wal flag")
7380
return
7481
}
7582

76-
err = copyWAL(flagExecutionStateDir, flagOutputDir, segment, rootHash)
83+
if len(flagBackupDir) == 0 {
84+
log.Error().Msgf("backup directory is not provided")
85+
return
86+
}
87+
88+
// genereate a segment file to the temporary folder with the root hash as its last record
89+
newSegmentFile, err := findRootHashAndCreateTrimmed(flagExecutionStateDir, segment, rootHash)
7790
if err != nil {
7891
log.Fatal().Err(err).Msg("cannot copy WAL")
7992
}
8093

81-
log.Info().Msgf("copied WAL to %s", flagOutputDir)
94+
log.Info().Msgf("successfully copied WAL to the temporary folder %v", newSegmentFile)
95+
96+
// before replacing the last wal file with the newly generated one, backup the rollbacked wals
97+
// then move the last segment file to the execution state directory
98+
err = backupRollbackedWALsAndMoveLastSegmentFile(segment, flagExecutionStateDir, flagBackupDir, newSegmentFile)
99+
if err != nil {
100+
log.Fatal().Err(err).Msg("cannot backup rollbacked WALs")
101+
}
102+
103+
log.Info().Msgf("successfully trimmed WAL %v the trie root hash %v as its last record, original wal files are moved to %v",
104+
segment, rootHash, flagBackupDir)
82105
}
83106

84107
func parseInput(rootHashStr string) (ledger.RootHash, error) {
@@ -181,26 +204,33 @@ func searchRootHashInSegments(
181204
return 0, 0, fmt.Errorf("finish reading all segment files from %d to %d, but not found", from, to)
182205
}
183206

184-
func copyWAL(dir, outputDir string, segment int, expectedRoot ledger.RootHash) error {
185-
writer, err := prometheusWAL.NewSize(log.Logger, nil, outputDir, wal.SegmentSize, false)
207+
// findRootHashAndCreateTrimmed finds the root hash in the segment file from the given dir folder
208+
// and creates a new segment file with the expected root hash as the last record in a temporary folder.
209+
// it return the path to the new segment file.
210+
func findRootHashAndCreateTrimmed(dir string, segment int, expectedRoot ledger.RootHash) (string, error) {
211+
tmpFolder, err := os.MkdirTemp("", "flow-last-segment-file")
186212
if err != nil {
187-
return fmt.Errorf("cannot create writer WAL: %w", err)
213+
return "", fmt.Errorf("cannot create temporary folder: %w", err)
188214
}
189215

190-
defer writer.Close()
216+
newSegmentFile := prometheusWAL.SegmentName(tmpFolder, segment)
217+
218+
log.Info().Msgf("writing new segment file to %v", newSegmentFile)
191219

192-
w, err := prometheusWAL.NewSize(log.Logger, nil, dir, wal.SegmentSize, false)
220+
writer, err := prometheusWAL.NewSize(log.Logger, nil, tmpFolder, wal.SegmentSize, false)
193221
if err != nil {
194-
return fmt.Errorf("cannot create WAL: %w", err)
222+
return "", fmt.Errorf("cannot create writer WAL: %w", err)
195223
}
196224

225+
defer writer.Close()
226+
197227
sr, err := prometheusWAL.NewSegmentsRangeReader(log.Logger, prometheusWAL.SegmentRange{
198-
Dir: w.Dir(),
228+
Dir: dir,
199229
First: segment,
200230
Last: segment,
201231
})
202232
if err != nil {
203-
return fmt.Errorf("cannot create WAL segments reader: %w", err)
233+
return "", fmt.Errorf("cannot create WAL segments reader: %w", err)
204234
}
205235

206236
defer sr.Close()
@@ -211,7 +241,7 @@ func copyWAL(dir, outputDir string, segment int, expectedRoot ledger.RootHash) e
211241
record := reader.Record()
212242
operation, _, update, err := wal.Decode(record)
213243
if err != nil {
214-
return fmt.Errorf("cannot decode LedgerWAL record: %w", err)
244+
return "", fmt.Errorf("cannot decode LedgerWAL record: %w", err)
215245
}
216246

217247
switch operation {
@@ -220,23 +250,101 @@ func copyWAL(dir, outputDir string, segment int, expectedRoot ledger.RootHash) e
220250
bytes := wal.EncodeUpdate(update)
221251
_, err = writer.Log(bytes)
222252
if err != nil {
223-
return fmt.Errorf("cannot write LedgerWAL record: %w", err)
253+
return "", fmt.Errorf("cannot write LedgerWAL record: %w", err)
224254
}
225255

226256
rootHash := update.RootHash
227257

228258
if rootHash.Equals(expectedRoot) {
229259
log.Info().Msgf("found expected trie root hash %v, finish writing", rootHash)
230-
return nil
260+
return newSegmentFile, nil
231261
}
232262
default:
233263
}
234264

235265
err = reader.Err()
236266
if err != nil {
237-
return fmt.Errorf("cannot read LedgerWAL: %w", err)
267+
return "", fmt.Errorf("cannot read LedgerWAL: %w", err)
238268
}
239269
}
240270

241-
return fmt.Errorf("finish reading all segment files from %d to %d, but not found", segment, segment)
271+
return "", fmt.Errorf("finish reading all segment files from %d to %d, but not found", segment, segment)
272+
}
273+
274+
func checkFolderNotExistOrEmpty(folderPath string) (bool, error) {
275+
// Check if the folder exists
276+
info, err := os.Stat(folderPath)
277+
if err != nil {
278+
if os.IsNotExist(err) {
279+
return true, nil
280+
}
281+
return false, nil
282+
}
283+
284+
// Check if the path is a directory
285+
if !info.IsDir() {
286+
return false, fmt.Errorf("The path is not a directory.")
287+
}
288+
289+
// Check if the folder is empty
290+
files, err := os.ReadDir(folderPath)
291+
if err != nil {
292+
return false, fmt.Errorf("Cannot read the folder.")
293+
}
294+
295+
return len(files) == 0, nil
296+
}
297+
298+
// backup new wals before replacing
299+
func backupRollbackedWALsAndMoveLastSegmentFile(
300+
segment int, walDir, backupDir string, newSegmentFile string) error {
301+
// making sure the backup dir is empty
302+
empty, err := checkFolderNotExistOrEmpty(backupDir)
303+
if err != nil {
304+
return fmt.Errorf("cannot check backup directory: %w", err)
305+
}
306+
307+
if !empty {
308+
return fmt.Errorf("backup directory %s is not empty", backupDir)
309+
}
310+
311+
// Create the backup directory
312+
err = os.MkdirAll(backupDir, os.ModePerm)
313+
if err != nil {
314+
return fmt.Errorf("cannot create backup directory: %w", err)
315+
}
316+
317+
first, last, err := prometheusWAL.Segments(walDir)
318+
if err != nil {
319+
return fmt.Errorf("cannot get segments: %w", err)
320+
}
321+
322+
if segment < first {
323+
return fmt.Errorf("segment %d is less than the first segment %d", segment, first)
324+
}
325+
326+
// backup all the segment files that have higher number than the given segment, including
327+
// the segment file itself, since it will be replaced.
328+
for i := segment; i <= last; i++ {
329+
segmentFile := prometheusWAL.SegmentName(walDir, i)
330+
backupFile := prometheusWAL.SegmentName(backupDir, i)
331+
332+
log.Info().Msgf("backup segment file %s to %s, %v/%v", segmentFile, backupFile, i, last)
333+
err := os.Rename(segmentFile, backupFile)
334+
if err != nil {
335+
return fmt.Errorf("cannot move segment file %s to %s: %w", segmentFile, backupFile, err)
336+
}
337+
}
338+
339+
// after backup the segment files, replace the last segment file
340+
segmentToBeReplaced := prometheusWAL.SegmentName(walDir, segment)
341+
342+
log.Info().Msgf("moving segment file %s to %s", newSegmentFile, segmentToBeReplaced)
343+
344+
err = os.Rename(newSegmentFile, segmentToBeReplaced)
345+
if err != nil {
346+
return fmt.Errorf("cannot move segment file %s to %s: %w", newSegmentFile, segmentToBeReplaced, err)
347+
}
348+
349+
return nil
242350
}

0 commit comments

Comments
 (0)