diff --git a/src/analytics/service.py b/src/analytics/service.py index d8c97ba..91764ca 100644 --- a/src/analytics/service.py +++ b/src/analytics/service.py @@ -2,10 +2,10 @@ import json import logging import time -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from typing import Any, Dict, List, Optional, Tuple -from sqlalchemy import asc, desc, func, text +from sqlalchemy import asc, desc, extract, func, text from sqlalchemy.orm import Session from src.analytics.models import ( @@ -540,7 +540,61 @@ def get_trending_events(self, limit: int = 10, hours: int = 24) -> List[Dict[str _trending_cache = (rows, time.monotonic() + _TRENDING_CACHE_TTL) return rows[:limit] - def _update_analytics_stats(self, event_id: str, + def get_scan_heatmap( + self, + event_id: str, + filter_date: Optional[date] = None, + ) -> Dict[str, Any]: + """Return hourly scan-density data (24 buckets) for an event. + + Optionally scoped to a single calendar day via *filter_date*. + Hours with no scans are filled with a count of 0 so the response + always contains exactly 24 entries. + """ + session = None + try: + session = get_session() + + hour_expr = extract("hour", TicketScan.scan_timestamp) + query = ( + session.query( + hour_expr.label("hour"), + func.count(TicketScan.id).label("scan_count"), + ) + .filter(TicketScan.event_id == event_id) + ) + + if filter_date is not None: + query = query.filter( + func.date(TicketScan.scan_timestamp) == filter_date.isoformat() + ) + + rows = query.group_by(hour_expr).all() + + hour_counts: Dict[int, int] = { + int(row.hour): int(row.scan_count) for row in rows + } + + data = [ + {"hour": h, "scan_count": hour_counts.get(h, 0)} + for h in range(24) + ] + + peak_hour = max(range(24), key=lambda h: hour_counts.get(h, 0)) + + return {"event_id": event_id, "data": data, "peak_hour": peak_hour} + + except Exception as e: + log_error("Failed to get scan heatmap", { + "event_id": event_id, + "error": str(e), + }) + raise + finally: + if session: + session.close() + + def _update_analytics_stats(self, event_id: str, increment_scan: bool = False, is_valid: bool = True, increment_transfer: bool = False, is_successful: bool = True, increment_invalid: bool = False): diff --git a/src/main.py b/src/main.py index 7c6c318..7aa06ae 100644 --- a/src/main.py +++ b/src/main.py @@ -14,7 +14,7 @@ from fastapi.staticfiles import StaticFiles from slowapi.errors import RateLimitExceeded -from src.auth.dependencies import require_admin_key +from src.auth.dependencies import require_admin_key, require_service_key from src.analytics.service import analytics_service from src.chat import ChatMessage, EscalationEvent, chat_manager @@ -70,6 +70,8 @@ AnalyticsScansResponse, AnalyticsStatsQuery, AnalyticsTransfersResponse, + HeatmapQuery, + HeatmapResponse, ChatEscalateRequest, ChatEscalateResponse, ChatEscalationsResponse, @@ -455,6 +457,39 @@ def get_invalid_attempts( raise HTTPException(status_code=500, detail=f"Failed to retrieve invalid attempts: {exc}") +@app.get("/stats/heatmap", response_model=HeatmapResponse) +def get_scan_heatmap( + query: Annotated[HeatmapQuery, Query()], + _: str = Depends(require_service_key), +) -> HeatmapResponse: + """Return hourly scan density for an event (24 buckets, zero-filled). + + Useful for capacity planning and staffing decisions. + Optionally scope to a single calendar day with the *date* parameter. + Protected by SERVICE_API_KEY bearer auth. + """ + log_info("Scan heatmap requested", { + "event_id": query.event_id, + "date": str(query.date) if query.date else None, + }) + try: + result = analytics_service.get_scan_heatmap( + event_id=query.event_id, + filter_date=query.date, + ) + return HeatmapResponse( + event_id=result["event_id"], + data=result["data"], + peak_hour=result["peak_hour"], + ) + except Exception as exc: + log_error("Failed to retrieve scan heatmap", { + "event_id": query.event_id, + "error": str(exc), + }) + raise HTTPException(status_code=500, detail=f"Failed to retrieve scan heatmap: {exc}") + + # --------------------------------------------------------------------------- # Fraud + scalper prediction # --------------------------------------------------------------------------- diff --git a/src/types_custom.py b/src/types_custom.py index b780cc4..858e2ca 100644 --- a/src/types_custom.py +++ b/src/types_custom.py @@ -263,6 +263,25 @@ class AnalyticsInvalidAttemptsResponse(BaseModel): to_ts: Optional[datetime] = Field(None, description="End datetime filter applied") +class HeatmapEntry(BaseModel): + model_config = ConfigDict(extra="forbid") + hour: int = Field(..., ge=0, le=23, description="Hour of day (0-23)") + scan_count: int = Field(..., ge=0, description="Number of scans in this hour") + + +class HeatmapQuery(BaseModel): + model_config = ConfigDict(extra="forbid") + event_id: str = Field(..., min_length=1, description="Event UUID to scope the heatmap") + date: Optional[date] = Field(None, description="Optional ISO date (YYYY-MM-DD) to scope to a specific day") + + +class HeatmapResponse(BaseModel): + model_config = ConfigDict(extra="forbid") + event_id: str + data: List[HeatmapEntry] = Field(..., description="24-entry array of hourly scan counts (hours 0-23)") + peak_hour: int = Field(..., ge=0, le=23, description="Hour with the highest scan count") + + class RootResponse(BaseModel): model_config = ConfigDict(extra="forbid") message: str diff --git a/tests/test_heatmap.py b/tests/test_heatmap.py new file mode 100644 index 0000000..f68f6f9 --- /dev/null +++ b/tests/test_heatmap.py @@ -0,0 +1,282 @@ +"""Tests for GET /stats/heatmap — hourly scan density per event.""" +from datetime import date, datetime +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from src.analytics.service import AnalyticsService +from src.config import settings +from src.main import app + +client = TestClient(app) + +SERVICE_HEADERS = {"Authorization": f"Bearer {settings.SERVICE_API_KEY}"} + + +# --------------------------------------------------------------------------- +# Service-layer unit tests +# --------------------------------------------------------------------------- + + +class TestGetScanHeatmap: + def setup_method(self): + self.service = AnalyticsService() + + def _make_row(self, hour: int, count: int): + row = MagicMock() + row.hour = hour + row.scan_count = count + return row + + def _mock_session(self, rows): + """Return a patched get_session whose query chain yields *rows*.""" + mock_session = MagicMock() + ( + mock_session.query.return_value + .filter.return_value + .group_by.return_value + .all.return_value + ) = rows + # Chained .filter().filter().group_by().all() for date-scoped queries + ( + mock_session.query.return_value + .filter.return_value + .filter.return_value + .group_by.return_value + .all.return_value + ) = rows + return mock_session + + def test_returns_24_entries_always(self): + """Response data must contain exactly 24 hourly entries.""" + rows = [self._make_row(14, 320), self._make_row(15, 400)] + mock_session = self._mock_session(rows) + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-abc") + + assert len(result["data"]) == 24 + + def test_zero_fill_missing_hours(self): + """Hours with no scans must appear with scan_count == 0.""" + rows = [self._make_row(10, 50)] + mock_session = self._mock_session(rows) + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-abc") + + data = {entry["hour"]: entry["scan_count"] for entry in result["data"]} + assert data[10] == 50 + # All other hours should be 0 + for h in range(24): + if h != 10: + assert data[h] == 0, f"hour {h} should be 0, got {data[h]}" + + def test_correct_hour_bucketing(self): + """Scan counts must be placed in the correct hour bucket.""" + rows = [ + self._make_row(0, 5), + self._make_row(8, 100), + self._make_row(23, 77), + ] + mock_session = self._mock_session(rows) + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-abc") + + data = {entry["hour"]: entry["scan_count"] for entry in result["data"]} + assert data[0] == 5 + assert data[8] == 100 + assert data[23] == 77 + + def test_peak_hour_is_highest_count_hour(self): + """peak_hour must point to the hour with the maximum scan count.""" + rows = [ + self._make_row(14, 300), + self._make_row(15, 500), + self._make_row(16, 200), + ] + mock_session = self._mock_session(rows) + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-abc") + + assert result["peak_hour"] == 15 + + def test_peak_hour_defaults_to_zero_when_no_scans(self): + """When there are no scans, peak_hour should be 0 (first of all-zero hours).""" + mock_session = self._mock_session([]) + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-abc") + + assert result["peak_hour"] == 0 + assert all(entry["scan_count"] == 0 for entry in result["data"]) + + def test_event_id_propagated(self): + """event_id in the result must match the one passed in.""" + mock_session = self._mock_session([]) + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-xyz-123") + + assert result["event_id"] == "event-xyz-123" + + def test_date_filter_applied(self): + """When filter_date is given the query chain uses the extra .filter() call.""" + rows = [self._make_row(9, 42)] + mock_session = MagicMock() + + # We need to capture what query path is exercised. + ( + mock_session.query.return_value + .filter.return_value + .filter.return_value + .group_by.return_value + .all.return_value + ) = rows + + with patch("src.analytics.service.get_session", return_value=mock_session): + result = self.service.get_scan_heatmap("event-abc", filter_date=date(2026, 3, 28)) + + # Verify the second .filter() was called (date scoping) + mock_session.query.return_value.filter.return_value.filter.assert_called_once() + assert result["data"][9]["scan_count"] == 42 + + def test_session_always_closed(self): + """Session must be closed even when an exception is raised.""" + mock_session = MagicMock() + mock_session.query.side_effect = RuntimeError("db exploded") + + with patch("src.analytics.service.get_session", return_value=mock_session): + with pytest.raises(RuntimeError): + self.service.get_scan_heatmap("event-abc") + + mock_session.close.assert_called_once() + + +# --------------------------------------------------------------------------- +# API endpoint tests +# --------------------------------------------------------------------------- + + +class TestHeatmapEndpoint: + """Tests for GET /stats/heatmap via TestClient.""" + + _sample_result = { + "event_id": "event-001", + "data": [{"hour": h, "scan_count": 10 if h == 14 else 0} for h in range(24)], + "peak_hour": 14, + } + + def test_missing_auth_returns_401(self): + response = client.get("/stats/heatmap", params={"event_id": "event-001"}) + assert response.status_code == 401 + + def test_invalid_token_returns_403(self): + headers = {"Authorization": "Bearer completely_wrong_token"} + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001"}, + headers=headers, + ) + assert response.status_code == 403 + + def test_missing_event_id_returns_422(self): + response = client.get("/stats/heatmap", headers=SERVICE_HEADERS) + assert response.status_code == 422 + + def test_valid_request_returns_200(self): + with patch( + "src.main.analytics_service.get_scan_heatmap", + return_value=self._sample_result, + ): + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001"}, + headers=SERVICE_HEADERS, + ) + + assert response.status_code == 200 + + def test_response_shape(self): + with patch( + "src.main.analytics_service.get_scan_heatmap", + return_value=self._sample_result, + ): + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001"}, + headers=SERVICE_HEADERS, + ) + + body = response.json() + assert body["event_id"] == "event-001" + assert body["peak_hour"] == 14 + assert len(body["data"]) == 24 + + def test_all_24_hours_present_in_response(self): + with patch( + "src.main.analytics_service.get_scan_heatmap", + return_value=self._sample_result, + ): + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001"}, + headers=SERVICE_HEADERS, + ) + + hours = [entry["hour"] for entry in response.json()["data"]] + assert hours == list(range(24)) + + def test_date_param_forwarded_to_service(self): + captured = {} + + def fake_heatmap(event_id, filter_date=None): + captured["filter_date"] = filter_date + return { + "event_id": event_id, + "data": [{"hour": h, "scan_count": 0} for h in range(24)], + "peak_hour": 0, + } + + with patch("src.main.analytics_service.get_scan_heatmap", side_effect=fake_heatmap): + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001", "date": "2026-03-28"}, + headers=SERVICE_HEADERS, + ) + + assert response.status_code == 200 + assert captured["filter_date"] == date(2026, 3, 28) + + def test_service_error_returns_500(self): + with patch( + "src.main.analytics_service.get_scan_heatmap", + side_effect=RuntimeError("db failure"), + ): + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001"}, + headers=SERVICE_HEADERS, + ) + + assert response.status_code == 500 + assert "Failed to retrieve scan heatmap" in response.json()["detail"] + + def test_peak_hour_accuracy(self): + """peak_hour in the response must match the hour with the max scan_count.""" + data = [{"hour": h, "scan_count": 0} for h in range(24)] + data[17]["scan_count"] = 999 + result = {"event_id": "event-001", "data": data, "peak_hour": 17} + + with patch("src.main.analytics_service.get_scan_heatmap", return_value=result): + response = client.get( + "/stats/heatmap", + params={"event_id": "event-001"}, + headers=SERVICE_HEADERS, + ) + + assert response.json()["peak_hour"] == 17 + assert response.json()["data"][17]["scan_count"] == 999