Skip to content

Commit

Permalink
Add sync mode p2p CLI flag (#2186)
Browse files Browse the repository at this point in the history
* Add sync mode p2p CLI flag

* Return struct instead of interface

* Change flag usage string

* Remove SnapServer interface

* Revert makefile

* Minor fixes

* Update docs
  • Loading branch information
weiihann authored Sep 30, 2024
1 parent 0455bf0 commit 61a20c7
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 119 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ feedernode: juno-cached
--p2p-feeder-node \
--p2p-addr=/ip4/0.0.0.0/tcp/7777 \
--p2p-private-key="5f6cdc3aebcc74af494df054876100368ef6126e3a33fa65b90c765b381ffc37a0a63bbeeefab0740f24a6a38dabb513b9233254ad0020c721c23e69bc820089" \
--metrics-port=9090
--metrics-port=9090 \
--pprof \
--pprof-port=9095

node1: juno-cached
# todo remove rm before merge
Expand All @@ -129,7 +131,10 @@ node1: juno-cached
--p2p-peers=/ip4/127.0.0.1/tcp/7777/p2p/12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG \
--p2p-addr=/ip4/0.0.0.0/tcp/7778 \
--p2p-private-key="8aeffc26c3c371565dbe634c5248ae26f4fa5c33bc8f7328ac95e73fb94eaf263550f02449521f7cf64af17d248c5f170be46c06986a29803124c0819cb8fac3" \
--metrics-port=9091
--metrics-port=9091 \
--pprof \
--pprof-port=9096 \
--p2p-sync-mode="snap"

# --p2p-peers=/ip4/127.0.0.1/tcp/7778/p2p/12D3KooWDQVMmK6cQrfFcWUoFF8Ch5vYegfwiP5Do2SFC2NAXeBk \
Expand Down
4 changes: 4 additions & 0 deletions cmd/juno/juno.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
callMaxStepsF = "rpc-call-max-steps"
corsEnableF = "rpc-cors-enable"
versionedConstantsFileF = "versioned-constants-file"
p2pSyncModeF = "p2p-sync-mode"

defaultConfig = ""
defaulHost = "localhost"
Expand Down Expand Up @@ -119,6 +120,7 @@ const (
defaultGwTimeout = 5 * time.Second
defaultCorsEnable = false
defaultVersionedConstantsFile = ""
defaultP2pSyncMode = "full"

configFlagUsage = "The YAML configuration file."
logLevelFlagUsage = "Options: trace, debug, info, warn, error."
Expand Down Expand Up @@ -152,6 +154,7 @@ const (
p2pFeederNodeUsage = "EXPERIMENTAL: Run juno as a feeder node which will only sync from feeder gateway and gossip the new" +
" blocks to the network."
p2pPrivateKeyUsage = "EXPERIMENTAL: Hexadecimal representation of a private key on the Ed25519 elliptic curve."
p2pSyncModeUsage = "EXPERIMENTAL: Synchronization mode: 'full' (default), 'snap'"
metricsUsage = "Enables the Prometheus metrics endpoint on the default port."
metricsHostUsage = "The interface on which the Prometheus endpoint will listen for requests."
metricsPortUsage = "The port on which the Prometheus endpoint will listen for requests."
Expand Down Expand Up @@ -335,6 +338,7 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr
junoCmd.Flags().String(p2pPeersF, defaultP2pPeers, p2pPeersUsage)
junoCmd.Flags().Bool(p2pFeederNodeF, defaultP2pFeederNode, p2pFeederNodeUsage)
junoCmd.Flags().String(p2pPrivateKey, defaultP2pPrivateKey, p2pPrivateKeyUsage)
junoCmd.Flags().String(p2pSyncModeF, defaultP2pSyncMode, p2pSyncModeUsage)
junoCmd.Flags().Bool(metricsF, defaultMetrics, metricsUsage)
junoCmd.Flags().String(metricsHostF, defaulHost, metricsHostUsage)
junoCmd.Flags().Uint16(metricsPortF, defaultMetricsPort, metricsPortUsage)
Expand Down
21 changes: 11 additions & 10 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ type Config struct {
MetricsHost string `mapstructure:"metrics-host"`
MetricsPort uint16 `mapstructure:"metrics-port"`

P2P bool `mapstructure:"p2p"`
P2PAddr string `mapstructure:"p2p-addr"`
P2PPublicAddr string `mapstructure:"p2p-public-addr"`
P2PPeers string `mapstructure:"p2p-peers"`
P2PFeederNode bool `mapstructure:"p2p-feeder-node"`
P2PPrivateKey string `mapstructure:"p2p-private-key"`
P2P bool `mapstructure:"p2p"`
P2PAddr string `mapstructure:"p2p-addr"`
P2PPublicAddr string `mapstructure:"p2p-public-addr"`
P2PPeers string `mapstructure:"p2p-peers"`
P2PFeederNode bool `mapstructure:"p2p-feeder-node"`
P2PPrivateKey string `mapstructure:"p2p-private-key"`
P2PSyncMode p2p.SyncMode `mapstructure:"p2p-sync-mode"`

MaxVMs uint `mapstructure:"max-vms"`
MaxVMQueue uint `mapstructure:"max-vm-queue"`
Expand Down Expand Up @@ -127,7 +128,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen

chain := blockchain.New(database, &cfg.Network)

//TODO: close a blockchain? better way?
// TODO: close a blockchain? better way?
services = append(services, blockchain.NewBlockchainCloser(chain, log))

// Verify that cfg.Network is compatible with the database.
Expand Down Expand Up @@ -170,12 +171,12 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen
// Do not start the feeder synchronisation
synchronizer = nil
}
if os.Getenv("JUNO_P2P_NO_SYNC") != "" {
if os.Getenv("JUNO_P2P_NO_SYNC") != "" { // TODO(weiihann): remove this in the future
log.Warnw("Got 'JUNO_P2P_NO_SYNC' to not syncing from p2p network")
synchronizer = nil
}
p2pService, err = p2p.New(cfg.P2PAddr, cfg.P2PPublicAddr, version, cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode,
chain, &cfg.Network, log, database)
cfg.P2PSyncMode, chain, &cfg.Network, log, database)
if err != nil {
return nil, fmt.Errorf("set up p2p service: %w", err)
}
Expand Down Expand Up @@ -372,7 +373,7 @@ func (n *Node) Run(ctx context.Context) {
}

<-ctx.Done()
//TODO: chain.Close() - which service should do this?
// TODO: chain.Close() - which service should do this?
n.log.Infow("Shutting down Juno...")
}

Expand Down
74 changes: 74 additions & 0 deletions p2p/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package p2p

import (
"context"
"os"
"sync/atomic"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p/core/host"
)

type Downloader struct {
isFeeder bool
mode atomic.Uint32
baseSyncer *SyncService
snapSyncer *SnapSyncer
log utils.SimpleLogger
}

func NewDownloader(isFeeder bool, syncMode SyncMode, p2pHost host.Host, network *utils.Network, bc *blockchain.Blockchain, log utils.SimpleLogger) *Downloader {
dl := &Downloader{
isFeeder: isFeeder,
log: log,
}

dl.baseSyncer = newSyncService(bc, p2pHost, network, log)

var snapSyncer *SnapSyncer
if syncMode == SnapSync {
snapSyncer = NewSnapSyncer(dl.baseSyncer.Client(), bc, log)
}
dl.snapSyncer = snapSyncer

// TODO: when syncing becomes more mature, we need a way to dynamically determine which sync mode to use
// For now, we will use the sync mode that is passed in the constructor
dl.mode.Store(uint32(syncMode))

return dl
}

func (d *Downloader) Start(ctx context.Context) error {
// Feeder node doesn't sync using P2P
if d.isFeeder {
return nil
}

d.log.Infow("Downloader start", "mode", d.getMode())
if d.getMode() == SnapSync {
// TODO: a hack, remove this
if os.Getenv("JUNO_P2P_NO_SYNC") == "" {
err := d.snapSyncer.Run(ctx)
if err != nil {
d.log.Errorw("Snapsyncer failed to start")
return err
}
} else {
d.log.Infow("Syncing is disabled")
return nil
}
}

d.baseSyncer.Start(ctx)
return nil
}

func (d *Downloader) getMode() SyncMode {
return SyncMode(d.mode.Load())
}

func (d *Downloader) WithListener(l sync.EventListener) {
d.baseSyncer.WithListener(l)
}
74 changes: 74 additions & 0 deletions p2p/modes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package p2p

import (
"encoding"
"fmt"

"github.com/spf13/pflag"
)

// The following are necessary for Cobra and Viper, respectively, to unmarshal
// CLI/config parameters properly.
var (
_ pflag.Value = (*SyncMode)(nil)
_ encoding.TextUnmarshaler = (*SyncMode)(nil)
)

// SyncMode represents the synchronisation mode of the downloader.
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32

const (
FullSync SyncMode = iota // Synchronize by downloading blocks and applying them to the chain sequentially
SnapSync // Download the chain and the state via snap protocol
)

func (s SyncMode) IsValid() bool {
return s == FullSync || s == SnapSync
}

func (s SyncMode) String() string {
switch s {
case FullSync:
return "full"
case SnapSync:
return "snap"
default:
return "unknown"
}
}

func (s SyncMode) Type() string {
return "SyncMode"
}

func (s SyncMode) MarshalYAML() (interface{}, error) {
return s.String(), nil
}

func (s *SyncMode) Set(mode string) error {
switch mode {
case "full":
*s = FullSync
case "snap":
*s = SnapSync
default:
return fmt.Errorf("unknown sync mode %q, want \"full\" or \"snap\"", mode)
}
return nil
}

func (s SyncMode) MarshalText() ([]byte, error) {
switch s {
case FullSync:
return []byte("full"), nil
case SnapSync:
return []byte("snap"), nil
default:
return nil, fmt.Errorf("unknown sync mode %d", s)
}
}

func (s *SyncMode) UnmarshalText(text []byte) error {
return s.Set(string(text))
}
55 changes: 18 additions & 37 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/NethermindEth/juno/service"
"math/rand"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -50,15 +48,11 @@ type Service struct {
topics map[string]*pubsub.Topic
topicsLock sync.RWMutex

synchroniser *syncService
snapSyncher service.Service
//snapServer *snapServer

feederNode bool
downloader *Downloader
database db.DB
}

func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network,
func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, syncMode SyncMode, bc *blockchain.Blockchain, snNetwork *utils.Network,
log utils.SimpleLogger, database db.DB,
) (*Service, error) {
if addr == "" {
Expand Down Expand Up @@ -116,10 +110,10 @@ func New(addr, publicAddr, version, peers, privKeyStr string, feederNode bool, b
// Todo: try to understand what will happen if user passes a multiaddr with p2p public and a private key which doesn't match.
// For example, a user passes the following multiaddr: --p2p-addr=/ip4/0.0.0.0/tcp/7778/p2p/(SomePublicKey) and also passes a
// --p2p-private-key="SomePrivateKey". However, the private public key pair don't match, in this case what will happen?
return NewWithHost(p2pHost, peers, feederNode, bc, snNetwork, log, database)
return NewWithHost(p2pHost, peers, feederNode, syncMode, bc, snNetwork, log, database)
}

func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network,
func NewWithHost(p2phost host.Host, peers string, feederNode bool, syncMode SyncMode, bc *blockchain.Blockchain, snNetwork *utils.Network,
log utils.SimpleLogger, database db.DB,
) (*Service, error) {
var (
Expand Down Expand Up @@ -150,22 +144,19 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
return nil, err
}

// todo: reconsider initialising synchroniser here because if node is a feedernode we should not create an instance of it.

synchroniser := newSyncService(bc, p2phost, snNetwork, log)
downloader := NewDownloader(feederNode, syncMode, p2phost, snNetwork, bc, log)
handler := starknet.NewHandler(bc, log)
handler.WithSnapsyncSupport(NewSnapServer(bc, log))
handler.WithSnapsyncSupport(NewSnapServer(bc, log)) // TODO: initialize the snap server in the starknet handler

s := &Service{
synchroniser: synchroniser,
snapSyncher: NewSnapSyncer(synchroniser, bc, log),
log: log,
host: p2phost,
network: snNetwork,
dht: p2pdht,
feederNode: feederNode,
topics: make(map[string]*pubsub.Topic),
handler: handler,
database: database,
downloader: downloader,
log: log,
host: p2phost,
network: snNetwork,
dht: p2pdht,
topics: make(map[string]*pubsub.Topic),
handler: handler,
database: database,
}
return s, nil
}
Expand Down Expand Up @@ -264,18 +255,8 @@ func (s *Service) Run(ctx context.Context) error {

s.setProtocolHandlers()

if !s.feederNode {
//s.synchroniser.start(ctx)
if os.Getenv("JUNO_P2P_NO_SYNC") == "" {
err := s.snapSyncher.Run(ctx)
if err != nil {
s.log.Errorw("Snapsyncer failed to start")
return err
}
} else {
s.log.Infow("Syncing is disabled")
}
}
// Start the syncing process
s.downloader.Start(ctx)

<-ctx.Done()
if err := s.persistPeers(); err != nil {
Expand Down Expand Up @@ -416,7 +397,7 @@ func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Strea

func (s *Service) WithListener(l junoSync.EventListener) {
runMetrics(s.host.Peerstore())
s.synchroniser.WithListener(l)
s.downloader.WithListener(l)
}

// persistPeers stores the given peers in the peers database
Expand Down
Loading

0 comments on commit 61a20c7

Please sign in to comment.