diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index e9e32a8301d1..902192eae79c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -36,9 +36,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.doAnswer; @@ -84,8 +83,6 @@ import javax.servlet.WriteListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -93,7 +90,6 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.utils.Archiver; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; @@ -239,7 +235,8 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock) .writeDbDataToStream(any(), any(), any(), any(), any()); doCallRealMethod().when(omDbCheckpointServletMock) - .writeDBToArchive(any(), any(), any(), any(), any(), any(), anyBoolean()); + .collectFilesFromDir(any(), any(), any(), anyBoolean(), any()); + doCallRealMethod().when(omDbCheckpointServletMock).collectDbDataToTransfer(any(), any(), any()); when(omDbCheckpointServletMock.getBootstrapStateLock()) .thenReturn(lock); @@ -247,11 +244,11 @@ public void write(int b) throws IOException { assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getBootstrapTempData()); doCallRealMethod().when(omDbCheckpointServletMock). processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); - doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any()); + doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any(), any()); doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir(); doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir(); doCallRealMethod().when(omDbCheckpointServletMock) - .transferSnapshotData(anySet(), any(), anyCollection(), anyCollection(), any(), any(), anyMap()); + .collectSnapshotData(anySet(), anyCollection(), anyCollection(), any(), any()); doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(anyBoolean()); doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any(), any(), any()); } @@ -298,14 +295,12 @@ public void testWriteDBToArchiveClosesFilesListStream() throws Exception { // Do not use CALLS_REAL_METHODS for java.nio.file.Files: internal/private static // methods (eg Files.provider()) get intercepted too and Mockito will try to invoke // them reflectively, which fails on JDK9+ without --add-opens. - try (MockedStatic files = mockStatic(Files.class); - TarArchiveOutputStream tar = new TarArchiveOutputStream(new java.io.ByteArrayOutputStream())) { + try (MockedStatic files = mockStatic(Files.class)) { files.when(() -> Files.exists(dbDir)).thenReturn(true); files.when(() -> Files.list(dbDir)).thenReturn(stream); - boolean result = servlet.writeDBToArchive( - new HashSet<>(), dbDir, new AtomicLong(Long.MAX_VALUE), - tar, folder, null, true); + boolean result = servlet.collectFilesFromDir(new HashSet<>(), dbDir, + new AtomicLong(Long.MAX_VALUE), true, new OMDBArchiver()); assertTrue(result); } @@ -446,49 +441,38 @@ public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { // Create dummy files: one SST, one non-SST Path sstFile = dbDir.resolve("test.sst"); Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); // Write some content to make it non-empty - Path nonSstFile = dbDir.resolve("test.log"); Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8)); Set sstFilesToExclude = new HashSet<>(); AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size - Map hardLinkFileMap = new java.util.HashMap<>(); + OMDBArchiver omdbArchiver = new OMDBArchiver(); Path tmpDir = folder.resolve("tmp"); Files.createDirectories(tmpDir); - TarArchiveOutputStream mockArchiveOutputStream = mock(TarArchiveOutputStream.class); + omdbArchiver.setTmpDir(tmpDir); + OMDBArchiver omDbArchiverSpy = spy(omdbArchiver); List fileNames = new ArrayList<>(); - try (MockedStatic archiverMock = mockStatic(Archiver.class)) { - archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(), any())).thenAnswer(invocation -> { - // Get the actual mockArchiveOutputStream passed from writeDBToArchive - TarArchiveOutputStream aos = invocation.getArgument(2); - File sourceFile = invocation.getArgument(0); - String fileId = invocation.getArgument(1); - fileNames.add(sourceFile.getName()); - aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId)); - aos.write(new byte[100], 0, 100); // Simulate writing - aos.closeArchiveEntry(); - return 100L; - }); - boolean success = omDbCheckpointServletMock.writeDBToArchive( - sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream, - tmpDir, hardLinkFileMap, expectOnlySstFiles); - assertTrue(success); - verify(mockArchiveOutputStream, times(fileNames.size())).putArchiveEntry(any()); - verify(mockArchiveOutputStream, times(fileNames.size())).closeArchiveEntry(); - verify(mockArchiveOutputStream, times(fileNames.size())).write(any(byte[].class), anyInt(), - anyInt()); // verify write was called once - - boolean containsNonSstFile = false; - for (String fileName : fileNames) { - if (expectOnlySstFiles) { - assertTrue(fileName.endsWith(".sst"), "File is not an SST File"); - } else { - containsNonSstFile = true; - } + doAnswer((invocation) -> { + File sourceFile = invocation.getArgument(0); + fileNames.add(sourceFile.getName()); + omdbArchiver.recordFileEntry(sourceFile, invocation.getArgument(1)); + return null; + }).when(omDbArchiverSpy).recordFileEntry(any(), anyString()); + boolean success = + omDbCheckpointServletMock.collectFilesFromDir(sstFilesToExclude, dbDir, maxTotalSstSize, expectOnlySstFiles, + omDbArchiverSpy); + assertTrue(success); + verify(omDbArchiverSpy, times(fileNames.size())).recordFileEntry(any(), anyString()); + boolean containsNonSstFile = false; + for (String fileName : fileNames) { + if (expectOnlySstFiles) { + assertTrue(fileName.endsWith(".sst"), "File is not an SST File"); + } else { + containsNonSstFile = true; } + } - if (!expectOnlySstFiles) { - assertTrue(containsNonSstFile, "SST File is not expected"); - } + if (!expectOnlySstFiles) { + assertTrue(containsNonSstFile, "SST File is not expected"); } } @@ -905,6 +889,7 @@ private void setupClusterAndMocks(String volumeName, String bucketName, doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(), eq(false), any(), any(), eq(false)); doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any(), any(), any()); + doCallRealMethod().when(omDbCheckpointServletMock).collectDbDataToTransfer(any(), any(), any()); omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBArchiver.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBArchiver.java new file mode 100644 index 000000000000..5e08a69a2602 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBArchiver.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag; +import static org.apache.hadoop.ozone.om.OMDBCheckpointServletInodeBasedXfer.writeHardlinkFile; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for handling operations relevant to archiving the OM DB tarball. + * Mainly maintains a map for recording the files collected from reading + * the checkpoint and snapshot DB's. It temporarily creates hardlinks and stores + * the link data in the map to release the bootstrap lock quickly + * and do the actual write at the end outside the lock. + */ +public class OMDBArchiver { + + private Path tmpDir; + private Map filesToWriteIntoTarball; + private Map hardLinkFileMap; + private static final Logger LOG = LoggerFactory.getLogger(OMDBArchiver.class); + private boolean completed; + + public OMDBArchiver() { + this.tmpDir = null; + this.filesToWriteIntoTarball = new HashMap<>(); + this.hardLinkFileMap = null; + this.completed = false; + } + + public void setTmpDir(Path tmpDir) { + this.tmpDir = tmpDir; + } + + public Path getTmpDir() { + return tmpDir; + } + + public Map getHardLinkFileMap() { + return hardLinkFileMap; + } + + public Map getFilesToWriteIntoTarball() { + return filesToWriteIntoTarball; + } + + public void setHardLinkFileMap(Map hardLinkFileMap) { + this.hardLinkFileMap = hardLinkFileMap; + } + + public boolean isCompleted() { + return completed; + } + + public void setCompleted(boolean completed) { + this.completed = completed; + } + + /** + * @param file the file to create a hardlink and record into the map + * @param entryName name of the entry corresponding to file + * @return the file size + * @throws IOException in case of hardlink failure + * + * Records the given file entry into the map after taking a hardlink. + */ + public long recordFileEntry(File file, String entryName) throws IOException { + File link = tmpDir.resolve(entryName).toFile(); + long bytes = 0; + try { + Files.createLink(link.toPath(), file.toPath()); + filesToWriteIntoTarball.put(entryName, link); + bytes = file.length(); + } catch (IOException ioe) { + LOG.error("Couldn't create hardlink for file {} while including it in tarball.", + file.getAbsolutePath(), ioe); + throw ioe; + } + return bytes; + } + + /** + * @param conf the configuration object to obtain metadata paths + * @param outputStream the tarball archive output stream + * @throws IOException in case of write failure to the archive + * + * Writes all the files captured by the map into the archive and + * also includes the hardlinkFile and the completion marker file. + */ + public void writeToArchive(OzoneConfiguration conf, OutputStream outputStream) + throws IOException { + long bytesWritten = 0; + long lastLoggedTime = Time.now(); + long filesWritten = 0; + try (ArchiveOutputStream archiveOutput = tar(outputStream)) { + for (Map.Entry kv : filesToWriteIntoTarball.entrySet()) { + String entryName = kv.getKey(); + File link = kv.getValue(); + try { + bytesWritten += includeFile(link, entryName, archiveOutput); + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } catch (IOException ioe) { + LOG.error("Couldn't create hardlink for file {} while including it in tarball.", + link.getAbsolutePath(), ioe); + throw ioe; + } finally { + Files.deleteIfExists(link.toPath()); + } + } + if (isCompleted()) { + writeHardlinkFile(conf, hardLinkFileMap, archiveOutput); + includeRatisSnapshotCompleteFlag(archiveOutput); + } + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index c41f28d0f251..98f9d19caa8b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -18,9 +18,6 @@ package org.apache.hadoop.ozone.om; import static org.apache.hadoop.hdds.utils.Archiver.includeFile; -import static org.apache.hadoop.hdds.utils.Archiver.linkAndIncludeFile; -import static org.apache.hadoop.hdds.utils.Archiver.tar; -import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag; import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; @@ -77,7 +74,6 @@ import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.ozone.om.snapshot.SnapshotCache; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ratis.util.UncheckedAutoCloseable; @@ -151,27 +147,39 @@ public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServl OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST); Set receivedSstFiles = extractFilesToExclude(sstParam); Path tmpdir = null; + OMDBArchiver omdbArchiver = new OMDBArchiver(); try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireWriteLock()) { tmpdir = Files.createTempDirectory(getBootstrapTempData().toPath(), "bootstrap-data-"); if (tmpdir == null) { throw new IOException("tmp dir is null"); } + omdbArchiver.setTmpDir(tmpdir); String tarName = "om.data-" + System.currentTimeMillis() + ".tar"; response.setContentType("application/x-tar"); response.setHeader("Content-Disposition", "attachment; filename=\"" + tarName + "\""); Instant start = Instant.now(); - writeDbDataToStream(request, response.getOutputStream(), receivedSstFiles, tmpdir); + collectDbDataToTransfer(request, receivedSstFiles, omdbArchiver); Instant end = Instant.now(); long duration = Duration.between(start, end).toMillis(); - LOG.info("Time taken to write the checkpoint to response output " + - "stream: {} milliseconds", duration); + LOG.info("Time taken to collect the DB data : {} milliseconds", duration); logSstFileList(receivedSstFiles, "Excluded {} SST files from the latest checkpoint{}: {}", 5); } catch (Exception e) { LOG.error( "Unable to process metadata snapshot request. ", e); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + try { + Instant start = Instant.now(); + OutputStream outputStream = response.getOutputStream(); + omdbArchiver.writeToArchive(getConf(), outputStream); + Instant end = Instant.now(); + long duration = Duration.between(start, end).toMillis(); + LOG.info("Time taken to write the checkpoint to response output " + + "stream: {} milliseconds", duration); + } catch (IOException e) { + LOG.error("unable to write to archive stream", e); } finally { try { if (tmpdir != null) { @@ -205,13 +213,11 @@ Path getCompactionLogDir() { * and includes a completion flag for Ratis snapshot streaming. * * @param request The HTTP servlet request containing parameters for the snapshot. - * @param destination The output stream to which the tar archive is written. * @param sstFilesToExclude Set of SST file identifiers to exclude from the archive. - * @param tmpdir Temporary directory for staging files during archiving. * @throws IOException if an I/O error occurs during processing or streaming. */ - public void writeDbDataToStream(HttpServletRequest request, OutputStream destination, - Set sstFilesToExclude, Path tmpdir) throws IOException { + public void collectDbDataToTransfer(HttpServletRequest request, + Set sstFilesToExclude, OMDBArchiver omdbArchiver) throws IOException { DBCheckpoint checkpoint = null; OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); OMMetadataManager omMetadataManager = om.getMetadataManager(); @@ -233,26 +239,26 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina } boolean shouldContinue = true; - try (ArchiveOutputStream archiveOutputStream = tar(destination)) { + try { if (includeSnapshotData) { // Process each snapshot db path and write it to archive for (Path snapshotDbPath : snapshotPaths) { if (!shouldContinue) { break; } - shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath, - maxTotalSstSize, archiveOutputStream, tmpdir, null, true); + shouldContinue = collectFilesFromDir(sstFilesToExclude, snapshotDbPath, + maxTotalSstSize, true, omdbArchiver); } if (shouldContinue) { - shouldContinue = writeDBToArchive(sstFilesToExclude, getSstBackupDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, null, true); + shouldContinue = collectFilesFromDir(sstFilesToExclude, getSstBackupDir(), + maxTotalSstSize, true, omdbArchiver); } if (shouldContinue) { - shouldContinue = writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, null, true); + shouldContinue = collectFilesFromDir(sstFilesToExclude, getCompactionLogDir(), + maxTotalSstSize, true, omdbArchiver); } } @@ -260,6 +266,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina // we finished transferring files from snapshot DB's by now and // this is the last step where we transfer the active om.db contents Map hardLinkFileMap = new HashMap<>(); + omdbArchiver.setHardLinkFileMap(hardLinkFileMap); SnapshotCache snapshotCache = om.getOmSnapshotManager().getSnapshotCache(); OmSnapshotLocalDataManager localDataManager = om.getOmSnapshotManager().getSnapshotLocalDataManager(); /* @@ -282,8 +289,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina // unlimited files as we want the Active DB contents to be transferred in a single batch maxTotalSstSize.set(Long.MAX_VALUE); Path checkpointDir = checkpoint.getCheckpointLocation(); - writeDBToArchive(sstFilesToExclude, checkpointDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, false); + collectFilesFromDir(sstFilesToExclude, checkpointDir, maxTotalSstSize, false, omdbArchiver); if (includeSnapshotData) { List sstBackupFiles = extractSSTFilesFromCompactionLog(checkpoint); // get the list of snapshots from the checkpoint @@ -293,25 +299,22 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina snapshotInCheckpoint = getSnapshotDirsFromDB(omMetadataManager, checkpointMetadataManager, snapshotLocalDataManager); } - writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, false); + collectFilesFromDir(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, false, omdbArchiver); try (Stream backupFiles = sstBackupFiles.stream()) { - writeDBToArchive(sstFilesToExclude, backupFiles, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, false); + collectFilesFromDir(sstFilesToExclude, backupFiles, maxTotalSstSize, false, omdbArchiver); } Collection snapshotLocalPropertyFiles = getSnapshotLocalDataPaths(localDataManager, snapshotInCheckpoint.keySet()); // This is done to ensure all data to be copied correctly is flushed in the snapshot DB - transferSnapshotData(sstFilesToExclude, tmpdir, snapshotInCheckpoint.values(), snapshotLocalPropertyFiles, - maxTotalSstSize, archiveOutputStream, hardLinkFileMap); + collectSnapshotData(sstFilesToExclude, snapshotInCheckpoint.values(), snapshotLocalPropertyFiles, + maxTotalSstSize, omdbArchiver); } } - writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream); - includeRatisSnapshotCompleteFlag(archiveOutputStream); + omdbArchiver.setCompleted(true); } } catch (IOException ioe) { - LOG.error("got exception writing to archive " + ioe); + LOG.error("got exception while collecting files to archive " + ioe); throw ioe; } finally { cleanupCheckpoint(checkpoint); @@ -353,25 +356,24 @@ private Collection getSnapshotLocalDataPaths(OmSnapshotLocalDataManager lo * handling deduplication and managing resource locking. * * @param sstFilesToExclude Set of SST file identifiers to exclude from the archive. - * @param tmpdir Temporary directory for intermediate processing. * @param snapshotPaths Set of paths to snapshot directories to be processed. * @param maxTotalSstSize AtomicLong to track the cumulative size of SST files included. - * @param archiveOutputStream Archive output stream to write the snapshot data. - * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication. * @throws IOException if an I/O error occurs during processing. */ - void transferSnapshotData(Set sstFilesToExclude, Path tmpdir, Collection snapshotPaths, + void collectSnapshotData(Set sstFilesToExclude, Collection snapshotPaths, Collection snapshotLocalPropertyFiles, AtomicLong maxTotalSstSize, - ArchiveOutputStream archiveOutputStream, Map hardLinkFileMap) + OMDBArchiver omdbArchiver) throws IOException { for (Path snapshotDir : snapshotPaths) { - writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, - false); + collectFilesFromDir(sstFilesToExclude, snapshotDir, maxTotalSstSize, + false, omdbArchiver); } for (Path snapshotLocalPropertyYaml : snapshotLocalPropertyFiles) { File yamlFile = snapshotLocalPropertyYaml.toFile(); - hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName()); - linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream, tmpdir); + if (omdbArchiver.getHardLinkFileMap() != null) { + omdbArchiver.getHardLinkFileMap().put(yamlFile.getAbsolutePath(), yamlFile.getName()); + } + omdbArchiver.recordFileEntry(yamlFile, yamlFile.getName()); } } @@ -399,7 +401,7 @@ private static void cleanupCheckpoint(DBCheckpoint checkpoint) { * @throws IOException If an I/O error occurs while creating or writing the * hardlink file. */ - private static void writeHardlinkFile(OzoneConfiguration conf, Map hardlinkFileMap, + static void writeHardlinkFile(OzoneConfiguration conf, Map hardlinkFileMap, ArchiveOutputStream archiveOutputStream) throws IOException { Path data = Files.createTempFile(DATA_PREFIX, DATA_SUFFIX); Path metaDirPath = OMStorage.getOmDbDir(conf).toPath(); @@ -453,30 +455,25 @@ Map getSnapshotDirsFromDB(OMMetadataManager activeOMMetadataManager, } @VisibleForTesting - boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, - ArchiveOutputStream archiveOutputStream, Path tmpDir, - Map hardLinkFileMap, boolean onlySstFile) throws IOException { + boolean collectFilesFromDir(Set sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, + boolean onlySstFile, OMDBArchiver omdbArchiver) throws IOException { if (!Files.exists(dbDir)) { LOG.warn("DB directory {} does not exist. Skipping.", dbDir); return true; } try (Stream files = Files.list(dbDir)) { - return writeDBToArchive(sstFilesToExclude, files, - maxTotalSstSize, archiveOutputStream, tmpDir, hardLinkFileMap, onlySstFile); + return collectFilesFromDir(sstFilesToExclude, files, maxTotalSstSize, onlySstFile, omdbArchiver); } } /** - * Writes database files to the archive, handling deduplication based on inode IDs. + * Collects database files to the archive, handling deduplication based on inode IDs. * Here the dbDir could either be a snapshot db directory, the active om.db, * compaction log dir, sst backup dir. * * @param sstFilesToExclude Set of SST file IDs to exclude from the archive * @param files Stream of files to archive * @param maxTotalSstSize Maximum total size of SST files to include - * @param archiveOutputStream Archive output stream - * @param tmpDir Temporary directory for processing - * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication * the archived files are not moved to this directory. * @param onlySstFile If true, only SST files are processed. If false, all files are processed. *

@@ -486,13 +483,10 @@ boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, AtomicLong m * @return true if processing should continue, false if size limit reached * @throws IOException if an I/O error occurs */ - @SuppressWarnings("checkstyle:ParameterNumber") - private boolean writeDBToArchive(Set sstFilesToExclude, Stream files, AtomicLong maxTotalSstSize, - ArchiveOutputStream archiveOutputStream, Path tmpDir, - Map hardLinkFileMap, boolean onlySstFile) throws IOException { - long bytesWritten = 0L; + private boolean collectFilesFromDir(Set sstFilesToExclude, Stream files, AtomicLong maxTotalSstSize, + boolean onlySstFile, OMDBArchiver omdbArchiver) throws IOException { + long bytesRecorded = 0L; int filesWritten = 0; - long lastLoggedTime = Time.monotonicNow(); Iterable iterable = files::iterator; for (Path dbFile : iterable) { if (!Files.isDirectory(dbFile)) { @@ -500,31 +494,28 @@ private boolean writeDBToArchive(Set sstFilesToExclude, Stream fil continue; } String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile); - if (hardLinkFileMap != null) { + if (omdbArchiver.getHardLinkFileMap() != null) { String path = dbFile.toFile().getAbsolutePath(); // if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB. if (path.contains(OM_CHECKPOINT_DIR)) { path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString(); } - hardLinkFileMap.put(path, fileId); + omdbArchiver.getHardLinkFileMap().put(path, fileId); } if (!sstFilesToExclude.contains(fileId)) { long fileSize = Files.size(dbFile); if (maxTotalSstSize.get() - fileSize <= 0) { return false; } - bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir); + bytesRecorded += omdbArchiver.recordFileEntry(dbFile.toFile(), fileId); filesWritten++; maxTotalSstSize.addAndGet(-fileSize); sstFilesToExclude.add(fileId); - if (Time.monotonicNow() - lastLoggedTime >= 30000) { - LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", - bytesWritten / (1024), filesWritten); - lastLoggedTime = Time.monotonicNow(); - } } } } + LOG.info("Collected {} KB, #files {} to write to checkpoint tarball stream...", + bytesRecorded / (1024), filesWritten); return true; } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMDBArchiver.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMDBArchiver.java new file mode 100644 index 000000000000..5d2093e1f687 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMDBArchiver.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.hdds.utils.HddsServerUtil.OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Test class for OMDBArchiver. + */ +public class TestOMDBArchiver { + + @TempDir + private Path folder; + + @Test + public void testRecordFileEntry() throws IOException { + OMDBArchiver omdbArchiver = new OMDBArchiver(); + Path tmpDir = Files.createTempDirectory(folder, "TestOMDBArchiver"); + omdbArchiver.setTmpDir(tmpDir); + assertThat(omdbArchiver.getFilesToWriteIntoTarball()).isNotNull(); + assertThat(omdbArchiver.getTmpDir()).isEqualTo(tmpDir); + File dummyFile = new File(tmpDir.toFile(), "dummy.txt"); + Files.write(dummyFile.toPath(), "dummy".getBytes(StandardCharsets.UTF_8)); + String entryName = "dummy-hardlink.txt"; + long result = omdbArchiver.recordFileEntry(dummyFile, entryName); + assertThat(omdbArchiver.getFilesToWriteIntoTarball().size()).isEqualTo(1); + Optional file = omdbArchiver.getFilesToWriteIntoTarball() + .values().stream().findFirst(); + assertThat(file.isPresent()).isTrue(); + file.ifPresent(value -> { + // because it's a hardlink + assertThat(value).isNotEqualTo(dummyFile); + try { + Object inodeLink = IOUtils.getINode(value.toPath()); + Object inodeFile = IOUtils.getINode(dummyFile.toPath()); + assertThat(inodeLink).isEqualTo(inodeFile); + } catch (IOException ex) { + Assertions.fail(ex.getMessage()); + } + }); + assertThat(result).isEqualTo(dummyFile.length()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWriteToArchive(boolean completed) throws IOException { + OMDBArchiver omdbArchiver = new OMDBArchiver(); + Path tmpDir = Files.createTempDirectory(folder, "TestOMDBArchiver"); + omdbArchiver.setTmpDir(tmpDir); + assertThat(omdbArchiver.getFilesToWriteIntoTarball()).isNotNull(); + assertThat(omdbArchiver.getTmpDir()).isEqualTo(tmpDir); + Map hardLinkFileMap = new HashMap(); + for (int i = 0; i < 10; i++) { + String fileName = "hardlink-" + i; + File dummyFile = new File(tmpDir.toFile(), fileName); + Files.write(dummyFile.toPath(), "dummy".getBytes(StandardCharsets.UTF_8)); + omdbArchiver.getFilesToWriteIntoTarball().put(fileName, dummyFile); + hardLinkFileMap.put(dummyFile.getAbsolutePath(), dummyFile.getName()); + } + omdbArchiver.setHardLinkFileMap(hardLinkFileMap); + + File tarFile = new File(folder.toFile(), "test-archive.tar"); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.metadata.dirs", String.valueOf(folder.toAbsolutePath())); + try (OutputStream outputStream = Files.newOutputStream(tarFile.toPath())) { + omdbArchiver.setCompleted(completed); + omdbArchiver.writeToArchive(conf, outputStream); + } + assertThat(tarFile.exists()).isTrue(); + assertThat(tarFile.length()).isGreaterThan(0); + // Untar the file + File extractDir = new File(folder.toFile(), "extracted"); + assertThat(extractDir.mkdirs()).isTrue(); + FileUtil.unTar(tarFile, extractDir); + // Verify contents + int fileCount = 10; + for (int i = 0; i < 10; i++) { + String fileName = "hardlink-" + i; + File extractedFile = new File(extractDir, fileName); + assertThat(extractedFile.exists()) + .as("File %s should exist in extracted archive", fileName).isTrue(); + assertThat(extractedFile.length()) + .as("File %s should have content", fileName) + .isEqualTo("dummy".getBytes(StandardCharsets.UTF_8).length); + + byte[] content = Files.readAllBytes(extractedFile.toPath()); + assertThat(new String(content, + StandardCharsets.UTF_8)).isEqualTo("dummy"); + } + + if (completed) { + File hardlinkFile = new File(extractDir, OM_HARDLINK_FILE); + File completeMarker = new File(extractDir, OZONE_RATIS_SNAPSHOT_COMPLETE_FLAG_NAME); + assertThat(completeMarker.exists()).isTrue(); + assertThat(hardlinkFile.exists()).isTrue(); + fileCount += 2; + } + + try (Stream stream = Files.list(extractDir.toPath())) { + List extractedFiles = stream.filter(Files::isRegularFile) + .collect(java.util.stream.Collectors.toList()); + assertThat(extractedFiles.size()).isEqualTo(fileCount); + } + } +}