Skip to content

Commit

Permalink
Merge pull request Netflix#3 from spotify/rename
Browse files Browse the repository at this point in the history
Add support for rename
  • Loading branch information
rohansingh committed May 19, 2016
2 parents ff02e42 + e719697 commit f0b92b3
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,118 @@ private List<Path> checkListing(List<FileInfo> metastoreListing, FileStatus [] s

return s3files.toArray(new FileStatus[s3files.size()]);
}


@Pointcut
public abstract void rename();

/**
* Rename listing records based on a rename call from the FileSystem.
*
* @param pjp
* @return
* @throws Throwable
*/
@Around("rename() && !within(ConsistentListingAspect)")
public Object metastoreRename(final ProceedingJoinPoint pjp) throws Throwable {
if(disabled) {
return pjp.proceed();
}

Configuration conf = ((FileSystem) pjp.getTarget()).getConf();
updateConfig(conf);
FileSystem fs = (FileSystem) pjp.getTarget();

Path srcPath = (Path) pjp.getArgs()[0];
Path dstPath = (Path) pjp.getArgs()[1];

metadataRename(conf, fs, srcPath, dstPath);

Object obj = pjp.proceed();
if ((Boolean) obj) {
// Everything went fine delete the old metadata.
// If not then we'll keep the metadata to prevent incomplete listings.
// Manual cleanup will be required in the case of failure.
metadataCleanup(conf, fs, srcPath);
}
return obj;
}

private void metadataRename(Configuration conf, FileSystem fs, Path srcPath, Path dstPath) throws Exception {
try {
if (fs.exists(srcPath) && !fs.exists(dstPath) && fs.isFile(srcPath)) {
renameFile(srcPath, dstPath);
} else if (fs.exists(srcPath) && !fs.exists(dstPath) && !fs.isFile(srcPath)){
renameFolder(srcPath, dstPath);
} else {
throw new UnsupportedOperationException("Move" + srcPath.toUri().getPath()
+ " to " + dstPath.toUri().getPath());
}
} catch (TimeoutException t) {
log.error("Timeout occurred rename metastore path: " + srcPath, t);

alertDispatcher.timeout("metastoreRename", Collections.singletonList(srcPath));

if(failOnTimeout) {
throw t;
}
} catch (Exception e) {
log.error("Error rename paths from metastore: " + srcPath, e);

if(shouldFail(conf)) {
throw e;
}
}
}

private void renameFile(Path src, Path dst) throws Exception {
metastore.add(dst, false);
}

private void renameFolder(Path src, Path dst) throws Exception {
metastore.add(dst, true);
List<FileInfo> metastoreFiles = metastore.list(Collections.singletonList(src));
for (FileInfo info : metastoreFiles) {
Path target = new Path(dst, info.getPath().getName());
if (info.isDirectory()) {
renameFolder(info.getPath(), target);
} else {
renameFile(info.getPath(), target);
}
}
}

private void metadataCleanup(Configuration conf, FileSystem fs, Path srcPath) throws Exception {
try {
renameCleanup(fs, srcPath);
} catch (TimeoutException t) {
log.error("Timeout occurred rename cleanup metastore path: " + srcPath, t);

alertDispatcher.timeout("metastoreRenameCleanup", Collections.singletonList(srcPath));

if(failOnTimeout) {
throw t;
}
} catch (Exception e) {
log.error("Error executing rename cleanup for paths from metastore: " + srcPath, e);

if(shouldFail(conf)) {
throw e;
}
}
}

private void renameCleanup(FileSystem fs, Path path) throws Exception {
if (fs.isFile(path)) {
metastore.delete(path);
return;
}

List<FileInfo> metastoreFiles = metastore.list(Collections.singletonList(path));
for (FileInfo info : metastoreFiles) {
renameCleanup(fs, info.getPath());
}
metastore.delete(path);
}

@Pointcut
public abstract void delete();
Expand Down
30 changes: 19 additions & 11 deletions src/main/resources/META-INF/aop.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
<aspectj>

<aspects>
<!-- Configuration for Consistent Listing -->
<!-- Configuration for Consistent Listing -->

<aspect name="com.netflix.bdp.s3mper.listing.ConsistentListingAspect"/>
<concrete-aspect name="com.netflix.bdp.s3mper.listing__ConsistentListing"
extends="com.netflix.bdp.s3mper.listing.ConsistentListingAspect">
<pointcut name="init" expression="execution(* org.apache.hadoop.fs.FileSystem.initialize(..))"/>
<pointcut name="create" expression="execution(* org.apache.hadoop.fs.FileSystem.create(..)) ||
execution(* org.apache.hadoop.fs.FileSystem.mkdirs(..))"/>
<pointcut name="list" expression="execution(* org.apache.hadoop.fs.FileSystem.listStatus(..))"/>
<pointcut name="delete" expression="execution(* org.apache.hadoop.fs.FileSystem.delete(..))"/>
</concrete-aspect>
<aspect name="com.netflix.bdp.s3mper.listing.ConsistentListingAspect"/>

<concrete-aspect name="com.netflix.bdp.s3mper.listing__ConsistentListing"
extends="com.netflix.bdp.s3mper.listing.ConsistentListingAspect">
<pointcut name="init" expression="execution(* org.apache.hadoop..*NativeS3FileSystem.initialize(..)) ||
execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(..))"/>
<pointcut name="create" expression="execution(* org.apache.hadoop..*NativeS3FileSystem.create(..)) ||
execution(* org.apache.hadoop..*NativeS3FileSystem.mkdirs(..)) ||
execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(..)) ||
execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.mkdirs(..))"/>
<pointcut name="list" expression="execution(* org.apache.hadoop..*NativeS3FileSystem.listStatus(..)) ||
execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(..))"/>
<pointcut name="delete" expression="execution(* org.apache.hadoop..*NativeS3FileSystem.listStatus(..)) ||
execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.delete(..))"/>
<pointcut name="rename" expression="execution(* org.apache.hadoop..*NativeS3FileSystem.rename(..)) ||
execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.rename(..))"/>
</concrete-aspect>

</aspects>

Expand All @@ -39,4 +47,4 @@
<exclude within="com.sun..*"/>
</weaver>

</aspectj>
</aspectj>

0 comments on commit f0b92b3

Please sign in to comment.