Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid conflicts when deployed by ArgoCD #147

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@

CLUSTER_SECRET_LABEL = "clustersecret.io"

BLACK_LISTED_ANNOTATIONS = ["kopf.zalando.org", "kubectl.kubernetes.io"]
BLACK_LISTED_ANNOTATIONS = ["kopf.zalando.org", "kubectl.kubernetes.io"]
BLACK_LISTED_LABELS = ["app.kubernetes.io"]
35 changes: 22 additions & 13 deletions src/kubernetes_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List, Mapping
from typing import Optional, Dict, Any, List, Mapping, Tuple, Iterator
import re

import kopf
from kubernetes.client import CoreV1Api, CustomObjectsApi, exceptions, V1ObjectMeta, rest, V1Secret

from os_utils import get_replace_existing, get_version
from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLACK_LISTED_ANNOTATIONS, \
CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL
BLACK_LISTED_LABELS, CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL


def patch_clustersecret_status(
Expand Down Expand Up @@ -286,27 +286,36 @@ def create_secret_metadata(
Kubernetes metadata object with ClusterSecret annotations.
"""

_labels = {
def filter_dict(
prefixes: List[str],
base: Dict[str, str],
source: Optional[Mapping[str, str]] = None
) -> Iterator[Tuple[str, str]]:
""" Remove potential useless / dangerous annotations and labels"""
for item in base.items():
yield item
if source is not None:
for item in source.items():
key, _ = item
if not any(key.startswith(prefix) for prefix in prefixes):
yield item

base_labels = {
CLUSTER_SECRET_LABEL: 'true'
}
_labels.update(labels or {})

_annotations = {
base_annotations = {
CREATE_BY_ANNOTATION: CREATE_BY_AUTHOR,
VERSION_ANNOTATION: get_version(),
LAST_SYNC_ANNOTATION: datetime.now().isoformat(),
}
_annotations.update(annotations or {})

# Remove potential useless / dangerous annotations
_annotations = {key: value for key, value in _annotations.items() if
not any(key.startswith(prefix) for prefix in BLACK_LISTED_ANNOTATIONS)}

_annotations = filter_dict(BLACK_LISTED_ANNOTATIONS, base_annotations, annotations)
_labels = filter_dict(BLACK_LISTED_LABELS, base_labels, labels)
return V1ObjectMeta(
name=name,
namespace=namespace,
annotations=_annotations,
labels=_labels,
annotations=dict(_annotations),
Copy link

@mihaigalos mihaigalos Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_dict already returns a dict, no need for a typecast. Or?

Suggested change
annotations=dict(_annotations),
annotations=_annotations,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, actually it returns a generator object, which has to be unfolded into a dict before passing to V1ObjectMeta constructor. The type annotation of filter_dict is wrong. Nice catch! I'll fix that!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, of course. Makes sense.
I see you corrected the logic.

Any chance you can add some u-tests as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihaigalos working on it

labels=dict(_labels),
)


Expand Down
101 changes: 100 additions & 1 deletion src/tests/test_kubernetes_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import datetime
import logging
import unittest
from typing import Tuple, Callable, Union
from unittest.mock import Mock

from kubernetes.client import V1ObjectMeta
from kubernetes_utils import get_ns_list

from consts import CREATE_BY_ANNOTATION, LAST_SYNC_ANNOTATION, VERSION_ANNOTATION, BLACK_LISTED_ANNOTATIONS, \
BLACK_LISTED_LABELS, CREATE_BY_AUTHOR, CLUSTER_SECRET_LABEL
from kubernetes_utils import get_ns_list, create_secret_metadata
from os_utils import get_version

USER_NAMESPACE_COUNT = 10
initial_namespaces = ['default', 'kube-node-lease', 'kube-public', 'kube-system']
user_namespaces = [f'example-{index}' for index in range(USER_NAMESPACE_COUNT)]


def is_iso_format(date: str) -> bool:
"""check whether a date string parses correctly as ISO 8601 format"""
try:
datetime.datetime.fromisoformat(date)
return True
except (TypeError, ValueError):
return False


class TestClusterSecret(unittest.TestCase):

def test_get_ns_list(self):
Expand Down Expand Up @@ -69,3 +84,87 @@ def test_get_ns_list(self):
)),
msg=case['name'],
)

def test_create_secret_metadata(self) -> None:

expected_base_label_key = CLUSTER_SECRET_LABEL
expected_base_label_value = 'true'

# key, value pairs, where the value can be a string or validation function
expected_base_annotations: list[Tuple[str, Union[str, Callable[[str], bool]]]] = [
(CREATE_BY_ANNOTATION, CREATE_BY_AUTHOR),
(VERSION_ANNOTATION, get_version()),
# Since LAST_SYNC_ANNOTATION is a date string which isn't easily validated by string equality
# have the function 'is_iso_format' validate the value of this annotation.
(LAST_SYNC_ANNOTATION, is_iso_format)
]

attributes_black_lists = dict(
labels=BLACK_LISTED_LABELS,
annotations=BLACK_LISTED_ANNOTATIONS,
)

test_cases: list[Tuple[dict[str, str], dict[str, str]]] = [
# Annotations, Labels
(
{},
{}
),
(
{},
{"modifiedAt": "1692462880",
"name": "prometheus-operator",
"owner": "helm",
"status": "superseded",
"version": "1"}
),
(
{"managed-by": "argocd.argoproj.io"},
{"argocd.argoproj.io/secret-type": "repository"}
),
(
{"argocd.argoproj.io/compare-options": "ServerSideDiff=true",
"argocd.argoproj.io/sync-wave": "4"},
{"app.kubernetes.io/instance": "cluster-secret"}
)
]

for annotations, labels in test_cases:

subject: V1ObjectMeta = create_secret_metadata(
name='test_secret',
namespace='test_namespace',
annotations=annotations,
labels=labels
)

self.assertIsInstance(obj=subject, cls=V1ObjectMeta, msg='returned value has correct type')

for attribute, black_list in attributes_black_lists.items():
attribute_object = subject.__getattribute__(attribute)
self.assertIsNotNone(obj=attribute_object, msg=f'attribute "{attribute}" is not None')

for key in attribute_object.keys():
self.assertIsInstance(obj=key, cls=str, msg=f'the {attribute} key is a string')
for black_listed_label_prefix in black_list:
self.assertFalse(
expr=key.startswith(black_listed_label_prefix),
msg=f'{attribute} key does not match black listed prefix'
)

# This tells mypy that those attributes are not None
assert subject.labels is not None
assert subject.annotations is not None

self.assertEqual(
first=subject.labels[expected_base_label_key],
second=expected_base_label_value,
msg='expected base label is present'
)
for key, value_expectation in expected_base_annotations:
validator = value_expectation if callable(value_expectation) else value_expectation.__eq__
value = subject.annotations[key]
self.assertTrue(
expr=validator(value),
msg=f'expected base annotation with key {key} is present and its value {value} is as expected'
)
Loading