Skip to content

[DPE-6874] Check other units when getting the primary endpoint #808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
26 changes: 1 addition & 25 deletions .github/renovate.json5
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,15 @@
reviewers: [
'team:data-platform-postgresql',
],
"baseBranches": ["main", "/^*\\/edge$/"],
packageRules: [
{
matchPackageNames: [
'pydantic',
],
allowedVersions: '<2.0.0',
},
{
matchManagers: [
'custom.regex',
],
matchDepNames: [
'juju',
],
matchDatasources: [
'pypi',
],
allowedVersions: '<3',
groupName: 'Juju agents',
},
],
customManagers: [
{
customType: 'regex',
fileMatch: [
'^\\.github/workflows/[^/]+\\.ya?ml$',
],
matchStrings: [
'(libjuju: )==(?<currentValue>.*?) +# renovate: latest libjuju 2',
],
depNameTemplate: 'juju',
datasourceTemplate: 'pypi',
versioningTemplate: 'loose',
},
],
}
1 change: 1 addition & 0 deletions .github/workflows/check_pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
- edited
branches:
- main
- '*/edge'

jobs:
check-pr:
Expand Down
42 changes: 18 additions & 24 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,30 +385,22 @@ def primary_endpoint(self) -> str | None:
logger.debug("primary endpoint early exit: Peer relation not joined yet.")
return None
try:
for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)):
with attempt:
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
# (like when the cluster was not updated yet after a failed switchover).
if not primary_endpoint or primary_endpoint not in self._units_ips:
# TODO figure out why peer data is not available
if (
primary_endpoint
and len(self._units_ips) == 1
and len(self._peers.units) > 1
):
logger.warning(
"Possibly incoplete peer data: Will not map primary IP to unit IP"
)
return primary_endpoint
logger.debug(
"primary endpoint early exit: Primary IP not in cached peer list."
)
primary_endpoint = None
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
# (like when the cluster was not updated yet after a failed switchover).
if not primary_endpoint or primary_endpoint not in self._units_ips:
# TODO figure out why peer data is not available
if primary_endpoint and len(self._units_ips) == 1 and len(self._peers.units) > 1:
logger.warning(
"Possibly incoplete peer data: Will not map primary IP to unit IP"
)
return primary_endpoint
logger.debug("primary endpoint early exit: Primary IP not in cached peer list.")
primary_endpoint = None
except RetryError:
return None
else:
Expand Down Expand Up @@ -952,6 +944,8 @@ def _units_ips(self) -> set[str]:
# Get all members IPs and remove the current unit IP from the list.
addresses = {self._get_unit_ip(unit) for unit in self._peers.units}
addresses.add(self._unit_ip)
if None in addresses:
addresses.remove(None)
return addresses

@property
Expand Down
74 changes: 37 additions & 37 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ops import BlockedStatus
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import (
AttemptManager,
RetryError,
Retrying,
retry,
Expand Down Expand Up @@ -233,11 +232,12 @@ def get_member_ip(self, member_name: str) -> str:
IP address of the cluster member.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
attempts = 2 * len(self.peers_ips) + 2
urls = self._get_alternative_patroni_url()
for attempt in Retrying(stop=stop_after_attempt(attempts)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
f"{next(urls)}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
Expand All @@ -257,11 +257,12 @@ def get_member_status(self, member_name: str) -> str:
couldn't be retrieved yet.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
attempts = 2 * len(self.peers_ips) + 2
urls = self._get_alternative_patroni_url()
for attempt in Retrying(stop=stop_after_attempt(attempts)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
f"{next(urls)}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
Expand All @@ -284,11 +285,15 @@ def get_primary(
primary pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
if alternative_endpoints:
attempts = 2 * len(alternative_endpoints) + 2
else:
attempts = 2 * len(self.peers_ips) + 2
urls = self._get_alternative_patroni_url(alternative_endpoints)
for attempt in Retrying(stop=stop_after_attempt(attempts)):
with attempt:
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
f"{next(urls)}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
Expand All @@ -314,11 +319,12 @@ def get_standby_leader(
standby leader pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
attempts = 2 * len(self.peers_ips) + 2
urls = self._get_alternative_patroni_url()
for attempt in Retrying(stop=stop_after_attempt(attempts)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
f"{next(urls)}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
Expand All @@ -338,11 +344,12 @@ def get_sync_standby_names(self) -> list[str]:
"""Get the list of sync standby unit names."""
sync_standbys = []
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
attempts = 2 * len(self.peers_ips) + 2
urls = self._get_alternative_patroni_url()
for attempt in Retrying(stop=stop_after_attempt(attempts)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
r = requests.get(
f"{url}/cluster",
f"{next(urls)}/cluster",
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
Expand All @@ -352,33 +359,26 @@ def get_sync_standby_names(self) -> list[str]:
sync_standbys.append("/".join(member["name"].rsplit("-", 1)))
return sync_standbys

def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: list[str] | None = None
) -> str:
def _get_alternative_patroni_url(self, alternative_endpoints: list[str] | None = None) -> str:
"""Get an alternative REST API URL from another member each time.

When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
)
attempt_number = attempt.retry_state.attempt_number
if attempt_number > 1:
url = self._patroni_url
# Build the URL using http and later using https for each peer.
if (attempt_number - 1) <= len(self.peers_ips):
url = url.replace("https://", "http://")
unit_number = attempt_number - 2
else:
url = url.replace("http://", "https://")
unit_number = attempt_number - 2 - len(self.peers_ips)
other_unit_ip = list(self.peers_ips)[unit_number]
url = url.replace(self.unit_ip, other_unit_ip)
if not alternative_endpoints:
alternative_endpoints = list(self.peers_ips)
urls = [self._patroni_url]
urls += [
self._patroni_url.replace(self.unit_ip, endpoint) for endpoint in alternative_endpoints
]
if self.tls_enabled:
default_schema = "https"
new_schema = "http"
else:
url = self._patroni_url
return url
default_schema = "http"
new_schema = "https"
urls += [url.replace(default_schema, new_schema) for url in urls]
yield from urls

def are_all_members_ready(self) -> bool:
"""Check if all members are correctly running Patroni and PostgreSQL.
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ def test_patroni_scrape_config_tls(harness):

def test_primary_endpoint(harness):
with (
patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay,
patch("charm.wait_fixed", new_callable=PropertyMock) as _wait_fixed,
patch(
"charm.PostgresqlOperatorCharm._units_ips",
new_callable=PropertyMock,
Expand All @@ -190,10 +188,6 @@ def test_primary_endpoint(harness):
_patroni.return_value.get_primary.return_value = sentinel.primary
assert harness.charm.primary_endpoint == "1.1.1.1"

# Check needed to ensure a fast charm deployment.
_stop_after_delay.assert_called_once_with(5)
_wait_fixed.assert_called_once_with(3)

_patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary)
_patroni.return_value.get_primary.assert_called_once_with()

Expand Down
Loading
Loading