Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/p2p/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Metrics struct {
// queue for a specific flow (i.e. Channel).
//metrics:The size of messages sent over a peer's queue for a specific p2p Channel.
PeerQueueMsgSize metrics.Gauge `metrics_labels:"ch_id" metric_name:"router_channel_queue_msg_size"`

PeerChannelSend metrics.Counter
}

type metricsLabelCache struct {
Expand Down
38 changes: 38 additions & 0 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -1002,6 +1003,17 @@ func (m *PeerManager) Scores() map[types.NodeID]PeerScore {
return scores
}

func (m *PeerManager) Score(id types.NodeID) int {
m.mtx.Lock()
defer m.mtx.Unlock()

peer, ok := m.store.Get(id)
if ok {
return int(peer.Score())
}
return -1
}

// Status returns the status for a peer, primarily for testing.
func (m *PeerManager) Status(id types.NodeID) PeerStatus {
m.mtx.Lock()
Expand All @@ -1014,6 +1026,32 @@ func (m *PeerManager) Status(id types.NodeID) PeerStatus {
}
}

func (m *PeerManager) State(id types.NodeID) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit- should it be States since it could be multiple

m.mtx.Lock()
defer m.mtx.Unlock()

states := []string{}
if _, ok := m.ready[id]; ok {
states = append(states, "ready")
}
if _, ok := m.dialing[id]; ok {
states = append(states, "dialing")
}
if _, ok := m.upgrading[id]; ok {
states = append(states, "upgrading")
}
if _, ok := m.connected[id]; ok {
states = append(states, "connected")
}
if _, ok := m.evict[id]; ok {
states = append(states, "evict")
}
if _, ok := m.evicting[id]; ok {
states = append(states, "evicting")
}
return strings.Join(states, ",")
}

// findUpgradeCandidate looks for a lower-scored peer that we could evict
// to make room for the given peer. Returns an empty ID if none is found.
// If the peer is already being upgraded to, we return that same upgrade.
Expand Down
7 changes: 6 additions & 1 deletion internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (r *Router) routeChannel(
r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds())

case <-q.closed():
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
r.logger.Error("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)

case <-ctx.Done():
return
Expand Down Expand Up @@ -940,6 +940,11 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect
return err
}

r.metrics.PeerSendBytesTotal.With(
"chID", fmt.Sprintf("%d", envelope.ChannelID),
"peer_id", string(peerID),
"message_type", "").Add(1)

r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)

case <-peerQueue.closed():
Expand Down
4 changes: 3 additions & 1 deletion internal/rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ type consensusState interface {

type peerManager interface {
Peers() []types.NodeID
Score(types.NodeID) int
State(types.NodeID) string
Addresses(types.NodeID) []p2p.NodeAddress
}

//----------------------------------------------
// ----------------------------------------------
// Environment contains objects and interfaces used by the RPC. It is expected
// to be setup once during startup.
type Environment struct {
Expand Down
15 changes: 11 additions & 4 deletions internal/rpc/core/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo,
peerList := env.PeerManager.Peers()

peers := make([]coretypes.Peer, 0, len(peerList))
peerConnections := make([]coretypes.PeerConnection, 0, len(peerList))
for _, peer := range peerList {
addrs := env.PeerManager.Addresses(peer)
if len(addrs) == 0 {
Expand All @@ -24,13 +25,19 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo,
ID: peer,
URL: addrs[0].String(),
})
peerConnections = append(peerConnections, coretypes.PeerConnection{
ID: peer,
State: env.PeerManager.State(peer),
Score: env.PeerManager.Score(peer),
})
}

return &coretypes.ResultNetInfo{
Listening: env.IsListening,
Listeners: env.Listeners,
NPeers: len(peers),
Peers: peers,
Listening: env.IsListening,
Listeners: env.Listeners,
NPeers: len(peers),
Peers: peers,
PeerConnections: peerConnections,
}, nil
}

Expand Down
16 changes: 12 additions & 4 deletions rpc/coretypes/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ func (s *ResultStatus) TxIndexEnabled() bool {

// Info about peer connections
type ResultNetInfo struct {
Listening bool `json:"listening"`
Listeners []string `json:"listeners"`
NPeers int `json:"n_peers,string"`
Peers []Peer `json:"peers"`
Listening bool `json:"listening"`
Listeners []string `json:"listeners"`
NPeers int `json:"n_peers,string"`
Peers []Peer `json:"peers"`
PeerConnections []PeerConnection `json:"peer_connections"`
}

// Log from dialing seeds
Expand All @@ -196,6 +197,13 @@ type Peer struct {
URL string `json:"url"`
}

// A peer connection
type PeerConnection struct {
ID types.NodeID `json:"node_id"`
State string `json:"state"`
Score int `json:"score,string"`
}

// Validators for a height.
type ResultValidators struct {
BlockHeight int64 `json:"block_height,string"`
Expand Down