Skip to content

Commit

Permalink
consolidate-locks: extract lock logic into decorators (#140)
Browse files Browse the repository at this point in the history
* consolidate-locks: extract lock logic into decorators

* consolidate-locks: move default back to 60
  • Loading branch information
emmeowzing authored May 31, 2024
1 parent 5e27cdb commit e81ead7
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 115 deletions.
12 changes: 6 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 6 additions & 109 deletions src/passoperator/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
"""


from typing import Any, List, Tuple
from typing import Any
from pathlib import Path
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from importlib import metadata
from kubernetes import client, config
from http import HTTPStatus
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import sleep

from passoperator.git import pull, clone
from passoperator.utils import LogLevel
from passoperator.secret import PassSecret, ManagedSecret
from passoperator.locks import lock_passsecret
from passoperator import env

import asyncio
Expand All @@ -28,81 +28,6 @@

log = logging.getLogger(__name__)

__in_progress_queue: List[Tuple[Any, Any]] = []


def _passsecret_block(body: kopf.Body) -> None:
"""
Block handlers' progress on a PassSecret until it's safe to modify the managed secret.
Decryption takes time, so we want to be sure to queue up any changes.
Args:
body [kopf.Body]: raw body of the PassSecret.
Raises:
kopf.TemporaryError: if the PassSecret is already blocked.
"""
log.debug(f'Blocking PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}')

if _is_passsecret_blocked(body):
raise kopf.TemporaryError(f'PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}" is already blocked.')

__in_progress_queue.append(
(
body['metadata']['name'],
body['metadata']['namespace']
)
)


def _is_passsecret_blocked(body: kopf.Body) -> bool:
"""
Check if a PassSecret is blocked from modification.
Args:
body [kopf.Body]: raw body of the PassSecret.
Returns:
bool: True if the PassSecret is blocked, else False.
"""
return (
body['metadata']['name'],
body['metadata']['namespace']
) in __in_progress_queue


def _block_passsecret_block(body: kopf.Body) -> None:
"""
Block handlers' progress on a PassSecret until it's safe to modify the managed secret.
This should only ever be called by event handlers, not by the reconciliation loop.
Args:
body (kopf.Body): raw body of the PassSecret.
"""
while True:
if not _is_passsecret_blocked(body):
_passsecret_block(body)
break
else:
log.debug(f'PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}" is already blocked.')
sleep(0.5)


def _lift_passsecret_block(body: kopf.Body) -> None:
"""
Unblock handlers' progress to modify the managed secret.
Args:
body [kopf.Body]: raw body of the PassSecret.
"""
log.debug(f'Lifting block on PassSecret "{body["metadata"]["name"]}" in namespace "{body["metadata"]["namespace"]}"')
__in_progress_queue.remove(
(
body['metadata']['name'],
body['metadata']['namespace']
)
)


@kopf.on.startup()
def start(settings: kopf.OperatorSettings, **_: Any) -> None:
Expand All @@ -123,6 +48,7 @@ def start(settings: kopf.OperatorSettings, **_: Any) -> None:
initial_delay=float(env['OPERATOR_INITIAL_DELAY']),
# Don't delay if the prior reconciliation hasn't completed.
sharp=True)
@lock_passsecret(exit_early=True)
def reconciliation(body: kopf.Body, **_: Any) -> None:
"""
Reconcile state of a managed secret against the pass store. Update secrets' data if a mismatch
Expand All @@ -133,11 +59,6 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
body [kopf.Body]: raw body of the PassSecret.
"""

# Before we parse and decrypt anything, which is more expensive, check if this PassSecret is already in progress.
if _is_passsecret_blocked(body):
log.info(f'PassSecret "{body["metadata"]["name"]}" is still in progress. Skipping.')
return None

# Ensure the GPG key ID in ~/.password-store/${PASS_DIRECTORY}/.gpg-id did not change with the git update.
check_gpg_id(
path=f'{env["PASS_DIRECTORY"]}/.gpg-id'
Expand All @@ -146,7 +67,6 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
v1 = client.CoreV1Api()

# Create a new PassSecret object with an up-to-date managedSecret decrypted value from the pass store.
_passsecret_block(body)
passSecretObj = PassSecret.from_kopf(body)

log.info(
Expand All @@ -166,7 +86,6 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
# submit a patch request to update it.
if not _managedSecret.data_equals(passSecretObj.spec.managedSecret):
if _managedSecret.immutable:
_lift_passsecret_block(body)
raise kopf.TemporaryError(
f'PassSecret "{passSecretObj.metadata.name}" managed secret "{passSecretObj.spec.managedSecret.metadata.name}" is immutable. Ignoring data patch.'
)
Expand All @@ -193,10 +112,7 @@ def reconciliation(body: kopf.Body, **_: Any) -> None:
)
)
else:
_lift_passsecret_block(body)
raise kopf.PermanentError(e)
finally:
_lift_passsecret_block(body)


# @kopf.on.cleanup()
Expand Down Expand Up @@ -238,6 +154,7 @@ def lookup_managing_passsecret(managedSecretName: str) -> PassSecret | None:


@kopf.on.update('secrets.premiscale.com', 'v1alpha1', 'passsecret')
@lock_passsecret()
def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf.Meta, body: kopf.Body, **_: Any) -> None:
"""
An update was received on the PassSecret object, so attempt to update the corresponding Secret.
Expand All @@ -257,10 +174,6 @@ def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf.
}
}

# If a reconciliation is already in progress for the triggered PassSecret, block this event handler
# until it's safe to modify the managed secret.
_block_passsecret_block(body)

# Parse the old PassSecret manifest.
try:
oldPassSecret = PassSecret.from_kopf(
Expand All @@ -270,7 +183,6 @@ def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf.
}
)
except (ValueError, KeyError) as e:
_lift_passsecret_block(body)
raise kopf.PermanentError(e)

# Parse the new PassSecret manifest.
Expand All @@ -282,7 +194,6 @@ def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf.
}
)
except (ValueError, KeyError) as e:
_lift_passsecret_block(body)
raise kopf.PermanentError(e)

v1 = client.CoreV1Api()
Expand Down Expand Up @@ -317,11 +228,10 @@ def update(old: kopf.BodyEssence | Any, new: kopf.BodyEssence | Any, meta: kopf.
)
except client.ApiException as e:
raise kopf.PermanentError(e)
finally:
_lift_passsecret_block(body)


@kopf.on.create('secrets.premiscale.com', 'v1alpha1', 'passsecret')
@lock_passsecret()
def create(body: kopf.Body, **_: Any) -> None:
"""
Create a new Secret with the spec of the newly-created PassSecret.
Expand All @@ -330,14 +240,9 @@ def create(body: kopf.Body, **_: Any) -> None:
body [kopf.Body]: raw body of the created PassSecret.
"""

# If a reconciliation is already in progress for the triggered PassSecret, block this event handler
# until it's safe to modify the managed secret.
_block_passsecret_block(body)

try:
passSecretObj = PassSecret.from_kopf(body)
except (ValueError, KeyError) as e:
_lift_passsecret_block(body)
raise kopf.PermanentError(e)

log.info(f'PassSecret "{passSecretObj.metadata.name}" created')
Expand All @@ -360,11 +265,10 @@ def create(body: kopf.Body, **_: Any) -> None:
raise kopf.TemporaryError(f'Duplicate PassSecret "{passSecretObj.metadata.name}" managed Secret "{passSecretObj.spec.managedSecret.metadata.name}" in Namespace "{passSecretObj.spec.managedSecret.metadata.namespace}". Skipping.')

raise kopf.PermanentError(e)
finally:
_lift_passsecret_block(body)


@kopf.on.delete('secrets.premiscale.com', 'v1alpha1', 'passsecret')
@lock_passsecret()
def delete(body: kopf.Body, **_: Any) -> None:
"""
Remove a managed secret, as the managing PassSecret has been deleted.
Expand All @@ -373,14 +277,9 @@ def delete(body: kopf.Body, **_: Any) -> None:
body [kopf.Body]: raw body of the deleted PassSecret.
"""

# If a reconciliation is already in progress for the triggered PassSecret, block this event handler
# until it's safe to modify the managed secret.
_block_passsecret_block(body)

try:
passSecretObj = PassSecret.from_kopf(body)
except (ValueError, KeyError) as e:
_lift_passsecret_block(body)
raise kopf.PermanentError(e)

log.info(f'PassSecret "{passSecretObj.metadata.name}" deleted')
Expand All @@ -397,8 +296,6 @@ def delete(body: kopf.Body, **_: Any) -> None:
if e.status == HTTPStatus.NOT_FOUND:
log.warning(f'PassSecret "{passSecretObj.metadata.name}" managed Secret "{passSecretObj.spec.managedSecret.metadata.name}" was not found. Skipping.')
raise kopf.PermanentError(e)
finally:
_lift_passsecret_block(body)


def check_gpg_id(path: Path | str, remove: bool =False) -> None:
Expand Down
Loading

0 comments on commit e81ead7

Please sign in to comment.