Skip to content

Commit ca07d0a

Browse files
authored
feat(op-node): chain replication via BOP_REPLAY env (#4)
* feat(op-node): UnsafeAllowOldPayloads flag Such flags allows to gossip and receive unsafe payloads which timestamp is older than 60 seconds. Useful for testing purposes. * feat(op-node): UnsafeIsChainReplication flag * test: disable derivation pipeline step * feat(op-node): use BOP_REPLAY flag to modify behaviour for chain replication * chore(op-node): bump fetch next N gateways to 6; more reliable * fix(op-node): use global variable instead of checking envs everytime
1 parent ea16f78 commit ca07d0a

8 files changed

Lines changed: 61 additions & 24 deletions

File tree

op-node/cmd/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ethereum-optimism/optimism/op-node/flags"
1919
"github.com/ethereum-optimism/optimism/op-node/metrics"
2020
"github.com/ethereum-optimism/optimism/op-node/node"
21+
"github.com/ethereum-optimism/optimism/op-node/params"
2122
"github.com/ethereum-optimism/optimism/op-node/version"
2223
opservice "github.com/ethereum-optimism/optimism/op-service"
2324
"github.com/ethereum-optimism/optimism/op-service/cliapp"
@@ -94,6 +95,11 @@ func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.
9495
cfg.Rollup.LogDescription(log, chaincfg.L2ChainIDToNetworkDisplayName)
9596
}
9697

98+
// CHANGE(thedevbirb): assess whether we're in chain replication mode at startup.
99+
if _, ok := os.LookupEnv("BOP_REPLAY"); ok {
100+
params.BopReplay = true
101+
}
102+
97103
n, err := node.New(ctx.Context, cfg, log, VersionWithMeta, m)
98104
if err != nil {
99105
return nil, fmt.Errorf("unable to create the rollup node: %w", err)

op-node/node/node.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/ethereum-optimism/optimism/op-node/metrics"
2424
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
2525
"github.com/ethereum-optimism/optimism/op-node/p2p"
26+
"github.com/ethereum-optimism/optimism/op-node/params"
2627
"github.com/ethereum-optimism/optimism/op-node/rollup"
2728
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
2829
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
@@ -249,7 +250,7 @@ func (n *OpNode) initRegistry(ctx context.Context, cfg *Config) error {
249250
}
250251

251252
// Initially fetch the current gateway + n gateways into the future
252-
err = n.registrySource.FetchNextNGateways(ctx, 2, 3)
253+
err = n.registrySource.FetchNextNGateways(ctx, 6, 3)
253254
if err != nil {
254255
return fmt.Errorf("failed to fetch initial gateways: %w", err)
255256
}
@@ -260,7 +261,7 @@ func (n *OpNode) initRegistry(ctx context.Context, cfg *Config) error {
260261
fetchCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
261262
defer cancel()
262263

263-
if err := n.registrySource.FetchNextNGateways(fetchCtx, 2, 3); err != nil {
264+
if err := n.registrySource.FetchNextNGateways(fetchCtx, 6, 3); err != nil {
264265
n.log.Warn("registry fetch error", "err", err)
265266
}
266267
time.Sleep(time.Second)
@@ -621,6 +622,10 @@ func (n *OpNode) onEvent(ev event.Event) bool {
621622
}
622623

623624
func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
625+
// CHANGE(thedevbirb): allow chain replication without deviation due to L1 state.
626+
if params.BopReplay {
627+
return
628+
}
624629
n.tracer.OnNewL1Head(ctx, sig)
625630

626631
if n.l2Driver == nil {
@@ -635,6 +640,10 @@ func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
635640
}
636641

637642
func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
643+
// CHANGE(thedevbirb): allow chain replication without deviation due to L1 state.
644+
if params.BopReplay {
645+
return
646+
}
638647
if n.l2Driver == nil {
639648
return
640649
}
@@ -647,6 +656,10 @@ func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
647656
}
648657

649658
func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) {
659+
// CHANGE(thedevbirb): allow chain replication without deviation due to L1 state.
660+
if params.BopReplay {
661+
return
662+
}
650663
if n.l2Driver == nil {
651664
return
652665
}
@@ -689,7 +702,6 @@ func (n *OpNode) PublishNewFrag(ctx context.Context, from peer.ID, frag *eth.Sig
689702
}
690703

691704
func (n *OpNode) PublishSealFrag(ctx context.Context, from peer.ID, seal *eth.SignedSeal) error {
692-
693705
n.tracer.OnPublishSealFrag(ctx, from, seal)
694706

695707
// publish to p2p, if we are running p2p at all
@@ -705,7 +717,6 @@ func (n *OpNode) PublishSealFrag(ctx context.Context, from peer.ID, seal *eth.Si
705717
}
706718

707719
func (n *OpNode) PublishEnv(ctx context.Context, from peer.ID, env *eth.SignedEnv) error {
708-
709720
n.tracer.OnPublishEnv(ctx, from, env)
710721

711722
// publish to p2p, if we are running p2p at all
@@ -783,7 +794,8 @@ func (n *OpNode) OnEnv(ctx context.Context, from peer.ID, env *eth.SignedEnv) er
783794
}
784795

785796
func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
786-
if p2pNode := n.getP2PNodeIfEnabled(); p2pNode != nil && p2pNode.AltSyncEnabled() {
797+
// CHANGE(thedevbirb): for chain replication, ignoring sending p2p syncing requests which may block the event loop.
798+
if p2pNode := n.getP2PNodeIfEnabled(); p2pNode != nil && p2pNode.AltSyncEnabled() && !params.BopReplay {
787799
if unixTimeStale(start.Time, 12*time.Hour) {
788800
n.log.Debug(
789801
"ignoring request to sync L2 range, timestamp is too old for p2p",

op-node/p2p/gossip.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import (
77
"encoding/binary"
88
"errors"
99
"fmt"
10-
"github.com/ethereum/go-ethereum/crypto"
1110
"sync"
1211
"time"
1312

13+
"github.com/ethereum/go-ethereum/crypto"
14+
1415
"github.com/golang/snappy"
1516
lru "github.com/hashicorp/golang-lru/v2"
1617
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -21,6 +22,7 @@ import (
2122
"github.com/ethereum/go-ethereum/common"
2223
"github.com/ethereum/go-ethereum/log"
2324

25+
"github.com/ethereum-optimism/optimism/op-node/params"
2426
"github.com/ethereum-optimism/optimism/op-node/rollup"
2527
"github.com/ethereum-optimism/optimism/op-service/eth"
2628
opsigner "github.com/ethereum-optimism/optimism/op-service/signer"
@@ -49,8 +51,10 @@ const (
4951
// Message domains, the msg id function uncompresses to keep data monomorphic,
5052
// but invalid compressed data will need a unique different id.
5153

52-
var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
53-
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
54+
var (
55+
MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
56+
MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
57+
)
5458

5559
type GossipSetupConfigurables interface {
5660
PeerScoringParams() *ScoringParams
@@ -404,7 +408,6 @@ func verifyGatewaySignature(log log.Logger, signatureBytes []byte, messageBytes
404408
}
405409

406410
func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, blockVersion eth.BlockVersion) pubsub.ValidatorEx {
407-
408411
// Seen block hashes per block height
409412
// uint64 -> *seenBlocks
410413
blockHeightLRU, err := lru.New[uint64, *seenBlocks](1000)
@@ -472,10 +475,13 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti
472475
// rounding down to seconds is fine here.
473476
now := uint64(time.Now().Unix())
474477

475-
// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
476-
if uint64(payload.Timestamp) < now-60 {
477-
log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
478-
return pubsub.ValidationReject
478+
// CHANGE(thedevbirb): for chain replication, allow old blocks.
479+
if !params.BopReplay {
480+
// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
481+
if uint64(payload.Timestamp) < now-60 {
482+
log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
483+
return pubsub.ValidationReject
484+
}
479485
}
480486

481487
// [REJECT] if the `payload.timestamp` is more than 5 seconds into the future
@@ -703,7 +709,7 @@ type publisher struct {
703709
var _ GossipOut = (*publisher)(nil)
704710

705711
func combinePeers(allPeers ...[]peer.ID) []peer.ID {
706-
var seen = make(map[peer.ID]bool)
712+
seen := make(map[peer.ID]bool)
707713
var res []peer.ID
708714
for _, peers := range allPeers {
709715
for _, p := range peers {
@@ -933,7 +939,6 @@ func newNewFragTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log
933939
validator,
934940
pubsub.WithValidatorTimeout(3*time.Second),
935941
pubsub.WithValidatorConcurrency(4))
936-
937942
if err != nil {
938943
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
939944
}
@@ -971,7 +976,6 @@ func sealFragFragTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, l
971976
validator,
972977
pubsub.WithValidatorTimeout(3*time.Second),
973978
pubsub.WithValidatorConcurrency(4))
974-
975979
if err != nil {
976980
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
977981
}
@@ -1009,7 +1013,6 @@ func newEnvTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log log
10091013
validator,
10101014
pubsub.WithValidatorTimeout(3*time.Second),
10111015
pubsub.WithValidatorConcurrency(4))
1012-
10131016
if err != nil {
10141017
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
10151018
}
@@ -1047,7 +1050,6 @@ func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log l
10471050
validator,
10481051
pubsub.WithValidatorTimeout(3*time.Second),
10491052
pubsub.WithValidatorConcurrency(4))
1050-
10511053
if err != nil {
10521054
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
10531055
}
@@ -1080,8 +1082,10 @@ func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log l
10801082
}, nil
10811083
}
10821084

1083-
type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
1084-
type MessageHandler func(ctx context.Context, from peer.ID, msg any) error
1085+
type (
1086+
TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
1087+
MessageHandler func(ctx context.Context, from peer.ID, msg any) error
1088+
)
10851089

10861090
func NewFragHandler(onNewFrag func(ctx context.Context, from peer.ID, msg *eth.SignedNewFrag) error) MessageHandler {
10871091
return func(ctx context.Context, from peer.ID, msg any) error {

op-node/params/globals.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package params
2+
3+
// CHANGE(thedevbirb): A global variable set only at node startup that assess
4+
// whether the node is running in chain replication mode, leading to some
5+
// syncing and safety functionality to be disabled.
6+
var BopReplay = false

op-node/rollup/derive/pipeline.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"os"
89

910
"github.com/ethereum/go-ethereum/common"
1011
"github.com/ethereum/go-ethereum/log"
@@ -174,6 +175,11 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
174175
}
175176
}()
176177

178+
// CHANGE(thedevbirb): for chain replication we must ignore deriving the chain from L1 data.
179+
if _, ok := os.LookupEnv("BOP_REPLAY"); ok {
180+
return nil, io.EOF
181+
}
182+
177183
// if any stages need to be reset, do that first.
178184
if dp.resetting < len(dp.stages) {
179185
if !dp.engineIsReset {

op-node/rollup/driver/state.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,8 @@ func (s *SyncDeriver) SyncStep() {
449449
return false
450450
} else {
451451
s.Emitter.Emit(rollup.CriticalErrorEvent{
452-
Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err)})
452+
Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err),
453+
})
453454
return false
454455
}
455456
}

op-node/rollup/sync/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ const (
2222
ELSyncString string = "execution-layer"
2323
)
2424

25-
var Modes = []Mode{CLSync, ELSync}
26-
var ModeStrings = []string{CLSyncString, ELSyncString}
25+
var (
26+
Modes = []Mode{CLSync, ELSync}
27+
ModeStrings = []string{CLSyncString, ELSyncString}
28+
)
2729

2830
func StringToMode(s string) (Mode, error) {
2931
switch strings.ToLower(s) {

op-node/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
8585
conductorRPCEndpoint := ctx.String(flags.ConductorRpcFlag.Name)
8686
cfg := &node.Config{
8787
L1: l1Endpoint,
88-
Registry: registryEndpoint,
88+
Registry: registryEndpoint,
8989
L2: l2Endpoint,
9090
Rollup: *rollupConfig,
9191
Driver: *driverConfig,

0 commit comments

Comments
 (0)