Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions hindsight-api-slim/hindsight_api/worker/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,21 @@ async def _log_progress_if_due(self):
SUM(CASE WHEN task_payload IS NULL THEN 1 ELSE 0 END) AS payload_null,
SUM(CASE WHEN next_retry_at IS NOT NULL AND next_retry_at > now()
THEN 1 ELSE 0 END) AS retry_blocked,
SUM(CASE WHEN worker_id IS NOT NULL THEN 1 ELSE 0 END) AS assigned
SUM(CASE WHEN worker_id IS NOT NULL THEN 1 ELSE 0 END) AS assigned,
-- bank_id is NOT NULL (schema), so this IN-set
-- test is the exact negation of the claim
-- query's ``bank_id != ALL(busy_bank_ids)`` with
-- busy_bank_ids = the same DISTINCT processing set.
SUM(CASE WHEN operation_type = 'consolidation'
AND task_payload IS NOT NULL
AND worker_id IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= now())
AND bank_id IN (
SELECT bank_id FROM {table} busy
WHERE busy.operation_type = 'consolidation'
AND busy.status = 'processing'
)
THEN 1 ELSE 0 END) AS bank_serialized
FROM {table}
WHERE status = 'pending'
GROUP BY operation_type
Expand All @@ -1102,12 +1116,14 @@ async def _log_progress_if_due(self):
for br in breakdown_rows:
op_type = br["operation_type"] or "unknown"
bucket = pending_breakdown.setdefault(
op_type, {"total": 0, "payload_null": 0, "retry_blocked": 0, "assigned": 0}
op_type,
{"total": 0, "payload_null": 0, "retry_blocked": 0, "assigned": 0, "bank_serialized": 0},
)
bucket["total"] += br["total"]
bucket["payload_null"] += br["payload_null"]
bucket["retry_blocked"] += br["retry_blocked"]
bucket["assigned"] += br["assigned"]
bucket["bank_serialized"] += br["bank_serialized"]
global_pending += br["total"]

try:
Expand Down Expand Up @@ -1225,15 +1241,42 @@ def _format_pool_stats(self) -> str:
logger.debug(f"Pool stats unavailable: {e}")
return "unavailable"

@staticmethod
def _claimable_from_bucket(b: dict[str, int]) -> int:
"""Residual pending rows that pass *every* claim predicate.

``bank_serialized`` is the consolidation-specific exclusion the other
buckets miss: a pending consolidation op whose bank already has a
``status='processing'`` consolidation is filtered out by the claim
query (``bank_id != ALL(busy_bank_ids)``) and can never be picked up
until that bank frees. Counting it keeps ``claimable`` honest — without
it a permanently bank-serialized op shows up as a phantom
``claimable=1`` while nothing is ever claimed (#2359). The bucket is
defined as a subset of the otherwise-claimable rows, so the
subtraction stays non-negative; ``max(0, ...)`` is a belt-and-braces
floor against schema/portability surprises.
"""
return max(
0,
b["total"]
- b["payload_null"]
- b["retry_blocked"]
- b["assigned"]
- b.get("bank_serialized", 0),
)

def _log_pending_breakdown(self, breakdown: dict[str, dict[str, int]]) -> None:
"""Emit one [PENDING_BREAKDOWN] line bucketing pending rows by claimability.

Each bucket mirrors a predicate in the claim query:
* payload_null - row has no task_payload (e.g. batch_retain parent
whose reconciliation never fired); claim query
skips it forever
* retry_blocked - next_retry_at is still in the future
* assigned - worker_id already set; another worker owns it
* payload_null - row has no task_payload (e.g. batch_retain parent
whose reconciliation never fired); claim query
skips it forever
* retry_blocked - next_retry_at is still in the future
* assigned - worker_id already set; another worker owns it
* bank_serialized - pending consolidation op whose bank already has a
'processing' consolidation; the claim query
serializes it out until that bank frees (#2359)

``claimable`` is the residual that *should* be picked up on the next
poll. If ``claimable > 0`` while workers report free slots, the bug is
Expand All @@ -1246,11 +1289,11 @@ def _log_pending_breakdown(self, breakdown: dict[str, dict[str, int]]) -> None:
parts = []
for op_type in sorted(breakdown):
b = breakdown[op_type]
claimable = b["total"] - b["payload_null"] - b["retry_blocked"] - b["assigned"]
claimable = self._claimable_from_bucket(b)
parts.append(
f"{op_type}: total={b['total']} claimable={claimable} "
f"payload_null={b['payload_null']} retry_blocked={b['retry_blocked']} "
f"assigned={b['assigned']}"
f"assigned={b['assigned']} bank_serialized={b.get('bank_serialized', 0)}"
)
logger.info(f"[PENDING_BREAKDOWN] {' | '.join(parts)}")

Expand Down
56 changes: 56 additions & 0 deletions hindsight-api-slim/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3747,3 +3747,59 @@ async def test_priority_respects_bank_serialization(self, pool, backend, clean_o
high_pending_op,
)
assert row["status"] == "pending"


class TestPendingBreakdownClaimable:
"""_claimable_from_bucket must not report a bank-serialized consolidation op
as claimable — that phantom is exactly what made #2359 look like
'claimable=1 assigned=0' while the worker never claimed anything. No DB.
"""

def test_bank_serialized_consolidation_is_not_claimable(self):
from hindsight_api.worker import WorkerPoller

# One stuck-bank pending op (bank_serialized), one payload_null parent,
# one genuinely free op -> only the free op is claimable.
bucket = {
"total": 3,
"payload_null": 1,
"retry_blocked": 0,
"assigned": 0,
"bank_serialized": 1,
}
assert WorkerPoller._claimable_from_bucket(bucket) == 1

def test_single_deadlocked_op_reports_zero_claimable(self):
from hindsight_api.worker import WorkerPoller

# The reporter's exact shape: a lone pending consolidation whose bank is
# serialized by a dead worker's 'processing' op. Old formula said
# claimable=1 (phantom); now it is correctly 0.
bucket = {
"total": 1,
"payload_null": 0,
"retry_blocked": 0,
"assigned": 0,
"bank_serialized": 1,
}
assert WorkerPoller._claimable_from_bucket(bucket) == 0

def test_no_bank_serialized_key_preserves_legacy_formula(self):
from hindsight_api.worker import WorkerPoller

# Buckets without the new key (e.g. non-consolidation op types) must
# behave exactly as before: total - payload_null - retry_blocked - assigned.
bucket = {"total": 5, "payload_null": 1, "retry_blocked": 1, "assigned": 1}
assert WorkerPoller._claimable_from_bucket(bucket) == 2

def test_claimable_is_floored_at_zero(self):
from hindsight_api.worker import WorkerPoller

bucket = {
"total": 1,
"payload_null": 1,
"retry_blocked": 0,
"assigned": 0,
"bank_serialized": 1,
}
assert WorkerPoller._claimable_from_bucket(bucket) == 0
Loading