Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gitops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from pathlib import Path

from gitops.utils.apps import App, get_app_details, get_apps
from gitops.utils.apps import App, get_app_details, get_apps # type: ignore[attr-defined]

from .utils.cli import success, warning

Expand Down
42 changes: 28 additions & 14 deletions gitops/common/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(
self,
name: str,
path: str | None = None,
deployments: dict | None = None,
secrets: dict | None = None,
deployments: dict[str, Any] | None = None,
secrets: dict[str, str] | None = None,
load_secrets: bool = True,
encode_secrets: bool = True,
# deprecated
Expand Down Expand Up @@ -75,7 +75,7 @@ def set_value(self, path: str, value: Any) -> None:
current_dict = current_dict.setdefault(key, {})
current_dict[keys[-1]] = value

def _make_values(self, deployments: dict, secrets: dict[str, str]) -> dict:
def _make_values(self, deployments: dict[str, Any], secrets: dict[str, str]) -> dict[str, Any]:
def encode(value: str) -> str:
return b64encode(str(value).encode()).decode() if self.encode_secrets else value

Expand All @@ -93,14 +93,17 @@ def encode(value: str) -> str:
values.pop("images", None)
return values

def _make_image(self, deployment_config: dict) -> str:
def _make_image(self, deployment_config: dict[str, Any]) -> str:
if "image-tag" in deployment_config:
return deployment_config["images"]["template"].format(
account_id=self.account_id,
tag=deployment_config["image-tag"],
return str(
deployment_config["images"]["template"].format(
account_id=self.account_id,
tag=deployment_config["image-tag"],
)
)
else:
return deployment_config.get("image", "")
image = deployment_config.get("image", "")
return str(image) if image else ""

@property
def image(self) -> str:
Expand All @@ -109,7 +112,7 @@ def image(self) -> str:
if isinstance(image, dict):
return f"{image['repository']}:{image.get('tag', 'latest')}"
else:
return image
return str(image) if image else ""

@property
def image_repository_name(self) -> str:
Expand Down Expand Up @@ -139,20 +142,31 @@ def image_prefix(self) -> str:

@property
def cluster(self) -> str:
return self.values.get("cluster", "")
cluster = self.values.get("cluster", "")
return str(cluster) if cluster else ""

@property
def tags(self) -> list[str]:
return self.values.get("tags", [])
tags = self.values.get("tags", [])
return list(tags) if tags else []

@property
def service_account_name(self) -> str:
return self.values.get("serviceAccount", {}).get("name") or self.values.get("serviceAccountName") or "default"
service_account = self.values.get("serviceAccount", {})
if isinstance(service_account, dict):
name = service_account.get("name")
if name:
return str(name)
service_account_name = self.values.get("serviceAccountName")
if service_account_name:
return str(service_account_name)
return "default"

@property
def secrets(self) -> dict[str, str]:
# TODO: This should be a first class property
return self.values.get("secrets", {})
secrets = self.values.get("secrets", {})
return dict(secrets) if secrets else {}


class Chart:
Expand All @@ -177,7 +191,7 @@ class Chart:
chart: https://github.com/uptick/workforce
"""

def __init__(self, definition: dict | str):
def __init__(self, definition: dict[str, Any] | str):
if isinstance(definition, str):
# for backwards compat, any chart definition which is a string, is a git repo
self.type = "git"
Expand Down
7 changes: 4 additions & 3 deletions gitops/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import os
from typing import Any

import yaml


def load_yaml(path: str) -> dict:
def load_yaml(path: str) -> dict[str, Any]:
with open(path) as file:
return resolve_values(yaml.safe_load(file), path)


def resolve_values(values: dict, path: str) -> dict:
def resolve_values(values: dict[str, Any], path: str) -> dict[str, Any]:
if "extends" not in values:
return values
parent_values = load_yaml(os.path.join(os.path.dirname(path), values["extends"]))
return deep_merge(parent_values, values)


def deep_merge(parent: dict, child: dict) -> dict:
def deep_merge(parent: dict[str, Any], child: dict[str, Any]) -> dict[str, Any]:
"""Deeply merge two dictionaries.

Dictionary entries will be followed and merged, anything else will be
Expand Down
81 changes: 45 additions & 36 deletions gitops/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import os
import uuid
from typing import Any

from colorama import Fore
from invoke import run, task
Expand All @@ -16,7 +18,7 @@


@task
def summary(ctx, filter="", exclude=""):
def summary(ctx: Any, filter: str = "", exclude: str = "") -> None:
"""Produce a summary of apps, their tags, and their expected images & replicas.
May not necessarily reflect actual app statuses if recent changes haven't yet been pushed to
the remote, or the deployment has failed.
Expand All @@ -32,18 +34,18 @@ def summary(ctx, filter="", exclude=""):

@task
def bump( # noqa: C901
ctx,
filter,
exclude="",
image_tag=None,
prefix=None,
autoexclude_inactive=True,
interactive=True,
push=False,
redeploy=False,
skip_migrations=False,
skip_deploy=False,
):
ctx: Any,
filter: str,
exclude: str = "",
image_tag: str | None = None,
prefix: str | None = None,
autoexclude_inactive: bool = True,
interactive: bool = True,
push: bool = False,
redeploy: bool = False,
skip_migrations: bool = False,
skip_deploy: bool = False,
) -> None:
"""Bump image tag on selected app(s).
Provide `image_tag` to set to a specific image tag, or provide `prefix` to use latest image
with the given prefix.
Expand Down Expand Up @@ -134,16 +136,16 @@ def bump( # noqa: C901

@task
def command(
ctx,
filter,
command,
exclude="",
cleanup=True,
sequential=True,
interactive=True,
cpu=0,
memory=0,
):
ctx: Any,
filter: str,
command: str,
exclude: str = "",
cleanup: bool = True,
sequential: bool = True,
interactive: bool = True,
cpu: int = 0,
memory: int = 0,
) -> None:
"""Run command on selected app(s).

eg. inv command customer,sandbox -e aesg "python manage.py migrate"
Expand Down Expand Up @@ -183,7 +185,7 @@ def command(


@task
def tag(ctx, filter, tag, exclude=""):
def tag(ctx: Any, filter: str, tag: str, exclude: str = "") -> None:
"""Set a tag on selected app(s)."""
try:
apps = get_apps(
Expand All @@ -209,7 +211,7 @@ def tag(ctx, filter, tag, exclude=""):


@task
def untag(ctx, filter, tag, exclude=""):
def untag(ctx: Any, filter: str, tag: str, exclude: str = "") -> None:
"""Unset a tag from selected app(s)."""
try:
apps = get_apps(
Expand All @@ -235,32 +237,34 @@ def untag(ctx, filter, tag, exclude=""):


@task # TODO: want `keys` to be optional-positional: https://github.com/pyinvoke/invoke/issues/159
def getenv(ctx, filter, keys="", exclude=""):
def getenv(ctx: Any, filter: str, keys: str = "", exclude: str = "") -> None:
"""Get one or more env vars on selected app(s)."""
_getenv("environment", filter, exclude, keys)


@task # TODO: want `keys` to be optional-positional: https://github.com/pyinvoke/invoke/issues/159
def getsecrets(ctx, filter, keys="", exclude=""):
def getsecrets(ctx: Any, filter: str, keys: str = "", exclude: str = "") -> None:
"""Get one or more secrets on selected app(s)."""
_getenv("secrets", filter, exclude, keys)


def _getenv(env_or_secrets, filter, exclude, filter_values):
filter_values = filter_values.split(",") if filter_values else ""
def _getenv(env_or_secrets: str, filter: str, exclude: str, filter_values: str) -> None:
filter_values_list = filter_values.split(",") if filter_values else []
apps = get_apps(filter=filter, exclude=exclude, mode="SILENT")
for app in apps:
print("-" * 20, progress(app.name), sep="\n")
values = app.values.get(env_or_secrets)
if isinstance(values, dict):
filtered_values = {k: v for k, v in values.items() if k in filter_values} if filter_values else values
filtered_values = (
{k: v for k, v in values.items() if k in filter_values_list} if filter_values_list else values
)
for k, v in filtered_values.items():
print(f"{k}={v}")
else:
print(warning(f"No {env_or_secrets} set."))


def _sort_envs(envs):
def _sort_envs(envs: dict[str, Any]) -> dict[str, Any]:
sorted_envs = {}
for e in config.getlist("env_order", fallback=""):
if e in envs:
Expand All @@ -271,7 +275,7 @@ def _sort_envs(envs):


@task
def setenv(ctx, filter, values, exclude=""):
def setenv(ctx: Any, filter: str, values: str, exclude: str = "") -> None:
"""Set one or more env vars on selected app(s).

eg. inv setenv customer,sandbox BG_RUNNER=DRAMATIQ,BUMP=2
Expand All @@ -292,11 +296,16 @@ def setenv(ctx, filter, values, exclude=""):
print(success_negative("Aborted."))
return
for app in apps:
# Split on first = only, in case values contain =
new_envs: dict[str, Any] = {}
for e in splitenvs:
key, value = e.split("=", 1)
new_envs[key] = value
update_app(
app.name,
environment=_sort_envs(
{
**dict(tuple(e.split("=")) for e in splitenvs),
**new_envs,
**app.values.get("environment", {}),
}
),
Expand All @@ -309,7 +318,7 @@ def setenv(ctx, filter, values, exclude=""):


@task
def unsetenv(ctx, filter, values, exclude=""):
def unsetenv(ctx: Any, filter: str, values: str, exclude: str = "") -> None:
"""Unset one or more env vars on selected app(s).

eg. inv unsetenv customer,sandbox BG_RUNNER,BUMP
Expand Down Expand Up @@ -343,7 +352,7 @@ def unsetenv(ctx, filter, values, exclude=""):


@task
def setcluster(ctx, filter, cluster, exclude=""):
def setcluster(ctx: Any, filter: str, cluster: str, exclude: str = "") -> None:
"""Move selected app(s) to given cluster.

eg. inv setcluster customer,sandbox eks-prod
Expand All @@ -370,7 +379,7 @@ def setcluster(ctx, filter, cluster, exclude=""):
print(success("Done!"))


def git_push(cluster_path: str, retry: int = 3):
def git_push(cluster_path: str | os.PathLike[str], retry: int = 3) -> None:
"""Git pushes in a directory and retries if commits already exist"""
print(progress(f"Pushing changes to {cluster_path}"))
attempts = 0
Expand Down
Loading
Loading