@@ -90,6 +90,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
90
90
public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false ;
91
91
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit" ;
92
92
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false ;
93
+ public static final String GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL = "gobblin.copy.shouldFailWhenPermissionsFail" ;
94
+ public static final boolean DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL = true ;
93
95
94
96
protected final AtomicLong bytesWritten = new AtomicLong ();
95
97
protected final AtomicLong filesWritten = new AtomicLong ();
@@ -108,6 +110,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
108
110
private final Configuration conf ;
109
111
110
112
protected final Meter copySpeedMeter ;
113
+ protected final boolean shouldFailWhenPermissionsFail ;
111
114
112
115
protected final Optional <String > writerAttemptIdOptional ;
113
116
/**
@@ -181,6 +184,8 @@ public FileAwareInputStreamDataWriter(State state, FileSystem fileSystem, int nu
181
184
} else {
182
185
this .renameOptions = Options .Rename .NONE ;
183
186
}
187
+ this .shouldFailWhenPermissionsFail = state .getPropAsBoolean (GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL ,
188
+ DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL );
184
189
}
185
190
186
191
public FileAwareInputStreamDataWriter (State state , int numBranches , int branchId )
@@ -350,10 +355,13 @@ public static Path getOutputDir(State state) {
350
355
}
351
356
352
357
/**
353
- * Sets the {@link FsPermission}, owner, group for the path passed. It will not throw exceptions, if operations
354
- * cannot be executed, will warn and continue.
358
+ * Sets the {@link FsPermission}, owner, group for the path passed. It uses `requirePermissionSetForSuccess` param
359
+ * to determine whether an exception will be thrown or only error log printed in the case of failure.
360
+ * @param requirePermissionSetForSuccess if true then throw exception, otherwise log error message and continue when
361
+ * operations cannot be executed.
355
362
*/
356
- public static void safeSetPathPermission (FileSystem fs , FileStatus file , OwnerAndPermission ownerAndPermission ) {
363
+ public static void setPathPermission (FileSystem fs , FileStatus file , OwnerAndPermission ownerAndPermission ,
364
+ boolean requirePermissionSetForSuccess ) throws IOException {
357
365
358
366
Path path = file .getPath ();
359
367
OwnerAndPermission targetOwnerAndPermission = setOwnerExecuteBitIfDirectory (file , ownerAndPermission );
@@ -367,7 +375,12 @@ public static void safeSetPathPermission(FileSystem fs, FileStatus file, OwnerAn
367
375
fs .setPermission (path , targetOwnerAndPermission .getFsPermission ());
368
376
}
369
377
} catch (IOException ioe ) {
370
- log .warn ("Failed to set permission for directory " + path , ioe );
378
+ String permissionFailureMessage = "Failed to set permission for directory " + path ;
379
+ if (requirePermissionSetForSuccess ) {
380
+ throw new IOException (permissionFailureMessage , ioe );
381
+ } else {
382
+ log .error (permissionFailureMessage , ioe );
383
+ }
371
384
}
372
385
373
386
String owner = Strings .isNullOrEmpty (targetOwnerAndPermission .getOwner ()) ? null : targetOwnerAndPermission .getOwner ();
@@ -378,7 +391,12 @@ public static void safeSetPathPermission(FileSystem fs, FileStatus file, OwnerAn
378
391
fs .setOwner (path , owner , group );
379
392
}
380
393
} catch (IOException ioe ) {
381
- log .warn ("Failed to set owner and/or group for path " + path + " to " + owner + ":" + group , ioe );
394
+ String ownerGroupFailureMessage = "Failed to set owner and/or group for path " + path + " to " + owner + ":" + group ;
395
+ if (requirePermissionSetForSuccess ) {
396
+ throw new IOException (ownerGroupFailureMessage , ioe );
397
+ } else {
398
+ log .error (ownerGroupFailureMessage , ioe );
399
+ }
382
400
}
383
401
}
384
402
@@ -394,7 +412,7 @@ private void setRecursivePermission(Path path, OwnerAndPermission ownerAndPermis
394
412
Collections .reverse (files );
395
413
396
414
for (FileStatus file : files ) {
397
- safeSetPathPermission (this .fs , file , ownerAndPermission );
415
+ setPathPermission (this .fs , file , ownerAndPermission , this . shouldFailWhenPermissionsFail );
398
416
}
399
417
}
400
418
@@ -480,7 +498,7 @@ public void commit()
480
498
try {
481
499
this .fs .delete (this .stagingDir , true );
482
500
} catch (IOException ioe ) {
483
- log .warn ("Failed to delete staging path at " + this .stagingDir );
501
+ log .error ("Failed to delete staging path at " + this .stagingDir );
484
502
}
485
503
}
486
504
}
0 commit comments