From 5de12288f30e0a71e8b5374e8ed4e4ef49657cfc Mon Sep 17 00:00:00 2001 From: Joe Nudell Date: Wed, 20 May 2026 15:41:49 -0400 Subject: [PATCH] improve error handling and robustness --- app/server/handlers/redaction.py | 12 +- app/server/tasks/finalize.py | 48 +++++++- app/server/tasks/format.py | 62 +++++++++- conftest.py | 19 +++ tests/unit/test_callback.py | 5 - tests/unit/test_case_helper.py | 93 ++++++++++++++- tests/unit/test_finalize.py | 196 ++++++++++++++++++++++++++++++- tests/unit/test_format.py | 64 ++++++++++ 8 files changed, 478 insertions(+), 21 deletions(-) diff --git a/app/server/handlers/redaction.py b/app/server/handlers/redaction.py index de07d4d..5587aeb 100644 --- a/app/server/handlers/redaction.py +++ b/app/server/handlers/redaction.py @@ -305,9 +305,17 @@ async def _get_doc_result( doc = await store.get_result_doc(doc_id) if not doc: errors = getattr(final_task_result, "errors", []) + # If we get here without any recorded errors, the pipeline + # finished cleanly but the result key is no longer present + # in Redis -- almost always because it expired or was + # evicted between completion and this poll. Make that + # explicit so the caller knows to resubmit rather than + # treat this as a silent pipeline bug. error_message = ( - "Redaction job completed but document " - "is missing and no specific errors were recorded." + "Redaction completed, but the redacted document is no " + "longer available in the result store. The result has " + "likely expired or been evicted from cache; please " + "resubmit the redaction request." ) if errors: error_message = str(errors) diff --git a/app/server/tasks/finalize.py b/app/server/tasks/finalize.py index d11d022..87029ea 100644 --- a/app/server/tasks/finalize.py +++ b/app/server/tasks/finalize.py @@ -13,7 +13,7 @@ from ..config import config from ..db import DocumentStatus from ..generated.models import OutputFormat, RedactionTarget -from .callback import CallbackTaskResult +from .callback import CallbackTaskResult, get_result_sync from .metrics import ( celery_counters, record_task_failure, @@ -68,7 +68,45 @@ def finalize( """Finalize the redaction process.""" format_result = callback_result.formatted - celery_counters.record_job(bool(format_result.errors)) + # Start from whatever the upstream pipeline reported. + final_errors: list[ProcessingError] = list(format_result.errors) + + # If the pipeline thinks it succeeded, verify that the redacted document + # is actually retrievable from the result store. The chain can be + # "successful" while the result doc is absent (e.g. a Redis write that + # silently failed earlier, or the key was evicted under memory pressure + # before finalize ran). Surfacing this here keeps the experiments DB and + # the poll API in agreement: both will report ERROR with a clear cause + # instead of disagreeing about whether the document exists. + if not final_errors: + try: + doc = get_result_sync( + format_result.jurisdiction_id, + format_result.case_id, + format_result.document_id, + ) + except Exception as e: + logger.exception("Failed to verify presence of redaction result in store") + final_errors.append( + ProcessingError.from_exception("finalize.verify_result", e) + ) + else: + if doc is None: + final_errors.append( + ProcessingError( + message=( + "Redaction pipeline reported success but the " + "redacted document was not found in the result " + "store. The result may have failed to persist " + "or has been evicted/expired from cache before " + "finalize ran." + ), + task="finalize.verify_result", + exception="MissingResultDocument", + ) + ) + + celery_counters.record_job(bool(final_errors)) if config.experiments.enabled: with config.experiments.store.driver.sync_session() as session: @@ -76,8 +114,8 @@ def finalize( jurisdiction_id=format_result.jurisdiction_id, case_id=format_result.case_id, document_id=format_result.document_id, - status="ERROR" if format_result.errors else "COMPLETE", - error=format_errors(format_result.errors), + status="ERROR" if final_errors else "COMPLETE", + error=format_errors(final_errors), ) session.add(status) session.commit() @@ -116,7 +154,7 @@ def finalize( jurisdiction_id=format_result.jurisdiction_id, case_id=format_result.case_id, document_id=format_result.document_id, - errors=format_result.errors, + errors=final_errors, next_task_id=str(next_task) if next_task else None, ) diff --git a/app/server/tasks/format.py b/app/server/tasks/format.py index 7c68b24..679e464 100644 --- a/app/server/tasks/format.py +++ b/app/server/tasks/format.py @@ -47,6 +47,15 @@ class FormatTaskResult(BaseModel): errors: list[ProcessingError] = [] +class ResultStoreWriteError(Exception): + """Raised when persisting the formatted result to the result store fails. + + Wraps the underlying exception so the format task can distinguish a + failure to *write* the result from other format-time failures and surface + a more specific ProcessingError after retries are exhausted. + """ + + register_type(FormatTask) register_type(FormatTaskResult) @@ -98,12 +107,22 @@ def format( else: document = format_document(params, redact_result) - save_result_sync( - redact_result.jurisdiction_id, - redact_result.case_id, - redact_result.document_id, - document, - ) + try: + save_result_sync( + redact_result.jurisdiction_id, + redact_result.case_id, + redact_result.document_id, + document, + ) + except Exception as save_exc: + # Persisting the formatted result to the result store is the step + # that determines whether downstream consumers can actually find + # the document, so we tag this failure separately. Wrapping the + # original exception keeps the traceback intact while making the + # final ProcessingError unambiguous. + raise ResultStoreWriteError( + "Failed to persist redacted document to the result store" + ) from save_exc return FormatTaskResult( jurisdiction_id=redact_result.jurisdiction_id, @@ -111,6 +130,37 @@ def format( document_id=redact_result.document_id, errors=redact_result.errors, ) + except ResultStoreWriteError as e: + # Transient Redis hiccups are common, so we still retry. After + # exhausting retries, surface a ProcessingError tagged with the + # specific subsystem that failed (rather than a generic "format" + # error) so operators can tell write failures apart from + # rendering/encoding bugs. + cause = e.__cause__ + # `__cause__` is typed as ``BaseException | None``; in practice the + # ``raise ... from save_exc`` site always chains an ``Exception``, + # but narrow defensively so mypy and ``ProcessingError.from_exception`` + # agree, and fall back to ``e`` if anything ever chains a + # ``BaseException`` (e.g. ``SystemExit``). + underlying: Exception = cause if isinstance(cause, Exception) else e + if format.request.retries < format.max_retries: + logger.warning( + f"Failed to save format result: {underlying}, will be retried." + ) + raise format.retry(exc=underlying) from e + logger.error( + f"Exhausted retries saving format result for {redact_result.document_id}" + ) + logger.exception(underlying) + return FormatTaskResult( + jurisdiction_id=redact_result.jurisdiction_id, + case_id=redact_result.case_id, + document_id=redact_result.document_id, + errors=[ + *redact_result.errors, + ProcessingError.from_exception("format.save_result", underlying), + ], + ) except Exception as e: if format.request.retries < format.max_retries: logger.warning(f"Format task failed: {e}, will be retried.") diff --git a/conftest.py b/conftest.py index 56e6430..8fef225 100644 --- a/conftest.py +++ b/conftest.py @@ -54,6 +54,25 @@ """ +@pytest.fixture(scope="session", autouse=True) +def _init_celery_counters() -> None: + """Initialize the Celery custom metrics counters for the test session. + + In production these counters are wired up by the ``worker_process_init`` + signal handler in ``app.server.tasks.queue`` when a Celery worker + process boots. Tests execute tasks eagerly via ``task.s(...).apply()`` + and never start a real worker process, so that signal never fires -- + which leaves ``celery_counters`` with no ``task_complete_counter`` + attribute and causes any task that triggers ``on_failure`` / + ``on_success`` to blow up with ``AttributeError`` deep inside Celery's + tracer. Initialize the counters once here so every test sees a + consistent, fully-wired metrics surface. + """ + from app.server.tasks.metrics import celery_counters + + celery_counters.init() + + @pytest.fixture def logger() -> logging.Logger: """Logging for tests.""" diff --git a/tests/unit/test_callback.py b/tests/unit/test_callback.py index a35239e..5c6ba1c 100644 --- a/tests/unit/test_callback.py +++ b/tests/unit/test_callback.py @@ -13,7 +13,6 @@ ProcessingError, callback, ) -from app.server.tasks.metrics import celery_counters def test_callback_no_callback_no_error(): @@ -31,7 +30,6 @@ def test_callback_no_callback_no_error(): ), ) - celery_counters.init() cb = CallbackTask(callback_url=None) result = callback.s(fmt_result, cb).apply() @@ -53,7 +51,6 @@ def test_callback_no_callback_with_error(): document=None, ) - celery_counters.init() cb = CallbackTask(callback_url=None) result = callback.s(fmt_result, cb).apply() @@ -107,7 +104,6 @@ def test_callback_with_callback_no_error(fake_redis_store: FakeRedis): ], ) - celery_counters.init() cb = CallbackTask(callback_url="http://callback.test.local") result = callback.s(fmt_result, cb).apply() @@ -156,7 +152,6 @@ def test_callback_with_callback_with_error(fake_redis_store: FakeRedis): ], ) - celery_counters.init() cb = CallbackTask(callback_url="http://callback.test.local") result = callback.s(fmt_result, cb).apply() diff --git a/tests/unit/test_case_helper.py b/tests/unit/test_case_helper.py index 73e015c..6180c69 100644 --- a/tests/unit/test_case_helper.py +++ b/tests/unit/test_case_helper.py @@ -1,6 +1,10 @@ -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock + +import pytest from app.server.case_helper import summarize_state +from app.server.handlers.redaction import _get_doc_result +from app.server.tasks import ProcessingError def _result(state: str, name: str, result_value=None) -> MagicMock: @@ -104,3 +108,90 @@ def test_summarize_single_success(): assert summary.simple_state == "SUCCESS" assert summary.dominant_task_name == "fetch" assert summary.result is tasks[0] + + +# --- _get_doc_result: SUCCESS-but-missing-doc messaging ----------------------- +# +# These tests cover the handler branch where Celery reports SUCCESS but +# `get_result_doc` returns None. The user-facing error message should +# clearly indicate that the result has likely expired or been evicted, +# rather than the older "no specific errors were recorded" wording which +# read like a silent pipeline bug. + + +@pytest.mark.asyncio +async def test_get_doc_result_missing_doc_reports_expiry_message(): + finalize_result = MagicMock() + # No `errors` attribute -> getattr default kicks in -> empty list. + del finalize_result.errors + + finalize_async = MagicMock() + finalize_async.state = "SUCCESS" + finalize_async.name = "finalize" + finalize_async.result = finalize_result + + store = MagicMock() + store.get_result_doc = AsyncMock(return_value=None) + + with pytest.MonkeyPatch.context() as mp: + # Patch `get_result` to return our synthetic AsyncResult. + mp.setattr( + "app.server.handlers.redaction.get_result", + lambda task_id: finalize_async, + ) + result = await _get_doc_result( + store=store, + jurisdiction_id="jur1", + case_id="case1", + doc_id="doc1", + task_ids=["finalize-task-id"], + masked_subjects=[], + ) + + body = result.root + assert body.status == "ERROR" + assert "expired" in body.error.lower() or "evicted" in body.error.lower() + assert "resubmit" in body.error.lower() + + +@pytest.mark.asyncio +async def test_get_doc_result_missing_doc_prefers_recorded_errors(): + """When the dominant task actually recorded errors, those take + precedence over the expiry-style fallback message. + """ + finalize_result = MagicMock() + finalize_result.errors = [ + ProcessingError( + message="Boom", + task="format.save_result", + exception="RuntimeError", + ) + ] + + finalize_async = MagicMock() + finalize_async.state = "SUCCESS" + finalize_async.name = "finalize" + finalize_async.result = finalize_result + + store = MagicMock() + store.get_result_doc = AsyncMock(return_value=None) + + with pytest.MonkeyPatch.context() as mp: + mp.setattr( + "app.server.handlers.redaction.get_result", + lambda task_id: finalize_async, + ) + result = await _get_doc_result( + store=store, + jurisdiction_id="jur1", + case_id="case1", + doc_id="doc1", + task_ids=["finalize-task-id"], + masked_subjects=[], + ) + + body = result.root + assert body.status == "ERROR" + # Recorded error wins; expiry fallback should not appear. + assert "expired" not in body.error.lower() + assert "format.save_result" in body.error diff --git a/tests/unit/test_finalize.py b/tests/unit/test_finalize.py index 13f422a..25379f5 100644 --- a/tests/unit/test_finalize.py +++ b/tests/unit/test_finalize.py @@ -1,6 +1,8 @@ +import json from unittest.mock import patch from celery.result import AsyncResult +from fakeredis import FakeRedis from app.server.db import DocumentStatus from app.server.generated.models import DocumentLink, OutputDocument @@ -14,8 +16,26 @@ ) -def test_finalize_no_experiments_success(config): +def _seed_result_doc(fake_redis_store: FakeRedis, doc_id: str = "doc1") -> None: + """Pre-populate the result store with a successful redacted document. + + `finalize` verifies the result doc actually landed in the store before + declaring success, so tests that simulate a successful chain need to + seed the same key that `format` would have written. + """ + doc = OutputDocument( + root=DocumentLink( + documentId=doc_id, + attachmentType="LINK", + url="http://blob.test.local/abc123", + ) + ) + fake_redis_store.set(f"jur1:case1:result:{doc_id}", doc.model_dump_json()) + + +def test_finalize_no_experiments_success(config, fake_redis_store: FakeRedis): config.experiments.enabled = False + _seed_result_doc(fake_redis_store) cb = CallbackTaskResult( status_code=200, @@ -94,8 +114,9 @@ def test_finalize_no_experiments_failed(config): ) -def test_finalize_experiments_success(config, exp_db): +def test_finalize_experiments_success(config, exp_db, fake_redis_store: FakeRedis): config.experiments.enabled = True + _seed_result_doc(fake_redis_store) cb = CallbackTaskResult( status_code=200, @@ -205,9 +226,180 @@ def test_finalize_experiments_failed(config, exp_db): ) +def test_finalize_detects_missing_result_doc( + config, exp_db, fake_redis_store: FakeRedis +): + """When the pipeline reports no errors but the result doc is absent + from the store, finalize should: + * append a ``finalize.verify_result`` ProcessingError, + * write ``status="ERROR"`` to the experiments DB, + * return that error in ``FinalizeTaskResult.errors``. + This keeps the DB and the poll API in agreement. + """ + config.experiments.enabled = True + + # NB: no `jur1:case1:result:doc1` key is set in fake_redis_store -- + # this simulates a write that silently failed or a key that was + # evicted before finalize ran. + + cb = CallbackTaskResult( + status_code=200, + response="ok", + formatted=FormatTaskResult( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + errors=[], + ), + ) + ft = FinalizeTask( + jurisdiction_id="jur1", + case_id="case1", + subject_ids=[], + renderer="PDF", + ) + + result = finalize.s(cb, ft).apply() + final = result.get() + + assert isinstance(final, FinalizeTaskResult) + assert len(final.errors) == 1 + assert final.errors[0].task == "finalize.verify_result" + assert final.errors[0].exception == "MissingResultDocument" + + with exp_db.sync_session() as session: + ds = ( + session.query(DocumentStatus) + .filter_by( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + ) + .all() + ) + assert len(ds) == 1 + assert ds[0].status == "ERROR" + # Error JSON should mention the verify task so operators can + # tell this apart from upstream pipeline failures. + assert ds[0].error is not None + recorded = json.loads(ds[0].error) + assert recorded[0]["task"] == "finalize.verify_result" + + +def test_finalize_writes_complete_when_result_doc_present( + config, exp_db, fake_redis_store: FakeRedis +): + """When the pipeline reports no errors *and* the result doc is present + in the store, finalize should write ``status="COMPLETE"`` and not + invent a spurious error. + """ + config.experiments.enabled = True + + doc = OutputDocument( + root=DocumentLink( + documentId="doc1", + attachmentType="LINK", + url="http://blob.test.local/abc123", + ) + ) + fake_redis_store.set("jur1:case1:result:doc1", doc.model_dump_json()) + + cb = CallbackTaskResult( + status_code=200, + response="ok", + formatted=FormatTaskResult( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + errors=[], + ), + ) + ft = FinalizeTask( + jurisdiction_id="jur1", + case_id="case1", + subject_ids=[], + renderer="PDF", + ) + + result = finalize.s(cb, ft).apply() + final = result.get() + + assert isinstance(final, FinalizeTaskResult) + assert final.errors == [] + + with exp_db.sync_session() as session: + ds = ( + session.query(DocumentStatus) + .filter_by( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + ) + .all() + ) + assert len(ds) == 1 + assert ds[0].status == "COMPLETE" + assert ds[0].error is None + + +def test_finalize_preserves_upstream_errors_without_verifying( + config, exp_db, fake_redis_store: FakeRedis +): + """If upstream already recorded errors, the verify-result probe should + be skipped (we already know the pipeline failed) and the existing + errors should be reported as-is. + """ + config.experiments.enabled = True + + upstream_err = ProcessingError( + message="Boom", + task="format", + exception="ValueError", + ) + + cb = CallbackTaskResult( + status_code=200, + response="ok", + formatted=FormatTaskResult( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + errors=[upstream_err], + ), + ) + ft = FinalizeTask( + jurisdiction_id="jur1", + case_id="case1", + subject_ids=[], + renderer="PDF", + ) + + result = finalize.s(cb, ft).apply() + final = result.get() + + assert isinstance(final, FinalizeTaskResult) + # No "finalize.verify_result" error layered on top. + assert len(final.errors) == 1 + assert final.errors[0].task == "format" + + with exp_db.sync_session() as session: + ds = ( + session.query(DocumentStatus) + .filter_by( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + ) + .all() + ) + assert len(ds) == 1 + assert ds[0].status == "ERROR" + + @patch("app.server.tasks.controller.chain") def test_finalize_no_experiments_more_objects(chain_mock, config, fake_redis_store): config.experiments.enabled = False + _seed_result_doc(fake_redis_store) chain_mock.return_value.apply_async.return_value = AsyncResult("new_task_id") diff --git a/tests/unit/test_format.py b/tests/unit/test_format.py index 393b1a9..84e6eb5 100644 --- a/tests/unit/test_format.py +++ b/tests/unit/test_format.py @@ -216,3 +216,67 @@ def test_format_errors(): ) ], ) + + +def test_format_save_result_failure_tagged_distinctly( + monkeypatch, fake_redis_store: FakeRedis +): + """A failure in `save_result_sync` should surface as a ProcessingError + tagged with ``task="format.save_result"`` (not the generic ``"format"``) + so operators can distinguish a result-store write failure from a + rendering/encoding bug. + """ + fake_redis_store.set("abc123", b"content") + + # Prevent retries from running so the failure is reported immediately. + monkeypatch.setattr(format, "max_retries", 0) + + redact_result = RedactionTaskResult( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + file_storage_id="abc123", + errors=[], + renderer=OutputFormat.PDF, + ) + + with patch( + "app.server.tasks.format.save_result_sync", + side_effect=RuntimeError("redis unreachable"), + ): + result = format.s(redact_result, FormatTask()).apply() + + final = cast(FormatTaskResult, result.get()) + assert len(final.errors) == 1 + err = final.errors[0] + assert err.task == "format.save_result" + assert err.exception == "RuntimeError" + assert "redis unreachable" in err.message + + +def test_format_render_failure_keeps_generic_format_tag( + monkeypatch, fake_redis_store: FakeRedis +): + """A non-save failure inside ``format`` should still be tagged with the + generic ``"format"`` task name, leaving ``"format.save_result"`` + reserved for result-store write failures. + """ + monkeypatch.setattr(format, "max_retries", 0) + + # No content saved under the file_storage_id -> format_document raises + # ValueError("No redacted content") inside the outer try, which is + # *not* a ResultStoreWriteError. + redact_result = RedactionTaskResult( + jurisdiction_id="jur1", + case_id="case1", + document_id="doc1", + file_storage_id="missing", + errors=[], + renderer=OutputFormat.PDF, + ) + + result = format.s(redact_result, FormatTask()).apply() + final = cast(FormatTaskResult, result.get()) + assert len(final.errors) == 1 + assert final.errors[0].task == "format" + assert final.errors[0].exception == "ValueError"