Skip to content
This repository was archived by the owner on Oct 21, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 111 additions & 66 deletions rcmt/git.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,100 @@
import os.path
from typing import Any, Mapping, Tuple, Union
from typing import Any, Mapping, Optional, Union

import git
import structlog
from git.exc import GitCommandError

from rcmt import source

log = structlog.get_logger(package="git")
log: structlog.stdlib.BoundLogger = structlog.get_logger(package="git")


class Git:
def __init__(
self,
branch_name: str,
base_branch: str,
checkout_dir: str,
clone_opts: Mapping[str, Any],
data_dir: str,
repository_name: str,
user_name: str,
user_email: str,
):
self.branch_name = branch_name
self.base_branch = base_branch
self.checkout_dir = checkout_dir
self.clone_opts = clone_opts
self.data_dir = data_dir
self.repository_name = repository_name
self.user_email = user_email
self.user_name = user_name

def checkout_dir(self, repo: source.Repository) -> str:
return os.path.join(self.data_dir, repo.source, repo.project, repo.name)
self.git_repo: Optional[git.Repo] = None

def commit_changes(self, msg: str):
if self.git_repo is None:
raise RuntimeError(
"git repository not initialized - call initialize() first"
)

def commit_changes(self, repo_dir: str, msg: str):
git_repo = git.Repo(path=repo_dir)
git_repo.git.add(all=True)
git_repo.index.commit(msg)
self.git_repo.git.add(all=True)
self.git_repo.index.commit(msg)

def has_changes_origin(self, branch: str) -> bool:
if self.git_repo is None:
raise RuntimeError(
"git repository not initialized - call initialize() first"
)

@staticmethod
def has_changes_origin(branch: str, repo_dir: str) -> bool:
git_repo = git.Repo(path=repo_dir)
try:
return len(git_repo.index.diff(f"origin/{branch}")) > 0
return len(self.git_repo.index.diff(f"origin/{branch}")) > 0
except git.BadName:
# git.BadName thrown if "origin/<branch>" does not exist.
# That means that this is the first time the Task is executed for this
# repository. Always push in this case, thus return True here.
return True

@staticmethod
def has_changes_local(repo_dir: str) -> bool:
git_repo = git.Repo(path=repo_dir)
return len(git_repo.index.diff(None)) > 0 or len(git_repo.untracked_files) > 0
def has_changes_local(self) -> bool:
if self.git_repo is None:
raise RuntimeError(
"git repository not initialized - call initialize() first"
)

return (
len(self.git_repo.index.diff(None)) > 0
or len(self.git_repo.untracked_files) > 0
)

def prepare(self, repo: source.Repository) -> Tuple[str, bool]:
def initialize(self, clone_url: str) -> None:
"""
1. Clone repository
2. Checkout base branch
3. Update base branch
4. Create task branch
5. Reset task branch to base branch
3. Prune deleted/merged remote branches
4. Update base branch
"""
checkout_dir = self.checkout_dir(repo)
if os.path.exists(checkout_dir) is False:
log.debug("Cloning repository", url=repo.clone_url, repo=str(repo))
os.makedirs(checkout_dir)
# Do clone/prune/checkout/fetch of base branch only once per run.
if self.git_repo is not None:
return None

if os.path.exists(self.checkout_dir) is False:
log.debug("Cloning repository", url=clone_url, repo=self.repository_name)
os.makedirs(self.checkout_dir)
git_repo = git.Repo.clone_from(
repo.clone_url, checkout_dir, **self.clone_opts
clone_url, self.checkout_dir, **self.clone_opts
)
else:
git_repo = git.Repo(path=checkout_dir)
self.reset(git_repo)

if self.validate_branch_name(git_repo) is False:
raise RuntimeError(f"Branch name '{self.branch_name}' is not valid")
git_repo = git.Repo(path=self.checkout_dir)
git_repo.git.reset("HEAD", hard=True)

git_repo.config_writer().set_value("user", "email", self.user_email).release()
git_repo.config_writer().set_value("user", "name", self.user_name).release()
log.debug("Checking out base branch", branch=repo.base_branch, repo=str(repo))
git_repo.heads[repo.base_branch].checkout()
log.debug(
"Checking out base branch",
branch=self.base_branch,
repo=self.repository_name,
)
git_repo.heads[self.base_branch].checkout()
hash_before_pull = str(git_repo.head.commit)
log.debug(
"Pulling changes into base branch", branch=repo.base_branch, repo=str(repo)
"Pulling changes into base branch",
branch=self.base_branch,
repo=self.repository_name,
)
git_repo.remotes["origin"].pull()
git_repo.git.remote("prune", "origin")
Expand All @@ -86,65 +103,93 @@ def prepare(self, repo: source.Repository) -> Tuple[str, bool]:
if has_base_branch_update is True:
log.debug(
"Base branch contains new commits",
base_branch=repo.base_branch,
repo=str(repo),
base_branch=self.base_branch,
repo=self.repository_name,
)

exists_local = branch_exists_local(self.branch_name, git_repo)
remote_branch = get_remote_branch(self.branch_name, git_repo)
self.git_repo = git_repo

def prepare_branch(self, branch_name: str) -> bool:
"""
1. Reset any previous changes
2. Create branch, if it does not exist
3. Detect if a merge conflict exists
4. Rebase changes of default branch onto working branch
"""
if self.git_repo is None:
raise RuntimeError(
"git repository not initialized - call initialize() first"
)

if (
self.validate_branch_name(branch_name=branch_name, git_repo=self.git_repo)
is False
):
raise RuntimeError(f"Branch name '{branch_name}' is not valid")

self.git_repo.git.reset("HEAD", hard=True)
self.git_repo.heads[self.base_branch].checkout()
exists_local = branch_exists_local(branch_name, self.git_repo)
remote_branch = get_remote_branch(branch_name, self.git_repo)
if exists_local is False:
log.debug("Creating branch", branch=self.branch_name, repo=str(repo))
log.debug("Creating branch", branch=branch_name, repo=self.repository_name)
if remote_branch is None:
git_repo.create_head(self.branch_name)
self.git_repo.create_head(branch_name)
else:
git_repo.create_head(self.branch_name, remote_branch)
self.git_repo.create_head(branch_name, remote_branch)

has_conflict = False
if remote_branch is not None:
try:
# Try to merge. Errors if there is a merge conflict.
git_repo.git.merge(self.branch_name, no_ff=True, no_commit=True)
self.git_repo.git.merge(branch_name, no_ff=True, no_commit=True)
except GitCommandError as e:
# Exit codes "1" or "2" indicate that a merge is not successful
if e.status != 1 and e.status != 2:
raise e

log.debug(
"Merge conflict with base branch",
base_branch=repo.base_branch,
repo=str(repo),
base_branch=self.base_branch,
repo=self.repository_name,
)
has_conflict = True

try:
# Abort the merge to not leave the branch in a conflicted state
git_repo.git.merge(abort=True)
self.git_repo.git.merge(abort=True)
except GitCommandError as e:
# "128" is the exit code of the git command if no abort was needed
if e.status != 128:
raise e

log.debug("Checking out work branch", branch=self.branch_name, repo=str(repo))
git_repo.heads[self.branch_name].checkout()
merge_base = git_repo.git.merge_base(repo.base_branch, self.branch_name)
log.debug("Resetting to merge base", branch=self.branch_name, repo=str(repo))
git_repo.git.reset(merge_base, hard=True)
log.debug("Rebasing onto work branch", branch=self.branch_name, repo=str(repo))
git_repo.git.rebase(repo.base_branch)
log.debug(
"Checking out work branch", branch=branch_name, repo=self.repository_name
)
self.git_repo.heads[branch_name].checkout()
merge_base = self.git_repo.git.merge_base(self.base_branch, branch_name)
log.debug(
"Resetting to merge base", branch=branch_name, repo=self.repository_name
)
self.git_repo.git.reset(merge_base, hard=True)
log.debug(
"Rebasing onto work branch", branch=branch_name, repo=self.repository_name
)
self.git_repo.git.rebase(self.base_branch)
return has_conflict

return checkout_dir, has_conflict
def push(self, branch_name: str):
if self.git_repo is None:
raise RuntimeError(
"git repository not initialized - call initialize() first"
)

def push(self, repo_dir):
git_repo = git.Repo(path=repo_dir)
git_repo.git.push("origin", self.branch_name, force=True, set_upstream=True)
self.git_repo.git.push("origin", branch_name, force=True, set_upstream=True)

@staticmethod
def reset(repo: git.Repo) -> None:
repo.git.reset("HEAD", hard=True)

def validate_branch_name(self, repo: git.Repo) -> bool:
def validate_branch_name(branch_name: str, git_repo: git.Repo) -> bool:
try:
repo.git.check_ref_format(f"refs/heads/{self.branch_name}")
git_repo.git.check_ref_format(f"refs/heads/{branch_name}")
return True
except GitCommandError:
return False
Expand Down
Loading