diff --git a/internal/consensus/metrics.gen.go b/internal/consensus/metrics.gen.go index 37bac369b..3d5510159 100644 --- a/internal/consensus/metrics.gen.go +++ b/internal/consensus/metrics.gen.go @@ -20,6 +20,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "height", Help: "Height of the chain.", }, labels).With(labelsAndValues...), + GossipDataCount: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "gossip_data_count", + Help: "", + }, append(labels, "peer_id", "branch")).With(labelsAndValues...), + GossipVotesCount: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "gossip_votes_count", + Help: "", + }, append(labels, "peer_id", "branch")).With(labelsAndValues...), ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -162,25 +174,25 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Namespace: namespace, Subsystem: MetricsSubsystem, Name: "proposal_block_created_on_propose", - Help: "Number of proposal blocks created on propose received.", + Help: "", }, append(labels, "success")).With(labelsAndValues...), ProposalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "proposal_txs", - Help: "Number of txs in a proposal.", + Help: "", }, labels).With(labelsAndValues...), ProposalMissingTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "proposal_missing_txs", - Help: "Number of missing txs when trying to create proposal.", + Help: "", }, labels).With(labelsAndValues...), MissingTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "missing_txs", - Help: "Number of missing txs when a proposal is received.", + Help: "", }, append(labels, "proposer_address")).With(labelsAndValues...), QuorumPrevoteDelay: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -237,29 +249,35 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ - Height: discard.NewGauge(), - ValidatorLastSignedHeight: discard.NewGauge(), - Rounds: discard.NewGauge(), - RoundDuration: discard.NewHistogram(), - Validators: discard.NewGauge(), - ValidatorsPower: discard.NewGauge(), - ValidatorPower: discard.NewGauge(), - ValidatorMissedBlocks: discard.NewGauge(), - MissingValidators: discard.NewGauge(), - MissingValidatorsPower: discard.NewGauge(), - ByzantineValidators: discard.NewGauge(), - ByzantineValidatorsPower: discard.NewGauge(), - BlockIntervalSeconds: discard.NewHistogram(), - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewHistogram(), - TotalTxs: discard.NewGauge(), - CommittedHeight: discard.NewGauge(), - BlockSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), - BlockParts: discard.NewCounter(), - StepDuration: discard.NewHistogram(), - BlockGossipReceiveLatency: discard.NewHistogram(), - BlockGossipPartsReceived: discard.NewCounter(), + Height: discard.NewGauge(), + GossipDataCount: discard.NewCounter(), + GossipVotesCount: discard.NewCounter(), + ValidatorLastSignedHeight: discard.NewGauge(), + Rounds: discard.NewGauge(), + RoundDuration: discard.NewHistogram(), + Validators: discard.NewGauge(), + ValidatorsPower: discard.NewGauge(), + ValidatorPower: discard.NewGauge(), + ValidatorMissedBlocks: discard.NewGauge(), + MissingValidators: discard.NewGauge(), + MissingValidatorsPower: discard.NewGauge(), + ByzantineValidators: discard.NewGauge(), + ByzantineValidatorsPower: discard.NewGauge(), + BlockIntervalSeconds: discard.NewHistogram(), + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewHistogram(), + TotalTxs: discard.NewGauge(), + CommittedHeight: discard.NewGauge(), + BlockSyncing: discard.NewGauge(), + StateSyncing: discard.NewGauge(), + BlockParts: discard.NewCounter(), + StepDuration: discard.NewHistogram(), + BlockGossipReceiveLatency: discard.NewHistogram(), + BlockGossipPartsReceived: discard.NewCounter(), + ProposalBlockCreatedOnPropose: discard.NewCounter(), + ProposalTxs: discard.NewGauge(), + ProposalMissingTxs: discard.NewGauge(), + MissingTxs: discard.NewGauge(), QuorumPrevoteDelay: discard.NewGauge(), FullPrevoteDelay: discard.NewGauge(), ProposalTimestampDifference: discard.NewHistogram(), @@ -268,9 +286,5 @@ func NopMetrics() *Metrics { ProposalCreateCount: discard.NewCounter(), RoundVotingPowerPercent: discard.NewGauge(), LateVotes: discard.NewCounter(), - ProposalBlockCreatedOnPropose: discard.NewCounter(), - ProposalTxs: discard.NewGauge(), - ProposalMissingTxs: discard.NewGauge(), - MissingTxs: discard.NewGauge(), } } diff --git a/internal/consensus/metrics.go b/internal/consensus/metrics.go index 0b8617be8..49e22da89 100644 --- a/internal/consensus/metrics.go +++ b/internal/consensus/metrics.go @@ -24,6 +24,9 @@ type Metrics struct { // Height of the chain. Height metrics.Gauge + GossipDataCount metrics.Counter `metrics_labels:"peer_id, branch"` + GossipVotesCount metrics.Counter `metrics_labels:"peer_id, branch"` + // Last height signed by this validator if the node is a validator. ValidatorLastSignedHeight metrics.Gauge `metrics_labels:"validator_address"` diff --git a/internal/consensus/peer_state.go b/internal/consensus/peer_state.go index f081eada1..6a3b2206f 100644 --- a/internal/consensus/peer_state.go +++ b/internal/consensus/peer_state.go @@ -396,6 +396,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { // ignore duplicates or decreases if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 { + ps.logger.Error(fmt.Sprintf("received duplicate new round step msg %d %d %d %d %d %d", msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step), "peer", ps.peerID) return } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 83d7a536d..449732572 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -315,10 +315,12 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) { } func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { - return stateCh.Send(ctx, p2p.Envelope{ + err := stateCh.Send(ctx, p2p.Envelope{ Broadcast: true, Message: makeRoundStepMessage(rs), }) + r.logger.Info(fmt.Sprintf("broadcasting %d-%d-%d: %s", rs.Height, rs.Round, rs.Step, err)) + return err } func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error { @@ -506,10 +508,16 @@ func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh * timer := time.NewTimer(0) defer timer.Stop() + defer func() { + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "6").Add(1) + }() OUTER_LOOP: for { if !r.IsRunning() { + r.logger.Error("[TMDEBUG] gossipDataRoutine reactor not running", "peer", ps.peerID) return } @@ -517,6 +525,7 @@ OUTER_LOOP: select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] gossipDataRoutine outer done", "peer", ps.peerID) return case <-timer.C: } @@ -530,7 +539,7 @@ OUTER_LOOP: part := rs.ProposalBlockParts.GetPart(index) partProto, err := part.ToProto() if err != nil { - logger.Error("failed to convert block part to proto", "err", err) + r.logger.Error("[TMDEBUG] gossipDataRoutine failed to convert block part to proto", "peer", ps.peerID, "err", err) return } @@ -543,10 +552,14 @@ OUTER_LOOP: Part: *partProto, }, }); err != nil { + r.logger.Error("[TMDEBUG] gossipDataRoutine send part error", "peer", ps.peerID, "err", err) return } ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "1").Add(1) continue OUTER_LOOP } } @@ -572,15 +585,24 @@ OUTER_LOOP: // Continue the loop since prs is a copy and not effected by this // initialization. + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "2").Add(1) continue OUTER_LOOP } r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh) + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "3").Add(1) continue OUTER_LOOP } // if height and round don't match, sleep if (rs.Height != prs.Height) || (rs.Round != prs.Round) { + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "4").Add(1) continue OUTER_LOOP } @@ -602,6 +624,7 @@ OUTER_LOOP: Proposal: *propProto, }, }); err != nil { + r.logger.Error("[TMDEBUG] gossipDataRoutine send proposal error", "peer", ps.peerID, "err", err) return } @@ -627,10 +650,14 @@ OUTER_LOOP: ProposalPol: *pPolProto, }, }); err != nil { + r.logger.Error("[TMDEBUG] gossipDataRoutine send POL error", "peer", ps.peerID, "err", err) return } } } + r.Metrics.GossipDataCount.With( + "peer_id", string(ps.peerID), + "branch", "5").Add(1) } } @@ -741,14 +768,22 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh timer := time.NewTimer(0) defer timer.Stop() + defer func() { + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "6").Add(1) + }() + logCounter := 0 for { if !r.IsRunning() { + r.logger.Error("[TMDEBUG] gossipVotesRoutine reactor not running", "peer", ps.peerID) return } select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] gossipVotesRoutine done", "peer", ps.peerID) return default: } @@ -756,28 +791,42 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh rs := r.getRoundState() prs := ps.GetRoundState() + enteredB1 := false // if height matches, then send LastCommit, Prevotes, and Precommits if rs.Height == prs.Height { + enteredB1 = true if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil { + r.logger.Error("[TMDEBUG] gossipVotesRoutine height match error", "error", err, "peer", ps.peerID) return } else if ok { + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "1").Add(1) continue } } + enteredB2 := false // special catchup logic -- if peer is lagging by height 1, send LastCommit if prs.Height != 0 && rs.Height == prs.Height+1 { + enteredB2 = true if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil { + r.logger.Error("[TMDEBUG] gossipVotesRoutine special catchup error", "error", err, "peer", ps.peerID) return } else if ok { logger.Debug("picked rs.LastCommit to send", "height", prs.Height) + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "2").Add(1) continue } } // catchup logic -- if peer is lagging by more than 1, send Commit blockStoreBase := r.state.blockStore.Base() + enteredCatchup := false if blockStoreBase > 0 && prs.Height != 0 && rs.Height >= prs.Height+2 && prs.Height >= blockStoreBase { + enteredCatchup = true // Load the block's extended commit for prs.Height, which contains precommit // signatures for prs.Height. var ec *types.ExtendedCommit @@ -787,19 +836,36 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh ec = r.state.blockStore.LoadBlockCommit(prs.Height).WrappedExtendedCommit() } if ec == nil { + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "3").Add(1) continue } if ok, err := r.pickSendVote(ctx, ps, ec, voteCh); err != nil { + r.logger.Error("[TMDEBUG] gossipVotesRoutine catchup error", "error", err, "peer", ps.peerID) return } else if ok { logger.Debug("picked Catchup commit to send", "height", prs.Height) + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "4").Add(1) continue } } + if logCounter%300 == 0 { + // print every 10s + r.logger.Error(fmt.Sprintf("not gossiping votes %d %d %d %t %t %t", blockStoreBase, prs.Height, rs.Height, enteredB1, enteredB2, enteredCatchup), "peer", ps.peerID) + r.logger.Error(ps.PRS.String(), "peer", ps.peerID) + } + logCounter++ + r.Metrics.GossipVotesCount.With( + "peer_id", string(ps.peerID), + "branch", "5").Add(1) timer.Reset(r.state.config.PeerGossipSleepDuration) select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] gossipVotesRoutine done 2", "peer", ps.peerID) return case <-timer.C: } @@ -945,7 +1011,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh // the peer. During peer removal, we remove the peer for our set of peers and // signal to all spawned goroutines to gracefully exit in a non-blocking manner. func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) { - r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) + r.logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() defer r.mtx.Unlock() @@ -976,12 +1042,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda go func() { select { case <-ctx.Done(): + r.logger.Error("[TMDEBUG] canceled", "peer", peerUpdate.NodeID) return case <-r.readySignal: } // do nothing if the peer has // stopped while we've been waiting. if !ps.IsRunning() { + r.logger.Error("[TMDEBUG] not running", "peer", peerUpdate.NodeID) return } // start goroutines for this peer @@ -996,6 +1064,8 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda } }() + } else { + r.logger.Error("[TMDEBUG] peer status up received while PeerState is running") } case p2p.PeerStatusDown: diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 70c2dc44c..d7821f151 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -49,6 +49,7 @@ var ( ) var msgQueueSize = 1000 +var heartbeatIntervalInSecs = 10 // msgs from the reactor which may update the state type msgInfo struct { @@ -490,6 +491,7 @@ func (cs *State) OnStart(ctx context.Context) error { // now start the receiveRoutine go cs.receiveRoutine(ctx, 0) + go cs.heartbeater(ctx) // schedule the first round! // use GetRoundState so we don't race the receiveRoutine for access @@ -894,6 +896,23 @@ func (cs *State) newStep() { } } +func (cs *State) heartbeater(ctx context.Context) { + for { + select { + case <-time.After(time.Duration(heartbeatIntervalInSecs) * time.Second): + cs.fireHeartbeatEvent() + case <-ctx.Done(): + return + } + } +} + +func (cs *State) fireHeartbeatEvent() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.evsw.FireEvent(types.EventNewRoundStepValue, &cs.RoundState) +} + //----------------------------------------- // the main go routines @@ -1595,7 +1614,7 @@ func (cs *State) enterPrevote(ctx context.Context, height int64, round int32, en cs.newStep() }() - logger.Info("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) + logger.Info("entering prevote step", "entry", entryLabel, "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) // Sign and broadcast vote as necessary cs.doPrevote(ctx, height, round) @@ -1837,7 +1856,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, return } - logger.Info("entering precommit step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) + logger.Info("entering precommit step", "entry", entryLabel, "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step), "time", time.Now().UnixMilli()) defer func() { // Done enterPrecommit: diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 901597fb6..bcc97fd4f 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -119,6 +119,8 @@ type MConnection struct { created time.Time // time of creation _maxPacketMsgSize int + + setMet func(time.Time, string) } // MConnConfig is a MConnection configuration. @@ -163,6 +165,7 @@ func NewMConnection( onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig, + setMet func(time.Time, string), ) *MConnection { mconn := &MConnection{ logger: logger, @@ -178,6 +181,7 @@ func NewMConnection( config: config, created: time.Now(), cancel: func() {}, + setMet: setMet, } mconn.BaseService = *service.NewBaseService(logger, "MConnection", mconn) @@ -720,6 +724,9 @@ func (ch *channel) nextPacketMsg() tmp2p.PacketMsg { func (ch *channel) writePacketMsgTo(w io.Writer) (n int, err error) { packet := ch.nextPacketMsg() n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) + if err == nil { + ch.conn.setMet(time.Now(), fmt.Sprintf("%d", ch.desc.ID)) + } atomic.AddInt64(&ch.recentlySent, int64(n)) return } diff --git a/internal/p2p/conn/connection_test.go b/internal/p2p/conn/connection_test.go index 5a604cd23..602dec42b 100644 --- a/internal/p2p/conn/connection_test.go +++ b/internal/p2p/conn/connection_test.go @@ -43,7 +43,7 @@ func createMConnectionWithCallbacks( cfg.PingInterval = 250 * time.Millisecond cfg.PongTimeout = 500 * time.Millisecond chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := NewMConnection(logger, conn, chDescs, onReceive, onError, cfg) + c := NewMConnection(logger, conn, chDescs, onReceive, onError, cfg, nil) return c } @@ -435,7 +435,7 @@ func newClientAndServerConnsForReadErrors( } logger := log.NewNopLogger() - mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError, DefaultMConnConfig()) + mconnClient := NewMConnection(logger.With("module", "client"), client, chDescs, onReceive, onError, DefaultMConnConfig(), nil) err := mconnClient.Start(ctx) require.NoError(t, err) diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index cbfba29d9..a2c233eff 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -32,6 +32,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_send_bytes_total", Help: "Number of bytes per channel sent to a given peer.", }, append(labels, "peer_id", "chID", "message_type")).With(labelsAndValues...), + PeerNumChannels: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_num_channels", + Help: "Number of channels open for peer", + }, append(labels, "peer_id")).With(labelsAndValues...), PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -68,6 +74,30 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_queue_msg_size", Help: "The size of messages sent over a peer's queue for a specific p2p Channel.", }, append(labels, "ch_id")).With(labelsAndValues...), + PeerChannelSend: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_channel_send", + Help: "", + }, labels).With(labelsAndValues...), + LastEnqueuedAt: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "last_enqueued_at", + Help: "", + }, append(labels, "ch_id", "peer_id")).With(labelsAndValues...), + LastSentAt: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "last_sent_at", + Help: "", + }, append(labels, "ch_id", "peer_id")).With(labelsAndValues...), + LastWrittenAt: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "last_written_at", + Help: "", + }, append(labels, "ch_id")).With(labelsAndValues...), } } @@ -76,11 +106,16 @@ func NopMetrics() *Metrics { Peers: discard.NewGauge(), PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), + PeerNumChannels: discard.NewGauge(), PeerPendingSendBytes: discard.NewGauge(), RouterPeerQueueRecv: discard.NewHistogram(), RouterPeerQueueSend: discard.NewHistogram(), RouterChannelQueueSend: discard.NewHistogram(), PeerQueueDroppedMsgs: discard.NewCounter(), PeerQueueMsgSize: discard.NewGauge(), + PeerChannelSend: discard.NewCounter(), + LastEnqueuedAt: discard.NewGauge(), + LastSentAt: discard.NewGauge(), + LastWrittenAt: discard.NewGauge(), } } diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index b45f128e5..9e803cb75 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -32,6 +32,8 @@ type Metrics struct { PeerReceiveBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"` // Number of bytes per channel sent to a given peer. PeerSendBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"` + // Number of channels open for peer + PeerNumChannels metrics.Gauge `metrics_labels:"peer_id"` // Number of bytes pending being sent to a given peer. PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"` @@ -59,6 +61,12 @@ type Metrics struct { // queue for a specific flow (i.e. Channel). //metrics:The size of messages sent over a peer's queue for a specific p2p Channel. PeerQueueMsgSize metrics.Gauge `metrics_labels:"ch_id" metric_name:"router_channel_queue_msg_size"` + + PeerChannelSend metrics.Counter + + LastEnqueuedAt metrics.Gauge `metrics_labels:"ch_id, peer_id"` + LastSentAt metrics.Gauge `metrics_labels:"ch_id, peer_id"` + LastWrittenAt metrics.Gauge `metrics_labels:"ch_id"` } type metricsLabelCache struct { diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 5be75ec84..c8f3dda5e 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -7,6 +7,7 @@ import ( "math" "math/rand" "sort" + "strings" "sync" "time" @@ -1002,6 +1003,17 @@ func (m *PeerManager) Scores() map[types.NodeID]PeerScore { return scores } +func (m *PeerManager) Score(id types.NodeID) int { + m.mtx.Lock() + defer m.mtx.Unlock() + + peer, ok := m.store.Get(id) + if ok { + return int(peer.Score()) + } + return -1 +} + // Status returns the status for a peer, primarily for testing. func (m *PeerManager) Status(id types.NodeID) PeerStatus { m.mtx.Lock() @@ -1014,6 +1026,32 @@ func (m *PeerManager) Status(id types.NodeID) PeerStatus { } } +func (m *PeerManager) State(id types.NodeID) string { + m.mtx.Lock() + defer m.mtx.Unlock() + + states := []string{} + if _, ok := m.ready[id]; ok { + states = append(states, "ready") + } + if _, ok := m.dialing[id]; ok { + states = append(states, "dialing") + } + if _, ok := m.upgrading[id]; ok { + states = append(states, "upgrading") + } + if _, ok := m.connected[id]; ok { + states = append(states, "connected") + } + if _, ok := m.evict[id]; ok { + states = append(states, "evict") + } + if _, ok := m.evicting[id]; ok { + states = append(states, "evicting") + } + return strings.Join(states, ",") +} + // findUpgradeCandidate looks for a lower-scored peer that we could evict // to make room for the given peer. Returns an empty ID if none is found. // If the peer is already being upgraded to, we return that same upgrade. diff --git a/internal/p2p/router.go b/internal/p2p/router.go index cb0d32713..708a0ec50 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -324,13 +324,18 @@ func (r *Router) routeChannel( errCh <-chan PeerError, wrapper Wrapper, ) { + defer func() { + r.logger.Error("[TMDEBUG] exited", "channel", chID) + }() for { select { case envelope, ok := <-outCh: if !ok { + r.logger.Error("[TMDEBUG] receiving envelope failed, dropping message", "channel", chID) return } if envelope.IsZero() { + r.logger.Error("[TMDEBUG] envelope is zero, dropping message", "channel", chID) continue } @@ -351,16 +356,21 @@ func (r *Router) routeChannel( // collect peer queues to pass the message via var queues []queue + var peerIDs []types.NodeID if envelope.Broadcast { r.peerMtx.RLock() queues = make([]queue, 0, len(r.peerQueues)) + peerIDs = make([]types.NodeID, 0, len(r.peerQueues)) for nodeID, q := range r.peerQueues { peerChs := r.peerChannels[nodeID] // check whether the peer is receiving on that channel if _, ok := peerChs[chID]; ok { queues = append(queues, q) + peerIDs = append(peerIDs, nodeID) + } else { + r.logger.Error("[TMDEBUG] broadcast mode block dropping message because peer is not ok", "peer", envelope.To, "channel", chID) } } @@ -373,13 +383,14 @@ func (r *Router) routeChannel( if ok { peerChs := r.peerChannels[envelope.To] + r.metrics.PeerNumChannels.With("peer_id", string(envelope.To)).Set(float64(len(peerChs))) // check whether the peer is receiving on that channel _, contains = peerChs[chID] } r.peerMtx.RUnlock() if !ok { - r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because we cannot find peer", "peer", envelope.To, "channel", chID) continue } @@ -388,30 +399,35 @@ func (r *Router) routeChannel( // peer doesn't have available. This is a known issue due to // how peer subscriptions work: // https://github.com/tendermint/tendermint/issues/6598 + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because contains is false (peer is not receiving on channel)", "peer", envelope.To, "channel", chID) continue } queues = []queue{q} + peerIDs = []types.NodeID{envelope.To} } // send message to peers - for _, q := range queues { + for idx, q := range queues { start := time.Now().UTC() select { case q.enqueue() <- envelope: + r.metrics.LastEnqueuedAt.With("peer_id", string(peerIDs[idx]), "ch_id", fmt.Sprintf("%d", chID)).Set(float64(time.Now().UnixMilli())) r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) case <-q.closed(): - r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because q is closed", "peer", envelope.To, "channel", chID) case <-ctx.Done(): + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because context is done", "peer", envelope.To, "channel", chID) return } } case peerError, ok := <-errCh: if !ok { + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because peer err", "channel", chID) return } @@ -431,6 +447,7 @@ func (r *Router) routeChannel( } case <-ctx.Done(): + r.logger.Error("[TMDEBUG] dropping message for unconnected peer because context is done 2", "channel", chID) return } } @@ -752,7 +769,9 @@ func (r *Router) handshakePeer( } nodeInfo := r.nodeInfoProducer() + r.logger.Info(fmt.Sprintf("[TMDEBUG] handshaking %s with channels %s", expectID, nodeInfo.Channels)) peerInfo, peerKey, err := conn.Handshake(ctx, *nodeInfo, r.privKey) + r.logger.Info(fmt.Sprintf("[TMDEBUG] handshaked %s with channels %s", peerInfo.NodeID, peerInfo.Channels)) if err != nil { return peerInfo, err } @@ -940,6 +959,13 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect return err } + r.metrics.LastSentAt.With("peer_id", string(peerID), "ch_id", fmt.Sprintf("%d", envelope.ChannelID)).Set(float64(time.Now().UnixMilli())) + + r.metrics.PeerSendBytesTotal.With( + "chID", fmt.Sprintf("%d", envelope.ChannelID), + "peer_id", string(peerID), + "message_type", "").Add(1) + r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message) case <-peerQueue.closed(): diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index 5cc10fd8f..08105902a 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -9,6 +9,7 @@ import ( "net" "strconv" "sync" + "time" "golang.org/x/net/netutil" @@ -51,6 +52,8 @@ type MConnTransport struct { lastConnsMutex *sync.Mutex lastConns map[string]net.Conn + + Met *Metrics } // NewMConnTransport sets up a new MConnection transport. This uses the @@ -61,6 +64,7 @@ func NewMConnTransport( mConnConfig conn.MConnConfig, channelDescs []*ChannelDescriptor, options MConnTransportOptions, + met *Metrics, ) *MConnTransport { return &MConnTransport{ logger: logger, @@ -70,6 +74,7 @@ func NewMConnTransport( channelDescs: channelDescs, lastConns: make(map[string]net.Conn), lastConnsMutex: &sync.Mutex{}, + Met: met, } } @@ -178,7 +183,7 @@ func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { } else { m.logger.Error("accepting a connection whose remote address is not TCPAddr") } - return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil + return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs, m.Met), nil } } @@ -217,7 +222,7 @@ func (m *MConnTransport) Dial(ctx context.Context, endpoint *Endpoint) (Connecti } } - return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil + return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs, m.Met), nil } // Close implements Transport. @@ -274,6 +279,7 @@ type mConnConnection struct { errorCh chan error doneCh chan struct{} closeOnce sync.Once + Met *Metrics mconn *conn.MConnection // set during Handshake() } @@ -290,6 +296,7 @@ func newMConnConnection( conn net.Conn, mConnConfig conn.MConnConfig, channelDescs []*ChannelDescriptor, + met *Metrics, ) *mConnConnection { return &mConnConnection{ logger: logger, @@ -299,6 +306,7 @@ func newMConnConnection( receiveCh: make(chan mConnMessage), errorCh: make(chan error, 1), // buffered to avoid onError leak doneCh: make(chan struct{}), + Met: met, } } @@ -414,6 +422,9 @@ func (c *mConnConnection) handshake( c.onReceive, c.onError, c.mConnConfig, + func(t time.Time, s string) { + c.Met.LastWrittenAt.With("ch_id", s).Set(float64(t.UnixMilli())) + }, ) return mconn, peerInfo, secretConn.RemotePubKey(), nil diff --git a/internal/p2p/transport_mconn_test.go b/internal/p2p/transport_mconn_test.go index c478dbe1d..28d5bfeaa 100644 --- a/internal/p2p/transport_mconn_test.go +++ b/internal/p2p/transport_mconn_test.go @@ -24,6 +24,7 @@ func init() { conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, + nil, ) err := transport.Listen(&p2p.Endpoint{ Protocol: p2p.MConnProtocol, @@ -46,6 +47,7 @@ func TestMConnTransport_AcceptBeforeListen(t *testing.T) { p2p.MConnTransportOptions{ MaxAcceptedConnections: 2, }, + nil, ) t.Cleanup(func() { _ = transport.Close() @@ -69,6 +71,7 @@ func TestMConnTransport_AcceptMaxAcceptedConnections(t *testing.T) { p2p.MConnTransportOptions{ MaxAcceptedConnections: 2, }, + nil, ) t.Cleanup(func() { _ = transport.Close() @@ -158,6 +161,7 @@ func TestMConnTransport_Listen(t *testing.T) { conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{{ID: chID, Priority: 1}}, p2p.MConnTransportOptions{}, + nil, ) // Transport should not listen on any endpoints yet. diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 124525f26..f52d732e3 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -59,10 +59,12 @@ type consensusState interface { type peerManager interface { Peers() []types.NodeID + Score(types.NodeID) int + State(types.NodeID) string Addresses(types.NodeID) []p2p.NodeAddress } -//---------------------------------------------- +// ---------------------------------------------- // Environment contains objects and interfaces used by the RPC. It is expected // to be setup once during startup. type Environment struct { diff --git a/internal/rpc/core/net.go b/internal/rpc/core/net.go index b18f1e2fc..124deb757 100644 --- a/internal/rpc/core/net.go +++ b/internal/rpc/core/net.go @@ -14,6 +14,7 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, peerList := env.PeerManager.Peers() peers := make([]coretypes.Peer, 0, len(peerList)) + peerConnections := make([]coretypes.PeerConnection, 0, len(peerList)) for _, peer := range peerList { addrs := env.PeerManager.Addresses(peer) if len(addrs) == 0 { @@ -24,13 +25,19 @@ func (env *Environment) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, ID: peer, URL: addrs[0].String(), }) + peerConnections = append(peerConnections, coretypes.PeerConnection{ + ID: peer, + State: env.PeerManager.State(peer), + Score: env.PeerManager.Score(peer), + }) } return &coretypes.ResultNetInfo{ - Listening: env.IsListening, - Listeners: env.Listeners, - NPeers: len(peers), - Peers: peers, + Listening: env.IsListening, + Listeners: env.Listeners, + NPeers: len(peers), + Peers: peers, + PeerConnections: peerConnections, }, nil } diff --git a/node/node.go b/node/node.go index 7c381eea3..164b6ab7c 100644 --- a/node/node.go +++ b/node/node.go @@ -324,7 +324,7 @@ func makeNode( node.rpcEnv.ConsensusState = csState csReactor := consensus.NewReactor( - logger, + logger.With("module", "csreactor"), csState, peerManager.Subscribe, eventBus, diff --git a/node/setup.go b/node/setup.go index cf65856dc..05699c4b2 100644 --- a/node/setup.go +++ b/node/setup.go @@ -299,6 +299,7 @@ func createRouter( p2p.MConnTransportOptions{ MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections), }, + p2pMetrics, ) ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress)) diff --git a/rpc/coretypes/responses.go b/rpc/coretypes/responses.go index ef740f89d..cde125fa2 100644 --- a/rpc/coretypes/responses.go +++ b/rpc/coretypes/responses.go @@ -174,10 +174,11 @@ func (s *ResultStatus) TxIndexEnabled() bool { // Info about peer connections type ResultNetInfo struct { - Listening bool `json:"listening"` - Listeners []string `json:"listeners"` - NPeers int `json:"n_peers,string"` - Peers []Peer `json:"peers"` + Listening bool `json:"listening"` + Listeners []string `json:"listeners"` + NPeers int `json:"n_peers,string"` + Peers []Peer `json:"peers"` + PeerConnections []PeerConnection `json:"peer_connections"` } // Log from dialing seeds @@ -196,6 +197,13 @@ type Peer struct { URL string `json:"url"` } +// A peer connection +type PeerConnection struct { + ID types.NodeID `json:"node_id"` + State string `json:"state"` + Score int `json:"score,string"` +} + // Validators for a height. type ResultValidators struct { BlockHeight int64 `json:"block_height,string"`