Skip to content

Commit d7df608

Browse files
authored
feat(proxyd): add threshold configs to handle safe and finalized block drift (#229)
* feat(proxyd): add threshold configs to handle safe and finalized block drift
1 parent a4d212b commit d7df608

7 files changed

+242
-7
lines changed

proxyd/backend.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,11 @@ type Backend struct {
155155
skipPeerCountCheck bool
156156
forcedCandidate bool
157157

158-
maxDegradedLatencyThreshold time.Duration
159-
maxLatencyThreshold time.Duration
160-
maxErrorRateThreshold float64
158+
safeBlockDriftThreshold uint64
159+
finalizedBlockDriftThreshold uint64
160+
maxDegradedLatencyThreshold time.Duration
161+
maxLatencyThreshold time.Duration
162+
maxErrorRateThreshold float64
161163

162164
latencySlidingWindow *sw.AvgSlidingWindow
163165
networkRequestsSlidingWindow *sw.AvgSlidingWindow
@@ -244,6 +246,18 @@ func WithSkipIsSyncingCheck(skipIsSyncingCheck bool) BackendOpt {
244246
}
245247
}
246248

249+
func WithSafeBlockDriftThreshold(safeBlockDriftThreshold uint64) BackendOpt {
250+
return func(b *Backend) {
251+
b.safeBlockDriftThreshold = safeBlockDriftThreshold
252+
}
253+
}
254+
255+
func WithFinalizedBlockDriftThreshold(finalizedBlockDriftThreshold uint64) BackendOpt {
256+
return func(b *Backend) {
257+
b.finalizedBlockDriftThreshold = finalizedBlockDriftThreshold
258+
}
259+
}
260+
247261
func WithConsensusSkipPeerCountCheck(skipPeerCountCheck bool) BackendOpt {
248262
return func(b *Backend) {
249263
b.skipPeerCountCheck = skipPeerCountCheck

proxyd/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ type BackendConfig struct {
112112

113113
SkipIsSyncingCheck bool `toml:"skip_is_syncing_check"`
114114

115+
SafeBlockDriftThreshold uint64 `toml:"safe_block_drift_threshold"`
116+
FinalizedBlockDriftThreshold uint64 `toml:"finalized_block_drift_threshold"`
117+
115118
ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"`
116119
ConsensusForcedCandidate bool `toml:"consensus_forced_candidate"`
117120
ConsensusReceiptsTarget string `toml:"consensus_receipts_target"`

proxyd/consensus_poller.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
400400

401401
// sanity check for latest, safe and finalized block tags
402402
expectedBlockTags := cp.checkExpectedBlockTags(
403+
be.safeBlockDriftThreshold,
404+
be.finalizedBlockDriftThreshold,
403405
latestBlockNumber,
404406
bs.safeBlockNumber, safeBlockNumber,
405407
bs.finalizedBlockNumber, finalizedBlockNumber)
@@ -420,15 +422,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
420422
}
421423

422424
// checkExpectedBlockTags for unexpected conditions on block tags
423-
// - finalized block number should never decrease
424-
// - safe block number should never decrease
425+
// - finalized block number should never decrease by more than finalizedBlockDriftThreshold
426+
// - safe block number should never decrease by more than safeBlockDriftThreshold
425427
// - finalized block should be <= safe block <= latest block
426428
func (cp *ConsensusPoller) checkExpectedBlockTags(
429+
safeBlockDriftThreshold uint64,
430+
finalizedBlockDriftThreshold uint64,
427431
currentLatest hexutil.Uint64,
428432
oldSafe hexutil.Uint64, currentSafe hexutil.Uint64,
429433
oldFinalized hexutil.Uint64, currentFinalized hexutil.Uint64) bool {
430-
return currentFinalized >= oldFinalized &&
431-
currentSafe >= oldSafe &&
434+
435+
minSafeBlockAllowance := oldSafe
436+
minFinalizedBlockAllowance := oldFinalized
437+
if minSafeBlockAllowance > hexutil.Uint64(safeBlockDriftThreshold) {
438+
minSafeBlockAllowance -= hexutil.Uint64(safeBlockDriftThreshold)
439+
}
440+
if minFinalizedBlockAllowance > hexutil.Uint64(finalizedBlockDriftThreshold) {
441+
minFinalizedBlockAllowance -= hexutil.Uint64(finalizedBlockDriftThreshold)
442+
}
443+
444+
return currentFinalized >= minFinalizedBlockAllowance &&
445+
currentSafe >= minSafeBlockAllowance &&
432446
currentFinalized <= currentSafe &&
433447
currentSafe <= currentLatest
434448
}

proxyd/example.config.toml

+6
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ consensus_receipts_target = "eth_getBlockReceipts"
8080
# Allow backends to skip eth_syncing checks, default false
8181
# skip_is_syncing_check = false
8282

83+
# Allow backends safe and finalized block to drift backward up to the threshold.
84+
# Default is 0, meaning no drift is allowed e.g. newSafe >= oldSafe and newFinalized >= oldFinalized
85+
# Drift allows newSafe >= (oldSafe - safe_block_drift_threshold) and newFinalized >= (oldFinalized - finalized_block_drift_threshold)
86+
# safe_block_drift_threshold = 0
87+
# finalized_block_drift_threshold = 0
88+
8389
[backends.alchemy]
8490
rpc_url = ""
8591
ws_url = ""

proxyd/integration_tests/consensus_custom_config_test.go

+163
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ethereum-optimism/infra/proxyd"
1111
ms "github.com/ethereum-optimism/infra/proxyd/tools/mockserver/handler"
12+
"github.com/ethereum/go-ethereum/common/hexutil"
1213
"github.com/stretchr/testify/require"
1314
)
1415

@@ -129,3 +130,165 @@ func TestConsensusSkipSyncTest(t *testing.T) {
129130
require.Equal(t, 2, len(consensusGroup))
130131
})
131132
}
133+
134+
func TestConsensusBlockDriftThreshold(t *testing.T) {
135+
nodes, bg, _, shutdown := setupCustomConfig(t, "consensus_block_drift_threshold")
136+
defer nodes["node1"].mockBackend.Close()
137+
defer nodes["node2"].mockBackend.Close()
138+
defer shutdown()
139+
140+
ctx := context.Background()
141+
142+
// poll for updated consensus
143+
update := func() {
144+
for _, be := range bg.Backends {
145+
bg.Consensus.UpdateBackend(ctx, be)
146+
}
147+
bg.Consensus.UpdateBackendGroupConsensus(ctx)
148+
}
149+
150+
// convenient methods to manipulate state and mock responses
151+
reset := func() {
152+
for _, node := range nodes {
153+
node.handler.ResetOverrides()
154+
node.mockBackend.Reset()
155+
node.backend.ClearSlidingWindows()
156+
}
157+
bg.Consensus.ClearListeners()
158+
bg.Consensus.Reset()
159+
160+
}
161+
162+
initSetupAndAssetions := func() {
163+
reset()
164+
update()
165+
166+
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
167+
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
168+
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
169+
}
170+
171+
override := func(node string, method string, block string, response string) {
172+
if _, ok := nodes[node]; !ok {
173+
t.Fatalf("node %s does not exist in the nodes map", node)
174+
}
175+
nodes[node].handler.AddOverride(&ms.MethodTemplate{
176+
Method: method,
177+
Block: block,
178+
Response: response,
179+
})
180+
}
181+
182+
overrideBlock := func(node string, blockRequest string, blockResponse string) {
183+
override(node,
184+
"eth_getBlockByNumber",
185+
blockRequest,
186+
buildResponse(map[string]string{
187+
"number": blockResponse,
188+
"hash": "hash_" + blockResponse,
189+
}))
190+
}
191+
192+
overridePeerCount := func(node string, count int) {
193+
override(node, "net_peerCount", "", buildResponse(hexutil.Uint64(count).String()))
194+
}
195+
196+
// force ban node2 and make sure node1 is the only one in consensus
197+
useOnlyNode1 := func() {
198+
overridePeerCount("node2", 0)
199+
update()
200+
201+
consensusGroup := bg.Consensus.GetConsensusGroup()
202+
require.Equal(t, 1, len(consensusGroup))
203+
require.Contains(t, consensusGroup, nodes["node1"].backend)
204+
nodes["node1"].mockBackend.Reset()
205+
}
206+
207+
t.Run("allow backend if tags are messed if below tolerance - safe dropped", func(t *testing.T) {
208+
initSetupAndAssetions()
209+
210+
overrideBlock("node1", "safe", "0xe0") // 1 blocks behind is ok
211+
update()
212+
213+
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
214+
require.Equal(t, "0xe0", bg.Consensus.GetSafeBlockNumber().String())
215+
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
216+
217+
consensusGroup := bg.Consensus.GetConsensusGroup()
218+
require.Contains(t, consensusGroup, nodes["node1"].backend)
219+
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
220+
require.Equal(t, 2, len(consensusGroup))
221+
})
222+
223+
t.Run("ban backend if tags are messed above tolerance - safe dropped", func(t *testing.T) {
224+
initSetupAndAssetions()
225+
overrideBlock("node1", "safe", "0xdf") // 2 blocks behind is not ok
226+
update()
227+
228+
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
229+
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
230+
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
231+
232+
consensusGroup := bg.Consensus.GetConsensusGroup()
233+
require.NotContains(t, consensusGroup, nodes["node1"].backend)
234+
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
235+
require.Equal(t, 1, len(consensusGroup))
236+
})
237+
238+
t.Run("allow backend if tags are messed if below tolerance - finalized dropped", func(t *testing.T) {
239+
initSetupAndAssetions()
240+
overrideBlock("node1", "finalized", "0xbf") // finalized 2 blocks behind is ok
241+
update()
242+
243+
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
244+
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
245+
require.Equal(t, "0xbf", bg.Consensus.GetFinalizedBlockNumber().String())
246+
247+
consensusGroup := bg.Consensus.GetConsensusGroup()
248+
require.Contains(t, consensusGroup, nodes["node1"].backend)
249+
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
250+
require.Equal(t, 2, len(consensusGroup))
251+
})
252+
253+
t.Run("ban backend if tags are messed - finalized dropped", func(t *testing.T) {
254+
initSetupAndAssetions()
255+
overrideBlock("node1", "finalized", "0xbe") // finalized 3 blocks behind is not ok
256+
update()
257+
258+
require.Equal(t, "0x101", bg.Consensus.GetLatestBlockNumber().String())
259+
require.Equal(t, "0xe1", bg.Consensus.GetSafeBlockNumber().String())
260+
require.Equal(t, "0xc1", bg.Consensus.GetFinalizedBlockNumber().String())
261+
262+
consensusGroup := bg.Consensus.GetConsensusGroup()
263+
require.NotContains(t, consensusGroup, nodes["node1"].backend)
264+
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
265+
require.Equal(t, 1, len(consensusGroup))
266+
})
267+
268+
t.Run("recover after safe and finalized dropped", func(t *testing.T) {
269+
reset()
270+
useOnlyNode1()
271+
overrideBlock("node1", "latest", "0xd1")
272+
overrideBlock("node1", "safe", "0xb1")
273+
overrideBlock("node1", "finalized", "0x91")
274+
update()
275+
276+
consensusGroup := bg.Consensus.GetConsensusGroup()
277+
require.NotContains(t, consensusGroup, nodes["node1"].backend)
278+
require.True(t, bg.Consensus.IsBanned(nodes["node1"].backend))
279+
require.Equal(t, 0, len(consensusGroup))
280+
281+
// unban and see if it recovers
282+
bg.Consensus.Unban(nodes["node1"].backend)
283+
update()
284+
285+
consensusGroup = bg.Consensus.GetConsensusGroup()
286+
require.Contains(t, consensusGroup, nodes["node1"].backend)
287+
require.False(t, bg.Consensus.IsBanned(nodes["node1"].backend))
288+
require.Equal(t, 1, len(consensusGroup))
289+
290+
require.Equal(t, "0xd1", bg.Consensus.GetLatestBlockNumber().String())
291+
require.Equal(t, "0xb1", bg.Consensus.GetSafeBlockNumber().String())
292+
require.Equal(t, "0x91", bg.Consensus.GetFinalizedBlockNumber().String())
293+
})
294+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
[server]
2+
rpc_port = 8545
3+
4+
[backend]
5+
response_timeout_seconds = 1
6+
max_degraded_latency_threshold = "30ms"
7+
8+
[backends]
9+
[backends.node1]
10+
rpc_url = "$NODE1_URL"
11+
safe_block_drift_threshold = 1
12+
finalized_block_drift_threshold = 2
13+
14+
[backends.node2]
15+
rpc_url = "$NODE2_URL"
16+
17+
[backend_groups]
18+
[backend_groups.node]
19+
backends = ["node1", "node2"]
20+
routing_strategy = "consensus_aware"
21+
consensus_handler = "noop" # allow more control over the consensus poller for tests
22+
consensus_ban_period = "1m"
23+
consensus_max_update_threshold = "2m"
24+
consensus_max_block_lag = 8
25+
consensus_min_peer_count = 4
26+
27+
28+
[rpc_method_mappings]
29+
eth_call = "node"
30+
eth_chainId = "node"
31+
eth_blockNumber = "node"
32+
eth_getBlockByNumber = "node"
33+
consensus_getReceipts = "node"

proxyd/proxyd.go

+2
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ func Start(config *Config) (*Server, func(), error) {
194194
}
195195
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
196196
opts = append(opts, WithSkipIsSyncingCheck(cfg.SkipIsSyncingCheck))
197+
opts = append(opts, WithSafeBlockDriftThreshold(cfg.SafeBlockDriftThreshold))
198+
opts = append(opts, WithFinalizedBlockDriftThreshold(cfg.FinalizedBlockDriftThreshold))
197199
opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck))
198200
opts = append(opts, WithConsensusForcedCandidate(cfg.ConsensusForcedCandidate))
199201
opts = append(opts, WithWeight(cfg.Weight))

0 commit comments

Comments
 (0)