diff --git a/hindsight-api-slim/hindsight_api/worker/poller.py b/hindsight-api-slim/hindsight_api/worker/poller.py index 73d09161f..05479c8a0 100644 --- a/hindsight-api-slim/hindsight_api/worker/poller.py +++ b/hindsight-api-slim/hindsight_api/worker/poller.py @@ -1090,7 +1090,21 @@ async def _log_progress_if_due(self): SUM(CASE WHEN task_payload IS NULL THEN 1 ELSE 0 END) AS payload_null, SUM(CASE WHEN next_retry_at IS NOT NULL AND next_retry_at > now() THEN 1 ELSE 0 END) AS retry_blocked, - SUM(CASE WHEN worker_id IS NOT NULL THEN 1 ELSE 0 END) AS assigned + SUM(CASE WHEN worker_id IS NOT NULL THEN 1 ELSE 0 END) AS assigned, + -- bank_id is NOT NULL (schema), so this IN-set + -- test is the exact negation of the claim + -- query's ``bank_id != ALL(busy_bank_ids)`` with + -- busy_bank_ids = the same DISTINCT processing set. + SUM(CASE WHEN operation_type = 'consolidation' + AND task_payload IS NOT NULL + AND worker_id IS NULL + AND (next_retry_at IS NULL OR next_retry_at <= now()) + AND bank_id IN ( + SELECT bank_id FROM {table} busy + WHERE busy.operation_type = 'consolidation' + AND busy.status = 'processing' + ) + THEN 1 ELSE 0 END) AS bank_serialized FROM {table} WHERE status = 'pending' GROUP BY operation_type @@ -1102,12 +1116,14 @@ async def _log_progress_if_due(self): for br in breakdown_rows: op_type = br["operation_type"] or "unknown" bucket = pending_breakdown.setdefault( - op_type, {"total": 0, "payload_null": 0, "retry_blocked": 0, "assigned": 0} + op_type, + {"total": 0, "payload_null": 0, "retry_blocked": 0, "assigned": 0, "bank_serialized": 0}, ) bucket["total"] += br["total"] bucket["payload_null"] += br["payload_null"] bucket["retry_blocked"] += br["retry_blocked"] bucket["assigned"] += br["assigned"] + bucket["bank_serialized"] += br["bank_serialized"] global_pending += br["total"] try: @@ -1225,15 +1241,42 @@ def _format_pool_stats(self) -> str: logger.debug(f"Pool stats unavailable: {e}") return "unavailable" + @staticmethod + def _claimable_from_bucket(b: dict[str, int]) -> int: + """Residual pending rows that pass *every* claim predicate. + + ``bank_serialized`` is the consolidation-specific exclusion the other + buckets miss: a pending consolidation op whose bank already has a + ``status='processing'`` consolidation is filtered out by the claim + query (``bank_id != ALL(busy_bank_ids)``) and can never be picked up + until that bank frees. Counting it keeps ``claimable`` honest — without + it a permanently bank-serialized op shows up as a phantom + ``claimable=1`` while nothing is ever claimed (#2359). The bucket is + defined as a subset of the otherwise-claimable rows, so the + subtraction stays non-negative; ``max(0, ...)`` is a belt-and-braces + floor against schema/portability surprises. + """ + return max( + 0, + b["total"] + - b["payload_null"] + - b["retry_blocked"] + - b["assigned"] + - b.get("bank_serialized", 0), + ) + def _log_pending_breakdown(self, breakdown: dict[str, dict[str, int]]) -> None: """Emit one [PENDING_BREAKDOWN] line bucketing pending rows by claimability. Each bucket mirrors a predicate in the claim query: - * payload_null - row has no task_payload (e.g. batch_retain parent - whose reconciliation never fired); claim query - skips it forever - * retry_blocked - next_retry_at is still in the future - * assigned - worker_id already set; another worker owns it + * payload_null - row has no task_payload (e.g. batch_retain parent + whose reconciliation never fired); claim query + skips it forever + * retry_blocked - next_retry_at is still in the future + * assigned - worker_id already set; another worker owns it + * bank_serialized - pending consolidation op whose bank already has a + 'processing' consolidation; the claim query + serializes it out until that bank frees (#2359) ``claimable`` is the residual that *should* be picked up on the next poll. If ``claimable > 0`` while workers report free slots, the bug is @@ -1246,11 +1289,11 @@ def _log_pending_breakdown(self, breakdown: dict[str, dict[str, int]]) -> None: parts = [] for op_type in sorted(breakdown): b = breakdown[op_type] - claimable = b["total"] - b["payload_null"] - b["retry_blocked"] - b["assigned"] + claimable = self._claimable_from_bucket(b) parts.append( f"{op_type}: total={b['total']} claimable={claimable} " f"payload_null={b['payload_null']} retry_blocked={b['retry_blocked']} " - f"assigned={b['assigned']}" + f"assigned={b['assigned']} bank_serialized={b.get('bank_serialized', 0)}" ) logger.info(f"[PENDING_BREAKDOWN] {' | '.join(parts)}") diff --git a/hindsight-api-slim/tests/test_worker.py b/hindsight-api-slim/tests/test_worker.py index 11433ed64..7f4915dbe 100644 --- a/hindsight-api-slim/tests/test_worker.py +++ b/hindsight-api-slim/tests/test_worker.py @@ -3747,3 +3747,59 @@ async def test_priority_respects_bank_serialization(self, pool, backend, clean_o high_pending_op, ) assert row["status"] == "pending" + + +class TestPendingBreakdownClaimable: + """_claimable_from_bucket must not report a bank-serialized consolidation op + as claimable — that phantom is exactly what made #2359 look like + 'claimable=1 assigned=0' while the worker never claimed anything. No DB. + """ + + def test_bank_serialized_consolidation_is_not_claimable(self): + from hindsight_api.worker import WorkerPoller + + # One stuck-bank pending op (bank_serialized), one payload_null parent, + # one genuinely free op -> only the free op is claimable. + bucket = { + "total": 3, + "payload_null": 1, + "retry_blocked": 0, + "assigned": 0, + "bank_serialized": 1, + } + assert WorkerPoller._claimable_from_bucket(bucket) == 1 + + def test_single_deadlocked_op_reports_zero_claimable(self): + from hindsight_api.worker import WorkerPoller + + # The reporter's exact shape: a lone pending consolidation whose bank is + # serialized by a dead worker's 'processing' op. Old formula said + # claimable=1 (phantom); now it is correctly 0. + bucket = { + "total": 1, + "payload_null": 0, + "retry_blocked": 0, + "assigned": 0, + "bank_serialized": 1, + } + assert WorkerPoller._claimable_from_bucket(bucket) == 0 + + def test_no_bank_serialized_key_preserves_legacy_formula(self): + from hindsight_api.worker import WorkerPoller + + # Buckets without the new key (e.g. non-consolidation op types) must + # behave exactly as before: total - payload_null - retry_blocked - assigned. + bucket = {"total": 5, "payload_null": 1, "retry_blocked": 1, "assigned": 1} + assert WorkerPoller._claimable_from_bucket(bucket) == 2 + + def test_claimable_is_floored_at_zero(self): + from hindsight_api.worker import WorkerPoller + + bucket = { + "total": 1, + "payload_null": 1, + "retry_blocked": 0, + "assigned": 0, + "bank_serialized": 1, + } + assert WorkerPoller._claimable_from_bucket(bucket) == 0