From 146fccb5c1cf2931189d6c2b896ae9bd8bd50fd7 Mon Sep 17 00:00:00 2001 From: Steffen Grohsschmiedt Date: Thu, 19 May 2016 13:31:36 -0400 Subject: [PATCH 1/2] Use s3mper for NativeS3FileSystem and GoogleHadoopFileSystemBase only We don't want s3mper to keep track of all intermediate files stored on HDFS. Configure s3mper to only keep track of GCS files. --- src/main/resources/META-INF/aop.xml | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 9325e84..9db93ca 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -19,17 +19,23 @@ - + - - - - - - - + + + + + + + + @@ -39,4 +45,4 @@ - \ No newline at end of file + From e719697c33e81b45b440fe060471a0c2807a67fe Mon Sep 17 00:00:00 2001 From: Steffen Grohsschmiedt Date: Thu, 19 May 2016 13:47:44 -0400 Subject: [PATCH 2/2] Add support for rename Implement rename by first duplicating the metadata for the destination, then executing the rename and finally deleting the metadata for the source location. This prevents incomplete listings for both, the source and the destination as clients will fail or retry on missing objects. In case of invalid operations, the rename will return false and the duplicated metadata will be deleted again. In case of errors, rename will throw Exceptions and the metadata will not be cleaned up, thus preventing incomplete listings for both, source and destination. Manual cleanup will be required in these scenarios but preventing reading incomplete data is the most important right now. --- .../listing/ConsistentListingAspect.java | 113 +++++++++++++++++- src/main/resources/META-INF/aop.xml | 2 + 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java b/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java index 87c9fc6..45022a3 100644 --- a/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java +++ b/src/main/java/com/netflix/bdp/s3mper/listing/ConsistentListingAspect.java @@ -441,7 +441,118 @@ private List checkListing(List metastoreListing, FileStatus [] s return s3files.toArray(new FileStatus[s3files.size()]); } - + + @Pointcut + public abstract void rename(); + + /** + * Rename listing records based on a rename call from the FileSystem. + * + * @param pjp + * @return + * @throws Throwable + */ + @Around("rename() && !within(ConsistentListingAspect)") + public Object metastoreRename(final ProceedingJoinPoint pjp) throws Throwable { + if(disabled) { + return pjp.proceed(); + } + + Configuration conf = ((FileSystem) pjp.getTarget()).getConf(); + updateConfig(conf); + FileSystem fs = (FileSystem) pjp.getTarget(); + + Path srcPath = (Path) pjp.getArgs()[0]; + Path dstPath = (Path) pjp.getArgs()[1]; + + metadataRename(conf, fs, srcPath, dstPath); + + Object obj = pjp.proceed(); + if ((Boolean) obj) { + // Everything went fine delete the old metadata. + // If not then we'll keep the metadata to prevent incomplete listings. + // Manual cleanup will be required in the case of failure. + metadataCleanup(conf, fs, srcPath); + } + return obj; + } + + private void metadataRename(Configuration conf, FileSystem fs, Path srcPath, Path dstPath) throws Exception { + try { + if (fs.exists(srcPath) && !fs.exists(dstPath) && fs.isFile(srcPath)) { + renameFile(srcPath, dstPath); + } else if (fs.exists(srcPath) && !fs.exists(dstPath) && !fs.isFile(srcPath)){ + renameFolder(srcPath, dstPath); + } else { + throw new UnsupportedOperationException("Move" + srcPath.toUri().getPath() + + " to " + dstPath.toUri().getPath()); + } + } catch (TimeoutException t) { + log.error("Timeout occurred rename metastore path: " + srcPath, t); + + alertDispatcher.timeout("metastoreRename", Collections.singletonList(srcPath)); + + if(failOnTimeout) { + throw t; + } + } catch (Exception e) { + log.error("Error rename paths from metastore: " + srcPath, e); + + if(shouldFail(conf)) { + throw e; + } + } + } + + private void renameFile(Path src, Path dst) throws Exception { + metastore.add(dst, false); + } + + private void renameFolder(Path src, Path dst) throws Exception { + metastore.add(dst, true); + List metastoreFiles = metastore.list(Collections.singletonList(src)); + for (FileInfo info : metastoreFiles) { + Path target = new Path(dst, info.getPath().getName()); + if (info.isDirectory()) { + renameFolder(info.getPath(), target); + } else { + renameFile(info.getPath(), target); + } + } + } + + private void metadataCleanup(Configuration conf, FileSystem fs, Path srcPath) throws Exception { + try { + renameCleanup(fs, srcPath); + } catch (TimeoutException t) { + log.error("Timeout occurred rename cleanup metastore path: " + srcPath, t); + + alertDispatcher.timeout("metastoreRenameCleanup", Collections.singletonList(srcPath)); + + if(failOnTimeout) { + throw t; + } + } catch (Exception e) { + log.error("Error executing rename cleanup for paths from metastore: " + srcPath, e); + + if(shouldFail(conf)) { + throw e; + } + } + } + + private void renameCleanup(FileSystem fs, Path path) throws Exception { + if (fs.isFile(path)) { + metastore.delete(path); + return; + } + + List metastoreFiles = metastore.list(Collections.singletonList(path)); + for (FileInfo info : metastoreFiles) { + renameCleanup(fs, info.getPath()); + } + metastore.delete(path); + } @Pointcut public abstract void delete(); diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index 9db93ca..b521a53 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -35,6 +35,8 @@ execution(* com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(..))"/> +