From c9c9ba9f8e46c2d6aef424f590c4ed0ad4c8562e Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:18:28 +0200 Subject: [PATCH 1/6] chore: build with race debug --- Dockerfile | 12 +++++++----- build/ci.go | 21 +++++++++------------ 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9b70e9e8a0..4ed668256a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,9 +4,10 @@ ARG VERSION="" ARG BUILDNUM="" # Build Geth in a stock Go builder container -FROM golang:1.24-alpine AS builder +FROM golang:1.24-bookworm AS builder -RUN apk add --no-cache gcc musl-dev linux-headers git +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential git ca-certificates && rm -rf /var/lib/apt/lists/* # Get dependencies - will also be cached if we won't change go.mod/go.sum COPY go.mod /go-ethereum/ @@ -14,12 +15,13 @@ COPY go.sum /go-ethereum/ RUN cd /go-ethereum && go mod download ADD . /go-ethereum -RUN cd /go-ethereum && go run build/ci.go install -static ./cmd/geth +RUN cd /go-ethereum && go run build/ci.go install ./cmd/geth # Pull Geth into a second stage deploy alpine container -FROM alpine:latest +FROM debian:bookworm-slim -RUN apk add --no-cache ca-certificates +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential git ca-certificates && rm -rf /var/lib/apt/lists/* COPY --from=builder /go-ethereum/build/bin/geth /usr/local/bin/ EXPOSE 8545 8546 30303 30303/udp diff --git a/build/ci.go b/build/ci.go index 7fa226ef1f..d70ad5e0e5 100644 --- a/build/ci.go +++ b/build/ci.go @@ -182,10 +182,10 @@ func main() { func doInstall(cmdline []string) { var ( - dlgo = flag.Bool("dlgo", false, "Download Go and build with it") - arch = flag.String("arch", "", "Architecture to cross build for") - cc = flag.String("cc", "", "C compiler to cross build with") - staticlink = flag.Bool("static", false, "Create statically-linked executable") + dlgo = flag.Bool("dlgo", false, "Download Go and build with it") + arch = flag.String("arch", "", "Architecture to cross build for") + cc = flag.String("cc", "", "C compiler to cross build with") + // staticlink = flag.Bool("static", false, "Create statically-linked executable") ) flag.CommandLine.Parse(cmdline) env := build.Env() @@ -205,10 +205,11 @@ func doInstall(cmdline []string) { } // Configure the build. - gobuild := tc.Go("build", buildFlags(env, *staticlink, buildTags)...) + gobuild := tc.Go("build", buildFlags(env, false, buildTags)...) // We use -trimpath to avoid leaking local paths into the built executables. - gobuild.Args = append(gobuild.Args, "-trimpath") + // NOTE(thedevbirb): add also -race flag + gobuild.Args = append(gobuild.Args, "-trimpath", "-race") // Show packages during build. gobuild.Args = append(gobuild.Args, "-v") @@ -376,9 +377,7 @@ func doCheckTidy() { // doCheckGenerate ensures that re-generating generated files does not cause // any mutations in the source file tree. func doCheckGenerate() { - var ( - cachedir = flag.String("cachedir", "./build/cache", "directory for caching binaries.") - ) + cachedir := flag.String("cachedir", "./build/cache", "directory for caching binaries.") // Compute the origin hashes of all the files var hashes map[string][32]byte @@ -444,9 +443,7 @@ func doCheckBadDeps() { // doLint runs golangci-lint on requested packages. func doLint(cmdline []string) { - var ( - cachedir = flag.String("cachedir", "./build/cache", "directory for caching golangci-lint binary.") - ) + cachedir := flag.String("cachedir", "./build/cache", "directory for caching golangci-lint binary.") flag.CommandLine.Parse(cmdline) packages := []string{"./..."} if len(flag.CommandLine.Args()) > 0 { From ff6f3c2b7f705896cb4a5e5612b1fb2fbe540138 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Wed, 27 Aug 2025 17:28:10 +0200 Subject: [PATCH 2/6] fix: concurrent state DB to prevent data races with RPC --- core/blockchain.go | 13 +- core/blockchain_insert.go | 31 +++- core/blockchain_reader.go | 24 ++- core/state/concurrent_statedb.go | 249 ++++++++++++++++++++++++++ core/state/concurrent_statedb_test.go | 204 +++++++++++++++++++++ eth/api_backend.go | 4 +- eth/catalyst/api.go | 18 +- 7 files changed, 513 insertions(+), 30 deletions(-) create mode 100644 core/state/concurrent_statedb.go create mode 100644 core/state/concurrent_statedb_test.go diff --git a/core/blockchain.go b/core/blockchain.go index c53c65e68c..241e95dccf 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -248,8 +248,7 @@ type BlockChain struct { currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block currentSafeBlock atomic.Pointer[types.Header] // Latest (consensus) safe block - currentUnsealedBlock *types.UnsealedBlock // Current unsealed block - unsealedBlockDbState *state.StateDB // StateDB for the current unsealed block + unsealedBlockDbState *state.ConcurrentStateDB // Concurrent StateDB for the current unsealed block (includes metadata) bodyCache *lru.Cache[common.Hash, *types.Body] bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] @@ -343,8 +342,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.currentFinalBlock.Store(nil) bc.currentSafeBlock.Store(nil) - bc.currentUnsealedBlock = nil - // Update chain info data metrics chainInfoGauge.Update(metrics.GaugeInfoValue{"chain_id": bc.chainConfig.ChainID.String()}) @@ -653,14 +650,16 @@ func (bc *BlockChain) SetCurrentUnsealedBlock(block *types.UnsealedBlock) error if err != nil { return err } - bc.unsealedBlockDbState = newState - bc.currentUnsealedBlock = block + + // Create a concurrent StateDB wrapper for thread-safe access + // This ensures state and metadata stay synchronized + concurrentState := state.NewConcurrentStateDB(newState, block) + bc.unsealedBlockDbState = concurrentState return nil } func (bc *BlockChain) ResetCurrentUnsealedBlock() { - bc.currentUnsealedBlock = nil bc.unsealedBlockDbState = nil } diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 13bd56e152..7e09088b20 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -178,7 +178,7 @@ func (it *insertIterator) remaining() int { } func (bc *BlockChain) InsertNewFrag(frag types.Frag) error { - currentUnsealedBlock := bc.CurrentUnsealedBlock() + currentUnsealedBlock := bc.CurrentUnsealedBlockMetadata() if bc.unsealedBlockDbState == nil { return fmt.Errorf("unsealed block state db not set") @@ -211,21 +211,36 @@ func (bc *BlockChain) InsertNewFrag(frag types.Frag) error { Withdrawals: []*types.Withdrawal{}, }) - res, err := bc.Processor().ProcessWithCumulativeGas(block, bc.unsealedBlockDbState, bc.vmConfig, &bc.currentUnsealedBlock.CumulativeGasUsed) + // Begin write transaction to get exclusive access to the state + workingState, err := bc.unsealedBlockDbState.BeginWriteTransaction() + if err != nil { + return fmt.Errorf("failed to begin write transaction: %w", err) + } + + // Process the frag with the working state + res, err := bc.Processor().ProcessWithCumulativeGas(block, workingState.StateDB, bc.vmConfig, &workingState.Metadata.CumulativeGasUsed) if err != nil { + // Rollback on error + bc.unsealedBlockDbState.RollbackWriteTransaction() return err } + // Update the metadata atomically with the state changes for _, receipt := range res.Receipts { - currentUnsealedBlock.CumulativeBlobGasUsed += receipt.BlobGasUsed + workingState.Metadata.CumulativeBlobGasUsed += receipt.BlobGasUsed } - currentUnsealedBlock.Frags = append(currentUnsealedBlock.Frags, frag) - currentUnsealedBlock.LastSequenceNumber = &frag.Seq - currentUnsealedBlock.Receipts = append(currentUnsealedBlock.Receipts, res.Receipts...) - currentUnsealedBlock.Logs = append(currentUnsealedBlock.Logs, res.Logs...) - currentUnsealedBlock.CumulativeGasUsed = res.GasUsed + workingState.Metadata.Frags = append(workingState.Metadata.Frags, frag) + workingState.Metadata.LastSequenceNumber = &frag.Seq + workingState.Metadata.Receipts = append(workingState.Metadata.Receipts, res.Receipts...) + workingState.Metadata.Logs = append(workingState.Metadata.Logs, res.Logs...) + workingState.Metadata.CumulativeGasUsed = res.GasUsed + + // Commit the write transaction (both state and metadata) + if err := bc.unsealedBlockDbState.CommitWriteTransaction(); err != nil { + return fmt.Errorf("failed to commit write transaction: %w", err) + } return nil } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 0e2fc8a2d8..977409ec5b 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -62,12 +62,28 @@ func (bc *BlockChain) CurrentSafeBlock() *types.Header { return bc.currentSafeBlock.Load() } -func (bc *BlockChain) CurrentUnsealedBlock() *types.UnsealedBlock { - return bc.currentUnsealedBlock +func (bc *BlockChain) CurrentUnsealedBlockState() *state.StateDB { + if bc.unsealedBlockDbState == nil { + return nil + } + snapshot := bc.unsealedBlockDbState.GetReadSnapshot() + if snapshot == nil { + return nil + } + return snapshot.StateDB } -func (bc *BlockChain) CurrentUnsealedBlockState() *state.StateDB { - return bc.unsealedBlockDbState +// CurrentUnsealedBlockMetadata returns the current unsealed block metadata +// This is safe to call concurrently and will always be in sync with the state +func (bc *BlockChain) CurrentUnsealedBlockMetadata() *types.UnsealedBlock { + if bc.unsealedBlockDbState == nil { + return nil + } + snapshot := bc.unsealedBlockDbState.GetReadSnapshot() + if snapshot == nil { + return nil + } + return snapshot.Metadata } // HasHeader checks if a block header is present in the database or not, caching diff --git a/core/state/concurrent_statedb.go b/core/state/concurrent_statedb.go new file mode 100644 index 0000000000..55a4b751b9 --- /dev/null +++ b/core/state/concurrent_statedb.go @@ -0,0 +1,249 @@ +package state + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// UnsealedBlockState represents the complete state of an unsealed block +// including both the StateDB and metadata, ensuring they stay synchronized +type UnsealedBlockState struct { + // The StateDB containing account balances, storage, etc. + StateDB *StateDB + + // Metadata about the unsealed block + Metadata *types.UnsealedBlock +} + +// ConcurrentStateDB provides thread-safe access to a StateDB with copy-on-write semantics +// and integrated unsealed block metadata management +type ConcurrentStateDB struct { + mu sync.RWMutex + + // The base state that all snapshots are derived from + baseState *UnsealedBlockState + + // Current working state for writes + workingState *UnsealedBlockState + + // Snapshot counter for tracking versions + snapshotCounter uint64 + + // Channel for notifying when new snapshots are available + snapshotChan chan *UnsealedBlockState + + // Flag indicating if the state is being modified + isModifying atomic.Bool +} + +// NewConcurrentStateDB creates a new concurrent StateDB wrapper +func NewConcurrentStateDB(baseState *StateDB, metadata *types.UnsealedBlock) *ConcurrentStateDB { + return &ConcurrentStateDB{ + baseState: &UnsealedBlockState{ + StateDB: baseState, + Metadata: metadata, + }, + workingState: &UnsealedBlockState{ + StateDB: baseState.Copy(), + Metadata: copyUnsealedBlock(metadata), + }, + snapshotChan: make(chan *UnsealedBlockState, 100), // Buffer for multiple readers + } +} + +// copyUnsealedBlock creates a deep copy of an UnsealedBlock +func copyUnsealedBlock(ub *types.UnsealedBlock) *types.UnsealedBlock { + if ub == nil { + return nil + } + + // Copy the Env + var envCopy *types.Env + if ub.Env != nil { + envCopy = &types.Env{ + Number: ub.Env.Number, + ParentHash: ub.Env.ParentHash, + Beneficiary: ub.Env.Beneficiary, + Timestamp: ub.Env.Timestamp, + GasLimit: ub.Env.GasLimit, + Basefee: ub.Env.Basefee, + Difficulty: ub.Env.Difficulty, + Prevrandao: ub.Env.Prevrandao, + ExtraData: append([]byte{}, ub.Env.ExtraData...), + ParentBeaconBlockRoot: ub.Env.ParentBeaconBlockRoot, + } + } + + // Copy Frags + fragsCopy := make([]types.Frag, len(ub.Frags)) + for i, frag := range ub.Frags { + fragsCopy[i] = types.Frag{ + BlockNumber: frag.BlockNumber, + Seq: frag.Seq, + IsLast: frag.IsLast, + Txs: append([]*types.Transaction{}, frag.Txs...), + } + } + + // Copy LastSequenceNumber + var lastSeqCopy *uint64 + if ub.LastSequenceNumber != nil { + seq := *ub.LastSequenceNumber + lastSeqCopy = &seq + } + + // Copy Receipts + receiptsCopy := make(types.Receipts, len(ub.Receipts)) + for i, receipt := range ub.Receipts { + receiptCopy := *receipt + receiptsCopy[i] = &receiptCopy + } + + // Copy Logs + logsCopy := make([]*types.Log, len(ub.Logs)) + for i, log := range ub.Logs { + logCopy := *log + logsCopy[i] = &logCopy + } + + return &types.UnsealedBlock{ + Env: envCopy, + Frags: fragsCopy, + LastSequenceNumber: lastSeqCopy, + Hash: ub.Hash, + Receipts: receiptsCopy, + Logs: logsCopy, + CumulativeGasUsed: ub.CumulativeGasUsed, + CumulativeBlobGasUsed: ub.CumulativeBlobGasUsed, + } +} + +// GetReadSnapshot returns a read-only snapshot of the current state +// This is safe to call concurrently with writes +func (cs *ConcurrentStateDB) GetReadSnapshot() *UnsealedBlockState { + cs.mu.RLock() + defer cs.mu.RUnlock() + + // Create a copy of the current working state for reading + snapshot := &UnsealedBlockState{ + StateDB: cs.workingState.StateDB.Copy(), + Metadata: copyUnsealedBlock(cs.workingState.Metadata), + } + + // Increment snapshot counter + atomic.AddUint64(&cs.snapshotCounter, 1) + + return snapshot +} + +// BeginWriteTransaction starts a write transaction +// Only one write transaction can be active at a time +func (cs *ConcurrentStateDB) BeginWriteTransaction() (*UnsealedBlockState, error) { + if !cs.isModifying.CompareAndSwap(false, true) { + return nil, ErrConcurrentModification + } + + cs.mu.Lock() + + // Create a new working state for this transaction + cs.workingState = &UnsealedBlockState{ + StateDB: cs.workingState.StateDB.Copy(), + Metadata: copyUnsealedBlock(cs.workingState.Metadata), + } + + return cs.workingState, nil +} + +// CommitWriteTransaction commits the write transaction and makes it available to readers +func (cs *ConcurrentStateDB) CommitWriteTransaction() error { + defer func() { + cs.mu.Unlock() + cs.isModifying.Store(false) + }() + + // Notify readers that a new snapshot is available + select { + case cs.snapshotChan <- &UnsealedBlockState{ + StateDB: cs.workingState.StateDB.Copy(), + Metadata: copyUnsealedBlock(cs.workingState.Metadata), + }: + default: + // Channel is full, readers will get the latest state on next read + log.Debug("Snapshot notification channel full, readers will get latest state on next read") + } + + return nil +} + +// RollbackWriteTransaction rolls back the write transaction +func (cs *ConcurrentStateDB) RollbackWriteTransaction() { + defer func() { + cs.mu.Unlock() + cs.isModifying.Store(false) + }() + + // Restore the working state to the previous version + cs.workingState = &UnsealedBlockState{ + StateDB: cs.baseState.StateDB.Copy(), + Metadata: copyUnsealedBlock(cs.baseState.Metadata), + } +} + +// GetLatestSnapshot returns the most recent snapshot available +func (cs *ConcurrentStateDB) GetLatestSnapshot() *UnsealedBlockState { + cs.mu.RLock() + defer cs.mu.RUnlock() + + // Try to get a snapshot from the channel first + select { + case snapshot := <-cs.snapshotChan: + return snapshot + default: + // No snapshot in channel, return current working state + return &UnsealedBlockState{ + StateDB: cs.workingState.StateDB.Copy(), + Metadata: copyUnsealedBlock(cs.workingState.Metadata), + } + } +} + +// WaitForSnapshot waits for the next snapshot to become available +func (cs *ConcurrentStateDB) WaitForSnapshot() *UnsealedBlockState { + snapshot := <-cs.snapshotChan + return snapshot +} + +// GetBaseState returns the base state (for internal use) +func (cs *ConcurrentStateDB) GetBaseState() *UnsealedBlockState { + return cs.baseState +} + +// GetWorkingState returns the current working state (for internal use) +func (cs *ConcurrentStateDB) GetWorkingState() *UnsealedBlockState { + return cs.workingState +} + +// IsModifying returns true if a write transaction is currently active +func (cs *ConcurrentStateDB) IsModifying() bool { + return cs.isModifying.Load() +} + +// GetSnapshotCounter returns the current snapshot counter +func (cs *ConcurrentStateDB) GetSnapshotCounter() uint64 { + return atomic.LoadUint64(&cs.snapshotCounter) +} + +// Close closes the concurrent StateDB and cleans up resources +func (cs *ConcurrentStateDB) Close() error { + close(cs.snapshotChan) + return nil +} + +// Error definitions +var ( + ErrConcurrentModification = fmt.Errorf("concurrent modification not allowed") +) diff --git a/core/state/concurrent_statedb_test.go b/core/state/concurrent_statedb_test.go new file mode 100644 index 0000000000..35899dd2cc --- /dev/null +++ b/core/state/concurrent_statedb_test.go @@ -0,0 +1,204 @@ +package state + +import ( + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/tracing" + "github.com/holiman/uint256" + "github.com/stretchr/testify/assert" +) + +func TestConcurrentStateDB_BasicOperations(t *testing.T) { + // Create a base state + baseState, err := New(common.Hash{}, NewDatabaseForTesting()) + assert.NoError(t, err) + + // Set some initial data + addr := common.HexToAddress("0x123") + baseState.SetBalance(addr, uint256.NewInt(100), tracing.BalanceChangeUnspecified) + + // Create concurrent StateDB + cs := NewConcurrentStateDB(baseState) + defer cs.Close() + + // Test read snapshot + snapshot := cs.GetReadSnapshot() + assert.NotNil(t, snapshot) + assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) + + // Test write transaction + workingState, err := cs.BeginWriteTransaction() + assert.NoError(t, err) + assert.True(t, cs.IsModifying()) + + // Modify the working state + workingState.SetBalance(addr, uint256.NewInt(200), tracing.BalanceChangeUnspecified) + + // Commit the transaction + err = cs.CommitWriteTransaction() + assert.NoError(t, err) + assert.False(t, cs.IsModifying()) + + // Verify the change is visible in new snapshots + newSnapshot := cs.GetReadSnapshot() + assert.Equal(t, uint256.NewInt(200), newSnapshot.GetBalance(addr)) + + // Original snapshot should still show old value + assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) +} + +func TestConcurrentStateDB_ConcurrentReads(t *testing.T) { + // Create a base state + baseState, err := New(common.Hash{}, NewDatabaseForTesting()) + assert.NoError(t, err) + + // Set some initial data + addr := common.HexToAddress("0x123") + baseState.SetBalance(addr, uint256.NewInt(100), tracing.BalanceChangeUnspecified) + + // Create concurrent StateDB + cs := NewConcurrentStateDB(baseState) + defer cs.Close() + + // Test concurrent reads + var wg sync.WaitGroup + readCount := 100 + + for i := 0; i < readCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + snapshot := cs.GetReadSnapshot() + assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) + }() + } + + wg.Wait() +} + +func TestConcurrentStateDB_WriteTransactionExclusivity(t *testing.T) { + // Create a base state + baseState, err := New(common.Hash{}, NewDatabaseForTesting()) + assert.NoError(t, err) + + // Create concurrent StateDB + cs := NewConcurrentStateDB(baseState) + defer cs.Close() + + // Start first write transaction + _, err = cs.BeginWriteTransaction() + assert.NoError(t, err) + assert.True(t, cs.IsModifying()) + + // Try to start second write transaction (should fail) + workingState2, err := cs.BeginWriteTransaction() + assert.Error(t, err) + assert.Equal(t, ErrConcurrentModification, err) + assert.Nil(t, workingState2) + + // Commit first transaction + err = cs.CommitWriteTransaction() + assert.NoError(t, err) + assert.False(t, cs.IsModifying()) + + // Now should be able to start second transaction + _, err = cs.BeginWriteTransaction() + assert.NoError(t, err) + assert.True(t, cs.IsModifying()) + + // Clean up + cs.RollbackWriteTransaction() +} + +func TestConcurrentStateDB_Rollback(t *testing.T) { + // Create a base state + baseState, err := New(common.Hash{}, NewDatabaseForTesting()) + assert.NoError(t, err) + + // Set initial data + addr := common.HexToAddress("0x123") + baseState.SetBalance(addr, uint256.NewInt(100), tracing.BalanceChangeUnspecified) + + // Create concurrent StateDB + cs := NewConcurrentStateDB(baseState) + defer cs.Close() + + // Start write transaction + workingState, err := cs.BeginWriteTransaction() + assert.NoError(t, err) + + // Modify state + workingState.SetBalance(addr, uint256.NewInt(200), tracing.BalanceChangeUnspecified) + + // Rollback + cs.RollbackWriteTransaction() + assert.False(t, cs.IsModifying()) + + // Verify rollback worked + snapshot := cs.GetReadSnapshot() + assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) +} + +func TestConcurrentStateDB_SnapshotNotifications(t *testing.T) { + // Create a base state + baseState, err := New(common.Hash{}, NewDatabaseForTesting()) + assert.NoError(t, err) + + // Create concurrent StateDB + cs := NewConcurrentStateDB(baseState) + defer cs.Close() + + // Start write transaction + workingState, err := cs.BeginWriteTransaction() + assert.NoError(t, err) + + // Modify state + addr := common.HexToAddress("0x123") + workingState.SetBalance(addr, uint256.NewInt(200), tracing.BalanceChangeUnspecified) + + // Start goroutine to wait for snapshot + var receivedSnapshot *StateDB + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + receivedSnapshot = cs.WaitForSnapshot() + }() + + // Give some time for goroutine to start + time.Sleep(10 * time.Millisecond) + + // Commit transaction + err = cs.CommitWriteTransaction() + assert.NoError(t, err) + + // Wait for snapshot to be received + wg.Wait() + + // Verify snapshot was received + assert.NotNil(t, receivedSnapshot) + assert.Equal(t, uint256.NewInt(200), receivedSnapshot.GetBalance(addr)) +} + +func TestConcurrentStateDB_SnapshotCounter(t *testing.T) { + // Create a base state + baseState, err := New(common.Hash{}, NewDatabaseForTesting()) + assert.NoError(t, err) + + // Create concurrent StateDB + cs := NewConcurrentStateDB(baseState) + defer cs.Close() + + initialCounter := cs.GetSnapshotCounter() + + // Get a few snapshots + cs.GetReadSnapshot() + cs.GetReadSnapshot() + cs.GetReadSnapshot() + + finalCounter := cs.GetSnapshotCounter() + assert.Equal(t, initialCounter+3, finalCounter) +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 023325625d..9740bbfe23 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -205,7 +205,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B if number == rpc.LatestBlockNumber { stateDb := b.eth.BlockChain().CurrentUnsealedBlockState() if stateDb != nil { - return stateDb, b.eth.BlockChain().CurrentUnsealedBlock().TempHeader(), nil + return stateDb, b.eth.BlockChain().CurrentUnsealedBlockMetadata().TempHeader(), nil } } @@ -473,5 +473,5 @@ func (b *EthAPIBackend) Genesis() *types.Block { } func (b *EthAPIBackend) GetUnsealedBlock() *types.UnsealedBlock { - return b.eth.blockchain.CurrentUnsealedBlock() + return b.eth.blockchain.CurrentUnsealedBlockMetadata() } diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index d930244e9f..dbb5cee3c6 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -387,7 +387,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl if rawdb.ReadCanonicalHash(api.eth.ChainDb(), block.NumberU64()) != update.HeadBlockHash { // Block is not canonical, set head. if latestValid, err := api.eth.BlockChain().SetCanonical(block); err != nil { - bc := api.eth.BlockChain().CurrentUnsealedBlock() + bc := api.eth.BlockChain().CurrentUnsealedBlockMetadata() if bc != nil { if bc.Env.Number == block.NumberU64() { log.Info("Ignoring current unsealed block", "number", block.NumberU64(), "hash", update.HeadBlockHash, "age", common.PrettyAge(time.Unix(int64(block.Time()), 0)), "have", api.eth.BlockChain().CurrentBlock().Number) @@ -1371,13 +1371,13 @@ func (api *ConsensusAPI) NewFragV0(frag engine.SignedNewFrag) (string, error) { } func (api *ConsensusAPI) newFragV0(f engine.SignedNewFrag) (string, error) { - ub := api.eth.BlockChain().CurrentUnsealedBlock() + ub := api.eth.BlockChain().CurrentUnsealedBlockMetadata() if err := api.ValidateNewFragV0(f, ub); err != nil { log.Error("frag is invalid", "error", err) return engine.INVALID, err } - if f.Frag.BlockNumber < api.eth.BlockChain().CurrentUnsealedBlock().Env.Number { + if f.Frag.BlockNumber < api.eth.BlockChain().CurrentUnsealedBlockMetadata().Env.Number { return engine.VALID, nil } @@ -1440,7 +1440,7 @@ func (api *ConsensusAPI) ValidateNewFragV0(frag engine.SignedNewFrag, currentUns func (api *ConsensusAPI) SealFragV0(seal engine.SignedSeal) (string, error) { log.Info("seal received", "forBlock", seal.Seal.BlockNumber, "current", api.eth.BlockChain().CurrentBlock().Number, "seal", seal.Seal) - if api.eth.BlockChain().CurrentUnsealedBlock() != nil && api.eth.BlockChain().CurrentUnsealedBlock().Env.Number > seal.Seal.BlockNumber { + if api.eth.BlockChain().CurrentUnsealedBlockMetadata() != nil && api.eth.BlockChain().CurrentUnsealedBlockMetadata().Env.Number > seal.Seal.BlockNumber { log.Info("seal was outdated, dropping") return engine.VALID, nil } @@ -1479,7 +1479,7 @@ func (api *ConsensusAPI) sealFragV0(seal engine.SignedSeal) (string, error) { } func (api *ConsensusAPI) ValidateSealFragV0(preSealedBlock *types.Block, seal engine.SignedSeal) error { - if !types.IsOpened(api.eth.BlockChain().CurrentUnsealedBlock()) { + if !types.IsOpened(api.eth.BlockChain().CurrentUnsealedBlockMetadata()) { return errors.New("no unsealed block in progress") } @@ -1511,8 +1511,8 @@ func (api *ConsensusAPI) ValidateSealFragV0(preSealedBlock *types.Block, seal en return fmt.Errorf("gas limit mismatch, expected %v, got %v", preSealedBlock.GasLimit(), seal.Seal.GasLimit) } - if len(api.eth.BlockChain().CurrentUnsealedBlock().Frags) != int(seal.Seal.TotalFrags) { - return fmt.Errorf("total frags mismatch, expected %v, got %v", len(api.eth.BlockChain().CurrentUnsealedBlock().Frags), seal.Seal.TotalFrags) + if len(api.eth.BlockChain().CurrentUnsealedBlockMetadata().Frags) != int(seal.Seal.TotalFrags) { + return fmt.Errorf("total frags mismatch, expected %v, got %v", len(api.eth.BlockChain().CurrentUnsealedBlockMetadata().Frags), seal.Seal.TotalFrags) } return nil @@ -1537,7 +1537,7 @@ func (api *ConsensusAPI) EnvV0(env engine.SignedEnv) (string, error) { func (api *ConsensusAPI) envV0(env engine.SignedEnv) (string, error) { if err := api.ValidateEnvV0(env); err != nil { - if api.eth.BlockChain().CurrentUnsealedBlock() != nil && api.eth.BlockChain().CurrentUnsealedBlock().Env.Number < env.Env.Number { + if api.eth.BlockChain().CurrentUnsealedBlockMetadata() != nil && api.eth.BlockChain().CurrentUnsealedBlockMetadata().Env.Number < env.Env.Number { api.eth.BlockChain().ResetCurrentUnsealedBlock() } else { return engine.INVALID, err @@ -1570,7 +1570,7 @@ func (api *ConsensusAPI) ValidateEnvV0(env engine.SignedEnv) error { parentHeader := parent.Header() // Check that there's no unsealed block in progress - if api.eth.BlockChain().CurrentUnsealedBlock() != nil { + if api.eth.BlockChain().CurrentUnsealedBlockMetadata() != nil { return errors.New("cannot open a new unsealed block while there's one already in progress") } From 0dd7400d86660f6cab0cabda897cf6df5b673fe3 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Thu, 28 Aug 2025 10:16:53 +0200 Subject: [PATCH 3/6] Revert "fix: concurrent state DB to prevent data races with RPC" This reverts commit ff6f3c2b7f705896cb4a5e5612b1fb2fbe540138. --- core/blockchain.go | 13 +- core/blockchain_insert.go | 31 +--- core/blockchain_reader.go | 24 +-- core/state/concurrent_statedb.go | 249 -------------------------- core/state/concurrent_statedb_test.go | 204 --------------------- eth/api_backend.go | 4 +- eth/catalyst/api.go | 18 +- 7 files changed, 30 insertions(+), 513 deletions(-) delete mode 100644 core/state/concurrent_statedb.go delete mode 100644 core/state/concurrent_statedb_test.go diff --git a/core/blockchain.go b/core/blockchain.go index 241e95dccf..c53c65e68c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -248,7 +248,8 @@ type BlockChain struct { currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block currentSafeBlock atomic.Pointer[types.Header] // Latest (consensus) safe block - unsealedBlockDbState *state.ConcurrentStateDB // Concurrent StateDB for the current unsealed block (includes metadata) + currentUnsealedBlock *types.UnsealedBlock // Current unsealed block + unsealedBlockDbState *state.StateDB // StateDB for the current unsealed block bodyCache *lru.Cache[common.Hash, *types.Body] bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] @@ -342,6 +343,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.currentFinalBlock.Store(nil) bc.currentSafeBlock.Store(nil) + bc.currentUnsealedBlock = nil + // Update chain info data metrics chainInfoGauge.Update(metrics.GaugeInfoValue{"chain_id": bc.chainConfig.ChainID.String()}) @@ -650,16 +653,14 @@ func (bc *BlockChain) SetCurrentUnsealedBlock(block *types.UnsealedBlock) error if err != nil { return err } - - // Create a concurrent StateDB wrapper for thread-safe access - // This ensures state and metadata stay synchronized - concurrentState := state.NewConcurrentStateDB(newState, block) - bc.unsealedBlockDbState = concurrentState + bc.unsealedBlockDbState = newState + bc.currentUnsealedBlock = block return nil } func (bc *BlockChain) ResetCurrentUnsealedBlock() { + bc.currentUnsealedBlock = nil bc.unsealedBlockDbState = nil } diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 7e09088b20..13bd56e152 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -178,7 +178,7 @@ func (it *insertIterator) remaining() int { } func (bc *BlockChain) InsertNewFrag(frag types.Frag) error { - currentUnsealedBlock := bc.CurrentUnsealedBlockMetadata() + currentUnsealedBlock := bc.CurrentUnsealedBlock() if bc.unsealedBlockDbState == nil { return fmt.Errorf("unsealed block state db not set") @@ -211,36 +211,21 @@ func (bc *BlockChain) InsertNewFrag(frag types.Frag) error { Withdrawals: []*types.Withdrawal{}, }) - // Begin write transaction to get exclusive access to the state - workingState, err := bc.unsealedBlockDbState.BeginWriteTransaction() - if err != nil { - return fmt.Errorf("failed to begin write transaction: %w", err) - } - - // Process the frag with the working state - res, err := bc.Processor().ProcessWithCumulativeGas(block, workingState.StateDB, bc.vmConfig, &workingState.Metadata.CumulativeGasUsed) + res, err := bc.Processor().ProcessWithCumulativeGas(block, bc.unsealedBlockDbState, bc.vmConfig, &bc.currentUnsealedBlock.CumulativeGasUsed) if err != nil { - // Rollback on error - bc.unsealedBlockDbState.RollbackWriteTransaction() return err } - // Update the metadata atomically with the state changes for _, receipt := range res.Receipts { - workingState.Metadata.CumulativeBlobGasUsed += receipt.BlobGasUsed + currentUnsealedBlock.CumulativeBlobGasUsed += receipt.BlobGasUsed } - workingState.Metadata.Frags = append(workingState.Metadata.Frags, frag) - workingState.Metadata.LastSequenceNumber = &frag.Seq - workingState.Metadata.Receipts = append(workingState.Metadata.Receipts, res.Receipts...) - workingState.Metadata.Logs = append(workingState.Metadata.Logs, res.Logs...) - workingState.Metadata.CumulativeGasUsed = res.GasUsed - - // Commit the write transaction (both state and metadata) - if err := bc.unsealedBlockDbState.CommitWriteTransaction(); err != nil { - return fmt.Errorf("failed to commit write transaction: %w", err) - } + currentUnsealedBlock.Frags = append(currentUnsealedBlock.Frags, frag) + currentUnsealedBlock.LastSequenceNumber = &frag.Seq + currentUnsealedBlock.Receipts = append(currentUnsealedBlock.Receipts, res.Receipts...) + currentUnsealedBlock.Logs = append(currentUnsealedBlock.Logs, res.Logs...) + currentUnsealedBlock.CumulativeGasUsed = res.GasUsed return nil } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 977409ec5b..0e2fc8a2d8 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -62,28 +62,12 @@ func (bc *BlockChain) CurrentSafeBlock() *types.Header { return bc.currentSafeBlock.Load() } -func (bc *BlockChain) CurrentUnsealedBlockState() *state.StateDB { - if bc.unsealedBlockDbState == nil { - return nil - } - snapshot := bc.unsealedBlockDbState.GetReadSnapshot() - if snapshot == nil { - return nil - } - return snapshot.StateDB +func (bc *BlockChain) CurrentUnsealedBlock() *types.UnsealedBlock { + return bc.currentUnsealedBlock } -// CurrentUnsealedBlockMetadata returns the current unsealed block metadata -// This is safe to call concurrently and will always be in sync with the state -func (bc *BlockChain) CurrentUnsealedBlockMetadata() *types.UnsealedBlock { - if bc.unsealedBlockDbState == nil { - return nil - } - snapshot := bc.unsealedBlockDbState.GetReadSnapshot() - if snapshot == nil { - return nil - } - return snapshot.Metadata +func (bc *BlockChain) CurrentUnsealedBlockState() *state.StateDB { + return bc.unsealedBlockDbState } // HasHeader checks if a block header is present in the database or not, caching diff --git a/core/state/concurrent_statedb.go b/core/state/concurrent_statedb.go deleted file mode 100644 index 55a4b751b9..0000000000 --- a/core/state/concurrent_statedb.go +++ /dev/null @@ -1,249 +0,0 @@ -package state - -import ( - "fmt" - "sync" - "sync/atomic" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -// UnsealedBlockState represents the complete state of an unsealed block -// including both the StateDB and metadata, ensuring they stay synchronized -type UnsealedBlockState struct { - // The StateDB containing account balances, storage, etc. - StateDB *StateDB - - // Metadata about the unsealed block - Metadata *types.UnsealedBlock -} - -// ConcurrentStateDB provides thread-safe access to a StateDB with copy-on-write semantics -// and integrated unsealed block metadata management -type ConcurrentStateDB struct { - mu sync.RWMutex - - // The base state that all snapshots are derived from - baseState *UnsealedBlockState - - // Current working state for writes - workingState *UnsealedBlockState - - // Snapshot counter for tracking versions - snapshotCounter uint64 - - // Channel for notifying when new snapshots are available - snapshotChan chan *UnsealedBlockState - - // Flag indicating if the state is being modified - isModifying atomic.Bool -} - -// NewConcurrentStateDB creates a new concurrent StateDB wrapper -func NewConcurrentStateDB(baseState *StateDB, metadata *types.UnsealedBlock) *ConcurrentStateDB { - return &ConcurrentStateDB{ - baseState: &UnsealedBlockState{ - StateDB: baseState, - Metadata: metadata, - }, - workingState: &UnsealedBlockState{ - StateDB: baseState.Copy(), - Metadata: copyUnsealedBlock(metadata), - }, - snapshotChan: make(chan *UnsealedBlockState, 100), // Buffer for multiple readers - } -} - -// copyUnsealedBlock creates a deep copy of an UnsealedBlock -func copyUnsealedBlock(ub *types.UnsealedBlock) *types.UnsealedBlock { - if ub == nil { - return nil - } - - // Copy the Env - var envCopy *types.Env - if ub.Env != nil { - envCopy = &types.Env{ - Number: ub.Env.Number, - ParentHash: ub.Env.ParentHash, - Beneficiary: ub.Env.Beneficiary, - Timestamp: ub.Env.Timestamp, - GasLimit: ub.Env.GasLimit, - Basefee: ub.Env.Basefee, - Difficulty: ub.Env.Difficulty, - Prevrandao: ub.Env.Prevrandao, - ExtraData: append([]byte{}, ub.Env.ExtraData...), - ParentBeaconBlockRoot: ub.Env.ParentBeaconBlockRoot, - } - } - - // Copy Frags - fragsCopy := make([]types.Frag, len(ub.Frags)) - for i, frag := range ub.Frags { - fragsCopy[i] = types.Frag{ - BlockNumber: frag.BlockNumber, - Seq: frag.Seq, - IsLast: frag.IsLast, - Txs: append([]*types.Transaction{}, frag.Txs...), - } - } - - // Copy LastSequenceNumber - var lastSeqCopy *uint64 - if ub.LastSequenceNumber != nil { - seq := *ub.LastSequenceNumber - lastSeqCopy = &seq - } - - // Copy Receipts - receiptsCopy := make(types.Receipts, len(ub.Receipts)) - for i, receipt := range ub.Receipts { - receiptCopy := *receipt - receiptsCopy[i] = &receiptCopy - } - - // Copy Logs - logsCopy := make([]*types.Log, len(ub.Logs)) - for i, log := range ub.Logs { - logCopy := *log - logsCopy[i] = &logCopy - } - - return &types.UnsealedBlock{ - Env: envCopy, - Frags: fragsCopy, - LastSequenceNumber: lastSeqCopy, - Hash: ub.Hash, - Receipts: receiptsCopy, - Logs: logsCopy, - CumulativeGasUsed: ub.CumulativeGasUsed, - CumulativeBlobGasUsed: ub.CumulativeBlobGasUsed, - } -} - -// GetReadSnapshot returns a read-only snapshot of the current state -// This is safe to call concurrently with writes -func (cs *ConcurrentStateDB) GetReadSnapshot() *UnsealedBlockState { - cs.mu.RLock() - defer cs.mu.RUnlock() - - // Create a copy of the current working state for reading - snapshot := &UnsealedBlockState{ - StateDB: cs.workingState.StateDB.Copy(), - Metadata: copyUnsealedBlock(cs.workingState.Metadata), - } - - // Increment snapshot counter - atomic.AddUint64(&cs.snapshotCounter, 1) - - return snapshot -} - -// BeginWriteTransaction starts a write transaction -// Only one write transaction can be active at a time -func (cs *ConcurrentStateDB) BeginWriteTransaction() (*UnsealedBlockState, error) { - if !cs.isModifying.CompareAndSwap(false, true) { - return nil, ErrConcurrentModification - } - - cs.mu.Lock() - - // Create a new working state for this transaction - cs.workingState = &UnsealedBlockState{ - StateDB: cs.workingState.StateDB.Copy(), - Metadata: copyUnsealedBlock(cs.workingState.Metadata), - } - - return cs.workingState, nil -} - -// CommitWriteTransaction commits the write transaction and makes it available to readers -func (cs *ConcurrentStateDB) CommitWriteTransaction() error { - defer func() { - cs.mu.Unlock() - cs.isModifying.Store(false) - }() - - // Notify readers that a new snapshot is available - select { - case cs.snapshotChan <- &UnsealedBlockState{ - StateDB: cs.workingState.StateDB.Copy(), - Metadata: copyUnsealedBlock(cs.workingState.Metadata), - }: - default: - // Channel is full, readers will get the latest state on next read - log.Debug("Snapshot notification channel full, readers will get latest state on next read") - } - - return nil -} - -// RollbackWriteTransaction rolls back the write transaction -func (cs *ConcurrentStateDB) RollbackWriteTransaction() { - defer func() { - cs.mu.Unlock() - cs.isModifying.Store(false) - }() - - // Restore the working state to the previous version - cs.workingState = &UnsealedBlockState{ - StateDB: cs.baseState.StateDB.Copy(), - Metadata: copyUnsealedBlock(cs.baseState.Metadata), - } -} - -// GetLatestSnapshot returns the most recent snapshot available -func (cs *ConcurrentStateDB) GetLatestSnapshot() *UnsealedBlockState { - cs.mu.RLock() - defer cs.mu.RUnlock() - - // Try to get a snapshot from the channel first - select { - case snapshot := <-cs.snapshotChan: - return snapshot - default: - // No snapshot in channel, return current working state - return &UnsealedBlockState{ - StateDB: cs.workingState.StateDB.Copy(), - Metadata: copyUnsealedBlock(cs.workingState.Metadata), - } - } -} - -// WaitForSnapshot waits for the next snapshot to become available -func (cs *ConcurrentStateDB) WaitForSnapshot() *UnsealedBlockState { - snapshot := <-cs.snapshotChan - return snapshot -} - -// GetBaseState returns the base state (for internal use) -func (cs *ConcurrentStateDB) GetBaseState() *UnsealedBlockState { - return cs.baseState -} - -// GetWorkingState returns the current working state (for internal use) -func (cs *ConcurrentStateDB) GetWorkingState() *UnsealedBlockState { - return cs.workingState -} - -// IsModifying returns true if a write transaction is currently active -func (cs *ConcurrentStateDB) IsModifying() bool { - return cs.isModifying.Load() -} - -// GetSnapshotCounter returns the current snapshot counter -func (cs *ConcurrentStateDB) GetSnapshotCounter() uint64 { - return atomic.LoadUint64(&cs.snapshotCounter) -} - -// Close closes the concurrent StateDB and cleans up resources -func (cs *ConcurrentStateDB) Close() error { - close(cs.snapshotChan) - return nil -} - -// Error definitions -var ( - ErrConcurrentModification = fmt.Errorf("concurrent modification not allowed") -) diff --git a/core/state/concurrent_statedb_test.go b/core/state/concurrent_statedb_test.go deleted file mode 100644 index 35899dd2cc..0000000000 --- a/core/state/concurrent_statedb_test.go +++ /dev/null @@ -1,204 +0,0 @@ -package state - -import ( - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/tracing" - "github.com/holiman/uint256" - "github.com/stretchr/testify/assert" -) - -func TestConcurrentStateDB_BasicOperations(t *testing.T) { - // Create a base state - baseState, err := New(common.Hash{}, NewDatabaseForTesting()) - assert.NoError(t, err) - - // Set some initial data - addr := common.HexToAddress("0x123") - baseState.SetBalance(addr, uint256.NewInt(100), tracing.BalanceChangeUnspecified) - - // Create concurrent StateDB - cs := NewConcurrentStateDB(baseState) - defer cs.Close() - - // Test read snapshot - snapshot := cs.GetReadSnapshot() - assert.NotNil(t, snapshot) - assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) - - // Test write transaction - workingState, err := cs.BeginWriteTransaction() - assert.NoError(t, err) - assert.True(t, cs.IsModifying()) - - // Modify the working state - workingState.SetBalance(addr, uint256.NewInt(200), tracing.BalanceChangeUnspecified) - - // Commit the transaction - err = cs.CommitWriteTransaction() - assert.NoError(t, err) - assert.False(t, cs.IsModifying()) - - // Verify the change is visible in new snapshots - newSnapshot := cs.GetReadSnapshot() - assert.Equal(t, uint256.NewInt(200), newSnapshot.GetBalance(addr)) - - // Original snapshot should still show old value - assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) -} - -func TestConcurrentStateDB_ConcurrentReads(t *testing.T) { - // Create a base state - baseState, err := New(common.Hash{}, NewDatabaseForTesting()) - assert.NoError(t, err) - - // Set some initial data - addr := common.HexToAddress("0x123") - baseState.SetBalance(addr, uint256.NewInt(100), tracing.BalanceChangeUnspecified) - - // Create concurrent StateDB - cs := NewConcurrentStateDB(baseState) - defer cs.Close() - - // Test concurrent reads - var wg sync.WaitGroup - readCount := 100 - - for i := 0; i < readCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - snapshot := cs.GetReadSnapshot() - assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) - }() - } - - wg.Wait() -} - -func TestConcurrentStateDB_WriteTransactionExclusivity(t *testing.T) { - // Create a base state - baseState, err := New(common.Hash{}, NewDatabaseForTesting()) - assert.NoError(t, err) - - // Create concurrent StateDB - cs := NewConcurrentStateDB(baseState) - defer cs.Close() - - // Start first write transaction - _, err = cs.BeginWriteTransaction() - assert.NoError(t, err) - assert.True(t, cs.IsModifying()) - - // Try to start second write transaction (should fail) - workingState2, err := cs.BeginWriteTransaction() - assert.Error(t, err) - assert.Equal(t, ErrConcurrentModification, err) - assert.Nil(t, workingState2) - - // Commit first transaction - err = cs.CommitWriteTransaction() - assert.NoError(t, err) - assert.False(t, cs.IsModifying()) - - // Now should be able to start second transaction - _, err = cs.BeginWriteTransaction() - assert.NoError(t, err) - assert.True(t, cs.IsModifying()) - - // Clean up - cs.RollbackWriteTransaction() -} - -func TestConcurrentStateDB_Rollback(t *testing.T) { - // Create a base state - baseState, err := New(common.Hash{}, NewDatabaseForTesting()) - assert.NoError(t, err) - - // Set initial data - addr := common.HexToAddress("0x123") - baseState.SetBalance(addr, uint256.NewInt(100), tracing.BalanceChangeUnspecified) - - // Create concurrent StateDB - cs := NewConcurrentStateDB(baseState) - defer cs.Close() - - // Start write transaction - workingState, err := cs.BeginWriteTransaction() - assert.NoError(t, err) - - // Modify state - workingState.SetBalance(addr, uint256.NewInt(200), tracing.BalanceChangeUnspecified) - - // Rollback - cs.RollbackWriteTransaction() - assert.False(t, cs.IsModifying()) - - // Verify rollback worked - snapshot := cs.GetReadSnapshot() - assert.Equal(t, uint256.NewInt(100), snapshot.GetBalance(addr)) -} - -func TestConcurrentStateDB_SnapshotNotifications(t *testing.T) { - // Create a base state - baseState, err := New(common.Hash{}, NewDatabaseForTesting()) - assert.NoError(t, err) - - // Create concurrent StateDB - cs := NewConcurrentStateDB(baseState) - defer cs.Close() - - // Start write transaction - workingState, err := cs.BeginWriteTransaction() - assert.NoError(t, err) - - // Modify state - addr := common.HexToAddress("0x123") - workingState.SetBalance(addr, uint256.NewInt(200), tracing.BalanceChangeUnspecified) - - // Start goroutine to wait for snapshot - var receivedSnapshot *StateDB - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - receivedSnapshot = cs.WaitForSnapshot() - }() - - // Give some time for goroutine to start - time.Sleep(10 * time.Millisecond) - - // Commit transaction - err = cs.CommitWriteTransaction() - assert.NoError(t, err) - - // Wait for snapshot to be received - wg.Wait() - - // Verify snapshot was received - assert.NotNil(t, receivedSnapshot) - assert.Equal(t, uint256.NewInt(200), receivedSnapshot.GetBalance(addr)) -} - -func TestConcurrentStateDB_SnapshotCounter(t *testing.T) { - // Create a base state - baseState, err := New(common.Hash{}, NewDatabaseForTesting()) - assert.NoError(t, err) - - // Create concurrent StateDB - cs := NewConcurrentStateDB(baseState) - defer cs.Close() - - initialCounter := cs.GetSnapshotCounter() - - // Get a few snapshots - cs.GetReadSnapshot() - cs.GetReadSnapshot() - cs.GetReadSnapshot() - - finalCounter := cs.GetSnapshotCounter() - assert.Equal(t, initialCounter+3, finalCounter) -} diff --git a/eth/api_backend.go b/eth/api_backend.go index 9740bbfe23..023325625d 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -205,7 +205,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B if number == rpc.LatestBlockNumber { stateDb := b.eth.BlockChain().CurrentUnsealedBlockState() if stateDb != nil { - return stateDb, b.eth.BlockChain().CurrentUnsealedBlockMetadata().TempHeader(), nil + return stateDb, b.eth.BlockChain().CurrentUnsealedBlock().TempHeader(), nil } } @@ -473,5 +473,5 @@ func (b *EthAPIBackend) Genesis() *types.Block { } func (b *EthAPIBackend) GetUnsealedBlock() *types.UnsealedBlock { - return b.eth.blockchain.CurrentUnsealedBlockMetadata() + return b.eth.blockchain.CurrentUnsealedBlock() } diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index dbb5cee3c6..d930244e9f 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -387,7 +387,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl if rawdb.ReadCanonicalHash(api.eth.ChainDb(), block.NumberU64()) != update.HeadBlockHash { // Block is not canonical, set head. if latestValid, err := api.eth.BlockChain().SetCanonical(block); err != nil { - bc := api.eth.BlockChain().CurrentUnsealedBlockMetadata() + bc := api.eth.BlockChain().CurrentUnsealedBlock() if bc != nil { if bc.Env.Number == block.NumberU64() { log.Info("Ignoring current unsealed block", "number", block.NumberU64(), "hash", update.HeadBlockHash, "age", common.PrettyAge(time.Unix(int64(block.Time()), 0)), "have", api.eth.BlockChain().CurrentBlock().Number) @@ -1371,13 +1371,13 @@ func (api *ConsensusAPI) NewFragV0(frag engine.SignedNewFrag) (string, error) { } func (api *ConsensusAPI) newFragV0(f engine.SignedNewFrag) (string, error) { - ub := api.eth.BlockChain().CurrentUnsealedBlockMetadata() + ub := api.eth.BlockChain().CurrentUnsealedBlock() if err := api.ValidateNewFragV0(f, ub); err != nil { log.Error("frag is invalid", "error", err) return engine.INVALID, err } - if f.Frag.BlockNumber < api.eth.BlockChain().CurrentUnsealedBlockMetadata().Env.Number { + if f.Frag.BlockNumber < api.eth.BlockChain().CurrentUnsealedBlock().Env.Number { return engine.VALID, nil } @@ -1440,7 +1440,7 @@ func (api *ConsensusAPI) ValidateNewFragV0(frag engine.SignedNewFrag, currentUns func (api *ConsensusAPI) SealFragV0(seal engine.SignedSeal) (string, error) { log.Info("seal received", "forBlock", seal.Seal.BlockNumber, "current", api.eth.BlockChain().CurrentBlock().Number, "seal", seal.Seal) - if api.eth.BlockChain().CurrentUnsealedBlockMetadata() != nil && api.eth.BlockChain().CurrentUnsealedBlockMetadata().Env.Number > seal.Seal.BlockNumber { + if api.eth.BlockChain().CurrentUnsealedBlock() != nil && api.eth.BlockChain().CurrentUnsealedBlock().Env.Number > seal.Seal.BlockNumber { log.Info("seal was outdated, dropping") return engine.VALID, nil } @@ -1479,7 +1479,7 @@ func (api *ConsensusAPI) sealFragV0(seal engine.SignedSeal) (string, error) { } func (api *ConsensusAPI) ValidateSealFragV0(preSealedBlock *types.Block, seal engine.SignedSeal) error { - if !types.IsOpened(api.eth.BlockChain().CurrentUnsealedBlockMetadata()) { + if !types.IsOpened(api.eth.BlockChain().CurrentUnsealedBlock()) { return errors.New("no unsealed block in progress") } @@ -1511,8 +1511,8 @@ func (api *ConsensusAPI) ValidateSealFragV0(preSealedBlock *types.Block, seal en return fmt.Errorf("gas limit mismatch, expected %v, got %v", preSealedBlock.GasLimit(), seal.Seal.GasLimit) } - if len(api.eth.BlockChain().CurrentUnsealedBlockMetadata().Frags) != int(seal.Seal.TotalFrags) { - return fmt.Errorf("total frags mismatch, expected %v, got %v", len(api.eth.BlockChain().CurrentUnsealedBlockMetadata().Frags), seal.Seal.TotalFrags) + if len(api.eth.BlockChain().CurrentUnsealedBlock().Frags) != int(seal.Seal.TotalFrags) { + return fmt.Errorf("total frags mismatch, expected %v, got %v", len(api.eth.BlockChain().CurrentUnsealedBlock().Frags), seal.Seal.TotalFrags) } return nil @@ -1537,7 +1537,7 @@ func (api *ConsensusAPI) EnvV0(env engine.SignedEnv) (string, error) { func (api *ConsensusAPI) envV0(env engine.SignedEnv) (string, error) { if err := api.ValidateEnvV0(env); err != nil { - if api.eth.BlockChain().CurrentUnsealedBlockMetadata() != nil && api.eth.BlockChain().CurrentUnsealedBlockMetadata().Env.Number < env.Env.Number { + if api.eth.BlockChain().CurrentUnsealedBlock() != nil && api.eth.BlockChain().CurrentUnsealedBlock().Env.Number < env.Env.Number { api.eth.BlockChain().ResetCurrentUnsealedBlock() } else { return engine.INVALID, err @@ -1570,7 +1570,7 @@ func (api *ConsensusAPI) ValidateEnvV0(env engine.SignedEnv) error { parentHeader := parent.Header() // Check that there's no unsealed block in progress - if api.eth.BlockChain().CurrentUnsealedBlockMetadata() != nil { + if api.eth.BlockChain().CurrentUnsealedBlock() != nil { return errors.New("cannot open a new unsealed block while there's one already in progress") } From 7ca818672312970d7be1235b538b92a5c85b3aac Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Thu, 28 Aug 2025 18:10:56 +0200 Subject: [PATCH 4/6] fix: add shared lock for unsealed state --- core/blockchain.go | 19 +++++++++++++------ eth/api_backend.go | 13 ++++++++++--- eth/catalyst/api.go | 13 ++++++------- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c53c65e68c..4242dcb335 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -244,12 +244,14 @@ type BlockChain struct { // Readers don't need to take it, they can just read the database. chainmu *syncx.ClosableMutex - currentBlock atomic.Pointer[types.Header] // Current head of the chain - currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync - currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block - currentSafeBlock atomic.Pointer[types.Header] // Latest (consensus) safe block - currentUnsealedBlock *types.UnsealedBlock // Current unsealed block - unsealedBlockDbState *state.StateDB // StateDB for the current unsealed block + currentBlock atomic.Pointer[types.Header] // Current head of the chain + currentSnapBlock atomic.Pointer[types.Header] // Current head of snap-sync + currentFinalBlock atomic.Pointer[types.Header] // Latest (consensus) finalized block + currentSafeBlock atomic.Pointer[types.Header] // Latest (consensus) safe block + + currentUnsealedBlock *types.UnsealedBlock // Current unsealed block + unsealedBlockDbState *state.StateDB // StateDB for the current unsealed block + unsealedBlockLock sync.Mutex // Lock for the unsealedBlock bodyCache *lru.Cache[common.Hash, *types.Body] bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] @@ -2551,3 +2553,8 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { func (bc *BlockChain) GetTrieFlushInterval() time.Duration { return time.Duration(bc.flushInterval.Load()) } + +// UnsealedBlockLock returns a pointer to the unsealed block lock +func (bc *BlockChain) UnsealedBlockLock() *sync.Mutex { + return &bc.unsealedBlockLock +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 023325625d..ce4954cefe 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -203,9 +203,16 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B // Latest state is the current open unselaed block if number == rpc.LatestBlockNumber { - stateDb := b.eth.BlockChain().CurrentUnsealedBlockState() - if stateDb != nil { - return stateDb, b.eth.BlockChain().CurrentUnsealedBlock().TempHeader(), nil + // if the lock is held, we can't access the unsealed block + if b.eth.BlockChain().UnsealedBlockLock().TryLock() { + defer b.eth.BlockChain().UnsealedBlockLock().Unlock() + + stateDb := b.eth.BlockChain().CurrentUnsealedBlockState() + if stateDb != nil { + return stateDb.Copy(), b.eth.BlockChain().CurrentUnsealedBlock().TempHeader(), nil + } + } else { + log.Warn("unsealed block lock is held, falling back to latest state") } } diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index d930244e9f..a2f519e279 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -160,7 +160,6 @@ type ConsensusAPI struct { forkchoiceLock sync.Mutex // Lock for the forkChoiceUpdated method newPayloadLock sync.Mutex // Lock for the NewPayload method - unsealedBlockLock sync.Mutex // Lock for the unsealedBlock } // NewConsensusAPI creates a new consensus api for the given backend. @@ -1357,13 +1356,13 @@ func validateRequests(requests [][]byte) error { func (api *ConsensusAPI) NewFragV0(frag engine.SignedNewFrag) (string, error) { log.Info("new frag received", "forBlock", frag.Frag.BlockNumber, "current", api.eth.BlockChain().CurrentBlock().Number) - api.unsealedBlockLock.Lock() + api.eth.BlockChain().UnsealedBlockLock().Lock() res, err := api.newFragV0(frag) if err != nil { log.Error("failed to insert new frag, discarding unsealed block", "error", err) api.eth.BlockChain().ResetCurrentUnsealedBlock() } - api.unsealedBlockLock.Unlock() + api.eth.BlockChain().UnsealedBlockLock().Unlock() log.Info("new frag handled successfully") @@ -1445,13 +1444,13 @@ func (api *ConsensusAPI) SealFragV0(seal engine.SignedSeal) (string, error) { return engine.VALID, nil } - api.unsealedBlockLock.Lock() + api.eth.BlockChain().UnsealedBlockLock().Lock() res, err := api.sealFragV0(seal) if err != nil { log.Error("failed to seal block, discarding unsealed block", "error", err) api.eth.BlockChain().ResetCurrentUnsealedBlock() } - api.unsealedBlockLock.Unlock() + api.eth.BlockChain().UnsealedBlockLock().Unlock() return res, err } @@ -1521,14 +1520,14 @@ func (api *ConsensusAPI) ValidateSealFragV0(preSealedBlock *types.Block, seal en func (api *ConsensusAPI) EnvV0(env engine.SignedEnv) (string, error) { log.Info("env received", "forBlock", env.Env.Number, "current", api.eth.BlockChain().CurrentBlock().Number, "env", env.Env) - api.unsealedBlockLock.Lock() + api.eth.BlockChain().UnsealedBlockLock().Lock() res, err := api.envV0(env) if err != nil { log.Error("failed to open unsealed block, discarding unsealed block", "error", err) api.eth.BlockChain().ResetCurrentUnsealedBlock() log.Error("EnvV0 failed", "error", err) } - api.unsealedBlockLock.Unlock() + api.eth.BlockChain().UnsealedBlockLock().Unlock() log.Info("env handled successfully") From f8c548bfd16ac0037d53697478829c4b9eb25719 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Fri, 29 Aug 2025 09:54:51 +0200 Subject: [PATCH 5/6] chore: revert dockerfile changes --- Dockerfile | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/Dockerfile b/Dockerfile index 4ed668256a..9b70e9e8a0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,10 +4,9 @@ ARG VERSION="" ARG BUILDNUM="" # Build Geth in a stock Go builder container -FROM golang:1.24-bookworm AS builder +FROM golang:1.24-alpine AS builder -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential git ca-certificates && rm -rf /var/lib/apt/lists/* +RUN apk add --no-cache gcc musl-dev linux-headers git # Get dependencies - will also be cached if we won't change go.mod/go.sum COPY go.mod /go-ethereum/ @@ -15,13 +14,12 @@ COPY go.sum /go-ethereum/ RUN cd /go-ethereum && go mod download ADD . /go-ethereum -RUN cd /go-ethereum && go run build/ci.go install ./cmd/geth +RUN cd /go-ethereum && go run build/ci.go install -static ./cmd/geth # Pull Geth into a second stage deploy alpine container -FROM debian:bookworm-slim +FROM alpine:latest -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential git ca-certificates && rm -rf /var/lib/apt/lists/* +RUN apk add --no-cache ca-certificates COPY --from=builder /go-ethereum/build/bin/geth /usr/local/bin/ EXPOSE 8545 8546 30303 30303/udp From 99e62fafcd45c264e3dfd4e8f77f5882566d7c42 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Fri, 29 Aug 2025 09:55:13 +0200 Subject: [PATCH 6/6] chore: revert build/ci changes --- build/ci.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/build/ci.go b/build/ci.go index d70ad5e0e5..7fa226ef1f 100644 --- a/build/ci.go +++ b/build/ci.go @@ -182,10 +182,10 @@ func main() { func doInstall(cmdline []string) { var ( - dlgo = flag.Bool("dlgo", false, "Download Go and build with it") - arch = flag.String("arch", "", "Architecture to cross build for") - cc = flag.String("cc", "", "C compiler to cross build with") - // staticlink = flag.Bool("static", false, "Create statically-linked executable") + dlgo = flag.Bool("dlgo", false, "Download Go and build with it") + arch = flag.String("arch", "", "Architecture to cross build for") + cc = flag.String("cc", "", "C compiler to cross build with") + staticlink = flag.Bool("static", false, "Create statically-linked executable") ) flag.CommandLine.Parse(cmdline) env := build.Env() @@ -205,11 +205,10 @@ func doInstall(cmdline []string) { } // Configure the build. - gobuild := tc.Go("build", buildFlags(env, false, buildTags)...) + gobuild := tc.Go("build", buildFlags(env, *staticlink, buildTags)...) // We use -trimpath to avoid leaking local paths into the built executables. - // NOTE(thedevbirb): add also -race flag - gobuild.Args = append(gobuild.Args, "-trimpath", "-race") + gobuild.Args = append(gobuild.Args, "-trimpath") // Show packages during build. gobuild.Args = append(gobuild.Args, "-v") @@ -377,7 +376,9 @@ func doCheckTidy() { // doCheckGenerate ensures that re-generating generated files does not cause // any mutations in the source file tree. func doCheckGenerate() { - cachedir := flag.String("cachedir", "./build/cache", "directory for caching binaries.") + var ( + cachedir = flag.String("cachedir", "./build/cache", "directory for caching binaries.") + ) // Compute the origin hashes of all the files var hashes map[string][32]byte @@ -443,7 +444,9 @@ func doCheckBadDeps() { // doLint runs golangci-lint on requested packages. func doLint(cmdline []string) { - cachedir := flag.String("cachedir", "./build/cache", "directory for caching golangci-lint binary.") + var ( + cachedir = flag.String("cachedir", "./build/cache", "directory for caching golangci-lint binary.") + ) flag.CommandLine.Parse(cmdline) packages := []string{"./..."} if len(flag.CommandLine.Args()) > 0 {