-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathctx_util.py
47 lines (37 loc) · 1.35 KB
/
ctx_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import functools
import os
import watchdog.events
import watchdog.observers.polling
import ci.util
import secret_mgmt
class FileChangeEventHandler(watchdog.events.FileSystemEventHandler):
def dispatch(self, event):
# Clear cache so that the next time the cfg factory is needed it is
# created using the new cfg
secret_factory.cache_clear()
@functools.cache
def watch_for_file_changes(
path: str,
event_handler: FileChangeEventHandler=None,
):
if not event_handler:
event_handler = FileChangeEventHandler()
observer = watchdog.observers.polling.PollingObserver(timeout=60)
observer.schedule(event_handler, path)
observer.start()
@functools.cache
def secret_factory() -> secret_mgmt.SecretFactory:
# secret factory creation from k8s secrets
if path := os.environ.get('SECRET_FACTORY_PATH'):
watch_for_file_changes(path)
return secret_mgmt.SecretFactory.from_dir(
secrets_dir=path,
)
# fallback: use cfg factory and convert it to secret factory structure
# this is handy for local development where the cfg-factory is available
try:
return secret_mgmt.SecretFactory.from_cfg_factory(
cfg_factory=ci.util.ctx().cfg_factory(),
)
except ValueError:
return secret_mgmt.SecretFactory(secrets_dict={}) # no secrets found