Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b92bdda
NIFI-12345 Added conflict resolution strategies for file move operati…
ravinarayansingh Oct 9, 2025
abe0f06
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 23, 2025
7b7c898
Refactored unique filename generation to use UUID in FileTransferConf…
ravinarayansingh Oct 23, 2025
5bcaf61
Updated MOVE_DESTINATION_DIR property formatting in FetchFileTransfer
ravinarayansingh Oct 23, 2025
1350efb
Updated MOVE_DESTINATION_DIR property formatting in FetchFileTransfer
ravinarayansingh Oct 23, 2025
f73dfa9
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
e99ee17
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
66fb517
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
66d7dd0
Update nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer…
ravinarayansingh Oct 25, 2025
88fb177
Refactored FetchFileTransfer unique filename generation
ravinarayansingh Oct 27, 2025
ba2a476
Refactored FetchFileTransfer post-commit completion strategy for DELE…
ravinarayansingh Oct 30, 2025
27b584f
Simplified FetchFileTransfer completion strategy logic
ravinarayansingh Oct 31, 2025
4473b9d
Merge branch 'apache:main' into NIFI-15077
ravinarayansingh Nov 5, 2025
1727e72
Refactored FetchFileTransfer MOVE and DELETE strategies
ravinarayansingh Nov 5, 2025
362be85
Refactored FetchFileTransfer completion strategy handling
ravinarayansingh Nov 6, 2025
194ec41
Merge branch 'apache:main' into NIFI-15077
ravinarayansingh Nov 10, 2025
8874497
updated getSimpleFilename to handle a window style path
ravinarayansingh Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();

public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Move Conflict Resolution")
.description(String.format("Determines how to handle filename collisions when '%s' is '%s'. " +
"This setting controls behavior when the target file exists in the %s.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_DESTINATION_DIR.getDisplayName()))
.required(true)
.dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE.getValue())
.allowableValues(FileTransfer.CONFLICT_RESOLUTION_REPLACE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_RENAME_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_REJECT_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_FAIL_ALLOWABLE)
.defaultValue(FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE)
.build();

public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log Level When File Not Found")
.description("Log level to use in case the file does not exist when the processor is triggered")
Expand Down Expand Up @@ -147,7 +162,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
REMOTE_FILENAME,
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION
);

private static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down Expand Up @@ -363,7 +379,45 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces
transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath));
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename);
String destinationFileName = simpleFilename;
final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, absoluteTargetDirPath, destinationFileName);
if (remoteFileInfo != null) {
final String strategy = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue();
getLogger().info("Detected filename conflict moving remote file for {} so handling using configured Conflict Resolution of {}", flowFile, strategy);
switch (strategy.toUpperCase()) {
case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
try {
transfer.deleteFile(flowFile, absoluteTargetDirPath, destinationFileName);
} catch (final IOException deleteEx) {
getLogger().warn("Failed to delete existing destination file {} on {}:{} due to {}. Move will be attempted regardless.",
destinationFileName, host, port, deleteEx.toString(), deleteEx);
}
break;
case FileTransfer.CONFLICT_RESOLUTION_RENAME:
final String unique = FileTransferConflictUtil.generateUniqueFilename(transfer, absoluteTargetDirPath, destinationFileName, flowFile, getLogger());
if (unique != null) {
destinationFileName = unique;
} else {
getLogger().warn("Could not determine a unique name after 99 attempts for {}. Move will not be performed.", flowFile);
return;
}
break;
case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
getLogger().info("Configured to IGNORE move conflict for {}. Original remote file will be left in place.", flowFile);
return;
case FileTransfer.CONFLICT_RESOLUTION_REJECT:
case FileTransfer.CONFLICT_RESOLUTION_FAIL:
getLogger().warn("Configured to {} on move conflict for {}. Original remote file will be left in place.", strategy, flowFile);
return;
case FileTransfer.CONFLICT_RESOLUTION_NONE:
default:
// Treat as IGNORE for move
getLogger().info("Configured to NONE for move conflict on {}. Original remote file will be left in place.", flowFile);
return;
}
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, destinationFileName);
transfer.rename(flowFile, filename, destinationPath);

} catch (final IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.file.transfer;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;

import java.io.IOException;

public final class FileTransferConflictUtil {
private FileTransferConflictUtil() {
}

/**
* Attempts to generate a unique filename by prefixing with an incrementing integer followed by a dot.
* Returns null if a unique name could not be found within 99 attempts.
*/
public static String generateUniqueFilename(final FileTransfer transfer,
final String path,
final String baseFileName,
final FlowFile flowFile,
final ComponentLog logger) throws IOException {
boolean uniqueNameGenerated;
String candidate = null;
for (int i = 1; i < 100; i++) {
final String possibleFileName = i + "." + baseFileName;
final FileInfo renamedFileInfo = transfer.getRemoteFileInfo(flowFile, path, possibleFileName);
uniqueNameGenerated = (renamedFileInfo == null);
if (uniqueNameGenerated) {
candidate = possibleFileName;
logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", flowFile, candidate);
break;
}
}
return candidate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class FetchFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
FTPTransfer.CONNECTION_TIMEOUT,
FTPTransfer.DATA_TIMEOUT,
FTPTransfer.USE_COMPRESSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class FetchSFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
DISABLE_DIRECTORY_LISTING,
SFTPTransfer.CONNECTION_TIMEOUT,
SFTPTransfer.DATA_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;
import org.apache.nifi.processor.util.file.transfer.FileInfo;
import org.apache.nifi.processor.util.file.transfer.PermissionDeniedException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void setUp() throws Exception {

MockProcessContext ctx = (MockProcessContext) runner.getProcessContext();
setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT,
FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE);
FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE);
ctx.setProperty(FTPTransfer.USERNAME, "foo");
ctx.setProperty(FTPTransfer.PASSWORD, "bar");
}
Expand Down Expand Up @@ -241,6 +242,24 @@ public void testRenameFails() {
assertTrue(proc.fileContents.containsKey("hello.txt"));
}

@Test
public void testMoveConflictReplace() {
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
runner.setProperty(FetchFileTransfer.MOVE_CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);

// Destination exists
proc.addContent("/moved/hello.txt", "old".getBytes());
addFileAndEnqueue("hello.txt");

runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);

assertFalse(proc.fileContents.containsKey("hello.txt"));
assertTrue(proc.fileContents.containsKey("/moved/hello.txt"));
}


@Test
public void testCreateDirFails() {
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
Expand Down Expand Up @@ -319,11 +338,19 @@ public void deleteFile(FlowFile flowFile, String path, String remoteFileName) th
throw new PermissionDeniedException("test permission denied");
}

if (!fileContents.containsKey(remoteFileName)) {
String key;
if (path == null) {
key = remoteFileName;
} else {
key = (path.endsWith("/") ? path.substring(0, path.length() - 1) : path) + "/" + remoteFileName;
}
key = key.replaceAll("/+", "/");

if (!fileContents.containsKey(key)) {
throw new FileNotFoundException();
}

fileContents.remove(remoteFileName);
fileContents.remove(key);
}

@Override
Expand All @@ -332,12 +359,15 @@ public void rename(FlowFile flowFile, String source, String target) throws IOExc
throw new PermissionDeniedException("test permission denied");
}

if (!fileContents.containsKey(source)) {
final String normalizedSource = source.replaceAll("/+", "/");
final String normalizedTarget = target.replaceAll("/+", "/");

if (!fileContents.containsKey(normalizedSource)) {
throw new FileNotFoundException();
}

final byte[] content = fileContents.remove(source);
fileContents.put(target, content);
final byte[] content = fileContents.remove(normalizedSource);
fileContents.put(normalizedTarget, content);
}

@Override
Expand All @@ -347,11 +377,49 @@ public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throw
}
}

@Override
public String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException {
final String abs;
if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) {
abs = new File(getHomeDirectory(flowFile), remotePath).getPath();
} else {
abs = remotePath;
}
String norm = abs.replace("\\", "/");
norm = norm.replaceAll("/+", "/");
if (norm.endsWith("/") && norm.length() > 1) {
norm = norm.substring(0, norm.length() - 1);
}
return norm;
}

@Override
public void close() throws IOException {
super.close();
isClosed = true;
}

@Override
public String getHomeDirectory(FlowFile flowFile) throws IOException {
return "/";
}

@Override
public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
final String dir = path == null ? "/" : path;
String key = (dir.endsWith("/") ? dir.substring(0, dir.length() - 1) : dir) + "/" + remoteFileName;
key = key.replaceAll("/+", "/");
final byte[] content = fileContents.get(key);
if (content == null) {
return null;
}
return new FileInfo.Builder()
.filename(remoteFileName)
.fullPathFileName(key)
.directory(false)
.size(content.length)
.build();
}
};
}
}
Expand Down