Skip to content

Commit 457284c

Browse files
fix: prevent concurrent state overwrites (lost update bug) (#1532)
* fix: prevent concurrent state overwrites (lost update bug) Root cause: when two consensus workers process transactions for the same contract concurrently, update_contract_state() does a read-modify-write on the full JSONB blob. The second commit silently overwrites the first's changes because it reads stale state. This caused 336+ lost submissions in Rally production (March 2026). Fixes applied: 1. Advisory locks in all claim CTEs (claim_next_transaction, claim_next_finalization, claim_next_appeal) via pg_try_advisory_xact_lock(hashtext(to_address)) — closes the TOCTOU window where two workers both pass the NOT EXISTS check before either commits blocked_at. 2. SELECT FOR UPDATE + populate_existing() in update_contract_state() — serializes concurrent writes and ensures fresh state is read from DB, preventing cross-field clobber (accepted vs finalized). Also adds: - DB-level regression tests (test_concurrent_state_update.py) - E2E state integrity test (Counter contract + invariant check) - CI runs with CONSENSUS_WORKERS=3 for realistic multi-worker testing * fix(ci): re-create validators before state integrity test The endpoint load test step runs sim_deleteAllValidators and sim_clearDbTables, leaving no validators for the subsequent state integrity test which then fails. --------- Co-authored-by: Cristiam Da Silva <[email protected]>
1 parent b157f07 commit 457284c

7 files changed

Lines changed: 588 additions & 1 deletion

File tree

.github/workflows/load-test-oha.yml

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ jobs:
8484
restore-keys: |
8585
${{ runner.os }}-buildx-
8686
87-
- name: Run Docker Compose
87+
- name: Run Docker Compose with multiple workers
8888
run: docker compose up -d
89+
env:
90+
CONSENSUS_WORKERS: 3
8991

9092
- name: Wait for services to be up
9193
run: |
@@ -491,6 +493,42 @@ jobs:
491493
echo "✅ HTML report generated: api_test_report.html"
492494
fi
493495
496+
- name: Re-setup validators for state integrity test
497+
run: |
498+
echo "Re-creating validators (previous step cleaned them up)..."
499+
for i in {1..5}; do
500+
echo "Creating validator $i/5..."
501+
response=$(curl -s -X POST http://localhost:4000/api \
502+
-H "Content-Type: application/json" \
503+
-d '{
504+
"jsonrpc": "2.0",
505+
"method": "sim_createRandomValidator",
506+
"params": [1],
507+
"id": '"$i"'
508+
}')
509+
if echo "$response" | grep -q '"result"'; then
510+
echo "✅ Validator $i created"
511+
else
512+
echo "❌ Failed to create validator $i: $response"
513+
fi
514+
sleep 2
515+
done
516+
517+
echo "Waiting 10 seconds for validators to be registered..."
518+
sleep 10
519+
520+
# Verify validators exist
521+
response=$(curl -s -X POST http://localhost:4000/api \
522+
-H "Content-Type: application/json" \
523+
-d '{"jsonrpc":"2.0","method":"sim_countValidators","params":[],"id":100}')
524+
echo "Validator count: $response"
525+
526+
- name: Run State Integrity Test (lost update detection)
527+
run: |
528+
cd tests/load
529+
pip install genlayer-py requests
530+
python3 test_state_integrity.py http://localhost:4000/api --txs 20
531+
494532
- name: Shutdown Docker Compose
495533
if: always()
496534
run: docker compose down -v

backend/consensus/worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ async def claim_next_finalization(self, session: Session) -> Optional[dict]:
200200
AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL)
201201
AND t2.hash != t.hash
202202
)
203+
AND pg_try_advisory_xact_lock(hashtext(t.to_address))
203204
ORDER BY t.created_at ASC
204205
FOR UPDATE SKIP LOCKED
205206
),
@@ -305,6 +306,7 @@ async def claim_next_appeal(self, session: Session) -> Optional[dict]:
305306
AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL)
306307
AND t2.hash != t.hash
307308
)
309+
AND pg_try_advisory_xact_lock(hashtext(t.to_address))
308310
ORDER BY t.created_at ASC
309311
FOR UPDATE SKIP LOCKED
310312
),
@@ -410,6 +412,10 @@ async def claim_next_transaction(self, session: Session) -> Optional[dict]:
410412
AND t2.blocked_at > NOW() - CAST(:timeout AS INTERVAL)
411413
AND t2.hash != t.hash
412414
)
415+
-- Atomic per-contract lock to close TOCTOU window in NOT EXISTS.
416+
-- Under READ COMMITTED, two workers can both pass NOT EXISTS before
417+
-- either commits blocked_at. This advisory lock prevents that race.
418+
AND pg_try_advisory_xact_lock(hashtext(t.to_address))
413419
ORDER BY CASE WHEN t.type = 3 THEN 0 ELSE 1 END, t.created_at ASC
414420
FOR UPDATE SKIP LOCKED
415421
),

backend/database_handler/contract_processor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ def update_contract_state(
3434
contract = (
3535
self.session.query(CurrentState)
3636
.filter_by(id=contract_address)
37+
.with_for_update()
38+
.populate_existing()
3739
.one_or_none()
3840
)
3941

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
"""
2+
Regression test for the lost update bug in ContractProcessor.update_contract_state().
3+
4+
The bug: update_contract_state() does a read-modify-write on the full JSONB blob.
5+
When two sessions call it concurrently for the same contract, the second commit
6+
silently overwrites the first's changes because it re-reads stale state.
7+
8+
This is the root cause of 336+ lost submissions in Rally production (March 2026).
9+
See: Rally2/docs/genvm-state-mismatch-bug.md
10+
11+
Production scenario:
12+
- Worker A accepts TX-A → writes accepted_state with TX-A's submission
13+
- Worker B accepts TX-B → reads the SAME pre-TX-A state → writes accepted_state
14+
with TX-B's submission → TX-A's submission is silently erased
15+
"""
16+
17+
import threading
18+
19+
import pytest
20+
from sqlalchemy import Engine
21+
from sqlalchemy.orm import sessionmaker
22+
23+
from backend.database_handler.contract_processor import ContractProcessor
24+
from backend.database_handler.models import CurrentState
25+
26+
27+
CONTRACT_ADDRESS = "0xrace_test_contract"
28+
29+
INITIAL_STATE = {
30+
"accepted": {"slot_a": "original_a"},
31+
"finalized": {"slot_f": "original_f"},
32+
}
33+
34+
35+
def _setup_contract(engine: Engine):
36+
"""Insert a contract with initial state."""
37+
Session_ = sessionmaker(bind=engine)
38+
with Session_() as s:
39+
contract = CurrentState(
40+
id=CONTRACT_ADDRESS,
41+
data={"state": INITIAL_STATE},
42+
)
43+
s.add(contract)
44+
s.commit()
45+
46+
47+
def _read_state(engine: Engine) -> dict:
48+
"""Read the current contract state from a fresh session."""
49+
Session_ = sessionmaker(bind=engine)
50+
with Session_() as s:
51+
row = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
52+
return row.data["state"]
53+
54+
55+
# ---------------------------------------------------------------------------
56+
# Test 1: Two concurrent accepted_state updates — must both survive
57+
# ---------------------------------------------------------------------------
58+
59+
60+
@pytest.mark.xfail(
61+
reason="update_contract_state does full-field replacement by design. "
62+
"Same-field concurrent writes are prevented upstream by advisory locks "
63+
"in worker.py claim CTEs (pg_try_advisory_xact_lock). "
64+
"This test documents the limitation — will pass if state merging is added.",
65+
strict=True,
66+
)
67+
def test_concurrent_accepted_updates_preserve_both(engine: Engine):
68+
"""
69+
Two workers both write accepted_state for the same contract concurrently.
70+
Worker A adds submission_A, Worker B adds submission_B.
71+
72+
This scenario is prevented in production by advisory locks at the worker
73+
claim level. The update_contract_state API does full replacement, so if
74+
two callers pass different complete dicts, the second always wins.
75+
"""
76+
_setup_contract(engine)
77+
78+
barrier = threading.Barrier(2, timeout=5)
79+
errors = []
80+
81+
def worker_a():
82+
try:
83+
Session_ = sessionmaker(bind=engine)
84+
with Session_() as s:
85+
cp = ContractProcessor(s)
86+
# Read current state (sees original)
87+
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
88+
_ = contract.data # force load
89+
barrier.wait() # synchronize with worker B
90+
# Write accepted_state with submission_A
91+
cp.update_contract_state(
92+
CONTRACT_ADDRESS,
93+
accepted_state={"slot_a": "original_a", "submission_A": "scored"},
94+
)
95+
except Exception as e:
96+
errors.append(("A", e))
97+
98+
def worker_b():
99+
try:
100+
Session_ = sessionmaker(bind=engine)
101+
with Session_() as s:
102+
cp = ContractProcessor(s)
103+
# Read current state (sees original — same as worker A)
104+
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
105+
_ = contract.data # force load
106+
barrier.wait() # synchronize with worker A
107+
# Write accepted_state with submission_B
108+
cp.update_contract_state(
109+
CONTRACT_ADDRESS,
110+
accepted_state={"slot_a": "original_a", "submission_B": "scored"},
111+
)
112+
except Exception as e:
113+
errors.append(("B", e))
114+
115+
t_a = threading.Thread(target=worker_a)
116+
t_b = threading.Thread(target=worker_b)
117+
t_a.start()
118+
t_b.start()
119+
t_a.join(timeout=10)
120+
t_b.join(timeout=10)
121+
122+
assert not errors, f"Worker errors: {errors}"
123+
124+
state = _read_state(engine)
125+
126+
has_a = "submission_A" in state["accepted"]
127+
has_b = "submission_B" in state["accepted"]
128+
129+
assert has_a and has_b, (
130+
f"Lost update: concurrent accepted_state writes must both survive. "
131+
f"has_A={has_a}, has_B={has_b}, state={state['accepted']}"
132+
)
133+
134+
135+
# ---------------------------------------------------------------------------
136+
# Test 2: accepted + finalized concurrent updates — must both survive
137+
# ---------------------------------------------------------------------------
138+
139+
140+
def test_concurrent_accepted_and_finalized_preserve_both(engine: Engine):
141+
"""
142+
Worker A writes accepted_state, Worker B writes finalized_state concurrently.
143+
144+
CORRECT behavior: both fields must reflect their respective updates.
145+
This test FAILS until the cross-field clobber bug is fixed.
146+
"""
147+
_setup_contract(engine)
148+
149+
barrier = threading.Barrier(2, timeout=5)
150+
errors = []
151+
152+
def writer_accepted():
153+
try:
154+
Session_ = sessionmaker(bind=engine)
155+
with Session_() as s:
156+
cp = ContractProcessor(s)
157+
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
158+
_ = contract.data
159+
barrier.wait()
160+
cp.update_contract_state(
161+
CONTRACT_ADDRESS,
162+
accepted_state={"slot_a": "updated_by_accepted_writer"},
163+
)
164+
except Exception as e:
165+
errors.append(("accepted", e))
166+
167+
def writer_finalized():
168+
try:
169+
Session_ = sessionmaker(bind=engine)
170+
with Session_() as s:
171+
cp = ContractProcessor(s)
172+
contract = s.query(CurrentState).filter_by(id=CONTRACT_ADDRESS).one()
173+
_ = contract.data
174+
barrier.wait()
175+
cp.update_contract_state(
176+
CONTRACT_ADDRESS,
177+
finalized_state={"slot_f": "updated_by_finalized_writer"},
178+
)
179+
except Exception as e:
180+
errors.append(("finalized", e))
181+
182+
t1 = threading.Thread(target=writer_accepted)
183+
t2 = threading.Thread(target=writer_finalized)
184+
t1.start()
185+
t2.start()
186+
t1.join(timeout=10)
187+
t2.join(timeout=10)
188+
189+
assert not errors, f"Worker errors: {errors}"
190+
191+
state = _read_state(engine)
192+
193+
accepted_updated = state["accepted"].get("slot_a") == "updated_by_accepted_writer"
194+
finalized_updated = (
195+
state["finalized"].get("slot_f") == "updated_by_finalized_writer"
196+
)
197+
198+
assert accepted_updated and finalized_updated, (
199+
f"Cross-field clobber: concurrent accepted + finalized writes must both survive. "
200+
f"accepted={state['accepted']}, finalized={state['finalized']}"
201+
)
202+
203+
204+
# ---------------------------------------------------------------------------
205+
# Test 3: Sequential updates — sanity check (should always pass)
206+
# ---------------------------------------------------------------------------
207+
208+
209+
def test_sequential_updates_preserve_all_state(engine: Engine):
210+
"""
211+
Baseline: sequential updates don't lose data.
212+
This should always pass regardless of the bug.
213+
"""
214+
_setup_contract(engine)
215+
216+
Session_ = sessionmaker(bind=engine)
217+
218+
with Session_() as s:
219+
cp = ContractProcessor(s)
220+
cp.update_contract_state(
221+
CONTRACT_ADDRESS,
222+
accepted_state={"slot_a": "original_a", "submission_A": "scored"},
223+
)
224+
225+
with Session_() as s:
226+
cp = ContractProcessor(s)
227+
cp.update_contract_state(
228+
CONTRACT_ADDRESS,
229+
accepted_state={
230+
"slot_a": "original_a",
231+
"submission_A": "scored",
232+
"submission_B": "scored",
233+
},
234+
)
235+
236+
state = _read_state(engine)
237+
assert state["accepted"]["submission_A"] == "scored"
238+
assert state["accepted"]["submission_B"] == "scored"
239+
assert state["finalized"] == {"slot_f": "original_f"}

tests/load/contracts/counter.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# v0.1.0
2+
# { "Depends": "py-genlayer:1jb45aa8ynh2a9c9xn3b7qqh8sm5q93hwfp7jqmwsfhh8jpz09h6" }
3+
from genlayer import *
4+
5+
6+
class Counter(gl.Contract):
7+
count: int
8+
9+
def __init__(self):
10+
self.count = 0
11+
12+
@gl.public.write
13+
def increment(self) -> None:
14+
self.count += 1
15+
16+
@gl.public.view
17+
def get_count(self) -> int:
18+
return self.count
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#!/bin/bash
2+
#
3+
# State integrity test — detects the lost update bug.
4+
#
5+
# Requires CONSENSUS_WORKERS >= 2 to trigger the race condition.
6+
# With 1 worker, all transactions are serialized and the bug cannot manifest.
7+
#
8+
# Usage:
9+
# # Start with multiple workers:
10+
# CONSENSUS_WORKERS=3 docker compose up -d
11+
#
12+
# # Run the test:
13+
# ./run_state_integrity_test.sh [API_URL] [NUM_TXS]
14+
#
15+
# # Example with more transactions for higher detection probability:
16+
# ./run_state_integrity_test.sh http://localhost:4000/api 50
17+
18+
set -e
19+
20+
API_URL="${1:-http://localhost:4000/api}"
21+
NUM_TXS="${2:-20}"
22+
23+
# Ensure /api suffix
24+
API_URL="${API_URL%/}"
25+
if [[ ! "$API_URL" == */api ]]; then
26+
API_URL="${API_URL}/api"
27+
fi
28+
29+
echo "=== State Integrity Test ==="
30+
echo "API: $API_URL"
31+
echo "Transactions: $NUM_TXS"
32+
echo ""
33+
34+
# Check if services are running
35+
if ! curl -s -X POST -H "Content-Type: application/json" \
36+
-d '{"jsonrpc":"2.0","method":"ping","params":[],"id":1}' \
37+
"$API_URL" 2>/dev/null | grep -q "OK"; then
38+
echo "ERROR: RPC server not running at $API_URL"
39+
echo "Start with: CONSENSUS_WORKERS=3 docker compose up -d"
40+
exit 1
41+
fi
42+
43+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
44+
python3 "$SCRIPT_DIR/test_state_integrity.py" "$API_URL" --txs "$NUM_TXS"

0 commit comments

Comments
 (0)