Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,42 @@ public FlowRegistryBranch getDefaultBranch(final FlowRegistryClientConfiguration
return defaultBranch;
}

@Override
public void createBranch(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation sourceLocation, final String newBranchName)
throws FlowRegistryException, IOException {
if (StringUtils.isBlank(newBranchName)) {
throw new IllegalArgumentException("Branch name must be specified when creating a new branch");
}

final GitRepositoryClient repositoryClient = getRepositoryClient(context);
verifyWritePermissions(repositoryClient);

final String sourceBranch = resolveSourceBranch(context, sourceLocation);
if (StringUtils.isBlank(sourceBranch)) {
throw new FlowRegistryException("Unable to determine source branch for new branch creation");
}

final Optional<String> sourceCommitSha = sourceLocation == null ? Optional.empty() : Optional.ofNullable(sourceLocation.getVersion());
final String trimmedBranchName = newBranchName.trim();
final String trimmedSourceBranch = sourceBranch.trim();

getLogger().info("Creating branch [{}] from branch [{}]", trimmedBranchName, trimmedSourceBranch);

try {
repositoryClient.createBranch(trimmedBranchName, trimmedSourceBranch, sourceCommitSha);
} catch (final UnsupportedOperationException e) {
throw new FlowRegistryException("Configured repository client does not support branch creation", e);
}
}

private String resolveSourceBranch(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation sourceLocation) {
if (sourceLocation != null && StringUtils.isNotBlank(sourceLocation.getBranch())) {
return sourceLocation.getBranch();
}
final String defaultBranch = context.getProperty(REPOSITORY_BRANCH).getValue();
return StringUtils.isBlank(defaultBranch) ? null : defaultBranch;
}

@Override
public Set<FlowRegistryBucket> getBuckets(final FlowRegistryClientConfigurationContext context, final String branch) throws IOException, FlowRegistryException {
final GitRepositoryClient repositoryClient = getRepositoryClient(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ public interface GitRepositoryClient {
*/
InputStream deleteContent(String filePath, String commitMessage, String branch) throws FlowRegistryException, IOException;

/**
* Creates a new branch in the repository.
*
* @param newBranchName the name of the branch to create
* @param sourceBranch the name of the source branch
* @param sourceCommitSha optional commit SHA to use as the starting point for the new branch. If empty, the head commit of the source branch should be used.
* @throws IOException if an I/O error occurs
* @throws FlowRegistryException if a non-I/O error occurs
* @throws UnsupportedOperationException if the repository implementation does not support branch creation
*/
default void createBranch(final String newBranchName, final String sourceBranch, final Optional<String> sourceCommitSha)
throws IOException, FlowRegistryException {
throw new UnsupportedOperationException("Branch creation is not supported");
}

/**
* Closes any resources held by the client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext;
import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.FlowVersionLocation;
import org.apache.nifi.registry.flow.git.client.GitCommit;
import org.apache.nifi.registry.flow.git.client.GitCreateContentRequest;
import org.apache.nifi.registry.flow.git.client.GitRepositoryClient;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Test;

import javax.net.ssl.SSLContext;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
Expand All @@ -40,6 +44,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class AbstractGitFlowRegistryClientTest {
Expand All @@ -49,6 +54,7 @@ void verifySuccessful() throws Exception {
final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a", ".git"));
final AtomicReference<TestGitRepositoryClient> suppliedClient = new AtomicReference<>(repositoryClient);
final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> suppliedClient.getAndSet(null), "[email protected]");
flowRegistryClient.initialize(createInitializationContext());
final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*");
final ComponentLog verificationLogger = new MockComponentLog("test-component", this);

Expand All @@ -71,6 +77,7 @@ void verifyAuthenticationFailure() {
final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> {
throw new FlowRegistryException("Authentication failed");
}, "[email protected]");
flowRegistryClient.initialize(createInitializationContext());
final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*");
final ComponentLog verificationLogger = new MockComponentLog("test-component", this);

Expand All @@ -85,6 +92,7 @@ void verifyAuthenticationFailure() {
void verifyReadFailureSkipsBucketListing() throws Exception {
final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(false, false, Set.of());
final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "[email protected]");
flowRegistryClient.initialize(createInitializationContext());
final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*");
final ComponentLog verificationLogger = new MockComponentLog("test-component", this);

Expand All @@ -100,10 +108,11 @@ void verifyReadFailureSkipsBucketListing() throws Exception {

@Test
void verifyBucketListingFailureReported() throws Exception {
final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of());
final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a"));
repositoryClient.setGetTopLevelDirectoryNamesException(new FlowRegistryException("listing error"));

final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "[email protected]");
flowRegistryClient.initialize(createInitializationContext());
final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*");
final ComponentLog verificationLogger = new MockComponentLog("test-component", this);

Expand All @@ -118,6 +127,41 @@ void verifyBucketListingFailureReported() throws Exception {
assertTrue(repositoryClient.isClosed());
}

@Test
void createBranchDelegatesToRepositoryClient() throws Exception {
final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a"));
final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "[email protected]");
flowRegistryClient.initialize(createInitializationContext());
final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*");

final FlowVersionLocation sourceLocation = new FlowVersionLocation("source-branch", "bucket-a", "flow-x", "commit-1");

flowRegistryClient.createBranch(context, sourceLocation, " new-branch ");

assertEquals("new-branch", repositoryClient.getCreatedBranch());
assertEquals("source-branch", repositoryClient.getCreatedBranchSource());
assertEquals(Optional.of("commit-1"), repositoryClient.getCreatedBranchCommit());
}

@Test
void createBranchUnsupportedThrowsFlowRegistryException() throws Exception {
final TestGitRepositoryClient repositoryClient = new TestGitRepositoryClient(true, true, Set.of("bucket-a"));
repositoryClient.setBranchCreationUnsupported(true);

final TestGitFlowRegistryClient flowRegistryClient = new TestGitFlowRegistryClient(() -> repositoryClient, "[email protected]");
flowRegistryClient.initialize(createInitializationContext());
final FlowRegistryClientConfigurationContext context = createContext("main", "[.].*");

final FlowVersionLocation sourceLocation = new FlowVersionLocation();
sourceLocation.setBranch("source");

final FlowRegistryException exception = assertThrows(FlowRegistryException.class,
() -> flowRegistryClient.createBranch(context, sourceLocation, "new-branch"));

assertEquals("Configured repository client does not support branch creation", exception.getMessage());
assertTrue(repositoryClient.getCreatedBranchCommit().isEmpty());
}

private FlowRegistryClientConfigurationContext createContext(final String branch, final String exclusionPattern) {
final Map<PropertyDescriptor, PropertyValue> properties = Map.of(
AbstractGitFlowRegistryClient.REPOSITORY_BRANCH, new MockPropertyValue(branch),
Expand Down Expand Up @@ -145,6 +189,25 @@ public Optional<String> getNiFiUserIdentity() {
};
}

private FlowRegistryClientInitializationContext createInitializationContext() {
return new FlowRegistryClientInitializationContext() {
@Override
public String getIdentifier() {
return "test-git-client";
}

@Override
public ComponentLog getLogger() {
return new MockComponentLog("test-git-client", AbstractGitFlowRegistryClientTest.this);
}

@Override
public Optional<SSLContext> getSystemSslContext() {
return Optional.empty();
}
};
}

private static class TestGitFlowRegistryClient extends AbstractGitFlowRegistryClient {
private final RepositoryClientSupplier repositoryClientSupplier;
private final String storageLocation;
Expand Down Expand Up @@ -186,6 +249,11 @@ private static class TestGitRepositoryClient implements GitRepositoryClient {
private FlowRegistryException topLevelDirectoryNamesException;
private IOException topLevelDirectoryNamesIOException;
private boolean closed;
private boolean branchCreationUnsupported;
private FlowRegistryException createBranchException;
private String createdBranch;
private String createdBranchSource;
private Optional<String> createdBranchCommit = Optional.empty();

TestGitRepositoryClient(final boolean canRead, final boolean canWrite, final Set<String> bucketNames) {
this.canRead = canRead;
Expand All @@ -198,11 +266,31 @@ void setGetTopLevelDirectoryNamesException(final FlowRegistryException exception
this.topLevelDirectoryNamesIOException = null;
}

void setGetTopLevelDirectoryNamesException(final IOException exception) {
void setGetTopLevelDirectoryNamesIOException(final IOException exception) {
this.topLevelDirectoryNamesIOException = exception;
this.topLevelDirectoryNamesException = null;
}

void setBranchCreationUnsupported(final boolean unsupported) {
this.branchCreationUnsupported = unsupported;
}

void setCreateBranchException(final FlowRegistryException exception) {
this.createBranchException = exception;
}

String getCreatedBranch() {
return createdBranch;
}

String getCreatedBranchSource() {
return createdBranchSource;
}

Optional<String> getCreatedBranchCommit() {
return createdBranchCommit;
}

boolean isClosed() {
return closed;
}
Expand All @@ -228,6 +316,21 @@ public Set<String> getTopLevelDirectoryNames(final String branch) throws IOExcep
return bucketNames;
}

@Override
public void createBranch(final String newBranchName, final String sourceBranch, final Optional<String> sourceCommitSha)
throws IOException, FlowRegistryException {
if (branchCreationUnsupported) {
throw new UnsupportedOperationException("Branch creation not supported");
}
if (createBranchException != null) {
throw createBranchException;
}

createdBranch = newBranchName;
createdBranchSource = sourceBranch;
createdBranchCommit = sourceCommitSha;
}

@Override
public void close() {
closed = true;
Expand Down Expand Up @@ -265,7 +368,7 @@ public Optional<String> getContentSha(final String path, final String branch) {

@Override
public String createContent(final GitCreateContentRequest request) {
throw new UnsupportedOperationException("Not required for test");
return "test-commit";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.git.client.GitCommit;
Expand Down Expand Up @@ -425,6 +426,56 @@ public InputStream deleteContent(final String filePath, final String commitMessa
});
}

@Override
public void createBranch(final String newBranchName, final String sourceBranch, final Optional<String> sourceCommitSha)
throws IOException, FlowRegistryException {
if (StringUtils.isBlank(newBranchName)) {
throw new IllegalArgumentException("Branch name must be specified");
}
if (StringUtils.isBlank(sourceBranch)) {
throw new IllegalArgumentException("Source branch must be specified");
}

final String trimmedNewBranch = newBranchName.trim();
final String trimmedSourceBranch = sourceBranch.trim();
final String newBranchRefPath = "heads/" + trimmedNewBranch;
final String sourceBranchRefPath = "heads/" + trimmedSourceBranch;

try {
execute(() -> repository.getRef(newBranchRefPath));
throw new FlowRegistryException("Branch [" + trimmedNewBranch + "] already exists");
} catch (final FileNotFoundException notFound) {
logger.debug("Branch [{}] does not exist and will be created", trimmedNewBranch, notFound);
}

final GHRef sourceBranchRef;
try {
sourceBranchRef = execute(() -> repository.getRef(sourceBranchRefPath));
} catch (final FileNotFoundException notFound) {
throw new FlowRegistryException("Source branch [" + trimmedSourceBranch + "] does not exist", notFound);
}

final String baseCommitSha;
if (sourceCommitSha.isPresent()) {
final String requestedCommitSha = sourceCommitSha.get();
try {
baseCommitSha = execute(() -> repository.getCommit(requestedCommitSha).getSHA1());
} catch (final FileNotFoundException notFound) {
throw new FlowRegistryException("Commit [" + requestedCommitSha + "] was not found in the repository", notFound);
}
} else {
baseCommitSha = sourceBranchRef.getObject().getSha();
}

logger.info("Creating branch [{}] from [{}] at commit [{}] for repository [{}]",
trimmedNewBranch, trimmedSourceBranch, baseCommitSha, repository.getFullName());

execute(() -> {
repository.createRef(BRANCH_REF_PATTERN.formatted(trimmedNewBranch), baseCommitSha);
return null;
});
}

private String getResolvedPath(final String path) {
return repoPath == null ? path : repoPath + "/" + path;
}
Expand Down
Loading
Loading