Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion .github/workflows/load-test-oha.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ jobs:
restore-keys: |
${{ runner.os }}-buildx-

- name: Run Docker Compose
- name: Run Docker Compose with multiple workers
run: docker compose up -d
env:
CONSENSUS_WORKERS: 3

- name: Wait for services to be up
run: |
Expand Down Expand Up @@ -491,6 +493,42 @@ jobs:
echo "✅ HTML report generated: api_test_report.html"
fi

- name: Re-setup validators for state integrity test
run: |
echo "Re-creating validators (previous step cleaned them up)..."
for i in {1..5}; do
echo "Creating validator $i/5..."
response=$(curl -s -X POST http://localhost:4000/api \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "sim_createRandomValidator",
"params": [1],
"id": '"$i"'
}')
if echo "$response" | grep -q '"result"'; then
echo "✅ Validator $i created"
else
echo "❌ Failed to create validator $i: $response"
fi
sleep 2
done

echo "Waiting 10 seconds for validators to be registered..."
sleep 10

# Verify validators exist
response=$(curl -s -X POST http://localhost:4000/api \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"sim_countValidators","params":[],"id":100}')
echo "Validator count: $response"

- name: Run State Integrity Test (lost update detection)
run: |
cd tests/load
pip install genlayer-py requests
python3 test_state_integrity.py http://localhost:4000/api --txs 20

- name: Shutdown Docker Compose
if: always()
run: docker compose down -v
6 changes: 6 additions & 0 deletions backend/consensus/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ async def claim_next_finalization(self, session: Session) -> Optional[dict]:
AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL)
AND t2.hash != t.hash
)
AND pg_try_advisory_xact_lock(hashtext(t.to_address))
ORDER BY t.created_at ASC
FOR UPDATE SKIP LOCKED
),
Expand Down Expand Up @@ -305,6 +306,7 @@ async def claim_next_appeal(self, session: Session) -> Optional[dict]:
AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL)
AND t2.hash != t.hash
)
AND pg_try_advisory_xact_lock(hashtext(t.to_address))
ORDER BY t.created_at ASC
FOR UPDATE SKIP LOCKED
),
Expand Down Expand Up @@ -410,6 +412,10 @@ async def claim_next_transaction(self, session: Session) -> Optional[dict]:
AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL)
AND t2.hash != t.hash
)
-- Atomic per-contract lock to close TOCTOU window in NOT EXISTS.
-- Under READ COMMITTED, two workers can both pass NOT EXISTS before
-- either commits blocked_at. This advisory lock prevents that race.
AND pg_try_advisory_xact_lock(hashtext(t.to_address))
ORDER BY CASE WHEN t.type = 3 THEN 0 ELSE 1 END, t.created_at ASC
FOR UPDATE SKIP LOCKED
),
Expand Down
2 changes: 2 additions & 0 deletions backend/database_handler/contract_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def update_contract_state(
contract = (
self.session.query(CurrentState)
.filter_by(id=contract_address)
.with_for_update()
.populate_existing()
.one_or_none()
)

Expand Down
239 changes: 239 additions & 0 deletions tests/db-sqlalchemy/test_concurrent_state_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
"""
Regression test for the lost update bug in ContractProcessor.update_contract_state().

The bug: update_contract_state() does a read-modify-write on the full JSONB blob.
When two sessions call it concurrently for the same contract, the second commit
silently overwrites the first's changes because it re-reads stale state.

This is the root cause of 336+ lost submissions in Rally production (March 2026).
See: Rally2/docs/genvm-state-mismatch-bug.md

Production scenario:
- Worker A accepts TX-A → writes accepted_state with TX-A's submission
- Worker B accepts TX-B → reads the SAME pre-TX-A state → writes accepted_state
with TX-B's submission → TX-A's submission is silently erased
"""

import threading

import pytest
from sqlalchemy import Engine
from sqlalchemy.orm import sessionmaker

from backend.database_handler.contract_processor import ContractProcessor
from backend.database_handler.models import CurrentState


CONTRACT_ADDRESS = "0xrace_test_contract"

INITIAL_STATE = {
"accepted": {"slot_a": "original_a"},
"finalized": {"slot_f": "original_f"},
}


def _setup_contract(engine: Engine):
"""Insert a contract with initial state."""
Session_ = sessionmaker(bind=engine)
with Session_() as s:
contract = CurrentState(
id=CONTRACT_ADDRESS,
data={"state": INITIAL_STATE},
)
s.add(contract)
s.commit()


def _read_state(engine: Engine) -> dict:
"""Read the current contract state from a fresh session."""
Session_ = sessionmaker(bind=engine)
with Session_() as s:
row = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
return row.data["state"]


# ---------------------------------------------------------------------------
# Test 1: Two concurrent accepted_state updates — must both survive
# ---------------------------------------------------------------------------


@pytest.mark.xfail(
reason="update_contract_state does full-field replacement by design. "
"Same-field concurrent writes are prevented upstream by advisory locks "
"in worker.py claim CTEs (pg_try_advisory_xact_lock). "
"This test documents the limitation — will pass if state merging is added.",
strict=True,
)
def test_concurrent_accepted_updates_preserve_both(engine: Engine):
"""
Two workers both write accepted_state for the same contract concurrently.
Worker A adds submission_A, Worker B adds submission_B.

This scenario is prevented in production by advisory locks at the worker
claim level. The update_contract_state API does full replacement, so if
two callers pass different complete dicts, the second always wins.
"""
_setup_contract(engine)

barrier = threading.Barrier(2, timeout=5)
errors = []

def worker_a():
try:
Session_ = sessionmaker(bind=engine)
with Session_() as s:
cp = ContractProcessor(s)
# Read current state (sees original)
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
_ = contract.data # force load
barrier.wait() # synchronize with worker B
# Write accepted_state with submission_A
cp.update_contract_state(
CONTRACT_ADDRESS,
accepted_state={"slot_a": "original_a", "submission_A": "scored"},
)
except Exception as e:
errors.append(("A", e))

def worker_b():
try:
Session_ = sessionmaker(bind=engine)
with Session_() as s:
cp = ContractProcessor(s)
# Read current state (sees original — same as worker A)
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
_ = contract.data # force load
barrier.wait() # synchronize with worker A
# Write accepted_state with submission_B
cp.update_contract_state(
CONTRACT_ADDRESS,
accepted_state={"slot_a": "original_a", "submission_B": "scored"},
)
except Exception as e:
errors.append(("B", e))

t_a = threading.Thread(target=worker_a)
t_b = threading.Thread(target=worker_b)
t_a.start()
t_b.start()
t_a.join(timeout=10)
t_b.join(timeout=10)

assert not errors, f"Worker errors: {errors}"
Comment on lines +115 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fail fast if a writer thread never exits.

These joins time out, but the test never asserts that both threads actually finished. If the new lock path blocks or deadlocks, the suite can keep going with half-written state or hang at process shutdown.

Suggested fix
     t_a.join(timeout=10)
     t_b.join(timeout=10)
+    assert not t_a.is_alive(), "worker A did not finish"
+    assert not t_b.is_alive(), "worker B did not finish"

Apply the same guard to the second two-thread case below.

Also applies to: 182-189

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/db-sqlalchemy/test_concurrent_state_update.py` around lines 115 - 122,
The test currently joins t_a and t_b with a timeout but doesn't fail if a thread
stayed alive; after t_a.join(timeout=10) and t_b.join(timeout=10) add assertions
that both threads finished (e.g., assert not t_a.is_alive() and assert not
t_b.is_alive() with a clear message like "Writer thread did not exit") so the
test fails fast on deadlock; apply the same change to the other two-thread case
that uses t_a/t_b and worker_a/worker_b later in the file.


state = _read_state(engine)

has_a = "submission_A" in state["accepted"]
has_b = "submission_B" in state["accepted"]

assert has_a and has_b, (
f"Lost update: concurrent accepted_state writes must both survive. "
f"has_A={has_a}, has_B={has_b}, state={state['accepted']}"
)


# ---------------------------------------------------------------------------
# Test 2: accepted + finalized concurrent updates — must both survive
# ---------------------------------------------------------------------------


def test_concurrent_accepted_and_finalized_preserve_both(engine: Engine):
"""
Worker A writes accepted_state, Worker B writes finalized_state concurrently.

CORRECT behavior: both fields must reflect their respective updates.
This test FAILS until the cross-field clobber bug is fixed.
"""
_setup_contract(engine)

barrier = threading.Barrier(2, timeout=5)
errors = []

def writer_accepted():
try:
Session_ = sessionmaker(bind=engine)
with Session_() as s:
cp = ContractProcessor(s)
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
_ = contract.data
barrier.wait()
cp.update_contract_state(
CONTRACT_ADDRESS,
accepted_state={"slot_a": "updated_by_accepted_writer"},
)
except Exception as e:
errors.append(("accepted", e))

def writer_finalized():
try:
Session_ = sessionmaker(bind=engine)
with Session_() as s:
cp = ContractProcessor(s)
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
_ = contract.data
barrier.wait()
cp.update_contract_state(
CONTRACT_ADDRESS,
finalized_state={"slot_f": "updated_by_finalized_writer"},
)
except Exception as e:
errors.append(("finalized", e))

t1 = threading.Thread(target=writer_accepted)
t2 = threading.Thread(target=writer_finalized)
t1.start()
t2.start()
t1.join(timeout=10)
t2.join(timeout=10)

assert not errors, f"Worker errors: {errors}"

state = _read_state(engine)

accepted_updated = state["accepted"].get("slot_a") == "updated_by_accepted_writer"
finalized_updated = (
state["finalized"].get("slot_f") == "updated_by_finalized_writer"
)

assert accepted_updated and finalized_updated, (
f"Cross-field clobber: concurrent accepted + finalized writes must both survive. "
f"accepted={state['accepted']}, finalized={state['finalized']}"
)


# ---------------------------------------------------------------------------
# Test 3: Sequential updates — sanity check (should always pass)
# ---------------------------------------------------------------------------


def test_sequential_updates_preserve_all_state(engine: Engine):
"""
Baseline: sequential updates don't lose data.
This should always pass regardless of the bug.
"""
_setup_contract(engine)

Session_ = sessionmaker(bind=engine)

with Session_() as s:
cp = ContractProcessor(s)
cp.update_contract_state(
CONTRACT_ADDRESS,
accepted_state={"slot_a": "original_a", "submission_A": "scored"},
)

with Session_() as s:
cp = ContractProcessor(s)
cp.update_contract_state(
CONTRACT_ADDRESS,
accepted_state={
"slot_a": "original_a",
"submission_A": "scored",
"submission_B": "scored",
},
)

state = _read_state(engine)
assert state["accepted"]["submission_A"] == "scored"
assert state["accepted"]["submission_B"] == "scored"
assert state["finalized"] == {"slot_f": "original_f"}
18 changes: 18 additions & 0 deletions tests/load/contracts/counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# v0.1.0
# { "Depends": "py-genlayer:1jb45aa8ynh2a9c9xn3b7qqh8sm5q93hwfp7jqmwsfhh8jpz09h6" }
from genlayer import *


class Counter(gl.Contract):
count: int

def __init__(self):
self.count = 0
Comment on lines +3 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace the star import here.

from genlayer import * is already tripping Ruff F403/F405, and this file only uses the gl namespace. While touching it, please also add the missing -> None on __init__.

Suggested fix
-from genlayer import *
+import genlayer as gl
@@
-    def __init__(self):
+    def __init__(self) -> None:

As per coding guidelines, **/*.py: Include type hints in all Python code.

🧰 Tools
🪛 Ruff (0.15.5)

[error] 3-3: from genlayer import * used; unable to detect undefined names

(F403)


[error] 6-6: gl may be undefined, or defined from star imports

(F405)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/load/contracts/counter.py` around lines 3 - 10, Replace the wildcard
import by importing only the gl namespace used (e.g., change "from genlayer
import *" to import gl from genlayer) and add the missing return type on the
Counter.__init__ signature (make __init__(self) -> None) so the class Counter
(subclassing gl.Contract) has an explicit import and correct type hint.


@gl.public.write
def increment(self) -> None:
self.count += 1

@gl.public.view
def get_count(self) -> int:
return self.count
44 changes: 44 additions & 0 deletions tests/load/run_state_integrity_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
#
# State integrity test — detects the lost update bug.
#
# Requires CONSENSUS_WORKERS >= 2 to trigger the race condition.
# With 1 worker, all transactions are serialized and the bug cannot manifest.
#
# Usage:
# # Start with multiple workers:
# CONSENSUS_WORKERS=3 docker compose up -d
#
# # Run the test:
# ./run_state_integrity_test.sh [API_URL] [NUM_TXS]
#
# # Example with more transactions for higher detection probability:
# ./run_state_integrity_test.sh http://localhost:4000/api 50

set -e

API_URL="${1:-http://localhost:4000/api}"
NUM_TXS="${2:-20}"

# Ensure /api suffix
API_URL="${API_URL%/}"
if [[ ! "$API_URL" == */api ]]; then
API_URL="${API_URL}/api"
fi

echo "=== State Integrity Test ==="
echo "API: $API_URL"
echo "Transactions: $NUM_TXS"
echo ""

# Check if services are running
if ! curl -s -X POST -H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"ping","params":[],"id":1}' \
"$API_URL" 2>/dev/null | grep -q "OK"; then
echo "ERROR: RPC server not running at $API_URL"
echo "Start with: CONSENSUS_WORKERS=3 docker compose up -d"
exit 1
fi

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
python3 "$SCRIPT_DIR/test_state_integrity.py" "$API_URL" --txs "$NUM_TXS"
Loading
Loading