Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b92bdda
NIFI-12345 Added conflict resolution strategies for file move operati…
ravinarayansingh Oct 9, 2025
abe0f06
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 23, 2025
7b7c898
Refactored unique filename generation to use UUID in FileTransferConf…
ravinarayansingh Oct 23, 2025
5bcaf61
Updated MOVE_DESTINATION_DIR property formatting in FetchFileTransfer
ravinarayansingh Oct 23, 2025
1350efb
Updated MOVE_DESTINATION_DIR property formatting in FetchFileTransfer
ravinarayansingh Oct 23, 2025
f73dfa9
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
e99ee17
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
66fb517
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
66d7dd0
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
88fb177
Refactored FetchFileTransfer unique filename generation
ravinarayansingh Oct 27, 2025
ba2a476
Refactored FetchFileTransfer post-commit completion strategy for DELE…
ravinarayansingh Oct 30, 2025
27b584f
Simplified FetchFileTransfer completion strategy logic
ravinarayansingh Oct 31, 2025
4473b9d
Merge branch 'apache:main' into NIFI-15077
ravinarayansingh Nov 5, 2025
1727e72
Refactored FetchFileTransfer MOVE and DELETE strategies
ravinarayansingh Nov 5, 2025
362be85
Refactored FetchFileTransfer completion strategy handling
ravinarayansingh Nov 6, 2025
194ec41
Merge branch 'apache:main' into NIFI-15077
ravinarayansingh Nov 10, 2025
8874497
updated getSimpleFilename to handle a window style path
ravinarayansingh Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -59,6 +60,14 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
public static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
public static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
public static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason";

// Failure reason constants
public static final String FAILURE_REASON_PERMISSION_DENIED_READ = "permission.denied.read";
public static final String FAILURE_REASON_COMPLETION_DELETE_PERMISSION_DENIED = "completion.delete.permission.denied";
public static final String FAILURE_REASON_COMPLETION_DELETE_IO_ERROR = "completion.delete.io.error";
public static final String FAILURE_REASON_COMPLETION_MOVE_PERMISSION_DENIED = "completion.move.permission.denied";
public static final String FAILURE_REASON_COMPLETION_MOVE_IO_ERROR = "completion.move.io.error";

public static final String OLD_FILE_NOT_FOUND_LOG_LEVEL_PROPERTY_NAME = "fetchfiletransfer-notfound-loglevel";

public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -106,14 +115,30 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();
public static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
.description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
+ "the remote system if '%s' is disabled, or the rename will fail.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.name("Move Destination Directory")
.description("""
The directory on the remote server to move the original file to once it has been ingested into NiFi.
This property is ignored unless the %s is set to '%s'.
The specified directory must already exist on the remote system if '%s' is disabled, or the rename will fail.
""".formatted( COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName())
)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();

public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Move Conflict Resolution")
.description(String.format("Determines how to handle filename collisions when '%s' is '%s'. " +
"This setting controls behavior when the target file exists in the %s.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_DESTINATION_DIR.getDisplayName()))
.required(true)
.dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE.getValue())
.allowableValues(FileTransfer.CONFLICT_RESOLUTION_REPLACE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_RENAME_ALLOWABLE
)
.defaultValue(FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE)
.build();

public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -147,7 +172,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
REMOTE_FILENAME,
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION
);

private static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down Expand Up @@ -256,6 +282,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

Relationship failureRelationship = null;
String failureReason = null;
boolean closeConnOnFailure = false;

try {
Expand All @@ -268,6 +295,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
flowFile, filename, host, failureRelationship.getName());
} catch (final PermissionDeniedException e) {
failureRelationship = REL_PERMISSION_DENIED;
failureReason = FAILURE_REASON_PERMISSION_DENIED_READ;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
flowFile, filename, host, failureRelationship.getName());
} catch (final ProcessException | IOException e) {
Expand Down Expand Up @@ -296,7 +324,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

if (failureRelationship != null) {
attributes.put(FAILURE_REASON_ATTRIBUTE, failureRelationship.getName());
attributes.put(FAILURE_REASON_ATTRIBUTE, failureReason != null ? failureReason : failureRelationship.getName());
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(session.penalize(flowFile), failureRelationship);
session.getProvenanceReporter().route(flowFile, failureRelationship);
Expand All @@ -306,21 +334,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

flowFile = session.putAllAttributes(flowFile, attributes);

// emit provenance event and transfer FlowFile
// Perform completion strategy before commit
final String completionFailureReason = performCompletionStrategy(transfer, context, flowFile, filename, host, port);

if (completionFailureReason != null) {
// Completion strategy failed - determine the appropriate failure relationship
final Relationship completionFailureRelationship;
if (completionFailureReason.contains("permission")) {
completionFailureRelationship = REL_PERMISSION_DENIED;
} else {
completionFailureRelationship = REL_COMMS_FAILURE;
closeConnOnFailure = true;
}

attributes.put(FAILURE_REASON_ATTRIBUTE, completionFailureReason);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(session.penalize(flowFile), completionFailureRelationship);
session.getProvenanceReporter().route(flowFile, completionFailureRelationship);
cleanupTransfer(transfer, closeConnOnFailure, transferQueue, host, port);
return;
}

// Both fetch and completion succeeded - emit provenance event and transfer to success
session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);

// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
// result in data loss! If we commit the session first, we are safe.
final BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port);

final FlowFile flowFileReceived = flowFile;
session.commitAsync(() -> {
performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port);
cleanupTask.run();
}, t -> cleanupTask.run());
// Cleanup connection
cleanupTransfer(transfer, false, transferQueue, host, port);
} catch (final Throwable t) {
getLogger().error("Failed to fetch file", t);
cleanupTransfer(transfer, true, transferQueue, host, port);
Expand All @@ -341,36 +381,92 @@ private void cleanupTransfer(final FileTransfer transfer, final boolean closeCon
}
}

private void performCompletionStrategy(final FileTransfer transfer, final ProcessContext context, final FlowFile flowFile, final String filename, final String host, final int port) {
/**
* Performs the configured completion strategy (MOVE or DELETE).
*
* @return null if successful or NONE strategy, or a failure reason string if the completion strategy failed
*/
private String performCompletionStrategy(final FileTransfer transfer, final ProcessContext context, final FlowFile flowFile, final String filename, final String host, final int port) {
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();

if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
try {
transfer.deleteFile(flowFile, null, filename);
} catch (final FileNotFoundException ignored) {
// file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
flowFile, host, port, filename, ioe, ioe);
getLogger().debug("Successfully deleted source file {} for {}", filename, flowFile);
} catch (final FileNotFoundException e) {
getLogger().debug("Source file not found during delete for {}. Nothing to delete.", flowFile);
// Not a failure - file is already gone
} catch (final PermissionDeniedException e) {
getLogger().error("Failed to delete {} on {}:{} due to insufficient permissions", flowFile, host, port);
return FAILURE_REASON_COMPLETION_DELETE_PERMISSION_DENIED;
} catch (final IOException e) {
getLogger().error("Failed to delete {} on {}:{} due to {}", flowFile, host, port, e.toString(), e);
return FAILURE_REASON_COMPLETION_DELETE_IO_ERROR;
}
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
final String moveDestination = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = getSimpleFilename(filename);

try {
final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
final String absoluteTargetDir = transfer.getAbsolutePath(flowFile, moveDestination);

// Create directory if configured
if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
// Create the target directory if necessary.
transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath));
transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDir));
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename);
// Handle conflict resolution
String destinationFilename = simpleFilename;
final FileInfo existingFile = transfer.getRemoteFileInfo(flowFile, absoluteTargetDir, simpleFilename);
final String conflictStrategy = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue();

if (existingFile != null) {
if (FileTransfer.CONFLICT_RESOLUTION_REPLACE.equalsIgnoreCase(conflictStrategy)) {
transfer.deleteFile(flowFile, absoluteTargetDir, simpleFilename);
getLogger().debug("Deleted existing file {} to replace for {}", simpleFilename, flowFile);
} else if (FileTransfer.CONFLICT_RESOLUTION_RENAME.equalsIgnoreCase(conflictStrategy)) {
destinationFilename = UUID.randomUUID() + "." + simpleFilename;
getLogger().info("Renaming to {} to avoid conflict for {}", destinationFilename, flowFile);
} else {
getLogger().debug("File already exists at destination; skipping move per {} strategy for {}", conflictStrategy, flowFile);
// IGNORE strategy - not a failure, just skip the move
return null;
}
}

// Perform the move
final String destinationPath = absoluteTargetDir + "/" + destinationFilename;
transfer.rename(flowFile, filename, destinationPath);
getLogger().debug("Successfully moved source file from {} to {} for {}", filename, destinationPath, flowFile);

} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
flowFile, host, port, filename, ioe, ioe);
} catch (final FileNotFoundException e) {
getLogger().debug("Source file not found during move for {}. Nothing to move.", flowFile);
// Not a failure - file is already gone
} catch (final PermissionDeniedException e) {
getLogger().error("Failed to move {} on {}:{} due to insufficient permissions", flowFile, host, port);
return FAILURE_REASON_COMPLETION_MOVE_PERMISSION_DENIED;
} catch (final IOException e) {
getLogger().error("Failed to move {} on {}:{} due to {}", flowFile, host, port, e.toString(), e);
return FAILURE_REASON_COMPLETION_MOVE_IO_ERROR;
}
}

return null; // Success or NONE strategy
}

private static String getSimpleFilename(final String path) {
if (path == null || path.isEmpty()) {
return null;
}

// 1. Normalize Windows backslashes to forward slashes
String normalized = path.replace("\\", "/");

// 2. Remove trailing slashes
normalized = normalized.replaceAll("/+$", "");

// 3. Use Paths.get to extract the last element
return java.nio.file.Paths.get(normalized).getFileName().toString();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class FetchFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
FTPTransfer.CONNECTION_TIMEOUT,
FTPTransfer.DATA_TIMEOUT,
FTPTransfer.USE_COMPRESSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class FetchSFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
DISABLE_DIRECTORY_LISTING,
SFTPTransfer.CONNECTION_TIMEOUT,
SFTPTransfer.DATA_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testGetFtp() {
@Test
public void testFetchFtp() {
FileSystem results = fakeFtpServer.getFileSystem();

results.add(new DirectoryEntry("c:\\data\\data"));
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
sampleFile.setContents("Just some random test test test chocolate");
results.add(sampleFile);
Expand Down Expand Up @@ -328,6 +328,7 @@ public void testListFtpHostPortVariablesFileFound() throws InterruptedException
disabledReason = "org.mockftpserver does not support specification of charset")
public void testFetchFtpUnicodeFileName() {
FileSystem fs = fakeFtpServer.getFileSystem();
fs.add(new DirectoryEntry("c:\\data\\data"));

FileEntry sampleFile = new FileEntry("c:\\data\\őűőű.txt");
sampleFile.setContents("Just some random test test test chocolate");
Expand Down
Loading