From f1b76fa03fd2b6461d51bd53db35f633bd44ae8e Mon Sep 17 00:00:00 2001 From: Jon Durbin Date: Tue, 3 Mar 2026 06:22:34 -0500 Subject: [PATCH 1/5] Bonus for solo instance of public TEE chutes to encourage keeping them running. --- api/constants.py | 3 ++ api/miner/router.py | 18 +++++----- chute_autoscaler.py | 88 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 100 insertions(+), 9 deletions(-) diff --git a/api/constants.py b/api/constants.py index 983d1d9f..80f083ae 100644 --- a/api/constants.py +++ b/api/constants.py @@ -98,6 +98,9 @@ class NoncePurpose(str, Enum): # TEE bonus. TEE_BONUS = 2.25 +# Last-instance bonus for public TEE chutes (sole active instance gets this multiplier). +LAST_PUBLIC_TEE_INSTANCE_BONUS = 2.0 + # Duration for instance disablement when consecutive errors are hit (increases linearly until max). INSTANCE_DISABLE_BASE_TIMEOUT = 90 diff --git a/api/miner/router.py b/api/miner/router.py index 57e344a5..fa2c7577 100644 --- a/api/miner/router.py +++ b/api/miner/router.py @@ -265,14 +265,16 @@ async def list_active_instances( """ query = text(""" SELECT - instance_id, - miner_hotkey, - chute_id, - activated_at, - COALESCE(compute_multiplier, 1.0) as compute_multiplier - FROM instances - WHERE active = true - AND verified = true + i.instance_id, + i.miner_hotkey, + i.chute_id, + i.activated_at, + COALESCE(ich.compute_multiplier, i.compute_multiplier, 1.0) as compute_multiplier + FROM instances i + LEFT JOIN instance_compute_history ich + ON ich.instance_id = i.instance_id AND ich.ended_at IS NULL + WHERE i.active = true + AND i.verified = true """) result = await session.execute(query) return [ diff --git a/chute_autoscaler.py b/chute_autoscaler.py index 10cae99c..4427034b 100644 --- a/chute_autoscaler.py +++ b/chute_autoscaler.py @@ -915,6 +915,83 @@ async def refresh_instance_compute_multipliers(chute_ids: List[str] = None): else: logger.info("No compute_multiplier updates needed") + # Last-instance bonus for public TEE chutes. + # For each public+tee chute, the expected ICH value is: + # instances.compute_multiplier * LAST_PUBLIC_TEE_INSTANCE_BONUS (if sole active instance) + # instances.compute_multiplier (if >1 active instances) + # Compare against the actual open ICH record and update only when they differ. + from api.constants import LAST_PUBLIC_TEE_INSTANCE_BONUS + + # Track the latest compute_multiplier for each instance (may have been updated above). + updated_multipliers = {iid: mult for iid, mult in updates} + + # Group instances by chute for public+tee chutes. + pub_tee_instances = defaultdict(list) + for inst in instances: + chute = chutes.get(inst.chute_id) + if chute and chute.public and chute.tee: + pub_tee_instances[inst.chute_id].append(inst) + + if pub_tee_instances: + # Batch-load current open ICH values for all relevant instances. + all_pub_tee_instance_ids = [ + inst.instance_id for inst_list in pub_tee_instances.values() for inst in inst_list + ] + ich_result = await session.execute( + text(""" + SELECT instance_id, compute_multiplier + FROM instance_compute_history + WHERE instance_id = ANY(:ids) AND ended_at IS NULL + """), + {"ids": all_pub_tee_instance_ids}, + ) + current_ich = {row.instance_id: float(row.compute_multiplier) for row in ich_result} + + ich_updates = 0 + for chute_id, chute_instances in pub_tee_instances.items(): + is_sole = len(chute_instances) == 1 + for inst in chute_instances: + # Use the latest compute_multiplier (from refresh above, or original if not updated). + base = float( + updated_multipliers.get(inst.instance_id, inst.compute_multiplier) or 1.0 + ) + # Thrash-penalized instances never receive the bonus — force back to base + # so any previously-applied bonus gets reconciled away. + is_thrashing = inst.instance_id in thrash_penalty_instances + expected = ( + base * LAST_PUBLIC_TEE_INSTANCE_BONUS + if is_sole and not is_thrashing + else base + ) + actual = current_ich.get(inst.instance_id) + + if actual is not None and abs(actual - expected) > 0.01: + await session.execute( + text(""" + WITH closed AS ( + UPDATE instance_compute_history + SET ended_at = NOW() + WHERE instance_id = :instance_id AND ended_at IS NULL + RETURNING instance_id + ) + INSERT INTO instance_compute_history (instance_id, compute_multiplier, started_at) + VALUES (:instance_id, :expected, NOW()) + """), + {"instance_id": inst.instance_id, "expected": expected}, + ) + ich_updates += 1 + action = "applied" if is_sole else "removed" + logger.info( + f"Last-instance bonus {action} for {inst.instance_id} on chute {chute_id}: " + f"ICH {actual:.2f} -> {expected:.2f}" + ) + + if ich_updates: + await session.commit() + logger.success( + f"Updated ICH for {ich_updates} instances (last-instance bonus adjustments)" + ) + async def _log_thrashing_instances(): """ @@ -1995,7 +2072,12 @@ def is_rl_significant(rl_count: float, completed_count: float) -> bool: # Calculate effective compute multiplier for each chute (for CSV export and logging) # This mirrors the logic in api/chute/util.py:calculate_effective_compute_multiplier # but uses ctx.boost (which may not be saved to DB yet in dry-run mode) - from api.constants import PRIVATE_INSTANCE_BONUS, INTEGRATED_SUBNET_BONUS, TEE_BONUS + from api.constants import ( + PRIVATE_INSTANCE_BONUS, + INTEGRATED_SUBNET_BONUS, + TEE_BONUS, + LAST_PUBLIC_TEE_INSTANCE_BONUS, + ) from api.chute.util import INTEGRATED_SUBNETS for ctx in contexts.values(): @@ -2036,6 +2118,10 @@ def is_rl_significant(rl_count: float, completed_count: float) -> bool: if ctx.tee: total *= TEE_BONUS + # Last-instance bonus (sole active instance of public TEE chute) + if ctx.public and ctx.tee and ctx.current_count == 1: + total *= LAST_PUBLIC_TEE_INSTANCE_BONUS + ctx.effective_multiplier = total ctx.cm_delta_ratio = total / base_mult if base_mult > 0 else 1.0 From 511bb8fe8e0d164267c3532165fd8e147d0a118f Mon Sep 17 00:00:00 2001 From: Jon Durbin Date: Tue, 3 Mar 2026 07:00:47 -0500 Subject: [PATCH 2/5] stale version cleanup --- chute_autoscaler.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/chute_autoscaler.py b/chute_autoscaler.py index 4427034b..e8352655 100644 --- a/chute_autoscaler.py +++ b/chute_autoscaler.py @@ -1240,6 +1240,50 @@ async def manage_rolling_updates( await notify_deleted(instance, message=reason) await invalidate_instance_cache(instance.chute_id, instance_id=instance.instance_id) + # Cleanup orphaned version-mismatched instances that have no rolling update record. + # This catches instances that slip through when a rolling update record is deleted + # before all old-version instances are cleaned up. + orphaned = ( + await session.execute( + text(""" + SELECT i.instance_id, i.chute_id, i.version, c.version AS current_version + FROM instances i + JOIN chutes c ON c.chute_id = i.chute_id + WHERE i.active = true + AND i.version != c.version + AND NOT EXISTS ( + SELECT 1 FROM rolling_updates ru WHERE ru.chute_id = i.chute_id + ) + """) + ) + ).fetchall() + + if orphaned: + orphan_ids = [row.instance_id for row in orphaned] + logger.warning( + f"Found {len(orphan_ids)} orphaned version-mismatched instances with no rolling update, purging: " + f"{[(row.instance_id, row.version, row.current_version) for row in orphaned]}" + ) + await session.execute( + text( + "UPDATE instance_audit SET deletion_reason = :reason, valid_termination = true " + "WHERE instance_id = ANY(:ids)" + ), + {"reason": "Orphaned version mismatch (no rolling update)", "ids": orphan_ids}, + ) + # Load and delete instances individually so the delete trigger fires for ICH. + orphan_instances = ( + await session.execute( + select(Instance).where(Instance.instance_id.in_(orphan_ids)) + ) + ).scalars().all() + for instance in orphan_instances: + await session.delete(instance) + await session.commit() + for instance in orphan_instances: + await notify_deleted(instance, message="Orphaned version mismatch (no rolling update)") + await invalidate_instance_cache(instance.chute_id, instance_id=instance.instance_id) + async def query_prometheus_batch( queries: Dict[str, str], prometheus_url: str = PROMETHEUS_URL From 7b85ed3c6886bec04142c10ef5c6b4372d02d1f3 Mon Sep 17 00:00:00 2001 From: Jon Durbin Date: Tue, 3 Mar 2026 07:02:18 -0500 Subject: [PATCH 3/5] lint --- chute_autoscaler.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/chute_autoscaler.py b/chute_autoscaler.py index e8352655..856a71c8 100644 --- a/chute_autoscaler.py +++ b/chute_autoscaler.py @@ -1273,15 +1273,21 @@ async def manage_rolling_updates( ) # Load and delete instances individually so the delete trigger fires for ICH. orphan_instances = ( - await session.execute( - select(Instance).where(Instance.instance_id.in_(orphan_ids)) + ( + await session.execute( + select(Instance).where(Instance.instance_id.in_(orphan_ids)) + ) ) - ).scalars().all() + .scalars() + .all() + ) for instance in orphan_instances: await session.delete(instance) await session.commit() for instance in orphan_instances: - await notify_deleted(instance, message="Orphaned version mismatch (no rolling update)") + await notify_deleted( + instance, message="Orphaned version mismatch (no rolling update)" + ) await invalidate_instance_cache(instance.chute_id, instance_id=instance.instance_id) From ddb64aba8b9ae66611b2f441b264f1deeb33094e Mon Sep 17 00:00:00 2001 From: Jon Durbin Date: Wed, 4 Mar 2026 08:32:08 -0500 Subject: [PATCH 4/5] Fix sub revenue query for non-standard quota values. --- ...918104132_per_day_private_instance_rev.sql | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/api/migrations/20250918104132_per_day_private_instance_rev.sql b/api/migrations/20250918104132_per_day_private_instance_rev.sql index d61e32f3..21efcd79 100644 --- a/api/migrations/20250918104132_per_day_private_instance_rev.sql +++ b/api/migrations/20250918104132_per_day_private_instance_rev.sql @@ -31,68 +31,68 @@ instance_daily_costs AS ( -- Calculate hours for this specific day CASE -- Instance runs through the entire day - WHEN DATE(i.activated_at) < d.date - AND DATE(COALESCE(i.stop_billing_at, NOW())) > d.date + WHEN DATE(i.activated_at) < d.date + AND DATE(COALESCE(i.stop_billing_at, NOW())) > d.date THEN 24.0 - + -- Instance starts and ends on the same day - WHEN DATE(i.activated_at) = d.date + WHEN DATE(i.activated_at) = d.date AND DATE(COALESCE(i.stop_billing_at, NOW())) = d.date THEN EXTRACT(EPOCH FROM ( COALESCE(i.stop_billing_at, NOW()) - i.activated_at )) / 3600.0 - + -- Instance starts on this day but continues - WHEN DATE(i.activated_at) = d.date + WHEN DATE(i.activated_at) = d.date AND DATE(COALESCE(i.stop_billing_at, NOW())) > d.date THEN EXTRACT(EPOCH FROM ( DATE_TRUNC('day', i.activated_at) + INTERVAL '1 day' - i.activated_at )) / 3600.0 - + -- Instance ends on this day - WHEN DATE(i.activated_at) < d.date + WHEN DATE(i.activated_at) < d.date AND DATE(COALESCE(i.stop_billing_at, NOW())) = d.date THEN EXTRACT(EPOCH FROM ( COALESCE(i.stop_billing_at, NOW()) - DATE_TRUNC('day', COALESCE(i.stop_billing_at, NOW())) )) / 3600.0 - + ELSE 0 END AS hours_on_day, - + -- Calculate the revenue for this day CASE -- Instance runs through the entire day - WHEN DATE(i.activated_at) < d.date - AND DATE(COALESCE(i.stop_billing_at, NOW())) > d.date + WHEN DATE(i.activated_at) < d.date + AND DATE(COALESCE(i.stop_billing_at, NOW())) > d.date THEN 24.0 * i.hourly_rate - + -- Instance starts and ends on the same day - WHEN DATE(i.activated_at) = d.date + WHEN DATE(i.activated_at) = d.date AND DATE(COALESCE(i.stop_billing_at, NOW())) = d.date THEN EXTRACT(EPOCH FROM ( COALESCE(i.stop_billing_at, NOW()) - i.activated_at )) / 3600.0 * i.hourly_rate - + -- Instance starts on this day but continues - WHEN DATE(i.activated_at) = d.date + WHEN DATE(i.activated_at) = d.date AND DATE(COALESCE(i.stop_billing_at, NOW())) > d.date THEN EXTRACT(EPOCH FROM ( DATE_TRUNC('day', i.activated_at) + INTERVAL '1 day' - i.activated_at )) / 3600.0 * i.hourly_rate - + -- Instance ends on this day - WHEN DATE(i.activated_at) < d.date + WHEN DATE(i.activated_at) < d.date AND DATE(COALESCE(i.stop_billing_at, NOW())) = d.date THEN EXTRACT(EPOCH FROM ( COALESCE(i.stop_billing_at, NOW()) - DATE_TRUNC('day', COALESCE(i.stop_billing_at, NOW())) )) / 3600.0 * i.hourly_rate - + ELSE 0 END AS daily_revenue - + FROM date_series d CROSS JOIN instance_audit i - WHERE + WHERE -- Only include instances that are billed and not deleted i.billed_to IS NOT NULL AND i.deleted_at IS NULL @@ -128,7 +128,7 @@ FROM ( sum(case when quota = 300 then 3 when quota = 2000 then 10 - else 20 + when quota = 5000 then 20 end) as new_subscriber_revenue FROM invocation_quotas WHERE quota > 200 From d050eb4f3d4b649333c1e0f2610b7533288ac7d6 Mon Sep 17 00:00:00 2001 From: Jon Durbin Date: Wed, 4 Mar 2026 13:31:51 -0500 Subject: [PATCH 5/5] Redis failover. --- api/config/__init__.py | 14 +++- api/safe_redis.py | 153 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 162 insertions(+), 5 deletions(-) diff --git a/api/config/__init__.py b/api/config/__init__.py index 9fa8a795..db7257d8 100644 --- a/api/config/__init__.py +++ b/api/config/__init__.py @@ -143,7 +143,7 @@ async def s3_client(self): # Base redis settings. redis_host: str = Field( - default="172.16.0.100", + default="172.16.0.22", validation_alias="PRIMARY_REDIS_HOST", ) redis_port: int = Field( @@ -158,6 +158,8 @@ async def s3_client(self): redis_op_timeout: float = float( os.getenv("REDIS_OP_TIMEOUT", os.getenv("REDIS_SOCKET_TIMEOUT", "2.5")) ) + redis_fallback_host: Optional[str] = os.getenv("REDIS_FALLBACK_HOST", "172.16.0.23") + redis_primary_probe_interval: float = float(os.getenv("REDIS_PRIMARY_PROBE_INTERVAL", "30.0")) _redis_client: Optional[redis.Redis] = None _lite_redis_client: Optional[redis.Redis] = None @@ -172,6 +174,12 @@ async def s3_client(self): def redis_url(self) -> str: return f"redis://:{self.redis_password}@{self.redis_host}:{self.redis_port}/{self.redis_db}" + def _safe_redis_common_kwargs(self) -> dict: + return dict( + fallback_host=self.redis_fallback_host, + primary_probe_interval=self.redis_primary_probe_interval, + ) + @property def redis_client(self) -> redis.Redis: if self._redis_client is None: @@ -188,6 +196,7 @@ def redis_client(self) -> redis.Redis: health_check_interval=30, retry_on_timeout=True, retry=Retry(ConstantBackoff(0.5), 2), + **self._safe_redis_common_kwargs(), ) return self._redis_client @@ -207,6 +216,7 @@ def lite_redis_client(self) -> redis.Redis: health_check_interval=30, retry_on_timeout=True, retry=Retry(ConstantBackoff(0.5), 2), + **self._safe_redis_common_kwargs(), ) return self._lite_redis_client @@ -226,6 +236,7 @@ def billing_redis_client(self) -> redis.Redis: health_check_interval=30, retry_on_timeout=True, retry=Retry(ConstantBackoff(0.5), 2), + **self._safe_redis_common_kwargs(), ) return self._billing_redis_client @@ -246,6 +257,7 @@ def cm_redis_client(self) -> list[redis.Redis]: health_check_interval=30, retry_on_timeout=True, retry=Retry(ConstantBackoff(0.5), 2), + **self._safe_redis_common_kwargs(), ) for idx in range(self.cm_redis_shard_count) ] diff --git a/api/safe_redis.py b/api/safe_redis.py index a0d9e5ca..c0d14003 100644 --- a/api/safe_redis.py +++ b/api/safe_redis.py @@ -1,6 +1,7 @@ import socket import asyncio import inspect +import time import traceback import concurrent.futures from typing import Any, Optional @@ -70,8 +71,9 @@ def pool_stats(pool) -> str: return "pool_stats_unavailable" -def wrap_pipeline(pipe, default=None, timeout: float = 0.5): - """Make pipeline.execute() fail-open.""" +def wrap_pipeline(pipe, default=None, timeout: float = 0.5, owner=None): + """Make pipeline.execute() fail-open. If owner (SafeRedis) is provided, + pipeline failures/successes contribute to failover tracking.""" loop = asyncio.get_running_loop() start = loop.time() orig_execute = pipe.execute @@ -81,13 +83,19 @@ async def safe_execute(*args, **kwargs): try: value = await asyncio.wait_for(asyncio.shield(task), timeout) elapsed = loop.time() - start + if owner is not None: + owner._record_success() if elapsed > 0.25: logger.debug(f"SafeRedis: slow pipleine elapsed={elapsed * 1000:.1f}ms") return value except asyncio.TimeoutError: task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) + if owner is not None: + owner._record_failure() logger.error("SafeRedis: pipeline.execute fail-open wait_for asyncio.TimeoutError") except FAIL_OPEN_EXCEPTIONS as exc: + if owner is not None: + owner._record_failure() error_detail = str(exc) if not error_detail.strip(): error_detail = traceback.format_exc() @@ -161,11 +169,25 @@ def __init__( health_check_interval: int = 30, retry_on_timeout: bool = False, retry: Any = None, + consecutive_failure_limit: int = 5, + pool_reset_cooldown: float = 5.0, + fallback_host: Optional[str] = None, + primary_probe_interval: float = 30.0, **kwargs, ): self.default = default self.timeout = op_timeout - self.client = redis.Redis( + self._consecutive_failures = 0 + self._consecutive_failure_limit = consecutive_failure_limit + self._pool_reset_cooldown = pool_reset_cooldown + self._last_pool_reset: float = 0.0 + self._primary_host = host + self._fallback_host = fallback_host + self._active_host = host + self._on_primary = True + self._primary_probe_interval = primary_probe_interval + self._primary_probe_task: Optional[asyncio.Task] = None + self._redis_kwargs = dict( host=host, port=port, db=db, @@ -179,12 +201,131 @@ def __init__( retry=retry, **kwargs, ) + self.client = redis.Redis(**self._redis_kwargs) + + def _record_success(self): + if self._consecutive_failures > 0: + logger.info( + f"SafeRedis: connection recovered on host={self._active_host} " + f"after {self._consecutive_failures} consecutive failures" + ) + self._consecutive_failures = 0 + + def _record_failure(self): + self._consecutive_failures += 1 + if self._consecutive_failures >= self._consecutive_failure_limit: + self._maybe_reset_pool() + + def _maybe_reset_pool(self): + now = time.monotonic() + if now - self._last_pool_reset < self._pool_reset_cooldown: + return + self._last_pool_reset = now + port = self._redis_kwargs.get("port", "?") + db = self._redis_kwargs.get("db", "?") + + # Decide which host to reconnect to. + if self._on_primary and self._fallback_host: + new_host = self._fallback_host + self._on_primary = False + logger.warning( + f"SafeRedis: {self._consecutive_failures} consecutive failures on " + f"primary={self._primary_host} port={port} db={db}, " + f"failing over to fallback={new_host}" + ) + elif not self._on_primary: + # Already on fallback and still failing — try primary again. + new_host = self._primary_host + self._on_primary = True + logger.warning( + f"SafeRedis: {self._consecutive_failures} consecutive failures on " + f"fallback={self._active_host} port={port} db={db}, " + f"trying primary={new_host} again" + ) + else: + # No fallback configured, just reset the pool on the same host. + new_host = self._active_host + logger.warning( + f"SafeRedis: {self._consecutive_failures} consecutive failures on " + f"host={self._active_host} port={port} db={db}, resetting connection pool" + ) + + self._active_host = new_host + try: + old_pool = self.client.connection_pool + self.client = redis.Redis(**{**self._redis_kwargs, "host": new_host}) + asyncio.ensure_future(self._close_old_pool(old_pool)) + except Exception: + logger.error(f"SafeRedis: pool reset failed: {traceback.format_exc()}") + self._consecutive_failures = 0 + + # If we just moved off primary, start probing for primary recovery. + if not self._on_primary and self._fallback_host: + self._start_primary_probe() + + def _start_primary_probe(self): + if self._primary_probe_task and not self._primary_probe_task.done(): + return + self._primary_probe_task = asyncio.ensure_future(self._probe_primary_loop()) + + async def _probe_primary_loop(self): + port = self._redis_kwargs.get("port", "?") + db = self._redis_kwargs.get("db", "?") + logger.info( + f"SafeRedis: starting primary probe for {self._primary_host}:{port} db={db} " + f"every {self._primary_probe_interval}s" + ) + while not self._on_primary: + await asyncio.sleep(self._primary_probe_interval) + if self._on_primary: + break + probe = None + try: + probe = redis.Redis( + **{ + **self._redis_kwargs, + "host": self._primary_host, + "max_connections": 1, + } + ) + pong = await asyncio.wait_for(probe.ping(), timeout=2.0) + if pong: + logger.info( + f"SafeRedis: primary {self._primary_host}:{port} db={db} is back, switching over" + ) + old_pool = self.client.connection_pool + self._active_host = self._primary_host + self._on_primary = True + self.client = redis.Redis(**{**self._redis_kwargs, "host": self._primary_host}) + asyncio.ensure_future(self._close_old_pool(old_pool)) + self._consecutive_failures = 0 + break + except Exception: + logger.debug( + f"SafeRedis: primary probe {self._primary_host}:{port} still unreachable" + ) + finally: + if probe is not None: + try: + await probe.aclose() + except Exception: + pass + logger.info(f"SafeRedis: primary probe loop ended, on_primary={self._on_primary}") + + @staticmethod + async def _close_old_pool(pool): + try: + await pool.disconnect() + except Exception: + pass async def get_with_status(self, key): try: result = await self.client.get(key) + self._record_success() return True, result except FAIL_OPEN_EXCEPTIONS as exc: + self._record_failure() error_detail = str(exc) if not error_detail.strip(): error_detail = traceback.format_exc() @@ -203,6 +344,7 @@ def wrapper(*args, **kwargs): try: result = attr(*args, **kwargs) except FAIL_OPEN_EXCEPTIONS as exc: + self._record_failure() error_detail = str(exc) if not error_detail.strip(): error_detail = traceback.format_exc() @@ -212,7 +354,7 @@ def wrapper(*args, **kwargs): return self.default if is_pipeline(result): - return wrap_pipeline(result, self.default, timeout=self.timeout * 3) + return wrap_pipeline(result, self.default, timeout=self.timeout * 3, owner=self) if inspect.isawaitable(result): @@ -224,6 +366,7 @@ async def safe_coro(): try: value = await asyncio.wait_for(asyncio.shield(task), timeout) elapsed = loop.time() - start + self._record_success() if elapsed > 0.25: logger.debug( f"SafeRedis: slow call {name} elapsed={elapsed * 1000:.1f}ms " @@ -232,6 +375,7 @@ async def safe_coro(): return value except asyncio.TimeoutError: elapsed = loop.time() - start + self._record_failure() task.add_done_callback( lambda t: t.exception() if not t.cancelled() else None ) @@ -242,6 +386,7 @@ async def safe_coro(): return self.default except FAIL_OPEN_EXCEPTIONS as exc: elapsed = loop.time() - start + self._record_failure() error_detail = str(exc) if not error_detail.strip(): error_detail = traceback.format_exc()