-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathpostgresql_provider.py
264 lines (223 loc) · 10 KB
/
postgresql_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
"""Postgres client relation hooks & helpers."""
import logging
from charms.data_platform_libs.v0.data_interfaces import (
DatabaseProvides,
DatabaseRequestedEvent,
)
from charms.postgresql_k8s.v0.postgresql import (
INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE,
PostgreSQLCreateDatabaseError,
PostgreSQLCreateUserError,
PostgreSQLDeleteUserError,
PostgreSQLGetPostgreSQLVersionError,
PostgreSQLListUsersError,
)
from ops.charm import CharmBase, RelationBrokenEvent, RelationChangedEvent
from ops.framework import Object
from ops.model import ActiveStatus, BlockedStatus, Relation
from constants import (
ALL_CLIENT_RELATIONS,
APP_SCOPE,
DATABASE_PORT,
ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE,
)
from utils import new_password
logger = logging.getLogger(__name__)
class PostgreSQLProvider(Object):
"""Defines functionality for the 'provides' side of the 'postgresql-client' relation.
Hook events observed:
- database-requested
- relation-broken
"""
def __init__(self, charm: CharmBase, relation_name: str = "database") -> None:
"""Constructor for PostgreSQLClientProvides object.
Args:
charm: the charm for which this relation is provided
relation_name: the name of the relation
"""
self.relation_name = relation_name
super().__init__(charm, self.relation_name)
self.framework.observe(
charm.on[self.relation_name].relation_broken, self._on_relation_broken
)
self.framework.observe(
charm.on[self.relation_name].relation_changed,
self._on_relation_changed_event,
)
self.charm = charm
# Charm events defined in the database provides charm library.
self.database_provides = DatabaseProvides(self.charm, relation_name=self.relation_name)
self.framework.observe(
self.database_provides.on.database_requested, self._on_database_requested
)
def _on_database_requested(self, event: DatabaseRequestedEvent) -> None:
"""Generate password and handle user and database creation for the related application."""
# Check for some conditions before trying to access the PostgreSQL instance.
if not self.charm.unit.is_leader():
return
if (
"cluster_initialised" not in self.charm._peers.data[self.charm.app]
or not self.charm._patroni.member_started
or not self.charm.primary_endpoint
):
event.defer()
logger.debug(
"Deferring on_database_requested: cluster not initialized, Patroni not started or primary endpoint not available"
)
return
# Retrieve the database name and extra user roles using the charm library.
database = event.database
extra_user_roles = event.extra_user_roles
try:
# Creates the user and the database for this specific relation.
user = f"relation-{event.relation.id}"
password = new_password()
self.charm.postgresql.create_user(user, password, extra_user_roles=extra_user_roles)
plugins = [
"_".join(plugin.split("_")[1:-1])
for plugin in self.charm.config.plugin_keys()
if self.charm.config[plugin]
]
self.charm.postgresql.create_database(
database, user, plugins=plugins, client_relations=self.charm.client_relations
)
# Share the credentials with the application.
self.database_provides.set_credentials(event.relation.id, user, password)
# Update the read/write and read-only endpoints.
self.update_endpoints(event)
# Set the database version.
self.database_provides.set_version(
event.relation.id, self.charm.postgresql.get_postgresql_version()
)
# Set the database name
self.database_provides.set_database(event.relation.id, database)
self._update_unit_status(event.relation)
except (
PostgreSQLCreateDatabaseError,
PostgreSQLCreateUserError,
PostgreSQLGetPostgreSQLVersionError,
) as e:
logger.exception(e)
self.charm.unit.status = BlockedStatus(
e.message
if issubclass(type(e), PostgreSQLCreateUserError) and e.message is not None
else f"Failed to initialize {self.relation_name} relation"
)
def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Correctly update the status."""
self._update_unit_status(event.relation)
def oversee_users(self) -> None:
"""Remove users from database if their relations were broken."""
if not self.charm.unit.is_leader():
return
delete_user = "suppress-oversee-users" not in self.charm.app_peer_data
# Retrieve database users.
try:
database_users = {
user for user in self.charm.postgresql.list_users() if user.startswith("relation-")
}
except PostgreSQLListUsersError:
return
# Retrieve the users from the active relations.
relations = [
relation
for relation_name, relations_list in self.model.relations.items()
for relation in relations_list
if relation_name in ALL_CLIENT_RELATIONS
]
relation_users = set()
for relation in relations:
username = f"relation-{relation.id}"
relation_users.add(username)
# Delete that users that exist in the database but not in the active relations.
for user in database_users - relation_users:
if delete_user:
try:
logger.info("Remove relation user: %s", user)
self.charm.set_secret(APP_SCOPE, user, None)
self.charm.set_secret(APP_SCOPE, f"{user}-database", None)
self.charm.postgresql.delete_user(user)
except PostgreSQLDeleteUserError:
logger.error("Failed to delete user %s", user)
else:
logger.info("Stale relation user detected: %s", user)
def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None:
"""Set the read/write and read-only endpoints."""
if not self.charm.unit.is_leader():
return
# Get the current relation or all the relations
# if this is triggered by another type of event.
relations = [event.relation] if event else self.model.relations[self.relation_name]
# If there are no replicas, remove the read-only endpoint.
replicas_endpoint = list(self.charm.members_ips - {self.charm.primary_endpoint})
replicas_endpoint.sort()
read_only_endpoints = (
",".join(f"{x}:{DATABASE_PORT}" for x in replicas_endpoint)
if len(replicas_endpoint) > 0
else ""
)
for relation in relations:
# Set the read/write endpoint.
self.database_provides.set_endpoints(
relation.id,
f"{self.charm.primary_endpoint}:{DATABASE_PORT}",
)
# Set the read-only endpoint.
self.database_provides.set_read_only_endpoints(
relation.id,
read_only_endpoints,
)
def _check_multiple_endpoints(self) -> bool:
"""Checks if there are relations with other endpoints."""
relation_names = {relation.name for relation in self.charm.client_relations}
if "database" in relation_names and len(relation_names) > 1:
return True
return False
def _update_unit_status(self, relation: Relation) -> None:
"""# Clean up Blocked status if it's due to extensions request."""
if (
self.charm.is_blocked
and self.charm.unit.status.message == INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE
):
if not self.check_for_invalid_extra_user_roles(relation.id):
self.charm.unit.status = ActiveStatus()
self._update_unit_status_on_blocking_endpoint_simultaneously()
def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the relation has changed."""
# Leader only
if not self.charm.unit.is_leader():
return
if self._check_multiple_endpoints():
self.charm.unit.status = BlockedStatus(ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE)
return
def _update_unit_status_on_blocking_endpoint_simultaneously(self):
"""Clean up Blocked status if this is due related of multiple endpoints."""
if (
self.charm.is_blocked
and self.charm.unit.status.message == ENDPOINT_SIMULTANEOUSLY_BLOCKING_MESSAGE
):
if not self._check_multiple_endpoints():
self.charm.unit.status = ActiveStatus()
def check_for_invalid_extra_user_roles(self, relation_id: int) -> bool:
"""Checks if there are relations with invalid extra user roles.
Args:
relation_id: current relation to be skipped.
"""
valid_privileges, valid_roles = self.charm.postgresql.list_valid_privileges_and_roles()
for relation in self.charm.model.relations.get(self.relation_name, []):
if relation.id == relation_id:
continue
for data in relation.data.values():
extra_user_roles = data.get("extra-user-roles")
if extra_user_roles is None:
break
extra_user_roles = extra_user_roles.lower().split(",")
for extra_user_role in extra_user_roles:
if (
extra_user_role not in valid_privileges
and extra_user_role not in valid_roles
):
return True
return False