Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ COPY run.py /app/
# Port the application listens on
EXPOSE 8000

# Health check to ensure the container is responding to requests
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1

# Command to run the application using Uvicorn
# The command format is: uvicorn [module:app_object] --host [ip] --port [port]
# We use the standard uvicorn worker configuration
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Copy `.env.example` to `.env` (done automatically by `make dev-setup`) and fill
| `ENABLE_ETL_SCHEDULER` | | Set `true` to run the ETL on a schedule |
| `ETL_CRON` | | Cron expression (UTC) — takes precedence over `ETL_INTERVAL_MINUTES` |
| `ETL_INTERVAL_MINUTES` | | ETL polling interval in minutes (default `15`) |
| `SHUTDOWN_TIMEOUT_SECONDS` | | Graceful shutdown timeout in seconds (default `30`) |
| `BQ_ENABLED` | | Set `true` to enable BigQuery loading |
| `BQ_PROJECT_ID` | | GCP project ID |
| `BQ_DATASET` | | BigQuery dataset name |
Expand Down
4 changes: 3 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[pytest]
minversion = 7.0
addopts = -ra -q
addopts = -ra -q -m "not integration"
testpaths = tests
markers =
integration: integration tests requiring a live database
9 changes: 8 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
from src.main import app # noqa: F401

if __name__ == "__main__":
uvicorn.run("src.main:app", host="0.0.0.0", port=8000)
from src.config import get_settings
settings = get_settings()
uvicorn.run(
"src.main:app",
host="0.0.0.0",
port=8000,
timeout_graceful_shutdown=settings.SHUTDOWN_TIMEOUT_SECONDS
)


24 changes: 24 additions & 0 deletions src/analytics/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,30 @@ def get_recent_scans(self, event_id: str, limit: int = 100) -> List[Dict[str, An
raise
finally:
session.close()

def get_scans_by_ticket_id(self, ticket_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Get scan records for a specific ticket identifier."""
try:
session = get_session()
scans = session.query(TicketScan).filter(
TicketScan.ticket_id == ticket_id
).order_by(desc(TicketScan.scan_timestamp)).limit(limit).all()
return [{
"id": scan.id,
"ticket_id": scan.ticket_id,
"event_id": scan.event_id,
"scan_timestamp": scan.scan_timestamp.isoformat(),
"is_valid": scan.is_valid,
"location": scan.location
} for scan in scans]
except Exception as e:
log_error("Failed to get scans by ticket_id", {
"ticket_id": ticket_id,
"error": str(e)
})
raise
finally:
session.close()

def get_recent_transfers(self, event_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""Get recent transfer records for an event."""
Expand Down
9 changes: 1 addition & 8 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from pydantic_settings import BaseSettings

class Settings(BaseSettings):


class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")

Expand Down Expand Up @@ -38,6 +33,7 @@ class Settings(BaseSettings):
POOL_SIZE: int = 5
POOL_MAX_OVERFLOW: int = 10
REPORT_CACHE_MINUTES: int = 60
SHUTDOWN_TIMEOUT_SECONDS: int = 30

SERVICE_API_KEY: str = "default_service_secret_change_me"
ADMIN_API_KEY: str = "default_admin_secret_change_me"
Expand All @@ -46,9 +42,6 @@ class Settings(BaseSettings):
"owerri,warri,uyo,akure,ilorin,sokoto,zaria,maiduguri,asaba,nnewi"
)

class Config:
env_file = ".env"

settings = Settings()


Expand Down
34 changes: 32 additions & 2 deletions src/fraud.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def check_fraud_rules(events: List[Dict[str, Any]]) -> List[str]:
triggered.add("duplicate_ticket_transfer")

# Rule 3: Excessive purchases by same user in a day (>5)
from datetime import date as date_type # noqa: PLC0415 – local import to avoid shadowing
from datetime import date as date_type # noqa: PLC0415
purchases_by_user_day: Dict[tuple[str, date_type], int] = {}
for event in events:
if event.get("type") == "purchase":
Expand All @@ -47,6 +47,31 @@ def check_fraud_rules(events: List[Dict[str, Any]]) -> List[str]:
if count > 5:
triggered.add("excessive_purchases_user_day")

# Rule 4: Impossible travel (different locations within 30 min)
scans_by_ticket: Dict[str, List[Dict[str, Any]]] = {}
for event in events:
if event.get("type") == "scan":
tid = str(event.get("ticket_id", ""))
scans_by_ticket.setdefault(tid, []).append(event)
for _tid, scans in scans_by_ticket.items():
scans.sort(key=lambda x: datetime.fromisoformat(str(x.get("timestamp", ""))))
for i in range(len(scans) - 1):
t1 = datetime.fromisoformat(str(scans[i].get("timestamp", "")))
t2 = datetime.fromisoformat(str(scans[i + 1].get("timestamp", "")))
loc1 = scans[i].get("location")
loc2 = scans[i + 1].get("location")
if loc1 != loc2 and (t2 - t1).total_seconds() <= 1800:
triggered.add("impossible_travel_scan")
break

# Rule 5: Bulk allocation (20% or more of event capacity)
for event in events:
if event.get("type") == "purchase":
qty = float(event.get("qty", 1))
capacity = float(event.get("capacity", 1000000))
if capacity > 0 and (qty / capacity) >= 0.2:
triggered.add("bulk_allocation_purchase")

return list(triggered)


Expand All @@ -58,7 +83,12 @@ def determine_severity(triggered_rules: List[str]) -> str:
if not triggered_rules:
return "none"

HIGH_RULES = {"too_many_purchases_same_ip", "excessive_purchases_user_day"}
HIGH_RULES = {
"too_many_purchases_same_ip",
"excessive_purchases_user_day",
"impossible_travel_scan",
"bulk_allocation_purchase",
}
MEDIUM_RULES = {"duplicate_ticket_transfer"}

s = set(triggered_rules)
Expand Down
26 changes: 22 additions & 4 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,14 @@ def on_startup() -> None:
@app.on_event("shutdown")
def on_shutdown() -> None:
global etl_scheduler
log_info("Shutdown initiated: waiting for in-flight requests and scheduler...")
if etl_scheduler is not None:
try:
etl_scheduler.shutdown(wait=False)
except Exception:
pass
# wait=True ensures running jobs complete before scheduler stops
etl_scheduler.shutdown(wait=True)
log_info("ETL scheduler shut down successfully.")
except Exception as exc:
log_error("Error during scheduler shutdown", {"error": str(exc)})


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -262,7 +265,7 @@ def generate_qr(payload: TicketRequest) -> Any:
encoded = base64.b64encode(buffer.read()).decode("utf-8")
QR_GENERATIONS_TOTAL.inc()
log_info("QR code generated successfully")
return QRResponse(qr_base64=encoded)
return QRResponse(qr_base64=encoded, token=json.dumps(data, separators=(",", ":")))


@app.post("/validate-qr", response_model=QRValidateResponse)
Expand All @@ -280,15 +283,30 @@ def validate_qr(payload: QRValidateRequest) -> QRValidateResponse:
if hmac.compare_digest(provided_sig, expected_sig):
QR_VALIDATIONS_TOTAL.labels(result="valid").inc()
log_info("QR validation successful", {"ticket_id": unsigned.get("ticket_id")})
analytics_service.log_ticket_scan(
ticket_id=str(unsigned.get("ticket_id") or "unknown"),
event_id=str(unsigned.get("event") or "unknown"),
is_valid=True
)
return QRValidateResponse(isValid=True, metadata=unsigned)
log_warning("Invalid QR signature", {"metadata": unsigned})
QR_VALIDATIONS_TOTAL.labels(result="invalid").inc()
analytics_service.log_ticket_scan(
ticket_id=str(unsigned.get("ticket_id") or "unknown"),
event_id=str(unsigned.get("event") or "unknown"),
is_valid=False
)
return QRValidateResponse(isValid=False)
except Exception as exc:
log_warning("Invalid QR validation attempt", {"error": str(exc)})
QR_VALIDATIONS_TOTAL.labels(result="error").inc()
return QRValidateResponse(isValid=False)

@app.get("/qr/scan-log/{ticket_id}")
def get_qr_scan_log(ticket_id: str) -> List[Dict[str, Any]]:
"""Returns the scan audit log for a specific ticket."""
return analytics_service.get_scans_by_ticket_id(ticket_id)


# ---------------------------------------------------------------------------
# Analytics endpoints
Expand Down
1 change: 1 addition & 0 deletions src/types_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TicketRequest(BaseModel):
class QRResponse(BaseModel):
model_config = ConfigDict(extra="forbid")
qr_base64: str
token: str


class QRValidateRequest(BaseModel):
Expand Down
32 changes: 32 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,36 @@
import os
import pytest
from sqlalchemy import create_engine, text
from src.config import get_settings

# Provide a non-default test key so startup validation passes in test environments.
os.environ.setdefault("QR_SIGNING_KEY", "a" * 32)
# Force model training to skip in test environments
os.environ.setdefault("SKIP_MODEL_TRAINING", "true")

@pytest.fixture(scope="session")
def db_engine():
"""Provides a database engine for integration tests."""
settings = get_settings()
engine = create_engine(settings.DATABASE_URL)
yield engine
engine.dispose()

@pytest.fixture
def clean_test_db(db_engine):
"""Truncates all tables before/after integration tests."""
tables = ["event_sales_summary", "daily_ticket_sales", "etl_run_log"]
with db_engine.begin() as conn:
for table in tables:
try:
conn.execute(text(f"TRUNCATE TABLE {table} RESTART IDENTITY CASCADE"))
except Exception:
# Tables might not exist yet if it's the first run
pass
yield
with db_engine.begin() as conn:
for table in tables:
try:
conn.execute(text(f"TRUNCATE TABLE {table} RESTART IDENTITY CASCADE"))
except Exception:
pass
103 changes: 103 additions & 0 deletions tests/e2e/test_qr_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import base64
import json
import pytest
from fastapi.testclient import TestClient
from prometheus_client import REGISTRY

from src.main import app
from src.config import get_settings

@pytest.fixture
def client(monkeypatch):
"""Fixture to provide a TestClient with a fixed signing key and cleared settings cache."""
# Requirement: All tests must set QR_SIGNING_KEY to a 32-character test string
monkeypatch.setenv("QR_SIGNING_KEY", "q" * 32)
get_settings.cache_clear()
return TestClient(app)

def test_qr_generate_validate_audit_flow(client):
"""
End-to-end test for the QR lifecycle:
1. Generate a signed QR token.
2. Validate the token successfully.
3. Validate a tampered token (invalid signature).
4. Validate a token with extra fields (signature mismatch).
5. Verify the audit log contains valid and invalid entries.
6. Verify Prometheus metrics.
"""
ticket_id = "E2E-TEST-001"
event_name = "E2E-Festival"

# --- Step 1: Generate QR ---
gen_payload = {
"ticket_id": ticket_id,
"event": event_name,
"user": "[email protected]"
}
resp = client.post("/generate-qr", json=gen_payload)

# Assertions 1 & 2: Status 200, valid PNG, and token presence
assert resp.status_code == 200
data = resp.json()
assert "qr_base64" in data
assert "token" in data # Extracted signed token requirement

qr_content = base64.b64decode(data["qr_base64"])
assert qr_content.startswith(b"\x89PNG"), "QR code must be a PNG image"

token_str = data["token"]
token_obj = json.loads(token_str)

# Store initial metrics
def get_metric(res):
return REGISTRY.get_sample_value("qr_validations_total", {"result": res}) or 0

m_valid_start = get_metric("valid")
m_invalid_start = get_metric("invalid")

# --- Step 2: Validate (Successful) ---
val_resp = client.post("/validate-qr", json={"qr_text": token_str})

# Assertion 3: Valid scan returns True and correct metadata
assert val_resp.status_code == 200
val_data = val_resp.json()
assert val_data["isValid"] is True
assert val_data["metadata"]["ticket_id"] == ticket_id
assert val_data["metadata"]["event"] == event_name

# --- Step 3: Validate (Tampered signature) ---
tampered_token = token_obj.copy()
tampered_token["sig"] = "invalid_signature_string"
resp_tampered = client.post("/validate-qr", json={"qr_text": json.dumps(tampered_token)})

# Assertion 4: Tampered signature returns False
assert resp_tampered.status_code == 200
assert resp_tampered.json()["isValid"] is False

# --- Step 4: Validate (Extra field / Tampered payload) ---
extra_field_token = token_obj.copy()
extra_field_token["fraud"] = "injected"
resp_extra = client.post("/validate-qr", json={"qr_text": json.dumps(extra_field_token)})

# Assertion 5: Extra field (tampering) returns False
assert resp_extra.status_code == 200
assert resp_extra.json()["isValid"] is False

# --- Step 5: Audit Log ---
log_resp = client.get(f"/qr/scan-log/{ticket_id}")

# Assertion 6: Audit log contains valid and invalid entries
assert log_resp.status_code == 200
logs = log_resp.json()
assert len(logs) >= 2, "Should have at least one valid and one invalid log entry"

has_valid = any(l["is_valid"] is True for l in logs)
has_invalid = any(l["is_valid"] is False for l in logs)
assert has_valid and has_invalid, "Audit log must contain both valid and invalid attempts"

# --- Step 6: Prometheus Metrics ---
# Assertion 7: Valid scan incremented counter
assert get_metric("valid") == m_valid_start + 1

# Assertion 8: Invalid scans incremented counter (2 invalid attempts in steps 3-4)
assert get_metric("invalid") >= m_invalid_start + 2
Loading
Loading