diff --git a/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java b/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java index 87c9fc6..45022a3 100644 --- a/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java +++ b/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java @@ -441,7 +441,118 @@ private List checkListing(List metastoreListing, FileStatus [] s return s3files.toArray(new FileStatus[s3files.size()]); } - + + @Pointcut + public abstract void rename(); + + /** + * Rename listing records based on a rename call from the FileSystem. + * + * @param pjp + * @return + * @throws Throwable + */ + @Around("rename() && !within(ConsistentListingAspect)") + public Object metastoreRename(final ProceedingJoinPoint pjp) throws Throwable { + if(disabled) { + return pjp.proceed(); + } + + Configuration conf = ((FileSystem) pjp.getTarget()).getConf(); + updateConfig(conf); + FileSystem fs = (FileSystem) pjp.getTarget(); + + Path srcPath = (Path) pjp.getArgs()[0]; + Path dstPath = (Path) pjp.getArgs()[1]; + + metadataRename(conf, fs, srcPath, dstPath); + + Object obj = pjp.proceed(); + if ((Boolean) obj) { + // Everything went fine delete the old metadata. + // If not then we'll keep the metadata to prevent incomplete listings. + // Manual cleanup will be required in the case of failure. + metadataCleanup(conf, fs, srcPath); + } + return obj; + } + + private void metadataRename(Configuration conf, FileSystem fs, Path srcPath, Path dstPath) throws Exception { + try { + if (fs.exists(srcPath) && !fs.exists(dstPath) && fs.isFile(srcPath)) { + renameFile(srcPath, dstPath); + } else if (fs.exists(srcPath) && !fs.exists(dstPath) && !fs.isFile(srcPath)){ + renameFolder(srcPath, dstPath); + } else { + throw new UnsupportedOperationException("Move" + srcPath.toUri().getPath() + + " to " + dstPath.toUri().getPath()); + } + } catch (TimeoutException t) { + log.error("Timeout occurred rename metastore path: " + srcPath, t); + + alertDispatcher.timeout("metastoreRename", Collections.singletonList(srcPath)); + + if(failOnTimeout) { + throw t; + } + } catch (Exception e) { + log.error("Error rename paths from metastore: " + srcPath, e); + + if(shouldFail(conf)) { + throw e; + } + } + } + + private void renameFile(Path src, Path dst) throws Exception { + metastore.add(dst, false); + } + + private void renameFolder(Path src, Path dst) throws Exception { + metastore.add(dst, true); + List metastoreFiles = metastore.list(Collections.singletonList(src)); + for (FileInfo info : metastoreFiles) { + Path target = new Path(dst, info.getPath().getName()); + if (info.isDirectory()) { + renameFolder(info.getPath(), target); + } else { + renameFile(info.getPath(), target); + } + } + } + + private void metadataCleanup(Configuration conf, FileSystem fs, Path srcPath) throws Exception { + try { + renameCleanup(fs, srcPath); + } catch (TimeoutException t) { + log.error("Timeout occurred rename cleanup metastore path: " + srcPath, t); + + alertDispatcher.timeout("metastoreRenameCleanup", Collections.singletonList(srcPath)); + + if(failOnTimeout) { + throw t; + } + } catch (Exception e) { + log.error("Error executing rename cleanup for paths from metastore: " + srcPath, e); + + if(shouldFail(conf)) { + throw e; + } + } + } + + private void renameCleanup(FileSystem fs, Path path) throws Exception { + if (fs.isFile(path)) { + metastore.delete(path); + return; + } + + List metastoreFiles = metastore.list(Collections.singletonList(path)); + for (FileInfo info : metastoreFiles) { + renameCleanup(fs, info.getPath()); + } + metastore.delete(path); + } @Pointcut public abstract void delete(); diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 9325e84..b521a53 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -19,17 +19,25 @@ - + - - - - - - - + + + + + + + + + @@ -39,4 +47,4 @@ - \ No newline at end of file +