diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java index bd98772ab3d6..74f487241734 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClient.java @@ -199,6 +199,42 @@ public FlowRegistryBranch getDefaultBranch(final FlowRegistryClientConfiguration return defaultBranch; } + @Override + public void createBranch(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation sourceLocation, final String newBranchName) + throws FlowRegistryException, IOException { + if (StringUtils.isBlank(newBranchName)) { + throw new IllegalArgumentException("Branch name must be specified when creating a new branch"); + } + + final GitRepositoryClient repositoryClient = getRepositoryClient(context); + verifyWritePermissions(repositoryClient); + + final String sourceBranch = resolveSourceBranch(context, sourceLocation); + if (StringUtils.isBlank(sourceBranch)) { + throw new FlowRegistryException("Unable to determine source branch for new branch creation"); + } + + final Optional sourceCommitSha = sourceLocation == null ? Optional.empty() : Optional.ofNullable(sourceLocation.getVersion()); + final String trimmedBranchName = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + + getLogger().info("Creating branch [{}] from branch [{}]", trimmedBranchName, trimmedSourceBranch); + + try { + repositoryClient.createBranch(trimmedBranchName, trimmedSourceBranch, sourceCommitSha); + } catch (final UnsupportedOperationException e) { + throw new FlowRegistryException("Configured repository client does not support branch creation", e); + } + } + + private String resolveSourceBranch(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation sourceLocation) { + if (sourceLocation != null && StringUtils.isNotBlank(sourceLocation.getBranch())) { + return sourceLocation.getBranch(); + } + final String defaultBranch = context.getProperty(REPOSITORY_BRANCH).getValue(); + return StringUtils.isBlank(defaultBranch) ? null : defaultBranch; + } + @Override public Set getBuckets(final FlowRegistryClientConfigurationContext context, final String branch) throws IOException, FlowRegistryException { final GitRepositoryClient repositoryClient = getRepositoryClient(context); diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java index a64ecab97c97..5acd651208b6 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/main/java/org/apache/nifi/registry/flow/git/client/GitRepositoryClient.java @@ -138,6 +138,21 @@ public interface GitRepositoryClient { */ InputStream deleteContent(String filePath, String commitMessage, String branch) throws FlowRegistryException, IOException; + /** + * Creates a new branch in the repository. + * + * @param newBranchName the name of the branch to create + * @param sourceBranch the name of the source branch + * @param sourceCommitSha optional commit SHA to use as the starting point for the new branch. If empty, the head commit of the source branch should be used. + * @throws IOException if an I/O error occurs + * @throws FlowRegistryException if a non-I/O error occurs + * @throws UnsupportedOperationException if the repository implementation does not support branch creation + */ + default void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + throw new UnsupportedOperationException("Branch creation is not supported"); + } + /** * Closes any resources held by the client. * diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java index 9cec464b2b86..8d006ad8b7a7 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/AbstractGitFlowRegistryClientTest.java @@ -22,7 +22,9 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext; +import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext; import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.git.client.GitCommit; import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest; import org.apache.nifi.registry.flow.git.client.GitRepositoryClient; @@ -30,6 +32,8 @@ import org.apache.nifi.util.MockPropertyValue; import org.junit.jupiter.api.Test; +import javax.net.ssl.SSLContext; + import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -40,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class AbstractGitFlowRegistryClientTest { @@ -49,6 +54,7 @@ void verifySuccessful() throws Exception { final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a", ".git")); final AtomicReference suppliedClient = new AtomicReference<>(repositoryClient); final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> suppliedClient.getAndSet(null), "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -71,6 +77,7 @@ void verifyAuthenticationFailure() { final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> { throw new FlowRegistryException("Authentication failed"); }, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -85,6 +92,7 @@ void verifyAuthenticationFailure() { void verifyReadFailureSkipsBucketListing() throws Exception { final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(false, false, Set.of()); final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -100,10 +108,11 @@ void verifyReadFailureSkipsBucketListing() throws Exception { @Test void verifyBucketListingFailureReported() throws Exception { - final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of()); + final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a")); repositoryClient.setGetTopLevelDirectoryNamesException(new FlowRegistryException("listing error")); final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); final ComponentLog verificationLogger = new MockComponentLog("test-component", this); @@ -118,6 +127,41 @@ void verifyBucketListingFailureReported() throws Exception { assertTrue(repositoryClient.isClosed()); } + @Test + void createBranchDelegatesToRepositoryClient() throws Exception { + final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a")); + final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); + final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("source-branch", "bucket-a", "flow-x", "commit-1"); + + flowRegistryClient.createBranch(context, sourceLocation, " new-branch "); + + assertEquals("new-branch", repositoryClient.getCreatedBranch()); + assertEquals("source-branch", repositoryClient.getCreatedBranchSource()); + assertEquals(Optional.of("commit-1"), repositoryClient.getCreatedBranchCommit()); + } + + @Test + void createBranchUnsupportedThrowsFlowRegistryException() throws Exception { + final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a")); + repositoryClient.setBranchCreationUnsupported(true); + + final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "git@example.git"); + flowRegistryClient.initialize(createInitializationContext()); + final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*"); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation(); + sourceLocation.setBranch("source"); + + final FlowRegistryException exception = assertThrows(FlowRegistryException.class, + () -> flowRegistryClient.createBranch(context, sourceLocation, "new-branch")); + + assertEquals("Configured repository client does not support branch creation", exception.getMessage()); + assertTrue(repositoryClient.getCreatedBranchCommit().isEmpty()); + } + private FlowRegistryClientConfigurationContext createContext(final String branch, final String exclusionPattern) { final Map properties = Map.of( AbstractGitFlowRegistryClient.REPOSITORY_BRANCH, new MockPropertyValue(branch), @@ -145,6 +189,25 @@ public Optional getNiFiUserIdentity() { }; } + private FlowRegistryClientInitializationContext createInitializationContext() { + return new FlowRegistryClientInitializationContext() { + @Override + public String getIdentifier() { + return "test-git-client"; + } + + @Override + public ComponentLog getLogger() { + return new MockComponentLog("test-git-client", AbstractGitFlowRegistryClientTest.this); + } + + @Override + public Optional getSystemSslContext() { + return Optional.empty(); + } + }; + } + private static class TestGitFlowRegistryClient extends AbstractGitFlowRegistryClient { private final RepositoryClientSupplier repositoryClientSupplier; private final String storageLocation; @@ -186,6 +249,11 @@ private static class TestGitRepositoryClient implements GitRepositoryClient { private FlowRegistryException topLevelDirectoryNamesException; private IOException topLevelDirectoryNamesIOException; private boolean closed; + private boolean branchCreationUnsupported; + private FlowRegistryException createBranchException; + private String createdBranch; + private String createdBranchSource; + private Optional createdBranchCommit = Optional.empty(); TestGitRepositoryClient(final boolean canRead, final boolean canWrite, final Set bucketNames) { this.canRead = canRead; @@ -198,11 +266,31 @@ void setGetTopLevelDirectoryNamesException(final FlowRegistryException exception this.topLevelDirectoryNamesIOException = null; } - void setGetTopLevelDirectoryNamesException(final IOException exception) { + void setGetTopLevelDirectoryNamesIOException(final IOException exception) { this.topLevelDirectoryNamesIOException = exception; this.topLevelDirectoryNamesException = null; } + void setBranchCreationUnsupported(final boolean unsupported) { + this.branchCreationUnsupported = unsupported; + } + + void setCreateBranchException(final FlowRegistryException exception) { + this.createBranchException = exception; + } + + String getCreatedBranch() { + return createdBranch; + } + + String getCreatedBranchSource() { + return createdBranchSource; + } + + Optional getCreatedBranchCommit() { + return createdBranchCommit; + } + boolean isClosed() { return closed; } @@ -228,6 +316,21 @@ public Set getTopLevelDirectoryNames(final String branch) throws IOExcep return bucketNames; } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (branchCreationUnsupported) { + throw new UnsupportedOperationException("Branch creation not supported"); + } + if (createBranchException != null) { + throw createBranchException; + } + + createdBranch = newBranchName; + createdBranchSource = sourceBranch; + createdBranchCommit = sourceCommitSha; + } + @Override public void close() { closed = true; @@ -265,7 +368,7 @@ public Optional getContentSha(final String path, final String branch) { @Override public String createContent(final GitCreateContentRequest request) { - throw new UnsupportedOperationException("Not required for test"); + return "test-commit"; } @Override diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java index 942dc708acf9..596ac9561cae 100644 --- a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java @@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.registry.flow.FlowRegistryException; import org.apache.nifi.registry.flow.git.client.GitCommit; @@ -425,6 +426,56 @@ public InputStream deleteContent(final String filePath, final String commitMessa }); } + @Override + public void createBranch(final String newBranchName, final String sourceBranch, final Optional sourceCommitSha) + throws IOException, FlowRegistryException { + if (StringUtils.isBlank(newBranchName)) { + throw new IllegalArgumentException("Branch name must be specified"); + } + if (StringUtils.isBlank(sourceBranch)) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String trimmedNewBranch = newBranchName.trim(); + final String trimmedSourceBranch = sourceBranch.trim(); + final String newBranchRefPath = "heads/" + trimmedNewBranch; + final String sourceBranchRefPath = "heads/" + trimmedSourceBranch; + + try { + execute(() -> repository.getRef(newBranchRefPath)); + throw new FlowRegistryException("Branch [" + trimmedNewBranch + "] already exists"); + } catch (final FileNotFoundException notFound) { + logger.debug("Branch [{}] does not exist and will be created", trimmedNewBranch, notFound); + } + + final GHRef sourceBranchRef; + try { + sourceBranchRef = execute(() -> repository.getRef(sourceBranchRefPath)); + } catch (final FileNotFoundException notFound) { + throw new FlowRegistryException("Source branch [" + trimmedSourceBranch + "] does not exist", notFound); + } + + final String baseCommitSha; + if (sourceCommitSha.isPresent()) { + final String requestedCommitSha = sourceCommitSha.get(); + try { + baseCommitSha = execute(() -> repository.getCommit(requestedCommitSha).getSHA1()); + } catch (final FileNotFoundException notFound) { + throw new FlowRegistryException("Commit [" + requestedCommitSha + "] was not found in the repository", notFound); + } + } else { + baseCommitSha = sourceBranchRef.getObject().getSha(); + } + + logger.info("Creating branch [{}] from [{}] at commit [{}] for repository [{}]", + trimmedNewBranch, trimmedSourceBranch, baseCommitSha, repository.getFullName()); + + execute(() -> { + repository.createRef(BRANCH_REF_PATTERN.formatted(trimmedNewBranch), baseCommitSha); + return null; + }); + } + private String getResolvedPath(final String path) { return repoPath == null ? path : repoPath + "/" + path; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CreateFlowBranchRequestEntity.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CreateFlowBranchRequestEntity.java new file mode 100644 index 000000000000..8e57e8387eea --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/CreateFlowBranchRequestEntity.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.RevisionDTO; + +@XmlRootElement(name = "createFlowBranchRequestEntity") +public class CreateFlowBranchRequestEntity extends Entity { + + private RevisionDTO processGroupRevision; + private String branch; + private String sourceBranch; + private String sourceVersion; + private Boolean disconnectedNodeAcknowledged; + + @Schema(description = "The Revision of the Process Group under Version Control") + public RevisionDTO getProcessGroupRevision() { + return processGroupRevision; + } + + public void setProcessGroupRevision(final RevisionDTO processGroupRevision) { + this.processGroupRevision = processGroupRevision; + } + + @Schema(description = "The name of the new branch to create") + public String getBranch() { + return branch; + } + + public void setBranch(final String branch) { + this.branch = branch; + } + + @Schema(description = "The name of the source branch to create the new branch from. Defaults to the branch currently tracking in NiFi.") + public String getSourceBranch() { + return sourceBranch; + } + + public void setSourceBranch(final String sourceBranch) { + this.sourceBranch = sourceBranch; + } + + @Schema(description = "The version on the source branch to use when creating the new branch. Defaults to the version currently tracked by NiFi.") + public String getSourceVersion() { + return sourceVersion; + } + + public void setSourceVersion(final String sourceVersion) { + this.sourceVersion = sourceVersion; + } + + @Schema(description = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.") + public Boolean isDisconnectedNodeAcknowledged() { + return disconnectedNodeAcknowledged; + } + + public void setDisconnectedNodeAcknowledged(final Boolean disconnectedNodeAcknowledged) { + this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java index 5ce884b74eb0..e23ce50c2318 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java @@ -448,6 +448,12 @@ public Set getFlows(final FlowRegistryClientUserContext context, return node.getFlows(context, bucketLocation); } + @Override + public void createBranch(final FlowRegistryClientUserContext context, final FlowVersionLocation sourceLocation, final String newBranchName) + throws FlowRegistryException, IOException { + node.createBranch(context, sourceLocation, newBranchName); + } + @Override public FlowSnapshotContainer getFlowContents(final FlowRegistryClientUserContext context, final FlowVersionLocation flowVersionLocation, final boolean fetchRemoteFlows) throws FlowRegistryException, IOException { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java index 69ae02c5695d..9bae75e97a9a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java @@ -243,6 +243,15 @@ public Set getFlows(final FlowRegistryClientUserContext context, return execute(() -> client.get().getComponent().getFlows(getConfigurationContext(context), bucketLocation)); } + @Override + public void createBranch(final FlowRegistryClientUserContext context, final FlowVersionLocation sourceLocation, final String newBranchName) + throws FlowRegistryException, IOException { + execute(() -> { + client.get().getComponent().createBranch(getConfigurationContext(context), sourceLocation, newBranchName); + return null; + }); + } + @Override public FlowSnapshotContainer getFlowContents(final FlowRegistryClientUserContext context, final FlowVersionLocation flowVersionLocation, final boolean fetchRemoteFlows) throws FlowRegistryException, IOException { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java index e07e826888bf..dd91238ed1b5 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java @@ -52,6 +52,7 @@ public interface FlowRegistryClientNode extends ComponentNode { RegisteredFlow getFlow(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException; Set getFlows(FlowRegistryClientUserContext context, BucketLocation bucketLocation) throws FlowRegistryException, IOException; + void createBranch(FlowRegistryClientUserContext context, FlowVersionLocation sourceLocation, String newBranchName) throws FlowRegistryException, IOException; FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, FlowVersionLocation flowVersionLocation, boolean fetchRemoteFlows) throws FlowRegistryException, IOException; RegisteredFlowSnapshot registerFlowSnapshot( diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index e5dbc47ee2cf..b71f05495fd7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1669,6 +1669,18 @@ RegisteredFlowSnapshot registerVersionedFlowSnapshot(String registryId, Register VersionControlInformationEntity setVersionControlInformation(Revision processGroupRevision, String processGroupId, VersionControlInformationDTO versionControlInfo, Map versionedComponentMapping); + /** + * Creates a new branch in the associated Flow Registry for the specified Process Group and updates the local Version Control information to track the new branch. + * + * @param revision the revision for the Process Group + * @param processGroupId the Process Group identifier + * @param newBranchName the name of the new branch to create + * @param sourceBranch the branch to branch from + * @param sourceVersion the commit/version on the source branch to branch from + * @return the updated Version Control information + */ + VersionControlInformationEntity createFlowBranch(Revision revision, String processGroupId, String newBranchName, String sourceBranch, String sourceVersion); + /** * Disconnects the specified Process Group from version control. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 6cb0c7dc83de..225b8741155c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -188,9 +188,11 @@ import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VerifiableFlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; @@ -5722,6 +5724,142 @@ public VersionControlInformationEntity setVersionControlInformation(final Revisi return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification())); } + @Override + public VersionControlInformationEntity createFlowBranch(final Revision revision, final String processGroupId, final String newBranchName, + final String sourceBranch, final String sourceVersion) { + final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation versionControlInformation = group.getVersionControlInformation(); + + if (versionControlInformation == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is not currently under Version Control"); + } + + final String trimmedBranchName = org.apache.commons.lang3.StringUtils.trimToNull(newBranchName); + if (trimmedBranchName == null) { + throw new IllegalArgumentException("Branch name must be specified"); + } + if (trimmedBranchName.equals(versionControlInformation.getBranch())) { + throw new IllegalArgumentException("Process Group is already tracking branch " + trimmedBranchName); + } + + final String resolvedSourceBranch = org.apache.commons.lang3.StringUtils.isNotBlank(sourceBranch) ? sourceBranch : versionControlInformation.getBranch(); + if (org.apache.commons.lang3.StringUtils.isBlank(resolvedSourceBranch)) { + throw new IllegalArgumentException("Source branch must be specified"); + } + + final String resolvedSourceVersion = org.apache.commons.lang3.StringUtils.isNotBlank(sourceVersion) ? sourceVersion : versionControlInformation.getVersion(); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation(resolvedSourceBranch, + versionControlInformation.getBucketIdentifier(), + versionControlInformation.getFlowIdentifier(), + resolvedSourceVersion); + + final FlowRegistryClientUserContext clientUserContext = FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()); + + try { + flowRegistryDAO.createBranchForUser(clientUserContext, versionControlInformation.getRegistryIdentifier(), sourceLocation, trimmedBranchName); + } catch (final UnsupportedOperationException e) { + throw new IllegalArgumentException("Configured Flow Registry does not support branch creation", e); + } + + final VersionControlInformationDTO updatedVersionControlInformation = new VersionControlInformationDTO(); + updatedVersionControlInformation.setGroupId(processGroupId); + updatedVersionControlInformation.setRegistryId(versionControlInformation.getRegistryIdentifier()); + updatedVersionControlInformation.setRegistryName(versionControlInformation.getRegistryName()); + updatedVersionControlInformation.setBucketId(versionControlInformation.getBucketIdentifier()); + updatedVersionControlInformation.setBucketName(versionControlInformation.getBucketName()); + updatedVersionControlInformation.setFlowId(versionControlInformation.getFlowIdentifier()); + updatedVersionControlInformation.setFlowName(versionControlInformation.getFlowName()); + updatedVersionControlInformation.setFlowDescription(versionControlInformation.getFlowDescription()); + updatedVersionControlInformation.setStorageLocation(versionControlInformation.getStorageLocation()); + updatedVersionControlInformation.setBranch(trimmedBranchName); + updatedVersionControlInformation.setVersion(resolvedSourceVersion); + + final VersionedFlowStatus status = versionControlInformation.getStatus(); + VersionedFlowState updatedState = null; + String stateExplanation = status == null ? null : status.getStateExplanation(); + if (status != null) { + final VersionedFlowState state = status.getState(); + if (state != null) { + switch (state) { + case LOCALLY_MODIFIED_AND_STALE: + updatedState = VersionedFlowState.LOCALLY_MODIFIED; + stateExplanation = "Process Group has local modifications"; + break; + case STALE: + updatedState = VersionedFlowState.UP_TO_DATE; + break; + default: + updatedState = state; + break; + } + } + } + + if (updatedState != null) { + updatedVersionControlInformation.setState(updatedState.name()); + updatedVersionControlInformation.setStateExplanation(stateExplanation); + } + + final FlowManager flowManager = controllerFacade.getFlowManager(); + + VersionedProcessGroup registrySnapshot = null; + if (flowManager == null) { + logger.warn("Failed to synchronize Process Group {} with Flow Registry after creating branch {} because Flow Manager is unavailable", group.getIdentifier(), trimmedBranchName); + } else { + try { + final FlowRegistryClientNode registryClient = flowManager.getFlowRegistryClient(versionControlInformation.getRegistryIdentifier()); + if (registryClient == null) { + logger.warn("Unable to retrieve Flow Registry client with identifier {} for Process Group {}", versionControlInformation.getRegistryIdentifier(), group.getIdentifier()); + } else { + final FlowVersionLocation branchLocation = new FlowVersionLocation(trimmedBranchName, + versionControlInformation.getBucketIdentifier(), + versionControlInformation.getFlowIdentifier(), + resolvedSourceVersion); + + final FlowSnapshotContainer snapshotContainer = registryClient.getFlowContents(clientUserContext, branchLocation, false); + final RegisteredFlowSnapshot flowSnapshot = snapshotContainer == null ? null : snapshotContainer.getFlowSnapshot(); + if (flowSnapshot != null) { + registrySnapshot = flowSnapshot.getFlowContents(); + } + } + } catch (final IOException | FlowRegistryException e) { + logger.warn("Failed to retrieve Flow Registry snapshot for Process Group {}", group.getIdentifier(), e); + } + } + + final RevisionUpdate snapshot = updateComponent(revision, + group, + () -> processGroupDAO.updateVersionControlInformation(updatedVersionControlInformation, Collections.emptyMap()), + processGroup -> dtoFactory.createVersionControlInformationDto(processGroup)); + + final VersionControlInformation updatedVci = group.getVersionControlInformation(); + if (registrySnapshot != null) { + final StandardVersionControlInformation restoredInfo = StandardVersionControlInformation.Builder.fromDto(updatedVersionControlInformation) + .flowSnapshot(registrySnapshot) + .build(); + restoredInfo.setBucketName(updatedVersionControlInformation.getBucketName()); + restoredInfo.setFlowName(updatedVersionControlInformation.getFlowName()); + restoredInfo.setFlowDescription(updatedVersionControlInformation.getFlowDescription()); + restoredInfo.setStorageLocation(updatedVersionControlInformation.getStorageLocation()); + + group.setVersionControlInformation(restoredInfo, Collections.emptyMap()); + } else if (updatedVci instanceof StandardVersionControlInformation) { + ((StandardVersionControlInformation) updatedVci).setFlowSnapshot(null); + } + + if (flowManager != null) { + try { + group.synchronizeWithFlowRegistry(flowManager); + } catch (final Exception e) { + logger.warn("Failed to synchronize Process Group {} with Flow Registry after creating branch {}", group.getIdentifier(), trimmedBranchName, e); + } + } + + final VersionControlInformationDTO refreshedDto = dtoFactory.createVersionControlInformationDto(group); + return entityFactory.createVersionControlInformationEntity(refreshedDto, dtoFactory.createRevisionDTO(snapshot.getLastModification())); + } + @Override public VersionControlInformationEntity deleteVersionControl(final Revision revision, final String processGroupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index ea22448180f1..b2bc3824a644 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -64,6 +64,7 @@ import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.CreateActiveRequestEntity; +import org.apache.nifi.web.api.entity.CreateFlowBranchRequestEntity; import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; @@ -623,6 +624,83 @@ private void unlockVersionControl(final URI requestUri, final String groupId) { } } + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("process-groups/{id}/branches") + @Operation( + summary = "Creates a new branch for a version controlled Process Group", + description = NON_GUARANTEED_ENDPOINT, + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = VersionControlInformationEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Read - /process-groups/{uuid}"), + @SecurityRequirement(name = "Write - /process-groups/{uuid}") + } + ) + public Response createFlowBranch( + @Parameter(description = "The process group id.") @PathParam("id") final String groupId, + @Parameter(description = "The branch creation request.", required = true) final CreateFlowBranchRequestEntity requestEntity) { + + if (requestEntity == null) { + throw new IllegalArgumentException("Branch creation request must be specified."); + } + + final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); + if (revisionDto == null) { + throw new IllegalArgumentException("Process Group Revision must be specified"); + } + if (StringUtils.isBlank(requestEntity.getBranch())) { + throw new IllegalArgumentException("Branch name must be specified"); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.POST, requestEntity); + } else if (isDisconnectedFromCluster()) { + verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged()); + } + + final Revision requestRevision = getRevision(revisionDto, groupId); + + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + processGroup.authorize(authorizer, RequestAction.READ, user); + processGroup.authorize(authorizer, RequestAction.WRITE, user); + }, + () -> { + final VersionControlInformationEntity currentVersionControlInfo = serviceFacade.getVersionControlInformation(groupId); + if (currentVersionControlInfo == null || currentVersionControlInfo.getVersionControlInformation() == null) { + throw new IllegalStateException("Process Group with ID " + groupId + " is not currently under Version Control"); + } + + final VersionControlInformationDTO currentInfo = currentVersionControlInfo.getVersionControlInformation(); + if (VersionControlInformationDTO.SYNC_FAILURE.equals(currentInfo.getState())) { + throw new IllegalStateException("Process Group with ID " + groupId + " cannot create a new branch while reporting Sync Failure"); + } + }, + (revision, entity) -> { + final VersionControlInformationEntity responseEntity = serviceFacade.createFlowBranch( + revision, + groupId, + entity.getBranch(), + entity.getSourceBranch(), + entity.getSourceVersion()); + + return generateOkResponse(responseEntity).build(); + }); + } + private String lockVersionControl(final URI originalUri, final String groupId) throws URISyntaxException { final URI createRequestUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), "/nifi-api/versions/active-requests", null, originalUri.getFragment()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java index 3c572cf4bb69..a773a6838b20 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/FlowRegistryDAO.java @@ -21,6 +21,7 @@ import org.apache.nifi.registry.flow.FlowRegistryBucket; import org.apache.nifi.registry.flow.FlowRegistryClientNode; import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO; @@ -54,6 +55,8 @@ public interface FlowRegistryDAO { Set getFlowVersionsForUser(FlowRegistryClientUserContext context, String branch, String registryId, String bucketId, String flowId); + void createBranchForUser(FlowRegistryClientUserContext context, String registryId, FlowVersionLocation sourceLocation, String newBranchName); + FlowRegistryClientNode removeFlowRegistry(String registryId); void verifyConfigVerification(String registryId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java index 60fceb53d757..3b76ad06f3dd 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java @@ -20,6 +20,12 @@ import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.LogRepository; +import org.apache.nifi.logging.StandardLoggingContext; +import org.apache.nifi.logging.repository.NopLogRepository; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.registry.flow.BucketLocation; import org.apache.nifi.registry.flow.FlowLocation; import org.apache.nifi.registry.flow.FlowRegistryBranch; @@ -27,20 +33,15 @@ import org.apache.nifi.registry.flow.FlowRegistryClientNode; import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowVersionLocation; import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; -import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.util.BundleUtils; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO; import org.apache.nifi.web.api.dto.FlowRegistryClientDTO; import org.apache.nifi.web.dao.FlowRegistryDAO; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.LogRepository; -import org.apache.nifi.logging.StandardLoggingContext; -import org.apache.nifi.logging.repository.NopLogRepository; -import org.apache.nifi.processor.SimpleProcessLogger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @@ -209,6 +210,22 @@ public RegisteredFlow getFlowForUser(final FlowRegistryClientUserContext context } } + @Override + public void createBranchForUser(final FlowRegistryClientUserContext context, final String registryId, final FlowVersionLocation sourceLocation, final String newBranchName) { + final FlowRegistryClientNode flowRegistry = flowController.getFlowManager().getFlowRegistryClient(registryId); + if (flowRegistry == null) { + throw new IllegalArgumentException("Registry ID [%s] not found".formatted(registryId)); + } + + try { + flowRegistry.createBranch(context, sourceLocation, newBranchName); + } catch (final UnsupportedOperationException e) { + throw e; + } catch (final IOException | FlowRegistryException ioe) { + throw new NiFiCoreException("Unable to create branch [" + newBranchName + "] in registry with ID " + registryId, ioe); + } + } + @Override public Set getFlowVersionsForUser(final FlowRegistryClientUserContext context, final String registryId, final String branch, final String bucketId, final String flowId) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeCreateFlowBranchTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeCreateFlowBranchTest.java new file mode 100644 index 000000000000..262688cd4731 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeCreateFlowBranchTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web; + +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserDetails; +import org.apache.nifi.authorization.user.StandardNiFiUser; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.FlowRegistryClientNode; +import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; +import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowSnapshotContainer; +import org.apache.nifi.registry.flow.FlowVersionLocation; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; +import org.apache.nifi.web.api.dto.DtoFactory; +import org.apache.nifi.web.api.dto.EntityFactory; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.apache.nifi.web.controller.ControllerFacade; +import org.apache.nifi.web.dao.FlowRegistryDAO; +import org.apache.nifi.web.dao.ProcessGroupDAO; +import org.apache.nifi.web.revision.RevisionClaim; +import org.apache.nifi.web.revision.RevisionManager; +import org.apache.nifi.web.revision.UpdateRevisionTask; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.security.authentication.TestingAuthenticationToken; +import org.springframework.security.core.context.SecurityContextHolder; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StandardNiFiServiceFacadeCreateFlowBranchTest { + + private StandardNiFiServiceFacade serviceFacade; + + @Mock + private ProcessGroupDAO processGroupDAO; + + @Mock + private FlowRegistryDAO flowRegistryDAO; + + @Mock + private DtoFactory dtoFactory; + + @Mock + private EntityFactory entityFactory; + + @Mock + private ControllerFacade controllerFacade; + + @Mock + private RevisionManager revisionManager; + + @Mock + private FlowManager flowManager; + + private static final String PROCESS_GROUP_ID = "pg-1"; + + @BeforeEach + void setUp() { + serviceFacade = new StandardNiFiServiceFacade(); + serviceFacade.setProcessGroupDAO(processGroupDAO); + serviceFacade.setFlowRegistryDAO(flowRegistryDAO); + serviceFacade.setDtoFactory(dtoFactory); + serviceFacade.setEntityFactory(entityFactory); + serviceFacade.setControllerFacade(controllerFacade); + serviceFacade.setRevisionManager(revisionManager); + + lenient().when(controllerFacade.getFlowManager()).thenReturn(flowManager); + lenient().when(flowManager.getFlowRegistryClient(anyString())).thenReturn(null); + + lenient().when(revisionManager.updateRevision(any(RevisionClaim.class), any(NiFiUser.class), any(UpdateRevisionTask.class))) + .thenAnswer(invocation -> { + final UpdateRevisionTask task = invocation.getArgument(2); + return task.update(); + }); + lenient().when(revisionManager.getRevision(anyString())).thenAnswer(invocation -> { + final String componentId = invocation.getArgument(0, String.class); + return new Revision(1L, "client-1", componentId); + }); + + final NiFiUser user = new StandardNiFiUser.Builder().identity("unit-test").build(); + final TestingAuthenticationToken authenticationToken = new TestingAuthenticationToken(new NiFiUserDetails(user), null); + SecurityContextHolder.getContext().setAuthentication(authenticationToken); + } + + @AfterEach + void tearDown() { + SecurityContextHolder.clearContext(); + } + + @Test + void testCreateFlowBranchSuccess() throws IOException, FlowRegistryException { + final Revision revision = new Revision(1L, "client-1", PROCESS_GROUP_ID); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1"); + when(versionControlInformation.getBranch()).thenReturn("main"); + when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1"); + when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1"); + when(versionControlInformation.getFlowDescription()).thenReturn("desc"); + when(versionControlInformation.getFlowName()).thenReturn("name"); + when(versionControlInformation.getStorageLocation()).thenReturn("loc"); + when(versionControlInformation.getVersion()).thenReturn("1"); + + final VersionedFlowStatus flowStatus = mock(VersionedFlowStatus.class); + when(versionControlInformation.getStatus()).thenReturn(flowStatus); + when(flowStatus.getState()).thenReturn(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE); + when(flowStatus.getStateExplanation()).thenReturn("Up to date"); + + final ProcessGroup updatedGroup = processGroup; + when(processGroupDAO.updateVersionControlInformation(any(VersionControlInformationDTO.class), eq(Collections.emptyMap()))) + .thenReturn(updatedGroup); + + final VersionControlInformationDTO updatedDto = new VersionControlInformationDTO(); + updatedDto.setBranch("feature"); + updatedDto.setRegistryId("registry-1"); + + final VersionControlInformationDTO refreshedDto = new VersionControlInformationDTO(); + refreshedDto.setBranch("feature"); + refreshedDto.setRegistryId("registry-1"); + refreshedDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED); + refreshedDto.setStateExplanation("Process Group has local modifications"); + + when(dtoFactory.createVersionControlInformationDto(updatedGroup)).thenReturn(updatedDto, refreshedDto); + + final VersionControlInformationEntity resultEntity = new VersionControlInformationEntity(); + resultEntity.setVersionControlInformation(refreshedDto); + when(entityFactory.createVersionControlInformationEntity(eq(refreshedDto), any(RevisionDTO.class))).thenReturn(resultEntity); + + when(dtoFactory.createRevisionDTO(any(FlowModification.class))).thenReturn(new RevisionDTO()); + + final FlowRegistryClientNode registryClient = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(registryClient); + final VersionedProcessGroup registrySnapshot = new VersionedProcessGroup(); + final RegisteredFlowSnapshot registeredFlowSnapshot = new RegisteredFlowSnapshot(); + registeredFlowSnapshot.setFlowContents(registrySnapshot); + final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(registeredFlowSnapshot); + when(registryClient.getFlowContents(any(), any(FlowVersionLocation.class), eq(false))).thenReturn(snapshotContainer); + + final VersionControlInformationEntity response = serviceFacade.createFlowBranch(revision, PROCESS_GROUP_ID, " feature ", null, null); + assertEquals(resultEntity, response); + + ArgumentCaptor locationCaptor = ArgumentCaptor.forClass(FlowVersionLocation.class); + verify(flowRegistryDAO).createBranchForUser(any(FlowRegistryClientUserContext.class), eq("registry-1"), + locationCaptor.capture(), eq("feature")); + + final FlowVersionLocation capturedLocation = locationCaptor.getValue(); + assertEquals("main", capturedLocation.getBranch()); + assertEquals("bucket-1", capturedLocation.getBucketId()); + assertEquals("flow-1", capturedLocation.getFlowId()); + assertEquals("1", capturedLocation.getVersion()); + + final ArgumentCaptor modificationCaptor = ArgumentCaptor.forClass(FlowModification.class); + verify(dtoFactory).createRevisionDTO(modificationCaptor.capture()); + assertEquals("unit-test", modificationCaptor.getValue().getLastModifier()); + + verify(registryClient).getFlowContents(any(), any(FlowVersionLocation.class), eq(false)); + verify(processGroup).setVersionControlInformation(argThat(vci -> vci instanceof StandardVersionControlInformation + && ((StandardVersionControlInformation) vci).getFlowSnapshot() == registrySnapshot), eq(Collections.emptyMap())); + verify(processGroup).synchronizeWithFlowRegistry(flowManager); + verify(entityFactory).createVersionControlInformationEntity(eq(refreshedDto), any(RevisionDTO.class)); + assertEquals(refreshedDto, resultEntity.getVersionControlInformation()); + } + + @Test + void testCreateFlowBranchSameBranchRejected() { + final Revision revision = new Revision(1L, "client-1", PROCESS_GROUP_ID); + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getBranch()).thenReturn("main"); + + assertThrows(IllegalArgumentException.class, + () -> serviceFacade.createFlowBranch(revision, PROCESS_GROUP_ID, "main", null, null)); + + verify(flowRegistryDAO, never()).createBranchForUser(any(), any(), any(), any()); + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } + + @Test + void testCreateFlowBranchUnsupportedRegistry() throws IOException, FlowRegistryException { + final Revision revision = new Revision(1L, "client-1", PROCESS_GROUP_ID); + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1"); + when(versionControlInformation.getBranch()).thenReturn("main"); + when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1"); + when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1"); + when(versionControlInformation.getVersion()).thenReturn("1"); + + doThrow(new UnsupportedOperationException("not supported")) + .when(flowRegistryDAO) + .createBranchForUser(any(FlowRegistryClientUserContext.class), eq("registry-1"), any(FlowVersionLocation.class), eq("feature")); + + assertThrows(IllegalArgumentException.class, + () -> serviceFacade.createFlowBranch(revision, PROCESS_GROUP_ID, "feature", null, null)); + + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } + + @Test + void testCreateFlowBranchNotVersionControlled() { + final Revision revision = new Revision(1L, "client-1", PROCESS_GROUP_ID); + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup); + when(processGroup.getVersionControlInformation()).thenReturn(null); + + assertThrows(IllegalStateException.class, + () -> serviceFacade.createFlowBranch(revision, PROCESS_GROUP_ID, "feature", null, null)); + + verify(flowRegistryDAO, never()).createBranchForUser(any(), any(), any(), any()); + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } + + @Test + void testCreateFlowBranchPropagatesRegistryErrors() throws IOException, FlowRegistryException { + final Revision revision = new Revision(1L, "client-1", PROCESS_GROUP_ID); + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroupDAO.getProcessGroup(PROCESS_GROUP_ID)).thenReturn(processGroup); + + final VersionControlInformation versionControlInformation = mock(VersionControlInformation.class); + when(processGroup.getVersionControlInformation()).thenReturn(versionControlInformation); + when(versionControlInformation.getRegistryIdentifier()).thenReturn("registry-1"); + when(versionControlInformation.getBranch()).thenReturn("main"); + when(versionControlInformation.getBucketIdentifier()).thenReturn("bucket-1"); + when(versionControlInformation.getFlowIdentifier()).thenReturn("flow-1"); + when(versionControlInformation.getVersion()).thenReturn("1"); + + final FlowRegistryException cause = new FlowRegistryException("Branch [feature] already exists"); + doThrow(new NiFiCoreException("Unable to create branch [feature] in registry with ID registry-1", cause)) + .when(flowRegistryDAO) + .createBranchForUser(any(FlowRegistryClientUserContext.class), eq("registry-1"), any(FlowVersionLocation.class), eq("feature")); + + final NiFiCoreException exception = assertThrows(NiFiCoreException.class, + () -> serviceFacade.createFlowBranch(revision, PROCESS_GROUP_ID, "feature", null, null)); + + assertTrue(exception.getMessage().contains("registry-1")); + assertTrue(exception.getMessage().contains("[feature]")); + assertEquals(cause, exception.getCause()); + + verify(processGroup, never()).synchronizeWithFlowRegistry(any(FlowManager.class)); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java index 2a1161782da6..f5a1fd88f634 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestVersionsResource.java @@ -16,23 +16,41 @@ */ package org.apache.nifi.web.api; +import jakarta.ws.rs.core.Response; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.FlowSnapshotContainer; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.entity.CreateFlowBranchRequestEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.mock.web.MockHttpServletRequest; -import jakarta.ws.rs.core.Response; +import java.net.HttpURLConnection; import java.util.Collections; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -45,6 +63,17 @@ public class TestVersionsResource { @Mock private NiFiServiceFacade serviceFacade; + @Mock + private NiFiProperties properties; + + @BeforeEach + public void setUp() { + versionsResource.setProperties(properties); + lenient().when(properties.isNode()).thenReturn(false); + lenient().when(properties.isClustered()).thenReturn(false); + versionsResource.httpServletRequest = new MockHttpServletRequest(); + } + @Test public void testExportFlowVersion() { final String groupId = UUID.randomUUID().toString(); @@ -70,7 +99,7 @@ public void testExportFlowVersion() { final RegisteredFlowSnapshot resultEntity = (RegisteredFlowSnapshot) response.getEntity(); - assertEquals(200, response.getStatus()); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); assertEquals(versionedFlowSnapshot, resultEntity); verify(versionedFlowSnapshot).setFlow(null); @@ -81,4 +110,238 @@ public void testExportFlowVersion() { verify(innerInnerVersionedProcessGroup).setVersionedFlowCoordinates(null); } -} \ No newline at end of file + @Test + public void testCreateFlowBranchRequiresBranchName() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(0L); + requestEntity.setProcessGroupRevision(revisionDTO); + + assertThrows(IllegalArgumentException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchInvokesService() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("3"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + final VersionControlInformationEntity expectedEntity = new VersionControlInformationEntity(); + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenReturn(expectedEntity); + + final Response response = versionsResource.createFlowBranch(groupId, requestEntity); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(expectedEntity, response.getEntity()); + + ArgumentCaptor revisionCaptor = ArgumentCaptor.forClass(Revision.class); + verify(serviceFacade).createFlowBranch(revisionCaptor.capture(), eq(groupId), eq("feature"), isNull(), isNull()); + + final Revision capturedRevision = revisionCaptor.getValue(); + assertEquals(1L, capturedRevision.getVersion()); + assertEquals("client-id", capturedRevision.getClientId()); + assertEquals(groupId, capturedRevision.getComponentId()); + } + + @Test + public void testCreateFlowBranchFailsWhenBranchExists() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("main"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("main"), any(), any())) + .thenThrow(new IllegalArgumentException("Process Group is already tracking branch main")); + + assertThrows(IllegalArgumentException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchUnsupported() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("2"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenThrow(new IllegalArgumentException("Registry does not support branching")); + + assertThrows(IllegalArgumentException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchAllowedWhenLocallyModified() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("1"); + currentDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + final VersionControlInformationEntity expectedEntity = new VersionControlInformationEntity(); + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenReturn(expectedEntity); + + final Response response = versionsResource.createFlowBranch(groupId, requestEntity); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(expectedEntity, response.getEntity()); + } + + @Test + public void testCreateFlowBranchBlockedWhenSyncFailure() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("1"); + currentDto.setState(VersionControlInformationDTO.SYNC_FAILURE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + assertThrows(IllegalStateException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + + verify(serviceFacade, never()).createFlowBranch(any(), anyString(), anyString(), any(), any()); + } + + @Test + public void testCreateFlowBranchAllowedWhenLocallyModifiedAndStale() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("1"); + currentDto.setState(VersionControlInformationDTO.LOCALLY_MODIFIED_AND_STALE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + final VersionControlInformationEntity expectedEntity = new VersionControlInformationEntity(); + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenReturn(expectedEntity); + + final Response response = versionsResource.createFlowBranch(groupId, requestEntity); + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(expectedEntity, response.getEntity()); + } + + @Test + public void testCreateFlowBranchBranchAlreadyExists() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationDTO currentDto = new VersionControlInformationDTO(); + currentDto.setGroupId(groupId); + currentDto.setBranch("main"); + currentDto.setVersion("3"); + currentDto.setState(VersionControlInformationDTO.UP_TO_DATE); + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(currentDto); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + when(serviceFacade.createFlowBranch(any(Revision.class), eq(groupId), eq("feature"), any(), any())) + .thenThrow(new NiFiCoreException("Unable to create branch [feature] in registry with ID registry-1: Branch [feature] already exists")); + + assertThrows(NiFiCoreException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } + + @Test + public void testCreateFlowBranchNotVersionControlled() { + final String groupId = UUID.randomUUID().toString(); + final CreateFlowBranchRequestEntity requestEntity = new CreateFlowBranchRequestEntity(); + final RevisionDTO revisionDTO = new RevisionDTO(); + revisionDTO.setClientId("client-id"); + revisionDTO.setVersion(1L); + requestEntity.setProcessGroupRevision(revisionDTO); + requestEntity.setBranch("feature"); + + versionsResource.httpServletRequest = new MockHttpServletRequest(); + + final VersionControlInformationEntity currentEntity = new VersionControlInformationEntity(); + currentEntity.setVersionControlInformation(null); + when(serviceFacade.getVersionControlInformation(groupId)).thenReturn(currentEntity); + + assertThrows(IllegalStateException.class, () -> versionsResource.createFlowBranch(groupId, requestEntity)); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java new file mode 100644 index 000000000000..43f4d69377b0 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAOTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.dao.impl; + +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.registry.flow.FlowRegistryClientNode; +import org.apache.nifi.registry.flow.FlowRegistryClientUserContext; +import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowVersionLocation; +import org.apache.nifi.web.NiFiCoreException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StandardFlowRegistryDAOTest { + + @Mock + private FlowController flowController; + + @Mock + private FlowManager flowManager; + + @Mock + private FlowRegistryClientUserContext userContext; + + private StandardFlowRegistryDAO dao; + + @BeforeEach + void setUp() { + dao = new StandardFlowRegistryDAO(); + dao.setFlowController(flowController); + + when(flowController.getFlowManager()).thenReturn(flowManager); + } + + @Test + void testCreateBranchDelegatesToRegistryClient() throws IOException, FlowRegistryException { + final FlowRegistryClientNode clientNode = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + + dao.createBranchForUser(userContext, "registry-1", sourceLocation, "feature"); + + ArgumentCaptor locationCaptor = ArgumentCaptor.forClass(FlowVersionLocation.class); + verify(clientNode).createBranch(eq(userContext), locationCaptor.capture(), eq("feature")); + + final FlowVersionLocation capturedLocation = locationCaptor.getValue(); + assertEquals("main", capturedLocation.getBranch()); + assertEquals("bucket", capturedLocation.getBucketId()); + assertEquals("flow", capturedLocation.getFlowId()); + assertEquals("1", capturedLocation.getVersion()); + } + + @Test + void testCreateBranchUnsupported() throws IOException, FlowRegistryException { + final FlowRegistryClientNode clientNode = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode); + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + + doThrow(new UnsupportedOperationException("not supported")) + .when(clientNode) + .createBranch(userContext, sourceLocation, "feature"); + + assertThrows(UnsupportedOperationException.class, + () -> dao.createBranchForUser(userContext, "registry-1", sourceLocation, "feature")); + } + + @Test + void testCreateBranchUnknownRegistry() { + when(flowManager.getFlowRegistryClient("missing")).thenReturn(null); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + assertThrows(IllegalArgumentException.class, + () -> dao.createBranchForUser(userContext, "missing", sourceLocation, "feature")); + } + + @Test + void testCreateBranchFlowRegistryExceptionWrapped() throws IOException, FlowRegistryException { + final FlowRegistryClientNode clientNode = mock(FlowRegistryClientNode.class); + when(flowManager.getFlowRegistryClient("registry-1")).thenReturn(clientNode); + + final FlowVersionLocation sourceLocation = new FlowVersionLocation("main", "bucket", "flow", "1"); + final FlowRegistryException cause = new FlowRegistryException("Branch [feature] already exists"); + doThrow(cause) + .when(clientNode) + .createBranch(userContext, sourceLocation, "feature"); + + final NiFiCoreException exception = assertThrows(NiFiCoreException.class, + () -> dao.createBranchForUser(userContext, "registry-1", sourceLocation, "feature")); + + assertTrue(exception.getMessage().contains("registry-1")); + assertTrue(exception.getMessage().contains("[feature]")); + assertEquals(cause, exception.getCause()); + } +} diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts index 8fa55e42f558..99543aed8317 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts @@ -40,6 +40,7 @@ import { openChangeProcessorVersionDialog, openChangeVersionDialogRequest, openCommitLocalChangesDialogRequest, + openCreateBranchDialogRequest, openForceCommitLocalChangesDialogRequest, openRevertLocalChangesDialogRequest, openSaveVersionDialogRequest, @@ -218,6 +219,31 @@ export class CanvasContextMenu implements ContextMenuDefinitionProvider { { isSeparator: true }, + { + condition: (selection: d3.Selection) => { + return this.canvasUtils.supportsCreateFlowBranch(selection); + }, + clazz: 'fa fa-code-fork', + text: 'Create Branch', + action: (selection: d3.Selection) => { + let pgId; + if (selection.empty()) { + pgId = this.canvasUtils.getProcessGroupId(); + } else { + pgId = selection.datum().id; + } + this.store.dispatch( + openCreateBranchDialogRequest({ + request: { + processGroupId: pgId + } + }) + ); + } + }, + { + isSeparator: true + }, { condition: (selection: d3.Selection) => { return this.canvasUtils.supportsStopFlowVersioning(selection); diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts index 7ed80573fccc..5c4a88a0e38a 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.spec.ts @@ -24,7 +24,7 @@ import * as fromFlow from '../state/flow/flow.reducer'; import { transformFeatureKey } from '../state/transform'; import * as fromTransform from '../state/transform/transform.reducer'; import { provideMockStore } from '@ngrx/store/testing'; -import { selectFlowState } from '../state/flow/flow.selectors'; +import { selectFlowState, selectRegistryClients } from '../state/flow/flow.selectors'; import { controllerServicesFeatureKey } from '../state/controller-services'; import * as fromControllerServices from '../state/controller-services/controller-services.reducer'; import { selectCurrentUser } from '../../../state/current-user/current-user.selectors'; @@ -69,6 +69,10 @@ describe('CanvasUtils', () => { { selector: selectFlowConfiguration, value: fromFlowConfiguration.initialState.flowConfiguration + }, + { + selector: selectRegistryClients, + value: [] } ] }) @@ -132,6 +136,66 @@ describe('CanvasUtils', () => { }); }); + describe('supportsCreateFlowBranch', () => { + it('should return true when process group is up to date and registry supports branching', () => { + const versionControlInformation = { + groupId: '1', + registryId: 'registry-1', + branch: 'main', + state: 'UP_TO_DATE' + }; + const pgDatum = { + id: '1', + type: ComponentType.ProcessGroup, + permissions: { canRead: true, canWrite: true }, + component: { + id: '1', + name: 'Test Process Group', + versionControlInformation + } + }; + const selection = d3.select(document.createElement('div')).classed('process-group', true).datum(pgDatum); + + (service as any).registryClients = [ + { + id: 'registry-1', + component: { supportsBranching: true } + } + ]; + + expect(service.supportsCreateFlowBranch(selection)).toBe(true); + }); + + it('should return true when process group has local changes', () => { + const versionControlInformation = { + groupId: '1', + registryId: 'registry-1', + branch: 'main', + state: 'LOCALLY_MODIFIED' + }; + const pgDatum = { + id: '1', + type: ComponentType.ProcessGroup, + permissions: { canRead: true, canWrite: true }, + component: { + id: '1', + name: 'Test Process Group', + versionControlInformation + } + }; + const selection = d3.select(document.createElement('div')).classed('process-group', true).datum(pgDatum); + + (service as any).registryClients = [ + { + id: 'registry-1', + component: { supportsBranching: true } + } + ]; + + expect(service.supportsCreateFlowBranch(selection)).toBe(true); + }); + }); + describe('isStoppable', () => { it('should return false for empty selection', () => { const emptySelection = d3.select(null); diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts index 9131bbd1135f..412b6673e6c1 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts @@ -26,7 +26,8 @@ import { selectConnections, selectCurrentParameterContext, selectCurrentProcessGroupId, - selectParentProcessGroupId + selectParentProcessGroupId, + selectRegistryClients } from '../state/flow/flow.selectors'; import { initialState as initialFlowState } from '../state/flow/flow.reducer'; import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; @@ -39,6 +40,7 @@ import { selectCurrentUser } from '../../../state/current-user/current-user.sele import { FlowConfiguration } from '../../../state/flow-configuration'; import { initialState as initialFlowConfigurationState } from '../../../state/flow-configuration/flow-configuration.reducer'; import { selectFlowConfiguration } from '../../../state/flow-configuration/flow-configuration.selectors'; +import type { RegistryClientEntity } from '../state/flow'; import { CopiedSnippet, VersionControlInformation } from '../state/flow'; import { Overlay, OverlayRef, PositionStrategy } from '@angular/cdk/overlay'; import { ComponentPortal } from '@angular/cdk/portal'; @@ -67,6 +69,7 @@ export class CanvasUtils { private flowConfiguration: FlowConfiguration | null = initialFlowConfigurationState.flowConfiguration; private scale: number = initialTransformState.scale; private connections: any[] = []; + private registryClients: RegistryClientEntity[] = initialFlowState.registryClients; private breadcrumbs: BreadcrumbEntity | null = null; private copiedSnippet: CopiedSnippet | null = null; @@ -104,6 +107,13 @@ export class CanvasUtils { this.connections = connections; }); + this.store + .select(selectRegistryClients) + .pipe(takeUntilDestroyed(this.destroyRef)) + .subscribe((registryClients) => { + this.registryClients = registryClients; + }); + this.store .select(selectCurrentUser) .pipe(takeUntilDestroyed(this.destroyRef)) @@ -2196,6 +2206,27 @@ export class CanvasUtils { ); } + public supportsCreateFlowBranch(selection: d3.Selection): boolean { + if (!this.canVersionFlows()) { + return false; + } + + const versionControlInformation = this.getFlowVersionControlInformation(selection); + + if (!versionControlInformation) { + return false; + } + + if (!versionControlInformation.registryId || versionControlInformation.state === 'SYNC_FAILURE') { + return false; + } + + const registryClient = this.registryClients.find( + (client) => client.id === versionControlInformation.registryId + ); + return !!registryClient?.component.supportsBranching; + } + /** * Determines whether the current selection supports stopping flow versioning. * diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/flow.service.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/flow.service.ts index fae04753d089..c48f7c00120c 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/flow.service.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/service/flow.service.ts @@ -43,6 +43,7 @@ import { ReplayLastProvenanceEventRequest, RunOnceRequest, SaveToVersionControlRequest, + CreateFlowBranchRequest, StartComponentRequest, StartProcessGroupRequest, StopComponentRequest, @@ -480,7 +481,6 @@ export class FlowService implements PropertyDescriptorRetriever { /* Clear Bulletins */ - clearBulletinForComponent(request: ClearBulletinsRequest): Observable { const payload = { fromTimestamp: request.fromTimestamp @@ -500,4 +500,28 @@ export class FlowService implements PropertyDescriptorRetriever { payload ); } + + /* + Create Branch + */ + createFlowBranch(request: CreateFlowBranchRequest): Observable { + const payload: any = { + processGroupRevision: request.revision, + branch: request.branch, + disconnectedNodeAcknowledged: this.clusterConnectionService.isDisconnectionAcknowledged() + }; + + if (request.sourceBranch) { + payload.sourceBranch = request.sourceBranch; + } + + if (request.sourceVersion) { + payload.sourceVersion = request.sourceVersion; + } + + return this.httpClient.post( + `${FlowService.API}/versions/process-groups/${request.processGroupId}/branches`, + payload + ) as Observable; + } } diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts index 9ea7f5769eb8..a1b331c4a6c1 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts @@ -24,8 +24,10 @@ import { ClearBulletinsForGroupResponse, ComponentEntity, ConfirmStopVersionControlRequest, + CreateBranchDialogRequest, CreateComponentRequest, CreateComponentResponse, + CreateFlowBranchRequest, CreateConnection, CreateConnectionRequest, CreatePortRequest, @@ -73,6 +75,7 @@ import { NavigateToParameterContext, NavigateToQueueListing, OpenChangeVersionDialogRequest, + OpenCreateBranchDialogRequest, OpenComponentDialogRequest, OpenGroupComponentsDialogRequest, OpenLocalChangesDialogRequest, @@ -756,6 +759,26 @@ export const disableControllerServicesInProcessGroup = createAction( props<{ id: string }>() ); +export const openCreateBranchDialogRequest = createAction( + `${CANVAS_PREFIX} Open Create Branch Dialog Request`, + props<{ request: OpenCreateBranchDialogRequest }>() +); + +export const openCreateBranchDialog = createAction( + `${CANVAS_PREFIX} Open Create Branch Dialog`, + props<{ request: CreateBranchDialogRequest }>() +); + +export const createFlowBranch = createAction( + `${CANVAS_PREFIX} Create Flow Branch`, + props<{ request: CreateFlowBranchRequest }>() +); + +export const createFlowBranchSuccess = createAction( + `${CANVAS_PREFIX} Create Flow Branch Success`, + props<{ response: VersionControlInformationEntity }>() +); + export const openChangeVersionDialogRequest = createAction( `${CANVAS_PREFIX} Open Change Flow Version Dialog Request`, props<{ request: OpenChangeVersionDialogRequest }>() diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts index c1e169c8c43b..75dd8cbd1de7 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts @@ -147,6 +147,7 @@ import { EditRemoteProcessGroup } from '../../ui/canvas/items/remote-process-gro import { HttpErrorResponse } from '@angular/common/http'; import { SaveVersionDialog } from '../../ui/canvas/items/flow/save-version-dialog/save-version-dialog.component'; import { ChangeVersionDialog } from '../../ui/canvas/items/flow/change-version-dialog/change-version-dialog'; +import { CreateBranchDialog } from '../../ui/canvas/items/flow/create-branch-dialog/create-branch-dialog'; import { ChangeVersionProgressDialog } from '../../ui/canvas/items/flow/change-version-progress-dialog/change-version-progress-dialog'; import { LocalChangesDialog } from '../../ui/canvas/items/flow/local-changes-dialog/local-changes-dialog'; import { ClusterConnectionService } from '../../../../service/cluster-connection.service'; @@ -4091,6 +4092,99 @@ export class FlowEffects { // Change version effects ///////////////////////////// + openCreateBranchDialogRequest$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.openCreateBranchDialogRequest), + map((action) => action.request), + switchMap((request) => + from(this.flowService.getVersionInformation(request.processGroupId)).pipe( + map((response) => { + const versionControlInformation = response.versionControlInformation; + if (!versionControlInformation) { + return FlowActions.flowSnackbarError({ + error: 'Process Group is no longer under version control.' + }); + } + + return FlowActions.openCreateBranchDialog({ + request: { + processGroupId: request.processGroupId, + revision: response.processGroupRevision, + versionControlInformation + } + }); + }), + catchError((errorResponse: HttpErrorResponse) => of(this.snackBarOrFullScreenError(errorResponse))) + ) + ) + ) + ); + + openCreateBranchDialog$ = createEffect( + () => + this.actions$.pipe( + ofType(FlowActions.openCreateBranchDialog), + map((action) => action.request), + tap((request) => { + const dialogRef = this.dialog.open(CreateBranchDialog, { + ...SMALL_DIALOG, + data: request, + autoFocus: true + }); + + dialogRef.componentInstance.createBranch.pipe(take(1)).subscribe((branch) => { + dialogRef.close(); + this.store.dispatch( + FlowActions.createFlowBranch({ + request: { + processGroupId: request.processGroupId, + revision: request.revision, + branch, + sourceBranch: request.versionControlInformation.branch, + sourceVersion: request.versionControlInformation.version + } + }) + ); + }); + }) + ), + { dispatch: false } + ); + + createFlowBranch$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.createFlowBranch), + map((action) => action.request), + switchMap((request) => + this.flowService.createFlowBranch(request).pipe( + map((response) => FlowActions.createFlowBranchSuccess({ response })), + catchError((errorResponse: HttpErrorResponse) => of(this.snackBarOrFullScreenError(errorResponse))) + ) + ) + ) + ); + + createFlowBranchSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.createFlowBranchSuccess), + map((action) => action.response), + tap((response) => { + const branch = response.versionControlInformation?.branch; + const message = branch + ? `Process Group is now tracking branch ${branch}.` + : 'Branch creation completed successfully.'; + + this.store.dispatch( + FlowActions.showOkDialog({ + title: 'Create Branch', + message + }) + ); + }), + switchMap(() => of(FlowActions.reloadFlow())) + ) + ); + openChangeVersionDialogRequest$ = createEffect(() => this.actions$.pipe( ofType(FlowActions.openChangeVersionDialogRequest), diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts index 4b1a01c160e0..ec1822050114 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts @@ -58,6 +58,8 @@ import { revertChangesComplete, revertChangesSuccess, runOnce, + createFlowBranch, + createFlowBranchSuccess, runOnceSuccess, saveToFlowRegistry, saveToFlowRegistrySuccess, @@ -433,6 +435,8 @@ export const flowReducer = createReducer( startComponent, stopComponent, runOnce, + createFlowBranch, + createFlowBranchSuccess, (state) => ({ ...state, saving: true @@ -604,7 +608,7 @@ export const flowReducer = createReducer( draftState.saving = false; }); }), - on(saveToFlowRegistry, stopVersionControl, (state) => ({ + on(saveToFlowRegistry, stopVersionControl, createFlowBranch, (state) => ({ ...state, versionSaving: true })), @@ -625,6 +629,27 @@ export const flowReducer = createReducer( draftState.versionSaving = false; }); }), + on(createFlowBranchSuccess, (state, { response }) => { + return produce(state, (draftState) => { + const collection: any[] | null = getComponentCollection(draftState, ComponentType.ProcessGroup); + + if (collection) { + const componentIndex: number = collection.findIndex( + (f: any) => response.versionControlInformation?.groupId === f.id + ); + if (componentIndex > -1) { + collection[componentIndex].revision = response.processGroupRevision; + collection[componentIndex].versionedFlowState = response.versionControlInformation?.state; + if (collection[componentIndex].component) { + collection[componentIndex].component.versionControlInformation = + response.versionControlInformation; + } + } + } + + draftState.versionSaving = false; + }); + }), on(stopVersionControlSuccess, (state, { response }) => { return produce(state, (draftState) => { const collection: any[] | null = getComponentCollection(draftState, ComponentType.ProcessGroup); diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts index d0ae7391bc24..a4d02cbda735 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts @@ -36,6 +36,8 @@ import { } from '@nifi/shared'; import { CopyResponseEntity, PasteRequestStrategy } from '../../../../state/copy'; +export type { RegistryClientEntity } from '../../../../state/shared'; + export const flowFeatureKey = 'flowState'; export interface SelectedComponent { @@ -209,6 +211,24 @@ export interface ChangeVersionDialogRequest { versions: VersionedFlowSnapshotMetadataEntity[]; } +export interface OpenCreateBranchDialogRequest { + processGroupId: string; +} + +export interface CreateBranchDialogRequest { + processGroupId: string; + revision: Revision; + versionControlInformation: VersionControlInformation; +} + +export interface CreateFlowBranchRequest { + processGroupId: string; + revision: Revision; + branch: string; + sourceBranch?: string; + sourceVersion?: string; +} + export interface SaveVersionDialogRequest { processGroupId: string; revision: Revision; diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.html b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.html new file mode 100644 index 000000000000..fc5cfe40e39e --- /dev/null +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.html @@ -0,0 +1,46 @@ + + +

Create Branch

+ + +
+ Current branch: + {{ currentBranch || 'Not specified' }} +
+ +
+ + Branch name + + + Branch name is required. + + + Branch name cannot be blank. + + + Branch name must differ from the current branch. + + +
+
+ + + + + diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.scss b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.scss new file mode 100644 index 000000000000..6bc88497dfab --- /dev/null +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.scss @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +.dialog-description { + margin-bottom: 16px; +} + +.branch-name { + font-weight: 600; +} diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.ts new file mode 100644 index 000000000000..07329c88b3d8 --- /dev/null +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/flow/create-branch-dialog/create-branch-dialog.ts @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Component, EventEmitter, Output, inject } from '@angular/core'; +import { MAT_DIALOG_DATA, MatDialogModule } from '@angular/material/dialog'; +import { MatButton } from '@angular/material/button'; +import { ReactiveFormsModule, FormBuilder, Validators } from '@angular/forms'; +import { MatFormFieldModule } from '@angular/material/form-field'; +import { MatInputModule } from '@angular/material/input'; +import { CreateBranchDialogRequest } from '../../../../../state/flow'; +import { CloseOnEscapeDialog } from '@nifi/shared'; +import { CommonModule } from '@angular/common'; + +@Component({ + selector: 'create-branch-dialog', + imports: [CommonModule, MatDialogModule, MatFormFieldModule, MatInputModule, MatButton, ReactiveFormsModule], + templateUrl: './create-branch-dialog.html', + styleUrl: './create-branch-dialog.scss' +}) +export class CreateBranchDialog extends CloseOnEscapeDialog { + private dialogRequest = inject(MAT_DIALOG_DATA); + private formBuilder = inject(FormBuilder); + + createBranchForm = this.formBuilder.group({ + branch: ['', [Validators.required, Validators.pattern(/^(?!\s).*$/), this.branchNotCurrentValidator.bind(this)]] + }); + + currentBranch = this.dialogRequest.versionControlInformation.branch; + + @Output() createBranch: EventEmitter = new EventEmitter(); + + branchNotCurrentValidator(control: { value: string }) { + if (!control.value) { + return null; + } + + const trimmedValue = control.value.trim(); + + if (this.currentBranch && trimmedValue === this.currentBranch) { + return { + branchConflicts: true + }; + } + + return null; + } + + submit() { + if (this.createBranchForm.valid) { + const branch = this.createBranchForm.controls.branch.value?.trim(); + if (branch) { + this.createBranch.emit(branch); + } + } else { + this.createBranchForm.markAllAsTouched(); + } + } +} diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/ui/common/component-state/component-state.component.html b/nifi-frontend/src/main/frontend/apps/nifi/src/app/ui/common/component-state/component-state.component.html index 29a379d79f88..dc9c8fa8b31f 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/ui/common/component-state/component-state.component.html +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/ui/common/component-state/component-state.component.html @@ -80,9 +80,7 @@

Component State

--> - +
@@ -115,7 +113,8 @@

Component State

} @if (displayedColumns.includes('actions')) { - + }