Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Yongqiang YANG committed Dec 31, 2024
1 parent 2771d1e commit 7a67453
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 100 deletions.
70 changes: 52 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -187,12 +188,6 @@ public static Pair<String, String> decodeFileNameWithChecksum(String fileNameWit
return Pair.of(fileName, md5sum);
}

// in: /path/to/orig_file
// out: /path/to/orig_file.BUWDnl831e4nldsf
public static String replaceFileNameWithChecksumFileName(String origPath, String fileNameWithChecksum) {
return origPath.substring(0, origPath.lastIndexOf(PATH_DELIMITER) + 1) + fileNameWithChecksum;
}

public static Repository read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) {
Repository repo = new Repository();
Expand Down Expand Up @@ -425,23 +420,52 @@ public boolean ping() {
public Status listSnapshots(List<String> snapshotNames) {
// list with prefix:
// eg. __palo_repository_repo_name/__ss_*
String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR)
+ "*";

String listPathPrefix = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
PREFIX_SNAPSHOT_DIR);

try {
listPathPrefix = new URI(listPathPrefix).normalize().toString();
} catch (URISyntaxException e) {
LOG.error("Invalid URI syntax: " + listPathPrefix, e);
return new Status(ErrCode.COMMON_ERROR, "Invalid URI syntax: " + listPathPrefix);
}


String listPath = listPathPrefix + "*" + PATH_DELIMITER + "*";
List<RemoteFile> result = Lists.newArrayList();
Status st = fileSystem.globList(listPath, result);
if (!st.ok()) {
return st;
}

int snapshotNameOffset = listPathPrefix.lastIndexOf(PATH_DELIMITER);

HashSet<String> uniqueSnapshotNames = new HashSet<>();
for (RemoteFile remoteFile : result) {
if (remoteFile.isFile()) {
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot path{} which is not a dir", remoteFile);
}
int index = remoteFile.getName().indexOf(listPathPrefix);
if (LOG.isDebugEnabled()) {
LOG.debug("get remote file: {} {}", remoteFile.getName(), listPathPrefix);
}
if (index == -1) {
LOG.info("glob list wrong results, prefix {}, file name {}", listPathPrefix, remoteFile.getName());
continue;
}
String snapshotName = remoteFile.getName().substring(index + snapshotNameOffset + 1);
snapshotName = disjoinPrefix(PREFIX_SNAPSHOT_DIR, snapshotName);
index = snapshotName.indexOf(PATH_DELIMITER);
if (index != -1) {
snapshotName = snapshotName.substring(0, index);
} else {
continue;
}

if (uniqueSnapshotNames.contains(snapshotName)) {
continue;
}

snapshotNames.add(disjoinPrefix(PREFIX_SNAPSHOT_DIR, remoteFile.getName()));
uniqueSnapshotNames.add(snapshotName);
snapshotNames.add(snapshotName);
}
return Status.OK;
}
Expand Down Expand Up @@ -604,6 +628,13 @@ public Status upload(String localFilePath, String remoteFilePath) {

// remoteFilePath must be a file(not dir) and does not contain checksum
public Status download(String remoteFilePath, String localFilePath) {
try {
remoteFilePath = new URI(remoteFilePath).normalize().toString();
} catch (URISyntaxException e) {
LOG.error("Invalid URI syntax: " + remoteFilePath, e);
return new Status(ErrCode.COMMON_ERROR, "Invalid URI syntax: " + remoteFilePath);
}

// 0. list to get to full name(with checksum)
List<RemoteFile> remoteFiles = Lists.newArrayList();
Status status = fileSystem.globList(remoteFilePath + "*", remoteFiles);
Expand All @@ -618,8 +649,7 @@ public Status download(String remoteFilePath, String localFilePath) {
return new Status(ErrCode.COMMON_ERROR, "Expected file with path: " + remoteFilePath + ". but get dir");
}

String remoteFilePathWithChecksum = replaceFileNameWithChecksumFileName(remoteFilePath,
remoteFiles.get(0).getName());
String remoteFilePathWithChecksum = remoteFiles.get(0).getName();
if (LOG.isDebugEnabled()) {
LOG.debug("get download filename with checksum: " + remoteFilePathWithChecksum);
}
Expand Down Expand Up @@ -780,13 +810,17 @@ private List<String> getSnapshotInfo(String snapshotName, String timestamp) {
List<String> tmp = Lists.newArrayList();
for (RemoteFile file : results) {
// __info_2018-04-18-20-11-00.Jdwnd9312sfdn1294343
Pair<String, String> pureFileName = decodeFileNameWithChecksum(file.getName());
if (pureFileName == null) {
Pair<String, String> pureFilePath = decodeFileNameWithChecksum(file.getName());
if (pureFilePath == null) {
// maybe: __info_2018-04-18-20-11-00.part
tmp.add("Invalid: " + file.getName());
continue;
}
tmp.add(disjoinPrefix(PREFIX_JOB_INFO, pureFileName.first));

int index = pureFilePath.first.lastIndexOf(PATH_DELIMITER);
String pureFileName = pureFilePath.first.substring(index + 1);
LOG.info("path {} filename {}", pureFilePath.first, pureFileName);
tmp.add(disjoinPrefix(PREFIX_JOB_INFO, pureFileName));
}
if (!tmp.isEmpty()) {
info.add(snapshotName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFil
try {
RemoteFileSystem fileSystem = FileSystemFactory.get(
brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties());
Status st = fileSystem.globList(path, rfiles, false);
Status st = fileSystem.globList(path, rfiles);
if (!st.ok()) {
throw new UserException(st.getErrMsg());
}
Expand Down
15 changes: 1 addition & 14 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,11 @@ default Status deleteDirectory(String dir) {

/**
* List files in remotePath by wildcard <br/>
* The {@link RemoteFile}'name will only contain file name (Not full path)
* @param remotePath remote path
* @param result All eligible files under the path
* @return
*/
default Status globList(String remotePath, List<RemoteFile> result) {
return globList(remotePath, result, true);
}

/**
* List files in remotePath by wildcard <br/>
* @param remotePath remote path
* @param result All eligible files under the path
* @param fileNameOnly for {@link RemoteFile}'name: whether the full path is included.<br/>
* true: only contains file name, false: contains full path<br/>
* @return
*/
Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
Status globList(String remotePath, List<RemoteFile> result);

default Status listDirectories(String remotePath, Set<String> result) {
throw new UnsupportedOperationException("Unsupported operation list directories on current file system.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ public Status makeDir(String remotePath) {
}

@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
try {
FileStatus[] locatedFileStatusRemoteIterator = fs.globStatus(new Path(remotePath));
if (locatedFileStatusRemoteIterator == null) {
return Status.OK;
}
for (FileStatus fileStatus : locatedFileStatusRemoteIterator) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime());
result.add(remoteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,24 +318,12 @@ public static String getLongestPrefix(String globPattern) {
return globPattern.substring(0, earliestSpecialCharIndex);
}

class MatcherContext {
public long elementCnt = 0;
public long matchCnt = 0;
public List<RemoteFile> result;
public boolean fileNameOnly;
public PathMatcher matcher;

public MatcherContext(List<RemoteFile> result, boolean fileNameOnly) {
this.result = result;
this.fileNameOnly = fileNameOnly;
}
}

public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
long roundCnt = 0;
long elementCnt = 0;
long matchCnt = 0;
long startTime = System.nanoTime();
Status st = Status.OK;
MatcherContext matchContext = new MatcherContext(result, fileNameOnly);

try {
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
String globPath = uri.getKey();
Expand All @@ -346,11 +334,34 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
LOG.info("path pattern {}", pathPattern.toString());
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());

matchContext.matcher = matcher;
String listPrefix = getLongestPrefix(globPath);
LOG.info("azure glob list prefix is {}", listPrefix);
ListBlobsOptions options = new ListBlobsOptions().setPrefix(listPrefix);
String newContinuationToken = null;
do {
roundCnt++;
PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null);
PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next();

for (BlobItem blobItem : pagedResponse.getElements()) {
elementCnt++;
java.nio.file.Path blobPath = Paths.get(blobItem.getName());

if (!matcher.matches(blobPath)) {
continue;
}
matchCnt++;
RemoteFile remoteFile = new RemoteFile(
constructS3Path(blobPath.toString(), uri.getBucket()),
!blobItem.isPrefix(),
blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(),
blobItem.getProperties().getContentLength(),
blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);
}
newContinuationToken = pagedResponse.getContinuationToken();
} while (newContinuationToken != null);

listBlobsHierarchicalListing(client, listPrefix, matchContext);
} catch (BlobStorageException e) {
LOG.warn("glob file " + remotePath + " failed because azure error: " + e.getMessage());
st = new Status(Status.ErrCode.COMMON_ERROR, "glob file " + remotePath
Expand All @@ -361,41 +372,11 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
} finally {
long endTime = System.nanoTime();
long duration = endTime - startTime;
LOG.info("process {} elements under prefix {}, match {} elements, take {} micro second",
remotePath, matchContext.elementCnt, matchContext.matchCnt, duration / 1000);
LOG.info("process {} elements under prefix {} for {} round, match {} elements, result {} take {} "
+ "micro second",
remotePath, elementCnt, roundCnt, matchCnt, result.size(), duration / 1000);
}
return st;
}

private void listBlobsHierarchicalListing(BlobContainerClient blobContainerClient, String prefix,
MatcherContext matchContext) {
String delimiter = "/";
ListBlobsOptions options = new ListBlobsOptions()
.setPrefix(prefix);

blobContainerClient.listBlobsByHierarchy(delimiter, options, null)
.forEach(blob -> {
matchContext.elementCnt++;
java.nio.file.Path blobPath = Paths.get(blob.getName());

if (!matchContext.matcher.matches(blobPath)) {
return;
}
matchContext.matchCnt++;

if (blob.isPrefix()) {
listBlobsHierarchicalListing(blobContainerClient, blob.getName(), matchContext);
}

RemoteFile remoteFile = new RemoteFile(
matchContext.fileNameOnly ? blobPath.getFileName().toString()
: constructS3Path(blobPath.toString(),
blobContainerClient.getBlobContainerName()),
!blob.isPrefix(),
blob.isPrefix() ? -1 : blob.getProperties().getContentLength(),
1L * 1024 * 1024,
blob.isPrefix() ? 0 : blob.getProperties().getLastModified().getSecond());
matchContext.result.add(remoteFile);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {
}

@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
return azureObjStorage.globList(remotePath, result, fileNameOnly);
return azureObjStorage.globList(remotePath, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ public boolean isSplittable(String remotePath, String inputFormat) throws UserEx

// List files in remotePath
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
Expand All @@ -663,7 +663,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
try {
TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
false /* not recursive */, properties);
req.setFileNameOnly(fileNameOnly);
req.setFileNameOnly(false);
TBrokerListResponse rep = client.listPath(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {

// broker file pattern glob is too complex, so we use hadoop directly
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
try {
FileSystem s3AFileSystem = nativeFileSystem(remotePath);
Path pathPattern = new Path(remotePath);
Expand All @@ -113,7 +113,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime());
result.add(remoteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ public Status globList(String remotePath, List<RemoteFile> result) {
return fileSystem(remotePath).globList(remotePath, result);
}

@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
return fileSystem(remotePath).globList(remotePath, result, fileNameOnly);
}

@Override
public Status listDirectories(String remotePath, Set<String> result) {
return fileSystem(remotePath).listDirectories(remotePath, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public Status delete(String remotePath) {
* @return Status.OK if success.
*/
@Override
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
try {
URI pathUri = URI.create(remotePath);
FileSystem fileSystem = nativeFileSystem(remotePath);
Expand All @@ -459,7 +459,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime());
result.add(remoteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public S3Client getClient() throws UserException {
fileSystem = new S3FileSystem(mockedStorage);
new MockUp<S3FileSystem>(S3FileSystem.class) {
@Mock
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result) {
try {
S3URI uri = S3URI.create(remotePath, false);
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket());
Expand Down

0 comments on commit 7a67453

Please sign in to comment.