Skip to content

Commit f9b6f32

Browse files
authored
fix: remove metric units that are interpreted as ratios (#49)
Metrics with a unit of "1" are interpreted as ratios by the prometheus exporter, which has the side effect of appending `_ratio` to the metric name. This is supposed to only be done for gauges but is actually applied to counters too. See https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheus for the transformations applied by the prometheus exporter. Since we were using "1" for dimensionless counts I have simply removed the units. This PR also hooks up the SentMessages, SentMessageErrors, SentRequests, SentRequestErrors, OutboundRequestLatency and SentBytes DHT metrics.
1 parent 0a1371b commit f9b6f32

File tree

8 files changed

+98
-36
lines changed

8 files changed

+98
-36
lines changed

dht.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func New(h host.Host, cfg *Config) (*DHT, error) {
132132
rtr := &router{
133133
host: h,
134134
protocolID: cfg.ProtocolID,
135-
tracer: d.tele.Tracer,
135+
tele: d.tele,
136+
clk: cfg.Clock,
136137
}
137138
d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), rtr, d.rt, coordCfg)
138139
if err != nil {

internal/coord/routing/bootstrap.go

-4
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
138138
b.counterFindSent, err = cfg.Meter.Int64Counter(
139139
"bootstrap_find_sent",
140140
metric.WithDescription("Total number of find closer nodes requests sent by the bootstrap state machine"),
141-
metric.WithUnit("1"),
142141
)
143142
if err != nil {
144143
return nil, fmt.Errorf("create bootstrap_find_sent counter: %w", err)
@@ -147,7 +146,6 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
147146
b.counterFindSucceeded, err = cfg.Meter.Int64Counter(
148147
"bootstrap_find_succeeded",
149148
metric.WithDescription("Total number of find closer nodes requests sent by the bootstrap state machine that were successful"),
150-
metric.WithUnit("1"),
151149
)
152150
if err != nil {
153151
return nil, fmt.Errorf("create bootstrap_find_succeeded counter: %w", err)
@@ -156,7 +154,6 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
156154
b.counterFindFailed, err = cfg.Meter.Int64Counter(
157155
"bootstrap_find_failed",
158156
metric.WithDescription("Total number of find closer nodes requests sent by the bootstrap state machine that failed"),
159-
metric.WithUnit("1"),
160157
)
161158
if err != nil {
162159
return nil, fmt.Errorf("create bootstrap_find_failed counter: %w", err)
@@ -165,7 +162,6 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
165162
b.gaugeRunning, err = cfg.Meter.Int64ObservableGauge(
166163
"bootstrap_running",
167164
metric.WithDescription("Whether or not the bootstrap is running"),
168-
metric.WithUnit("1"),
169165
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
170166
if b.running.Load() {
171167
o.Observe(1)

internal/coord/routing/explore.go

-4
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
186186
e.counterFindSent, err = cfg.Meter.Int64Counter(
187187
"explore_find_sent",
188188
metric.WithDescription("Total number of find closer nodes requests sent by the explore state machine"),
189-
metric.WithUnit("1"),
190189
)
191190
if err != nil {
192191
return nil, fmt.Errorf("create explore_find_sent counter: %w", err)
@@ -195,7 +194,6 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
195194
e.counterFindSucceeded, err = cfg.Meter.Int64Counter(
196195
"explore_find_succeeded",
197196
metric.WithDescription("Total number of find closer nodes requests sent by the explore state machine that were successful"),
198-
metric.WithUnit("1"),
199197
)
200198
if err != nil {
201199
return nil, fmt.Errorf("create explore_find_succeeded counter: %w", err)
@@ -204,7 +202,6 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
204202
e.counterFindFailed, err = cfg.Meter.Int64Counter(
205203
"explore_find_failed",
206204
metric.WithDescription("Total number of find closer nodes requests sent by the explore state machine that failed"),
207-
metric.WithUnit("1"),
208205
)
209206
if err != nil {
210207
return nil, fmt.Errorf("create explore_find_failed counter: %w", err)
@@ -213,7 +210,6 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
213210
e.gaugeRunning, err = cfg.Meter.Int64ObservableGauge(
214211
"explore_running",
215212
metric.WithDescription("Whether or not the an explore is running for a cpl"),
216-
metric.WithUnit("1"),
217213
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
218214
if e.running.Load() {
219215
o.Observe(1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))

internal/coord/routing/include.go

-5
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
149149
in.counterChecksSent, err = cfg.Meter.Int64Counter(
150150
"include_checks_sent",
151151
metric.WithDescription("Total number of connectivity checks sent by the include state machine"),
152-
metric.WithUnit("1"),
153152
)
154153
if err != nil {
155154
return nil, fmt.Errorf("create include_checks_sent counter: %w", err)
@@ -158,7 +157,6 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
158157
in.counterChecksPassed, err = cfg.Meter.Int64Counter(
159158
"include_checks_passed",
160159
metric.WithDescription("Total number of connectivity checks sent by the include state machine that were successful"),
161-
metric.WithUnit("1"),
162160
)
163161
if err != nil {
164162
return nil, fmt.Errorf("create include_checks_passed counter: %w", err)
@@ -167,7 +165,6 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
167165
in.counterChecksFailed, err = cfg.Meter.Int64Counter(
168166
"include_checks_failed",
169167
metric.WithDescription("Total number of connectivity checks sent by the include state machine that failed"),
170-
metric.WithUnit("1"),
171168
)
172169
if err != nil {
173170
return nil, fmt.Errorf("create include_checks_failed counter: %w", err)
@@ -176,7 +173,6 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
176173
in.counterCandidatesDroppedCapacity, err = cfg.Meter.Int64Counter(
177174
"include_candidates_dropped_capacity",
178175
metric.WithDescription("Total number of nodes that were not added to the candidate queue because it was already at maximum capacity"),
179-
metric.WithUnit("1"),
180176
)
181177
if err != nil {
182178
return nil, fmt.Errorf("create include_candidates_dropped_capacity counter: %w", err)
@@ -185,7 +181,6 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
185181
in.gaugeCandidateCount, err = cfg.Meter.Int64ObservableGauge(
186182
"include_candidate_count",
187183
metric.WithDescription("Total number of nodes in the include state machine's candidate queue"),
188-
metric.WithUnit("1"),
189184
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
190185
o.Observe(in.candidateCount.Load())
191186
return nil

internal/coord/routing/probe.go

-4
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *Prob
177177
p.counterChecksSent, err = cfg.Meter.Int64Counter(
178178
"probe_checks_sent",
179179
metric.WithDescription("Total number of connectivity checks sent by the probe state machine"),
180-
metric.WithUnit("1"),
181180
)
182181
if err != nil {
183182
return nil, fmt.Errorf("create probe_checks_sent counter: %w", err)
@@ -186,7 +185,6 @@ func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *Prob
186185
p.counterChecksPassed, err = cfg.Meter.Int64Counter(
187186
"probe_checks_passed",
188187
metric.WithDescription("Total number of connectivity checks sent by the probe state machine that were successful"),
189-
metric.WithUnit("1"),
190188
)
191189
if err != nil {
192190
return nil, fmt.Errorf("create probe_checks_passed counter: %w", err)
@@ -195,7 +193,6 @@ func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *Prob
195193
p.counterChecksFailed, err = cfg.Meter.Int64Counter(
196194
"probe_checks_failed",
197195
metric.WithDescription("Total number of connectivity checks sent by the probe state machine that failed"),
198-
metric.WithUnit("1"),
199196
)
200197
if err != nil {
201198
return nil, fmt.Errorf("create probe_checks_failed counter: %w", err)
@@ -204,7 +201,6 @@ func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *Prob
204201
p.gaugePendingCount, err = cfg.Meter.Int64ObservableGauge(
205202
"probe_pending_count",
206203
metric.WithDescription("Total number of nodes being monitored by the probe state machine"),
207-
metric.WithUnit("1"),
208204
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
209205
o.Observe(p.pendingCount.Load())
210206
return nil

pb/msg.aux.go

+60
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pb
33
import (
44
"bytes"
55
"fmt"
6+
math_bits "math/bits"
67

78
"github.com/libp2p/go-libp2p/core/peer"
89
ma "github.com/multiformats/go-multiaddr"
@@ -114,6 +115,41 @@ func (m *Message) CloserNodes() []kadt.PeerID {
114115
return ids
115116
}
116117

118+
func (m *Message) Size() (n int) {
119+
if m == nil {
120+
return 0
121+
}
122+
var l int
123+
_ = l
124+
if m.Type != 0 {
125+
n += 1 + sovDht(uint64(m.Type))
126+
}
127+
l = len(m.Key)
128+
if l > 0 {
129+
n += 1 + l + sovDht(uint64(l))
130+
}
131+
if m.Record != nil {
132+
l = m.Record.Size()
133+
n += 1 + l + sovDht(uint64(l))
134+
}
135+
if len(m.CloserPeers) > 0 {
136+
for _, e := range m.CloserPeers {
137+
l = e.Size()
138+
n += 1 + l + sovDht(uint64(l))
139+
}
140+
}
141+
if len(m.ProviderPeers) > 0 {
142+
for _, e := range m.ProviderPeers {
143+
l = e.Size()
144+
n += 1 + l + sovDht(uint64(l))
145+
}
146+
}
147+
if m.ClusterLevelRaw != 0 {
148+
n += 1 + sovDht(uint64(m.ClusterLevelRaw))
149+
}
150+
return n
151+
}
152+
117153
// Addresses returns the Multiaddresses associated with the Message_Peer entry
118154
func (m *Message_Peer) Addresses() []ma.Multiaddr {
119155
if m == nil {
@@ -133,3 +169,27 @@ func (m *Message_Peer) Addresses() []ma.Multiaddr {
133169

134170
return maddrs
135171
}
172+
173+
func (m *Message_Peer) Size() (n int) {
174+
if m == nil {
175+
return 0
176+
}
177+
var l int
178+
_ = l
179+
l = len(m.Id)
180+
n += 1 + l + sovDht(uint64(l))
181+
if len(m.Addrs) > 0 {
182+
for _, b := range m.Addrs {
183+
l = len(b)
184+
n += 1 + l + sovDht(uint64(l))
185+
}
186+
}
187+
if m.Connection != 0 {
188+
n += 1 + sovDht(uint64(m.Connection))
189+
}
190+
return n
191+
}
192+
193+
func sovDht(x uint64) (n int) {
194+
return (math_bits.Len64(x|1) + 6) / 7
195+
}

router.go

+26-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/benbjohnson/clock"
910
"github.com/libp2p/go-libp2p/core/host"
1011
"github.com/libp2p/go-libp2p/core/network"
1112
"github.com/libp2p/go-libp2p/core/peer"
@@ -31,8 +32,10 @@ type router struct {
3132
// [protocol]: https://docs.libp2p.io/concepts/fundamentals/protocols/
3233
protocolID protocol.ID
3334

34-
// an open telemetry tacer instance
35-
tracer trace.Tracer
35+
// tele holds a reference to a telemetry struct
36+
tele *Telemetry
37+
38+
clk clock.Clock
3639
}
3740

3841
var _ coordt.Router[kadt.Key, kadt.PeerID, *pb.Message] = (*router)(nil)
@@ -43,7 +46,7 @@ func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Messag
4346
trace.WithAttributes(tele.AttrPeerID(to.String())),
4447
trace.WithAttributes(tele.AttrKey(base64.RawStdEncoding.EncodeToString(req.GetKey()))),
4548
}
46-
ctx, span := r.tracer.Start(ctx, "router.SendMessage", spanOpts...)
49+
ctx, span := r.tele.Tracer.Start(ctx, "router.SendMessage", spanOpts...)
4750
defer func() {
4851
if err != nil {
4952
span.RecordError(err)
@@ -70,27 +73,42 @@ func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Messag
7073
w := pbio.NewDelimitedWriter(s)
7174
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
7275

76+
if !req.ExpectResponse() {
77+
err = w.WriteMsg(req)
78+
r.tele.SentMessages.Add(ctx, 1)
79+
if err != nil {
80+
r.tele.SentMessageErrors.Add(ctx, 1)
81+
return nil, fmt.Errorf("write message: %w", err)
82+
}
83+
r.tele.SentBytes.Record(ctx, int64(req.Size()))
84+
return nil, nil
85+
}
86+
87+
start := r.clk.Now()
88+
7389
err = w.WriteMsg(req)
90+
r.tele.SentRequests.Add(ctx, 1)
7491
if err != nil {
92+
r.tele.SentRequestErrors.Add(ctx, 1)
7593
return nil, fmt.Errorf("write message: %w", err)
7694
}
77-
78-
if !req.ExpectResponse() {
79-
return nil, nil
80-
}
95+
r.tele.SentBytes.Record(ctx, int64(req.Size()))
8196

8297
span.End()
83-
ctx, span = r.tracer.Start(ctx, "router.ReadMessage", spanOpts...)
98+
ctx, span = r.tele.Tracer.Start(ctx, "router.ReadMessage", spanOpts...)
8499

85100
data, err := reader.ReadMsg()
86101
if err != nil {
102+
r.tele.SentRequestErrors.Add(ctx, 1)
87103
return nil, fmt.Errorf("read message: %w", err)
88104
}
89105

90106
protoResp := pb.Message{}
91107
if err = proto.Unmarshal(data, &protoResp); err != nil {
108+
r.tele.SentRequestErrors.Add(ctx, 1)
92109
return nil, err
93110
}
111+
r.tele.OutboundRequestLatency.Record(ctx, float64(r.clk.Since(start))/float64(time.Millisecond))
94112

95113
for _, info := range protoResp.CloserPeersAddrInfos() {
96114
_ = r.addToPeerStore(ctx, info, time.Hour) // TODO: replace hard coded time.Hour with config

telemetry.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ type Telemetry struct {
2424
ReceivedBytes metric.Int64Histogram
2525
InboundRequestLatency metric.Float64Histogram
2626
OutboundRequestLatency metric.Float64Histogram
27-
SentMessages metric.Int64Counter
27+
SentMessages metric.Int64Counter // number of messages sent that did not expect a response
2828
SentMessageErrors metric.Int64Counter
29-
SentRequests metric.Int64Counter
29+
SentRequests metric.Int64Counter // number of messages sent that expected a response
3030
SentRequestErrors metric.Int64Counter
3131
SentBytes metric.Int64Histogram
3232
LRUCache metric.Int64Counter
@@ -63,12 +63,12 @@ func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.Trace
6363

6464
// Initalize metrics for the DHT
6565

66-
t.ReceivedMessages, err = meter.Int64Counter("received_messages", metric.WithDescription("Total number of messages received per RPC"), metric.WithUnit("1"))
66+
t.ReceivedMessages, err = meter.Int64Counter("received_messages", metric.WithDescription("Total number of messages received per RPC"))
6767
if err != nil {
6868
return nil, fmt.Errorf("received_messages counter: %w", err)
6969
}
7070

71-
t.ReceivedMessageErrors, err = meter.Int64Counter("received_message_errors", metric.WithDescription("Total number of errors for messages received per RPC"), metric.WithUnit("1"))
71+
t.ReceivedMessageErrors, err = meter.Int64Counter("received_message_errors", metric.WithDescription("Total number of errors for messages received per RPC"))
7272
if err != nil {
7373
return nil, fmt.Errorf("received_message_errors counter: %w", err)
7474
}
@@ -88,22 +88,22 @@ func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.Trace
8888
return nil, fmt.Errorf("outbound_request_latency histogram: %w", err)
8989
}
9090

91-
t.SentMessages, err = meter.Int64Counter("sent_messages", metric.WithDescription("Total number of messages sent per RPC"), metric.WithUnit("1"))
91+
t.SentMessages, err = meter.Int64Counter("sent_messages", metric.WithDescription("Total number of messages sent per RPC"))
9292
if err != nil {
9393
return nil, fmt.Errorf("sent_messages counter: %w", err)
9494
}
9595

96-
t.SentMessageErrors, err = meter.Int64Counter("sent_message_errors", metric.WithDescription("Total number of errors for messages sent per RPC"), metric.WithUnit("1"))
96+
t.SentMessageErrors, err = meter.Int64Counter("sent_message_errors", metric.WithDescription("Total number of errors for messages sent per RPC"))
9797
if err != nil {
9898
return nil, fmt.Errorf("sent_message_errors counter: %w", err)
9999
}
100100

101-
t.SentRequests, err = meter.Int64Counter("sent_requests", metric.WithDescription("Total number of requests sent per RPC"), metric.WithUnit("1"))
101+
t.SentRequests, err = meter.Int64Counter("sent_requests", metric.WithDescription("Total number of requests sent per RPC"))
102102
if err != nil {
103103
return nil, fmt.Errorf("sent_requests counter: %w", err)
104104
}
105105

106-
t.SentRequestErrors, err = meter.Int64Counter("sent_request_errors", metric.WithDescription("Total number of errors for requests sent per RPC"), metric.WithUnit("1"))
106+
t.SentRequestErrors, err = meter.Int64Counter("sent_request_errors", metric.WithDescription("Total number of errors for requests sent per RPC"))
107107
if err != nil {
108108
return nil, fmt.Errorf("sent_request_errors counter: %w", err)
109109
}
@@ -113,12 +113,12 @@ func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.Trace
113113
return nil, fmt.Errorf("sent_bytes histogram: %w", err)
114114
}
115115

116-
t.LRUCache, err = meter.Int64Counter("lru_cache", metric.WithDescription("Cache hit or miss counter"), metric.WithUnit("1"))
116+
t.LRUCache, err = meter.Int64Counter("lru_cache", metric.WithDescription("Cache hit or miss counter"))
117117
if err != nil {
118118
return nil, fmt.Errorf("lru_cache counter: %w", err)
119119
}
120120

121-
t.NetworkSize, err = meter.Int64Counter("network_size", metric.WithDescription("Network size estimation"), metric.WithUnit("1"))
121+
t.NetworkSize, err = meter.Int64Counter("network_size", metric.WithDescription("Network size estimation"))
122122
if err != nil {
123123
return nil, fmt.Errorf("network_size counter: %w", err)
124124
}

0 commit comments

Comments
 (0)