From 7a67453acbf300897d1c95b72d3ab2961af6497c Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Tue, 31 Dec 2024 20:55:29 +0800 Subject: [PATCH] fix --- .../org/apache/doris/backup/Repository.java | 70 ++++++++++++---- .../apache/doris/common/util/BrokerUtil.java | 2 +- .../java/org/apache/doris/fs/FileSystem.java | 15 +--- .../apache/doris/fs/LocalDfsFileSystem.java | 4 +- .../apache/doris/fs/obj/AzureObjStorage.java | 83 +++++++------------ .../doris/fs/remote/AzureFileSystem.java | 4 +- .../doris/fs/remote/BrokerFileSystem.java | 4 +- .../apache/doris/fs/remote/S3FileSystem.java | 4 +- .../doris/fs/remote/SwitchingFileSystem.java | 5 -- .../doris/fs/remote/dfs/DFSFileSystem.java | 4 +- .../apache/doris/fs/obj/S3FileSystemTest.java | 2 +- 11 files changed, 97 insertions(+), 100 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index 1450ef9a03459f3..786a19b9cf2c6bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -65,6 +65,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -187,12 +188,6 @@ public static Pair decodeFileNameWithChecksum(String fileNameWit return Pair.of(fileName, md5sum); } - // in: /path/to/orig_file - // out: /path/to/orig_file.BUWDnl831e4nldsf - public static String replaceFileNameWithChecksumFileName(String origPath, String fileNameWithChecksum) { - return origPath.substring(0, origPath.lastIndexOf(PATH_DELIMITER) + 1) + fileNameWithChecksum; - } - public static Repository read(DataInput in) throws IOException { if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { Repository repo = new Repository(); @@ -425,23 +420,52 @@ public boolean ping() { public Status listSnapshots(List snapshotNames) { // list with prefix: // eg. __palo_repository_repo_name/__ss_* - String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR) - + "*"; + + String listPathPrefix = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), + PREFIX_SNAPSHOT_DIR); + + try { + listPathPrefix = new URI(listPathPrefix).normalize().toString(); + } catch (URISyntaxException e) { + LOG.error("Invalid URI syntax: " + listPathPrefix, e); + return new Status(ErrCode.COMMON_ERROR, "Invalid URI syntax: " + listPathPrefix); + } + + + String listPath = listPathPrefix + "*" + PATH_DELIMITER + "*"; List result = Lists.newArrayList(); Status st = fileSystem.globList(listPath, result); if (!st.ok()) { return st; } + int snapshotNameOffset = listPathPrefix.lastIndexOf(PATH_DELIMITER); + + HashSet uniqueSnapshotNames = new HashSet<>(); for (RemoteFile remoteFile : result) { - if (remoteFile.isFile()) { - if (LOG.isDebugEnabled()) { - LOG.debug("get snapshot path{} which is not a dir", remoteFile); - } + int index = remoteFile.getName().indexOf(listPathPrefix); + if (LOG.isDebugEnabled()) { + LOG.debug("get remote file: {} {}", remoteFile.getName(), listPathPrefix); + } + if (index == -1) { + LOG.info("glob list wrong results, prefix {}, file name {}", listPathPrefix, remoteFile.getName()); + continue; + } + String snapshotName = remoteFile.getName().substring(index + snapshotNameOffset + 1); + snapshotName = disjoinPrefix(PREFIX_SNAPSHOT_DIR, snapshotName); + index = snapshotName.indexOf(PATH_DELIMITER); + if (index != -1) { + snapshotName = snapshotName.substring(0, index); + } else { + continue; + } + + if (uniqueSnapshotNames.contains(snapshotName)) { continue; } - snapshotNames.add(disjoinPrefix(PREFIX_SNAPSHOT_DIR, remoteFile.getName())); + uniqueSnapshotNames.add(snapshotName); + snapshotNames.add(snapshotName); } return Status.OK; } @@ -604,6 +628,13 @@ public Status upload(String localFilePath, String remoteFilePath) { // remoteFilePath must be a file(not dir) and does not contain checksum public Status download(String remoteFilePath, String localFilePath) { + try { + remoteFilePath = new URI(remoteFilePath).normalize().toString(); + } catch (URISyntaxException e) { + LOG.error("Invalid URI syntax: " + remoteFilePath, e); + return new Status(ErrCode.COMMON_ERROR, "Invalid URI syntax: " + remoteFilePath); + } + // 0. list to get to full name(with checksum) List remoteFiles = Lists.newArrayList(); Status status = fileSystem.globList(remoteFilePath + "*", remoteFiles); @@ -618,8 +649,7 @@ public Status download(String remoteFilePath, String localFilePath) { return new Status(ErrCode.COMMON_ERROR, "Expected file with path: " + remoteFilePath + ". but get dir"); } - String remoteFilePathWithChecksum = replaceFileNameWithChecksumFileName(remoteFilePath, - remoteFiles.get(0).getName()); + String remoteFilePathWithChecksum = remoteFiles.get(0).getName(); if (LOG.isDebugEnabled()) { LOG.debug("get download filename with checksum: " + remoteFilePathWithChecksum); } @@ -780,13 +810,17 @@ private List getSnapshotInfo(String snapshotName, String timestamp) { List tmp = Lists.newArrayList(); for (RemoteFile file : results) { // __info_2018-04-18-20-11-00.Jdwnd9312sfdn1294343 - Pair pureFileName = decodeFileNameWithChecksum(file.getName()); - if (pureFileName == null) { + Pair pureFilePath = decodeFileNameWithChecksum(file.getName()); + if (pureFilePath == null) { // maybe: __info_2018-04-18-20-11-00.part tmp.add("Invalid: " + file.getName()); continue; } - tmp.add(disjoinPrefix(PREFIX_JOB_INFO, pureFileName.first)); + + int index = pureFilePath.first.lastIndexOf(PATH_DELIMITER); + String pureFileName = pureFilePath.first.substring(index + 1); + LOG.info("path {} filename {}", pureFilePath.first, pureFileName); + tmp.add(disjoinPrefix(PREFIX_JOB_INFO, pureFileName)); } if (!tmp.isEmpty()) { info.add(snapshotName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index c5a2803b848c75f..168e1240bb039d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -88,7 +88,7 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List - * The {@link RemoteFile}'name will only contain file name (Not full path) * @param remotePath remote path * @param result All eligible files under the path * @return */ - default Status globList(String remotePath, List result) { - return globList(remotePath, result, true); - } - - /** - * List files in remotePath by wildcard
- * @param remotePath remote path - * @param result All eligible files under the path - * @param fileNameOnly for {@link RemoteFile}'name: whether the full path is included.
- * true: only contains file name, false: contains full path
- * @return - */ - Status globList(String remotePath, List result, boolean fileNameOnly); + Status globList(String remotePath, List result); default Status listDirectories(String remotePath, Set result) { throw new UnsupportedOperationException("Unsupported operation list directories on current file system."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java index 93e79bf94b150f5..2740637575f555e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java @@ -134,7 +134,7 @@ public Status makeDir(String remotePath) { } @Override - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { try { FileStatus[] locatedFileStatusRemoteIterator = fs.globStatus(new Path(remotePath)); if (locatedFileStatusRemoteIterator == null) { @@ -142,7 +142,7 @@ public Status globList(String remotePath, List result, boolean fileN } for (FileStatus fileStatus : locatedFileStatusRemoteIterator) { RemoteFile remoteFile = new RemoteFile( - fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), + fileStatus.getPath().toString(), !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), fileStatus.getBlockSize(), fileStatus.getModificationTime()); result.add(remoteFile); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java index fc0af501fc88400..630e15bcb0ae174 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java @@ -318,24 +318,12 @@ public static String getLongestPrefix(String globPattern) { return globPattern.substring(0, earliestSpecialCharIndex); } - class MatcherContext { - public long elementCnt = 0; - public long matchCnt = 0; - public List result; - public boolean fileNameOnly; - public PathMatcher matcher; - - public MatcherContext(List result, boolean fileNameOnly) { - this.result = result; - this.fileNameOnly = fileNameOnly; - } - } - - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { + long roundCnt = 0; + long elementCnt = 0; + long matchCnt = 0; long startTime = System.nanoTime(); Status st = Status.OK; - MatcherContext matchContext = new MatcherContext(result, fileNameOnly); - try { S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); String globPath = uri.getKey(); @@ -346,11 +334,34 @@ public Status globList(String remotePath, List result, boolean fileN LOG.info("path pattern {}", pathPattern.toString()); PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString()); - matchContext.matcher = matcher; String listPrefix = getLongestPrefix(globPath); LOG.info("azure glob list prefix is {}", listPrefix); + ListBlobsOptions options = new ListBlobsOptions().setPrefix(listPrefix); + String newContinuationToken = null; + do { + roundCnt++; + PagedIterable pagedBlobs = client.listBlobs(options, newContinuationToken, null); + PagedResponse pagedResponse = pagedBlobs.iterableByPage().iterator().next(); + + for (BlobItem blobItem : pagedResponse.getElements()) { + elementCnt++; + java.nio.file.Path blobPath = Paths.get(blobItem.getName()); + + if (!matcher.matches(blobPath)) { + continue; + } + matchCnt++; + RemoteFile remoteFile = new RemoteFile( + constructS3Path(blobPath.toString(), uri.getBucket()), + !blobItem.isPrefix(), + blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(), + blobItem.getProperties().getContentLength(), + blobItem.getProperties().getLastModified().getSecond()); + result.add(remoteFile); + } + newContinuationToken = pagedResponse.getContinuationToken(); + } while (newContinuationToken != null); - listBlobsHierarchicalListing(client, listPrefix, matchContext); } catch (BlobStorageException e) { LOG.warn("glob file " + remotePath + " failed because azure error: " + e.getMessage()); st = new Status(Status.ErrCode.COMMON_ERROR, "glob file " + remotePath @@ -361,41 +372,11 @@ public Status globList(String remotePath, List result, boolean fileN } finally { long endTime = System.nanoTime(); long duration = endTime - startTime; - LOG.info("process {} elements under prefix {}, match {} elements, take {} micro second", - remotePath, matchContext.elementCnt, matchContext.matchCnt, duration / 1000); + LOG.info("process {} elements under prefix {} for {} round, match {} elements, result {} take {} " + + "micro second", + remotePath, elementCnt, roundCnt, matchCnt, result.size(), duration / 1000); } return st; } - private void listBlobsHierarchicalListing(BlobContainerClient blobContainerClient, String prefix, - MatcherContext matchContext) { - String delimiter = "/"; - ListBlobsOptions options = new ListBlobsOptions() - .setPrefix(prefix); - - blobContainerClient.listBlobsByHierarchy(delimiter, options, null) - .forEach(blob -> { - matchContext.elementCnt++; - java.nio.file.Path blobPath = Paths.get(blob.getName()); - - if (!matchContext.matcher.matches(blobPath)) { - return; - } - matchContext.matchCnt++; - - if (blob.isPrefix()) { - listBlobsHierarchicalListing(blobContainerClient, blob.getName(), matchContext); - } - - RemoteFile remoteFile = new RemoteFile( - matchContext.fileNameOnly ? blobPath.getFileName().toString() - : constructS3Path(blobPath.toString(), - blobContainerClient.getBlobContainerName()), - !blob.isPrefix(), - blob.isPrefix() ? -1 : blob.getProperties().getContentLength(), - 1L * 1024 * 1024, - blob.isPrefix() ? 0 : blob.getProperties().getLastModified().getSecond()); - matchContext.result.add(remoteFile); - }); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java index c116182d3a42416..88f900e52bfcca3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java @@ -54,8 +54,8 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException { } @Override - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage(); - return azureObjStorage.globList(remotePath, result, fileNameOnly); + return azureObjStorage.globList(remotePath, result); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index 5b9ee8aaeca1cdc..c91179d50a95421 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -649,7 +649,7 @@ public boolean isSplittable(String remotePath, String inputFormat) throws UserEx // List files in remotePath @Override - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { // get a proper broker Pair pair = getBroker(); if (pair == null) { @@ -663,7 +663,7 @@ public Status globList(String remotePath, List result, boolean fileN try { TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, false /* not recursive */, properties); - req.setFileNameOnly(fileNameOnly); + req.setFileNameOnly(false); TBrokerListResponse rep = client.listPath(req); TBrokerOperationStatus opst = rep.getOpStatus(); if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index be53ffde2e095b0..e50b8a7cad1b417 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -103,7 +103,7 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException { // broker file pattern glob is too complex, so we use hadoop directly @Override - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { try { FileSystem s3AFileSystem = nativeFileSystem(remotePath); Path pathPattern = new Path(remotePath); @@ -113,7 +113,7 @@ public Status globList(String remotePath, List result, boolean fileN } for (FileStatus fileStatus : files) { RemoteFile remoteFile = new RemoteFile( - fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), + fileStatus.getPath().toString(), !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), fileStatus.getBlockSize(), fileStatus.getModificationTime()); result.add(remoteFile); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java index 00802922ef36898..c16a31ace40eee6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java @@ -112,11 +112,6 @@ public Status globList(String remotePath, List result) { return fileSystem(remotePath).globList(remotePath, result); } - @Override - public Status globList(String remotePath, List result, boolean fileNameOnly) { - return fileSystem(remotePath).globList(remotePath, result, fileNameOnly); - } - @Override public Status listDirectories(String remotePath, Set result) { return fileSystem(remotePath).listDirectories(remotePath, result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 89f4af2817ec050..e934b98e918e699 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -447,7 +447,7 @@ public Status delete(String remotePath) { * @return Status.OK if success. */ @Override - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { try { URI pathUri = URI.create(remotePath); FileSystem fileSystem = nativeFileSystem(remotePath); @@ -459,7 +459,7 @@ public Status globList(String remotePath, List result, boolean fileN } for (FileStatus fileStatus : files) { RemoteFile remoteFile = new RemoteFile( - fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), + fileStatus.getPath().toString(), !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), fileStatus.getBlockSize(), fileStatus.getModificationTime()); result.add(remoteFile); diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java index 442883573ce49a2..bb5f2cdad503a0d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java @@ -108,7 +108,7 @@ public S3Client getClient() throws UserException { fileSystem = new S3FileSystem(mockedStorage); new MockUp(S3FileSystem.class) { @Mock - public Status globList(String remotePath, List result, boolean fileNameOnly) { + public Status globList(String remotePath, List result) { try { S3URI uri = S3URI.create(remotePath, false); ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket());