From eb7deee4806c1d2ecc88c27b2483c00ef4cc38fc Mon Sep 17 00:00:00 2001 From: Edgars Date: Fri, 13 Mar 2026 12:35:32 +0000 Subject: [PATCH 1/2] fix: prevent concurrent state overwrites (lost update bug) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: when two consensus workers process transactions for the same contract concurrently, update_contract_state() does a read-modify-write on the full JSONB blob. The second commit silently overwrites the first's changes because it reads stale state. This caused 336+ lost submissions in Rally production (March 2026). Fixes applied: 1. Advisory locks in all claim CTEs (claim_next_transaction, claim_next_finalization, claim_next_appeal) via pg_try_advisory_xact_lock(hashtext(to_address)) — closes the TOCTOU window where two workers both pass the NOT EXISTS check before either commits blocked_at. 2. SELECT FOR UPDATE + populate_existing() in update_contract_state() — serializes concurrent writes and ensures fresh state is read from DB, preventing cross-field clobber (accepted vs finalized). Also adds: - DB-level regression tests (test_concurrent_state_update.py) - E2E state integrity test (Counter contract + invariant check) - CI runs with CONSENSUS_WORKERS=3 for realistic multi-worker testing --- .github/workflows/load-test-oha.yml | 10 +- backend/consensus/worker.py | 6 + .../database_handler/contract_processor.py | 2 + .../test_concurrent_state_update.py | 239 +++++++++++++++++ tests/load/contracts/counter.py | 18 ++ tests/load/run_state_integrity_test.sh | 44 ++++ tests/load/test_state_integrity.py | 240 ++++++++++++++++++ 7 files changed, 558 insertions(+), 1 deletion(-) create mode 100644 tests/db-sqlalchemy/test_concurrent_state_update.py create mode 100644 tests/load/contracts/counter.py create mode 100755 tests/load/run_state_integrity_test.sh create mode 100644 tests/load/test_state_integrity.py diff --git a/.github/workflows/load-test-oha.yml b/.github/workflows/load-test-oha.yml index ccfacc31b..dcb3980dc 100644 --- a/.github/workflows/load-test-oha.yml +++ b/.github/workflows/load-test-oha.yml @@ -84,8 +84,10 @@ jobs: restore-keys: | ${{ runner.os }}-buildx- - - name: Run Docker Compose + - name: Run Docker Compose with multiple workers run: docker compose up -d + env: + CONSENSUS_WORKERS: 3 - name: Wait for services to be up run: | @@ -491,6 +493,12 @@ jobs: echo "✅ HTML report generated: api_test_report.html" fi + - name: Run State Integrity Test (lost update detection) + run: | + cd tests/load + pip install genlayer-py requests + python3 test_state_integrity.py http://localhost:4000/api --txs 20 + - name: Shutdown Docker Compose if: always() run: docker compose down -v diff --git a/backend/consensus/worker.py b/backend/consensus/worker.py index b3d15864c..ace8758f7 100644 --- a/backend/consensus/worker.py +++ b/backend/consensus/worker.py @@ -200,6 +200,7 @@ async def claim_next_finalization(self, session: Session) -> Optional[dict]: AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL) AND t2.hash != t.hash ) + AND pg_try_advisory_xact_lock(hashtext(t.to_address)) ORDER BY t.created_at ASC FOR UPDATE SKIP LOCKED ), @@ -305,6 +306,7 @@ async def claim_next_appeal(self, session: Session) -> Optional[dict]: AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL) AND t2.hash != t.hash ) + AND pg_try_advisory_xact_lock(hashtext(t.to_address)) ORDER BY t.created_at ASC FOR UPDATE SKIP LOCKED ), @@ -410,6 +412,10 @@ async def claim_next_transaction(self, session: Session) -> Optional[dict]: AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL) AND t2.hash != t.hash ) + -- Atomic per-contract lock to close TOCTOU window in NOT EXISTS. + -- Under READ COMMITTED, two workers can both pass NOT EXISTS before + -- either commits blocked_at. This advisory lock prevents that race. + AND pg_try_advisory_xact_lock(hashtext(t.to_address)) ORDER BY CASE WHEN t.type = 3 THEN 0 ELSE 1 END, t.created_at ASC FOR UPDATE SKIP LOCKED ), diff --git a/backend/database_handler/contract_processor.py b/backend/database_handler/contract_processor.py index 961117fd0..49bbf82c2 100644 --- a/backend/database_handler/contract_processor.py +++ b/backend/database_handler/contract_processor.py @@ -34,6 +34,8 @@ def update_contract_state( contract = ( self.session.query(CurrentState) .filter_by(id=contract_address) + .with_for_update() + .populate_existing() .one_or_none() ) diff --git a/tests/db-sqlalchemy/test_concurrent_state_update.py b/tests/db-sqlalchemy/test_concurrent_state_update.py new file mode 100644 index 000000000..db1a33dc1 --- /dev/null +++ b/tests/db-sqlalchemy/test_concurrent_state_update.py @@ -0,0 +1,239 @@ +""" +Regression test for the lost update bug in ContractProcessor.update_contract_state(). + +The bug: update_contract_state() does a read-modify-write on the full JSONB blob. +When two sessions call it concurrently for the same contract, the second commit +silently overwrites the first's changes because it re-reads stale state. + +This is the root cause of 336+ lost submissions in Rally production (March 2026). +See: Rally2/docs/genvm-state-mismatch-bug.md + +Production scenario: + - Worker A accepts TX-A → writes accepted_state with TX-A's submission + - Worker B accepts TX-B → reads the SAME pre-TX-A state → writes accepted_state + with TX-B's submission → TX-A's submission is silently erased +""" + +import threading + +import pytest +from sqlalchemy import Engine +from sqlalchemy.orm import sessionmaker + +from backend.database_handler.contract_processor import ContractProcessor +from backend.database_handler.models import CurrentState + + +CONTRACT_ADDRESS = "0xrace_test_contract" + +INITIAL_STATE = { + "accepted": {"slot_a": "original_a"}, + "finalized": {"slot_f": "original_f"}, +} + + +def _setup_contract(engine: Engine): + """Insert a contract with initial state.""" + Session_ = sessionmaker(bind=engine) + with Session_() as s: + contract = CurrentState( + id=CONTRACT_ADDRESS, + data={"state": INITIAL_STATE}, + ) + s.add(contract) + s.commit() + + +def _read_state(engine: Engine) -> dict: + """Read the current contract state from a fresh session.""" + Session_ = sessionmaker(bind=engine) + with Session_() as s: + row = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one() + return row.data["state"] + + +# --------------------------------------------------------------------------- +# Test 1: Two concurrent accepted_state updates — must both survive +# --------------------------------------------------------------------------- + + +@pytest.mark.xfail( + reason="update_contract_state does full-field replacement by design. " + "Same-field concurrent writes are prevented upstream by advisory locks " + "in worker.py claim CTEs (pg_try_advisory_xact_lock). " + "This test documents the limitation — will pass if state merging is added.", + strict=True, +) +def test_concurrent_accepted_updates_preserve_both(engine: Engine): + """ + Two workers both write accepted_state for the same contract concurrently. + Worker A adds submission_A, Worker B adds submission_B. + + This scenario is prevented in production by advisory locks at the worker + claim level. The update_contract_state API does full replacement, so if + two callers pass different complete dicts, the second always wins. + """ + _setup_contract(engine) + + barrier = threading.Barrier(2, timeout=5) + errors = [] + + def worker_a(): + try: + Session_ = sessionmaker(bind=engine) + with Session_() as s: + cp = ContractProcessor(s) + # Read current state (sees original) + contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one() + _ = contract.data # force load + barrier.wait() # synchronize with worker B + # Write accepted_state with submission_A + cp.update_contract_state( + CONTRACT_ADDRESS, + accepted_state={"slot_a": "original_a", "submission_A": "scored"}, + ) + except Exception as e: + errors.append(("A", e)) + + def worker_b(): + try: + Session_ = sessionmaker(bind=engine) + with Session_() as s: + cp = ContractProcessor(s) + # Read current state (sees original — same as worker A) + contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one() + _ = contract.data # force load + barrier.wait() # synchronize with worker A + # Write accepted_state with submission_B + cp.update_contract_state( + CONTRACT_ADDRESS, + accepted_state={"slot_a": "original_a", "submission_B": "scored"}, + ) + except Exception as e: + errors.append(("B", e)) + + t_a = threading.Thread(target=worker_a) + t_b = threading.Thread(target=worker_b) + t_a.start() + t_b.start() + t_a.join(timeout=10) + t_b.join(timeout=10) + + assert not errors, f"Worker errors: {errors}" + + state = _read_state(engine) + + has_a = "submission_A" in state["accepted"] + has_b = "submission_B" in state["accepted"] + + assert has_a and has_b, ( + f"Lost update: concurrent accepted_state writes must both survive. " + f"has_A={has_a}, has_B={has_b}, state={state['accepted']}" + ) + + +# --------------------------------------------------------------------------- +# Test 2: accepted + finalized concurrent updates — must both survive +# --------------------------------------------------------------------------- + + +def test_concurrent_accepted_and_finalized_preserve_both(engine: Engine): + """ + Worker A writes accepted_state, Worker B writes finalized_state concurrently. + + CORRECT behavior: both fields must reflect their respective updates. + This test FAILS until the cross-field clobber bug is fixed. + """ + _setup_contract(engine) + + barrier = threading.Barrier(2, timeout=5) + errors = [] + + def writer_accepted(): + try: + Session_ = sessionmaker(bind=engine) + with Session_() as s: + cp = ContractProcessor(s) + contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one() + _ = contract.data + barrier.wait() + cp.update_contract_state( + CONTRACT_ADDRESS, + accepted_state={"slot_a": "updated_by_accepted_writer"}, + ) + except Exception as e: + errors.append(("accepted", e)) + + def writer_finalized(): + try: + Session_ = sessionmaker(bind=engine) + with Session_() as s: + cp = ContractProcessor(s) + contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one() + _ = contract.data + barrier.wait() + cp.update_contract_state( + CONTRACT_ADDRESS, + finalized_state={"slot_f": "updated_by_finalized_writer"}, + ) + except Exception as e: + errors.append(("finalized", e)) + + t1 = threading.Thread(target=writer_accepted) + t2 = threading.Thread(target=writer_finalized) + t1.start() + t2.start() + t1.join(timeout=10) + t2.join(timeout=10) + + assert not errors, f"Worker errors: {errors}" + + state = _read_state(engine) + + accepted_updated = state["accepted"].get("slot_a") == "updated_by_accepted_writer" + finalized_updated = ( + state["finalized"].get("slot_f") == "updated_by_finalized_writer" + ) + + assert accepted_updated and finalized_updated, ( + f"Cross-field clobber: concurrent accepted + finalized writes must both survive. " + f"accepted={state['accepted']}, finalized={state['finalized']}" + ) + + +# --------------------------------------------------------------------------- +# Test 3: Sequential updates — sanity check (should always pass) +# --------------------------------------------------------------------------- + + +def test_sequential_updates_preserve_all_state(engine: Engine): + """ + Baseline: sequential updates don't lose data. + This should always pass regardless of the bug. + """ + _setup_contract(engine) + + Session_ = sessionmaker(bind=engine) + + with Session_() as s: + cp = ContractProcessor(s) + cp.update_contract_state( + CONTRACT_ADDRESS, + accepted_state={"slot_a": "original_a", "submission_A": "scored"}, + ) + + with Session_() as s: + cp = ContractProcessor(s) + cp.update_contract_state( + CONTRACT_ADDRESS, + accepted_state={ + "slot_a": "original_a", + "submission_A": "scored", + "submission_B": "scored", + }, + ) + + state = _read_state(engine) + assert state["accepted"]["submission_A"] == "scored" + assert state["accepted"]["submission_B"] == "scored" + assert state["finalized"] == {"slot_f": "original_f"} diff --git a/tests/load/contracts/counter.py b/tests/load/contracts/counter.py new file mode 100644 index 000000000..bb98bf7f8 --- /dev/null +++ b/tests/load/contracts/counter.py @@ -0,0 +1,18 @@ +# v0.1.0 +# { "Depends": "py-genlayer:1jb45aa8ynh2a9c9xn3b7qqh8sm5q93hwfp7jqmwsfhh8jpz09h6" } +from genlayer import * + + +class Counter(gl.Contract): + count: int + + def __init__(self): + self.count = 0 + + @gl.public.write + def increment(self) -> None: + self.count += 1 + + @gl.public.view + def get_count(self) -> int: + return self.count diff --git a/tests/load/run_state_integrity_test.sh b/tests/load/run_state_integrity_test.sh new file mode 100755 index 000000000..bb0949496 --- /dev/null +++ b/tests/load/run_state_integrity_test.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# +# State integrity test — detects the lost update bug. +# +# Requires CONSENSUS_WORKERS >= 2 to trigger the race condition. +# With 1 worker, all transactions are serialized and the bug cannot manifest. +# +# Usage: +# # Start with multiple workers: +# CONSENSUS_WORKERS=3 docker compose up -d +# +# # Run the test: +# ./run_state_integrity_test.sh [API_URL] [NUM_TXS] +# +# # Example with more transactions for higher detection probability: +# ./run_state_integrity_test.sh http://localhost:4000/api 50 + +set -e + +API_URL="${1:-http://localhost:4000/api}" +NUM_TXS="${2:-20}" + +# Ensure /api suffix +API_URL="${API_URL%/}" +if [[ ! "$API_URL" == */api ]]; then + API_URL="${API_URL}/api" +fi + +echo "=== State Integrity Test ===" +echo "API: $API_URL" +echo "Transactions: $NUM_TXS" +echo "" + +# Check if services are running +if ! curl -s -X POST -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","method":"ping","params":[],"id":1}' \ + "$API_URL" 2>/dev/null | grep -q "OK"; then + echo "ERROR: RPC server not running at $API_URL" + echo "Start with: CONSENSUS_WORKERS=3 docker compose up -d" + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +python3 "$SCRIPT_DIR/test_state_integrity.py" "$API_URL" --txs "$NUM_TXS" diff --git a/tests/load/test_state_integrity.py b/tests/load/test_state_integrity.py new file mode 100644 index 000000000..6bb8457bf --- /dev/null +++ b/tests/load/test_state_integrity.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +State integrity test for the lost update bug. + +Deploys a simple Counter contract, fires N concurrent increment() transactions, +waits for all to finalize, then asserts get_count() == N. + +If concurrent workers overwrite each other's state (the lost update bug), +the final count will be less than N. + +Requires multiple consensus workers to trigger the race condition: + CONSENSUS_WORKERS=3 docker compose up -d + +Usage: + python3 test_state_integrity.py [API_URL] [--txs N] [--timeout SECONDS] +""" + +import argparse +import concurrent.futures +import sys +import time +from pathlib import Path + +import requests +from genlayer_py import create_account, create_client, localnet + + +def rpc_call(api_url: str, method: str, params: list | None = None) -> dict: + resp = requests.post( + api_url, + json={ + "jsonrpc": "2.0", + "method": method, + "params": params or [], + "id": 1, + }, + timeout=30, + ) + return resp.json() + + +def get_contract_address(api_url: str, tx_hash: str) -> str: + """Extract contract address from deploy transaction.""" + data = rpc_call(api_url, "eth_getTransactionByHash", [tx_hash]) + if "result" not in data: + raise RuntimeError(f"Failed to get tx {tx_hash}: {data}") + raw = data["result"] + for field in ("to_address", "recipient"): + addr = raw.get(field) + if addr and addr != "0x" + "0" * 40: + return addr + # Fallback: scan for any address-like field + for value in raw.values(): + if isinstance(value, str) and value.startswith("0x") and len(value) == 42: + if value != "0x" + "0" * 40: + return value + raise RuntimeError(f"No contract address found in tx {tx_hash}") + + +def wait_for_tx(client, tx_hash: str, timeout: int = 300) -> bool: + """Wait for a transaction receipt. Returns True if successful.""" + try: + receipt = client.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=timeout) + return receipt.status == 1 + except Exception as e: + print(f" tx {tx_hash[:16]}... failed: {e}") + return False + + +def main(): + parser = argparse.ArgumentParser(description="State integrity test") + parser.add_argument("api_url", nargs="?", default="http://localhost:4000/api") + parser.add_argument( + "--txs", + type=int, + default=20, + help="Number of increment transactions (default: 20)", + ) + parser.add_argument( + "--timeout", + type=int, + default=600, + help="Per-tx timeout in seconds (default: 600)", + ) + parser.add_argument( + "--batch-delay", + type=float, + default=0.1, + help="Delay between sending txs (default: 0.1s)", + ) + args = parser.parse_args() + + api_url = args.api_url + num_txs = args.txs + tx_timeout = args.timeout + + print("=" * 60) + print(" STATE INTEGRITY TEST — Lost Update Detection") + print("=" * 60) + print(f"API: {api_url}") + print(f"Increments: {num_txs}") + print(f"TX timeout: {tx_timeout}s") + print() + + # --- Check workers --- + print("Checking consensus worker count...") + # No direct API for this, but we can note that the test is most useful + # with CONSENSUS_WORKERS >= 2 + + # --- Deploy counter contract --- + print("Deploying Counter contract...") + contract_path = Path(__file__).parent / "contracts" / "counter.py" + if not contract_path.exists(): + print(f"ERROR: Contract not found at {contract_path}") + return 1 + + contract_code = contract_path.read_text() + print(f" Contract loaded ({len(contract_code)} bytes)") + + client = create_client(chain=localnet, endpoint=api_url) + account = create_account() + client.local_account = account + print(f" Account: {account.address}") + + deploy_hash = client.deploy_contract(code=contract_code, args=[]) + print(f" Deploy tx: {deploy_hash}") + + receipt = client.w3.eth.wait_for_transaction_receipt( + deploy_hash, timeout=tx_timeout + ) + if receipt.status != 1: + print("ERROR: Deployment failed") + return 1 + + time.sleep(3) # Wait for indexing + + contract_address = get_contract_address(api_url, deploy_hash) + print(f" Contract: {contract_address}") + + # --- Verify initial state --- + initial_count = client.read_contract( + address=contract_address, function_name="get_count" + ) + print(f" Initial count: {initial_count}") + assert initial_count == 0, f"Expected initial count 0, got {initial_count}" + print() + + # --- Fire N increment transactions as fast as possible --- + print(f"Sending {num_txs} increment transactions...") + tx_hashes = [] + send_start = time.time() + + for i in range(num_txs): + # Each tx needs its own account to avoid nonce conflicts + sender = create_account() + sender_client = create_client(chain=localnet, endpoint=api_url) + sender_client.local_account = sender + + try: + tx_hash = sender_client.write_contract( + address=contract_address, + function_name="increment", + args=[], + ) + tx_hashes.append(tx_hash) + if (i + 1) % 5 == 0: + print(f" Sent {i + 1}/{num_txs}") + except Exception as e: + print(f" ERROR sending tx {i + 1}: {e}") + + if args.batch_delay > 0: + time.sleep(args.batch_delay) + + send_elapsed = time.time() - send_start + print(f" Sent {len(tx_hashes)}/{num_txs} transactions in {send_elapsed:.1f}s") + print() + + if not tx_hashes: + print("ERROR: No transactions were sent") + return 1 + + # --- Wait for all transactions to finalize --- + print(f"Waiting for {len(tx_hashes)} transactions to finalize...") + wait_start = time.time() + + succeeded = 0 + failed = 0 + + # Wait for receipts in parallel + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool: + futures = { + pool.submit(wait_for_tx, client, tx_hash, tx_timeout): tx_hash + for tx_hash in tx_hashes + } + for future in concurrent.futures.as_completed(futures): + if future.result(): + succeeded += 1 + else: + failed += 1 + done = succeeded + failed + if done % 5 == 0: + print( + f" Finalized {done}/{len(tx_hashes)} (ok={succeeded}, fail={failed})" + ) + + wait_elapsed = time.time() - wait_start + print(f" All done in {wait_elapsed:.1f}s — {succeeded} succeeded, {failed} failed") + print() + + # --- Read final count --- + time.sleep(2) # Brief pause for state to settle + + print("Reading final contract state...") + final_count = client.read_contract( + address=contract_address, function_name="get_count" + ) + print(f" Final count: {final_count}") + print(f" Expected count: {succeeded}") + print() + + # --- Verdict --- + print("=" * 60) + if final_count == succeeded: + print(f" PASS: All {succeeded} increments preserved") + print("=" * 60) + return 0 + else: + lost = succeeded - final_count + loss_rate = (lost / succeeded * 100) if succeeded > 0 else 0 + print(f" FAIL: LOST UPDATE DETECTED") + print(f" Lost {lost}/{succeeded} increments ({loss_rate:.1f}% loss rate)") + print() + print(f" This confirms the concurrent state overwrite bug.") + print(f" See: Rally2/docs/genvm-state-mismatch-bug.md") + print("=" * 60) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) From d446502d3c7761046f1b97809377fd406073c9a0 Mon Sep 17 00:00:00 2001 From: Cristiam Da Silva Date: Mon, 16 Mar 2026 11:08:09 +0100 Subject: [PATCH 2/2] fix(ci): re-create validators before state integrity test The endpoint load test step runs sim_deleteAllValidators and sim_clearDbTables, leaving no validators for the subsequent state integrity test which then fails. --- .github/workflows/load-test-oha.yml | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/.github/workflows/load-test-oha.yml b/.github/workflows/load-test-oha.yml index dcb3980dc..051f2e2ee 100644 --- a/.github/workflows/load-test-oha.yml +++ b/.github/workflows/load-test-oha.yml @@ -493,6 +493,36 @@ jobs: echo "✅ HTML report generated: api_test_report.html" fi + - name: Re-setup validators for state integrity test + run: | + echo "Re-creating validators (previous step cleaned them up)..." + for i in {1..5}; do + echo "Creating validator $i/5..." + response=$(curl -s -X POST http://localhost:4000/api \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "sim_createRandomValidator", + "params": [1], + "id": '"$i"' + }') + if echo "$response" | grep -q '"result"'; then + echo "✅ Validator $i created" + else + echo "❌ Failed to create validator $i: $response" + fi + sleep 2 + done + + echo "Waiting 10 seconds for validators to be registered..." + sleep 10 + + # Verify validators exist + response=$(curl -s -X POST http://localhost:4000/api \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","method":"sim_countValidators","params":[],"id":100}') + echo "Validator count: $response" + - name: Run State Integrity Test (lost update detection) run: | cd tests/load