Skip to content

Commit 076db0a

Browse files
Fix LaterGauge metrics to collect from all servers (#18751)
Fix `LaterGauge` metrics to collect from all servers Follow-up to #18714 Previously, our `LaterGauge` metrics did include the `server_name` label as expected but we were only seeing the last server being reported in some cases. Any `LaterGauge` that we were creating multiple times was only reporting the last instance. This PR updates all `LaterGauge` to be created once and then we use `LaterGauge.register_hook(...)` to add in the metric callback as before. This works now because we store a list of callbacks instead of just one. I noticed this problem thanks to some [tests in the Synapse Pro for Small Hosts](element-hq/synapse-small-hosts#173) repo that sanity check all metrics to ensure that we can see each metric includes data from multiple servers. ### Testing strategy 1. This is only noticeable when you run multiple Synapse instances in the same process. 1. TODO (see test that was added) ### Dev notes Previous non-global `LaterGauge`: ``` synapse_federation_send_queue_xxx synapse_federation_transaction_queue_pending_destinations synapse_federation_transaction_queue_pending_pdus synapse_federation_transaction_queue_pending_edus synapse_handlers_presence_user_to_current_state_size synapse_handlers_presence_wheel_timer_size synapse_notifier_listeners synapse_notifier_rooms synapse_notifier_users synapse_replication_tcp_resource_total_connections synapse_replication_tcp_command_queue synapse_background_update_status synapse_federation_known_servers synapse_scheduler_running_tasks ``` ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
1 parent c7762cd commit 076db0a

File tree

14 files changed

+241
-141
lines changed

14 files changed

+241
-141
lines changed

changelog.d/18751.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix `LaterGauge` metrics to collect from all servers.

synapse/federation/send_queue.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"""
3838

3939
import logging
40+
from enum import Enum
4041
from typing import (
4142
TYPE_CHECKING,
4243
Dict,
@@ -67,6 +68,25 @@
6768
logger = logging.getLogger(__name__)
6869

6970

71+
class QueueNames(str, Enum):
72+
PRESENCE_MAP = "presence_map"
73+
KEYED_EDU = "keyed_edu"
74+
KEYED_EDU_CHANGED = "keyed_edu_changed"
75+
EDUS = "edus"
76+
POS_TIME = "pos_time"
77+
PRESENCE_DESTINATIONS = "presence_destinations"
78+
79+
80+
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
81+
82+
for queue_name in QueueNames:
83+
queue_name_to_gauge_map[queue_name] = LaterGauge(
84+
name=f"synapse_federation_send_queue_{queue_name.value}_size",
85+
desc="",
86+
labelnames=[SERVER_NAME_LABEL],
87+
)
88+
89+
7090
class FederationRemoteSendQueue(AbstractFederationSender):
7191
"""A drop in replacement for FederationSender"""
7292

@@ -111,23 +131,15 @@ def __init__(self, hs: "HomeServer"):
111131
# we make a new function, so we need to make a new function so the inner
112132
# lambda binds to the queue rather than to the name of the queue which
113133
# changes. ARGH.
114-
def register(name: str, queue: Sized) -> None:
115-
LaterGauge(
116-
name="synapse_federation_send_queue_%s_size" % (queue_name,),
117-
desc="",
118-
labelnames=[SERVER_NAME_LABEL],
119-
caller=lambda: {(self.server_name,): len(queue)},
134+
def register(queue_name: QueueNames, queue: Sized) -> None:
135+
queue_name_to_gauge_map[queue_name].register_hook(
136+
lambda: {(self.server_name,): len(queue)}
120137
)
121138

122-
for queue_name in [
123-
"presence_map",
124-
"keyed_edu",
125-
"keyed_edu_changed",
126-
"edus",
127-
"pos_time",
128-
"presence_destinations",
129-
]:
130-
register(queue_name, getattr(self, queue_name))
139+
for queue_name in QueueNames:
140+
queue = getattr(self, queue_name.value)
141+
assert isinstance(queue, Sized)
142+
register(queue_name, queue=queue)
131143

132144
self.clock.looping_call(self._clear_queue, 30 * 1000)
133145

synapse/federation/sender/__init__.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,24 @@
199199
labelnames=[SERVER_NAME_LABEL],
200200
)
201201

202+
transaction_queue_pending_destinations_gauge = LaterGauge(
203+
name="synapse_federation_transaction_queue_pending_destinations",
204+
desc="",
205+
labelnames=[SERVER_NAME_LABEL],
206+
)
207+
208+
transaction_queue_pending_pdus_gauge = LaterGauge(
209+
name="synapse_federation_transaction_queue_pending_pdus",
210+
desc="",
211+
labelnames=[SERVER_NAME_LABEL],
212+
)
213+
214+
transaction_queue_pending_edus_gauge = LaterGauge(
215+
name="synapse_federation_transaction_queue_pending_edus",
216+
desc="",
217+
labelnames=[SERVER_NAME_LABEL],
218+
)
219+
202220
# Time (in s) to wait before trying to wake up destinations that have
203221
# catch-up outstanding.
204222
# Please note that rate limiting still applies, so while the loop is
@@ -398,38 +416,28 @@ def __init__(self, hs: "HomeServer"):
398416
# map from destination to PerDestinationQueue
399417
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
400418

401-
LaterGauge(
402-
name="synapse_federation_transaction_queue_pending_destinations",
403-
desc="",
404-
labelnames=[SERVER_NAME_LABEL],
405-
caller=lambda: {
419+
transaction_queue_pending_destinations_gauge.register_hook(
420+
lambda: {
406421
(self.server_name,): sum(
407422
1
408423
for d in self._per_destination_queues.values()
409424
if d.transmission_loop_running
410425
)
411-
},
426+
}
412427
)
413-
414-
LaterGauge(
415-
name="synapse_federation_transaction_queue_pending_pdus",
416-
desc="",
417-
labelnames=[SERVER_NAME_LABEL],
418-
caller=lambda: {
428+
transaction_queue_pending_pdus_gauge.register_hook(
429+
lambda: {
419430
(self.server_name,): sum(
420431
d.pending_pdu_count() for d in self._per_destination_queues.values()
421432
)
422-
},
433+
}
423434
)
424-
LaterGauge(
425-
name="synapse_federation_transaction_queue_pending_edus",
426-
desc="",
427-
labelnames=[SERVER_NAME_LABEL],
428-
caller=lambda: {
435+
transaction_queue_pending_edus_gauge.register_hook(
436+
lambda: {
429437
(self.server_name,): sum(
430438
d.pending_edu_count() for d in self._per_destination_queues.values()
431439
)
432-
},
440+
}
433441
)
434442

435443
self._is_processing = False

synapse/handlers/presence.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,18 @@
173173
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
174174
)
175175

176+
presence_user_to_current_state_size_gauge = LaterGauge(
177+
name="synapse_handlers_presence_user_to_current_state_size",
178+
desc="",
179+
labelnames=[SERVER_NAME_LABEL],
180+
)
181+
182+
presence_wheel_timer_size_gauge = LaterGauge(
183+
name="synapse_handlers_presence_wheel_timer_size",
184+
desc="",
185+
labelnames=[SERVER_NAME_LABEL],
186+
)
187+
176188
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
177189
# "currently_active"
178190
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -779,11 +791,8 @@ def __init__(self, hs: "HomeServer"):
779791
EduTypes.PRESENCE, self.incoming_presence
780792
)
781793

782-
LaterGauge(
783-
name="synapse_handlers_presence_user_to_current_state_size",
784-
desc="",
785-
labelnames=[SERVER_NAME_LABEL],
786-
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
794+
presence_user_to_current_state_size_gauge.register_hook(
795+
lambda: {(self.server_name,): len(self.user_to_current_state)}
787796
)
788797

789798
# The per-device presence state, maps user to devices to per-device presence state.
@@ -882,11 +891,8 @@ def __init__(self, hs: "HomeServer"):
882891
60 * 1000,
883892
)
884893

885-
LaterGauge(
886-
name="synapse_handlers_presence_wheel_timer_size",
887-
desc="",
888-
labelnames=[SERVER_NAME_LABEL],
889-
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
894+
presence_wheel_timer_size_gauge.register_hook(
895+
lambda: {(self.server_name,): len(self.wheel_timer)}
890896
)
891897

892898
# Used to handle sending of presence to newly joined users/servers

synapse/http/request_metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
164164
return counts
165165

166166

167-
LaterGauge(
167+
in_flight_requests = LaterGauge(
168168
name="synapse_http_server_in_flight_requests_count",
169169
desc="",
170170
labelnames=["method", "servlet", SERVER_NAME_LABEL],
171-
caller=_get_in_flight_counts,
172171
)
172+
in_flight_requests.register_hook(_get_in_flight_counts)
173173

174174

175175
class RequestMetrics:

synapse/metrics/__init__.py

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
Dict,
3232
Generic,
3333
Iterable,
34+
List,
3435
Mapping,
3536
Optional,
3637
Sequence,
@@ -73,8 +74,6 @@
7374

7475
METRICS_PREFIX = "/_synapse/metrics"
7576

76-
all_gauges: Dict[str, Collector] = {}
77-
7877
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
7978

8079
SERVER_NAME_LABEL = "server_name"
@@ -163,42 +162,47 @@ class LaterGauge(Collector):
163162
name: str
164163
desc: str
165164
labelnames: Optional[StrSequence] = attr.ib(hash=False)
166-
# callback: should either return a value (if there are no labels for this metric),
167-
# or dict mapping from a label tuple to a value
168-
caller: Callable[
169-
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
170-
]
165+
# List of callbacks: each callback should either return a value (if there are no
166+
# labels for this metric), or dict mapping from a label tuple to a value
167+
_hooks: List[
168+
Callable[
169+
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
170+
]
171+
] = attr.ib(factory=list, hash=False)
171172

172173
def collect(self) -> Iterable[Metric]:
173174
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
174175
# (we don't enforce it here, one level up).
175176
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
176177

177-
try:
178-
calls = self.caller()
179-
except Exception:
180-
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
181-
yield g
182-
return
178+
for hook in self._hooks:
179+
try:
180+
hook_result = hook()
181+
except Exception:
182+
logger.exception(
183+
"Exception running callback for LaterGauge(%s)", self.name
184+
)
185+
yield g
186+
return
187+
188+
if isinstance(hook_result, (int, float)):
189+
g.add_metric([], hook_result)
190+
else:
191+
for k, v in hook_result.items():
192+
g.add_metric(k, v)
183193

184-
if isinstance(calls, (int, float)):
185-
g.add_metric([], calls)
186-
else:
187-
for k, v in calls.items():
188-
g.add_metric(k, v)
194+
yield g
189195

190-
yield g
196+
def register_hook(
197+
self,
198+
hook: Callable[
199+
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
200+
],
201+
) -> None:
202+
self._hooks.append(hook)
191203

192204
def __attrs_post_init__(self) -> None:
193-
self._register()
194-
195-
def _register(self) -> None:
196-
if self.name in all_gauges.keys():
197-
logger.warning("%s already registered, reregistering", self.name)
198-
REGISTRY.unregister(all_gauges.pop(self.name))
199-
200205
REGISTRY.register(self)
201-
all_gauges[self.name] = self
202206

203207

204208
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -250,7 +254,7 @@ def __init__(
250254
# Protects access to _registrations
251255
self._lock = threading.Lock()
252256

253-
self._register_with_collector()
257+
REGISTRY.register(self)
254258

255259
def register(
256260
self,
@@ -341,14 +345,6 @@ def collect(self) -> Iterable[Metric]:
341345
gauge.add_metric(labels=key, value=getattr(metrics, name))
342346
yield gauge
343347

344-
def _register_with_collector(self) -> None:
345-
if self.name in all_gauges.keys():
346-
logger.warning("%s already registered, reregistering", self.name)
347-
REGISTRY.unregister(all_gauges.pop(self.name))
348-
349-
REGISTRY.register(self)
350-
all_gauges[self.name] = self
351-
352348

353349
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
354350
"""

synapse/notifier.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,24 @@
8686
labelnames=["stream", SERVER_NAME_LABEL],
8787
)
8888

89+
90+
notifier_listeners_gauge = LaterGauge(
91+
name="synapse_notifier_listeners",
92+
desc="",
93+
labelnames=[SERVER_NAME_LABEL],
94+
)
95+
96+
notifier_rooms_gauge = LaterGauge(
97+
name="synapse_notifier_rooms",
98+
desc="",
99+
labelnames=[SERVER_NAME_LABEL],
100+
)
101+
notifier_users_gauge = LaterGauge(
102+
name="synapse_notifier_users",
103+
desc="",
104+
labelnames=[SERVER_NAME_LABEL],
105+
)
106+
89107
T = TypeVar("T")
90108

91109

@@ -281,28 +299,16 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]:
281299
)
282300
}
283301

284-
LaterGauge(
285-
name="synapse_notifier_listeners",
286-
desc="",
287-
labelnames=[SERVER_NAME_LABEL],
288-
caller=count_listeners,
289-
)
290-
291-
LaterGauge(
292-
name="synapse_notifier_rooms",
293-
desc="",
294-
labelnames=[SERVER_NAME_LABEL],
295-
caller=lambda: {
302+
notifier_listeners_gauge.register_hook(count_listeners)
303+
notifier_rooms_gauge.register_hook(
304+
lambda: {
296305
(self.server_name,): count(
297306
bool, list(self.room_to_user_streams.values())
298307
)
299-
},
308+
}
300309
)
301-
LaterGauge(
302-
name="synapse_notifier_users",
303-
desc="",
304-
labelnames=[SERVER_NAME_LABEL],
305-
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
310+
notifier_users_gauge.register_hook(
311+
lambda: {(self.server_name,): len(self.user_to_user_stream)}
306312
)
307313

308314
def add_replication_callback(self, cb: Callable[[], None]) -> None:

0 commit comments

Comments
 (0)