diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java b/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java index be84884f94c5..01bab9e5e9da 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -59,6 +60,14 @@ public abstract class FetchFileTransfer extends AbstractProcessor { public static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the property"); public static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system"); public static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason"; + + // Failure reason constants + public static final String FAILURE_REASON_PERMISSION_DENIED_READ = "permission.denied.read"; + public static final String FAILURE_REASON_COMPLETION_DELETE_PERMISSION_DENIED = "completion.delete.permission.denied"; + public static final String FAILURE_REASON_COMPLETION_DELETE_IO_ERROR = "completion.delete.io.error"; + public static final String FAILURE_REASON_COMPLETION_MOVE_PERMISSION_DENIED = "completion.move.permission.denied"; + public static final String FAILURE_REASON_COMPLETION_MOVE_IO_ERROR = "completion.move.io.error"; + public static final String OLD_FILE_NOT_FOUND_LOG_LEVEL_PROPERTY_NAME = "fetchfiletransfer-notfound-loglevel"; public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() @@ -106,14 +115,30 @@ public abstract class FetchFileTransfer extends AbstractProcessor { .required(false) .build(); public static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder() - .name("Move Destination Directory") - .description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. " - + "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on " - + "the remote system if '%s' is disabled, or the rename will fail.", - COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName())) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) + .name("Move Destination Directory") + .description(""" + The directory on the remote server to move the original file to once it has been ingested into NiFi. + This property is ignored unless the %s is set to '%s'. + The specified directory must already exist on the remote system if '%s' is disabled, or the rename will fail. + """.formatted( COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()) + ) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("Move Conflict Resolution") + .description(String.format("Determines how to handle filename collisions when '%s' is '%s'. " + + "This setting controls behavior when the target file exists in the %s.", + COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_DESTINATION_DIR.getDisplayName())) + .required(true) + .dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE.getValue()) + .allowableValues(FileTransfer.CONFLICT_RESOLUTION_REPLACE_ALLOWABLE, + FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE, + FileTransfer.CONFLICT_RESOLUTION_RENAME_ALLOWABLE + ) + .defaultValue(FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE) .build(); public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder() @@ -147,7 +172,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor { REMOTE_FILENAME, COMPLETION_STRATEGY, MOVE_DESTINATION_DIR, - MOVE_CREATE_DIRECTORY + MOVE_CREATE_DIRECTORY, + MOVE_CONFLICT_RESOLUTION ); private static final Set RELATIONSHIPS = Set.of( @@ -256,6 +282,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } Relationship failureRelationship = null; + String failureReason = null; boolean closeConnOnFailure = false; try { @@ -268,6 +295,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session flowFile, filename, host, failureRelationship.getName()); } catch (final PermissionDeniedException e) { failureRelationship = REL_PERMISSION_DENIED; + failureReason = FAILURE_REASON_PERMISSION_DENIED_READ; getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", flowFile, filename, host, failureRelationship.getName()); } catch (final ProcessException | IOException e) { @@ -296,7 +324,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } if (failureRelationship != null) { - attributes.put(FAILURE_REASON_ATTRIBUTE, failureRelationship.getName()); + attributes.put(FAILURE_REASON_ATTRIBUTE, failureReason != null ? failureReason : failureRelationship.getName()); flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(session.penalize(flowFile), failureRelationship); session.getProvenanceReporter().route(flowFile, failureRelationship); @@ -306,21 +334,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session flowFile = session.putAllAttributes(flowFile, attributes); - // emit provenance event and transfer FlowFile + // Perform completion strategy before commit + final String completionFailureReason = performCompletionStrategy(transfer, context, flowFile, filename, host, port); + + if (completionFailureReason != null) { + // Completion strategy failed - determine the appropriate failure relationship + final Relationship completionFailureRelationship; + if (completionFailureReason.contains("permission")) { + completionFailureRelationship = REL_PERMISSION_DENIED; + } else { + completionFailureRelationship = REL_COMMS_FAILURE; + closeConnOnFailure = true; + } + + attributes.put(FAILURE_REASON_ATTRIBUTE, completionFailureReason); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(session.penalize(flowFile), completionFailureRelationship); + session.getProvenanceReporter().route(flowFile, completionFailureRelationship); + cleanupTransfer(transfer, closeConnOnFailure, transferQueue, host, port); + return; + } + + // Both fetch and completion succeeded - emit provenance event and transfer to success session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where - // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would - // result in data loss! If we commit the session first, we are safe. - final BlockingQueue queue = transferQueue; - final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port); - - final FlowFile flowFileReceived = flowFile; - session.commitAsync(() -> { - performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port); - cleanupTask.run(); - }, t -> cleanupTask.run()); + // Cleanup connection + cleanupTransfer(transfer, false, transferQueue, host, port); } catch (final Throwable t) { getLogger().error("Failed to fetch file", t); cleanupTransfer(transfer, true, transferQueue, host, port); @@ -341,36 +381,92 @@ private void cleanupTransfer(final FileTransfer transfer, final boolean closeCon } } - private void performCompletionStrategy(final FileTransfer transfer, final ProcessContext context, final FlowFile flowFile, final String filename, final String host, final int port) { + /** + * Performs the configured completion strategy (MOVE or DELETE). + * + * @return null if successful or NONE strategy, or a failure reason string if the completion strategy failed + */ + private String performCompletionStrategy(final FileTransfer transfer, final ProcessContext context, final FlowFile flowFile, final String filename, final String host, final int port) { final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); + if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { try { transfer.deleteFile(flowFile, null, filename); - } catch (final FileNotFoundException ignored) { - // file doesn't exist -- effectively the same as removing it. Move on. - } catch (final IOException ioe) { - getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", - flowFile, host, port, filename, ioe, ioe); + getLogger().debug("Successfully deleted source file {} for {}", filename, flowFile); + } catch (final FileNotFoundException e) { + getLogger().debug("Source file not found during delete for {}. Nothing to delete.", flowFile); + // Not a failure - file is already gone + } catch (final PermissionDeniedException e) { + getLogger().error("Failed to delete {} on {}:{} due to insufficient permissions", flowFile, host, port); + return FAILURE_REASON_COMPLETION_DELETE_PERMISSION_DENIED; + } catch (final IOException e) { + getLogger().error("Failed to delete {} on {}:{} due to {}", flowFile, host, port, e.toString(), e); + return FAILURE_REASON_COMPLETION_DELETE_IO_ERROR; } } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { - final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); - final String simpleFilename = StringUtils.substringAfterLast(filename, "/"); + final String moveDestination = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); + final String simpleFilename = getSimpleFilename(filename); try { - final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir); + final String absoluteTargetDir = transfer.getAbsolutePath(flowFile, moveDestination); + + // Create directory if configured if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) { - // Create the target directory if necessary. - transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath)); + transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDir)); } - final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename); + // Handle conflict resolution + String destinationFilename = simpleFilename; + final FileInfo existingFile = transfer.getRemoteFileInfo(flowFile, absoluteTargetDir, simpleFilename); + final String conflictStrategy = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue(); + + if (existingFile != null) { + if (FileTransfer.CONFLICT_RESOLUTION_REPLACE.equalsIgnoreCase(conflictStrategy)) { + transfer.deleteFile(flowFile, absoluteTargetDir, simpleFilename); + getLogger().debug("Deleted existing file {} to replace for {}", simpleFilename, flowFile); + } else if (FileTransfer.CONFLICT_RESOLUTION_RENAME.equalsIgnoreCase(conflictStrategy)) { + destinationFilename = UUID.randomUUID() + "." + simpleFilename; + getLogger().info("Renaming to {} to avoid conflict for {}", destinationFilename, flowFile); + } else { + getLogger().debug("File already exists at destination; skipping move per {} strategy for {}", conflictStrategy, flowFile); + // IGNORE strategy - not a failure, just skip the move + return null; + } + } + + // Perform the move + final String destinationPath = absoluteTargetDir + "/" + destinationFilename; transfer.rename(flowFile, filename, destinationPath); + getLogger().debug("Successfully moved source file from {} to {} for {}", filename, destinationPath, flowFile); - } catch (final IOException ioe) { - getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", - flowFile, host, port, filename, ioe, ioe); + } catch (final FileNotFoundException e) { + getLogger().debug("Source file not found during move for {}. Nothing to move.", flowFile); + // Not a failure - file is already gone + } catch (final PermissionDeniedException e) { + getLogger().error("Failed to move {} on {}:{} due to insufficient permissions", flowFile, host, port); + return FAILURE_REASON_COMPLETION_MOVE_PERMISSION_DENIED; + } catch (final IOException e) { + getLogger().error("Failed to move {} on {}:{} due to {}", flowFile, host, port, e.toString(), e); + return FAILURE_REASON_COMPLETION_MOVE_IO_ERROR; } } + + return null; // Success or NONE strategy + } + + private static String getSimpleFilename(final String path) { + if (path == null || path.isEmpty()) { + return null; + } + + // 1. Normalize Windows backslashes to forward slashes + String normalized = path.replace("\\", "/"); + + // 2. Remove trailing slashes + normalized = normalized.replaceAll("/+$", ""); + + // 3. Use Paths.get to extract the last element + return java.nio.file.Paths.get(normalized).getFileName().toString(); } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index 7acb7da05fa4..713a1cb1d0bc 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -97,6 +97,7 @@ public class FetchFTP extends FetchFileTransfer { COMPLETION_STRATEGY, MOVE_DESTINATION_DIR, MOVE_CREATE_DIRECTORY, + MOVE_CONFLICT_RESOLUTION, FTPTransfer.CONNECTION_TIMEOUT, FTPTransfer.DATA_TIMEOUT, FTPTransfer.USE_COMPRESSION, diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 1e21adef703e..ae37cdb123c5 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -108,6 +108,7 @@ public class FetchSFTP extends FetchFileTransfer { COMPLETION_STRATEGY, MOVE_DESTINATION_DIR, MOVE_CREATE_DIRECTORY, + MOVE_CONFLICT_RESOLUTION, DISABLE_DIRECTORY_LISTING, SFTPTransfer.CONNECTION_TIMEOUT, SFTPTransfer.DATA_TIMEOUT, diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 89791ebaca14..083ba3e061a8 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -228,7 +228,7 @@ public void testGetFtp() { @Test public void testFetchFtp() { FileSystem results = fakeFtpServer.getFileSystem(); - + results.add(new DirectoryEntry("c:\\data\\data")); FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); sampleFile.setContents("Just some random test test test chocolate"); results.add(sampleFile); @@ -328,6 +328,7 @@ public void testListFtpHostPortVariablesFileFound() throws InterruptedException disabledReason = "org.mockftpserver does not support specification of charset") public void testFetchFtpUnicodeFileName() { FileSystem fs = fakeFtpServer.getFileSystem(); + fs.add(new DirectoryEntry("c:\\data\\data")); FileEntry sampleFile = new FileEntry("c:\\data\\őűőű.txt"); sampleFile.setContents("Just some random test test test chocolate"); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java index 32c75d5a5d31..f6be5c16d11a 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java @@ -28,6 +28,7 @@ import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processor.util.file.transfer.FileTransfer; +import org.apache.nifi.processor.util.file.transfer.FileInfo; import org.apache.nifi.processor.util.file.transfer.PermissionDeniedException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; @@ -71,7 +72,7 @@ public void setUp() throws Exception { MockProcessContext ctx = (MockProcessContext) runner.getProcessContext(); setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT, - FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE); + FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE); ctx.setProperty(FTPTransfer.USERNAME, "foo"); ctx.setProperty(FTPTransfer.PASSWORD, "bar"); } @@ -128,7 +129,7 @@ public void testInsufficientPermissions() { runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); - assertEquals(FetchFileTransfer.REL_PERMISSION_DENIED.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); + assertEquals(FetchFileTransfer.FAILURE_REASON_PERMISSION_DENIED_READ, transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); } @Test @@ -213,6 +214,23 @@ public void testDeleteFile() { assertTrue(proc.fileContents.isEmpty()); } + @Test + public void testDeleteCommsFailure() { + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); + proc.isDeleteCommFailure = true; + + addFileAndEnqueue("hello.txt"); + + runner.run(1, false, false); + // Completion strategy is now performed pre-commit; I/O errors during delete cause routing to comms.failure + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.FAILURE_REASON_COMPLETION_DELETE_IO_ERROR, transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); + // On delete comms failure, a remote file should still exist since transaction failed + assertTrue(proc.fileContents.containsKey("hello.txt")); + } + @Test public void testDeleteFails() { runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); @@ -221,8 +239,13 @@ public void testDeleteFails() { addFileAndEnqueue("hello.txt"); runner.run(1, false, false); - runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); - assertFalse(proc.fileContents.isEmpty()); + // Completion strategy is now performed pre-commit; permission denied during delete causes routing to permission.denied + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.FAILURE_REASON_COMPLETION_DELETE_PERMISSION_DENIED, transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); + // The original remote file should remain since the transaction failed + assertTrue(proc.fileContents.containsKey("hello.txt")); } @Test @@ -235,12 +258,30 @@ public void testRenameFails() { addFileAndEnqueue("hello.txt"); runner.run(1, false, false); - runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); assertEquals(1, proc.fileContents.size()); assertTrue(proc.fileContents.containsKey("hello.txt")); } + @Test + public void testMoveConflictReplace() { + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); + runner.setProperty(FetchFileTransfer.MOVE_CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); + + // Destination exists + proc.addContent("/moved/hello.txt", "old".getBytes()); + addFileAndEnqueue("hello.txt"); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + + assertFalse(proc.fileContents.containsKey("hello.txt")); + assertTrue(proc.fileContents.containsKey("/moved/hello.txt")); + } + + @Test public void testCreateDirFails() { runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); @@ -252,7 +293,7 @@ public void testCreateDirFails() { proc.allowCreateDir = false; runner.run(1, false, false); - runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); assertEquals(1, proc.fileContents.size()); assertTrue(proc.fileContents.containsKey("hello.txt")); @@ -267,6 +308,7 @@ private static class TestableFetchFTP extends FetchFTP { private boolean isClosed = false; private boolean isFileNotFound = false; private boolean isCommFailure = false; + private boolean isDeleteCommFailure = false; private int numberOfFileTransfers = 0; private final Map fileContents = new HashMap<>(); private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class); @@ -318,12 +360,23 @@ public void deleteFile(FlowFile flowFile, String path, String remoteFileName) th if (!allowDelete) { throw new PermissionDeniedException("test permission denied"); } + if (isDeleteCommFailure) { + throw new IOException("test delete communication failure"); + } + + String key; + if (path == null) { + key = remoteFileName; + } else { + key = (path.endsWith("/") ? path.substring(0, path.length() - 1) : path) + "/" + remoteFileName; + } + key = key.replaceAll("/+", "/"); - if (!fileContents.containsKey(remoteFileName)) { + if (!fileContents.containsKey(key)) { throw new FileNotFoundException(); } - fileContents.remove(remoteFileName); + fileContents.remove(key); } @Override @@ -332,12 +385,15 @@ public void rename(FlowFile flowFile, String source, String target) throws IOExc throw new PermissionDeniedException("test permission denied"); } - if (!fileContents.containsKey(source)) { + final String normalizedSource = source.replaceAll("/+", "/"); + final String normalizedTarget = target.replaceAll("/+", "/"); + + if (!fileContents.containsKey(normalizedSource)) { throw new FileNotFoundException(); } - final byte[] content = fileContents.remove(source); - fileContents.put(target, content); + final byte[] content = fileContents.remove(normalizedSource); + fileContents.put(normalizedTarget, content); } @Override @@ -347,11 +403,49 @@ public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throw } } + @Override + public String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException { + final String abs; + if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) { + abs = new File(getHomeDirectory(flowFile), remotePath).getPath(); + } else { + abs = remotePath; + } + String norm = abs.replace("\\", "/"); + norm = norm.replaceAll("/+", "/"); + if (norm.endsWith("/") && norm.length() > 1) { + norm = norm.substring(0, norm.length() - 1); + } + return norm; + } + @Override public void close() throws IOException { super.close(); isClosed = true; } + + @Override + public String getHomeDirectory(FlowFile flowFile) throws IOException { + return "/"; + } + + @Override + public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { + final String dir = path == null ? "/" : path; + String key = (dir.endsWith("/") ? dir.substring(0, dir.length() - 1) : dir) + "/" + remoteFileName; + key = key.replaceAll("/+", "/"); + final byte[] content = fileContents.get(key); + if (content == null) { + return null; + } + return new FileInfo.Builder() + .filename(remoteFileName) + .fullPathFileName(key) + .directory(false) + .size(content.length) + .build(); + } }; } }