Skip to content

Commit f3bafec

Browse files
metrics: make meter updates lock-free (ethereum#21446)
1 parent 54add42 commit f3bafec

File tree

3 files changed

+37
-38
lines changed

3 files changed

+37
-38
lines changed

metrics/ewma.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"math"
55
"sync"
66
"sync/atomic"
7+
"time"
78
)
89

910
// EWMAs continuously calculate an exponentially-weighted moving average
@@ -85,7 +86,7 @@ type StandardEWMA struct {
8586
func (a *StandardEWMA) Rate() float64 {
8687
a.mutex.Lock()
8788
defer a.mutex.Unlock()
88-
return a.rate * float64(1e9)
89+
return a.rate * float64(time.Second)
8990
}
9091

9192
// Snapshot returns a read-only copy of the EWMA.
@@ -98,7 +99,7 @@ func (a *StandardEWMA) Snapshot() EWMA {
9899
func (a *StandardEWMA) Tick() {
99100
count := atomic.LoadInt64(&a.uncounted)
100101
atomic.AddInt64(&a.uncounted, -count)
101-
instantRate := float64(count) / float64(5e9)
102+
instantRate := float64(count) / float64(5*time.Second)
102103
a.mutex.Lock()
103104
defer a.mutex.Unlock()
104105
if a.init {

metrics/meter.go

+31-34
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"sync"
5+
"sync/atomic"
56
"time"
67
)
78

@@ -101,6 +102,7 @@ func NewRegisteredMeterForced(name string, r Registry) Meter {
101102
// MeterSnapshot is a read-only copy of another Meter.
102103
type MeterSnapshot struct {
103104
count int64
105+
temp int64
104106
rate1, rate5, rate15, rateMean float64
105107
}
106108

@@ -149,7 +151,7 @@ func (NilMeter) Rate1() float64 { return 0.0 }
149151
// Rate5 is a no-op.
150152
func (NilMeter) Rate5() float64 { return 0.0 }
151153

152-
// Rate15is a no-op.
154+
// Rate15 is a no-op.
153155
func (NilMeter) Rate15() float64 { return 0.0 }
154156

155157
// RateMean is a no-op.
@@ -167,7 +169,7 @@ type StandardMeter struct {
167169
snapshot *MeterSnapshot
168170
a1, a5, a15 EWMA
169171
startTime time.Time
170-
stopped bool
172+
stopped uint32
171173
}
172174

173175
func newStandardMeter() *StandardMeter {
@@ -182,69 +184,54 @@ func newStandardMeter() *StandardMeter {
182184

183185
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
184186
func (m *StandardMeter) Stop() {
185-
m.lock.Lock()
186-
stopped := m.stopped
187-
m.stopped = true
188-
m.lock.Unlock()
189-
if !stopped {
187+
stopped := atomic.SwapUint32(&m.stopped, 1)
188+
if stopped != 1 {
190189
arbiter.Lock()
191190
delete(arbiter.meters, m)
192191
arbiter.Unlock()
193192
}
194193
}
195194

196195
// Count returns the number of events recorded.
196+
// It updates the meter to be as accurate as possible
197197
func (m *StandardMeter) Count() int64 {
198-
m.lock.RLock()
199-
count := m.snapshot.count
200-
m.lock.RUnlock()
201-
return count
198+
m.lock.Lock()
199+
defer m.lock.Unlock()
200+
m.updateMeter()
201+
return m.snapshot.count
202202
}
203203

204204
// Mark records the occurrence of n events.
205205
func (m *StandardMeter) Mark(n int64) {
206-
m.lock.Lock()
207-
defer m.lock.Unlock()
208-
if m.stopped {
209-
return
210-
}
211-
m.snapshot.count += n
212-
m.a1.Update(n)
213-
m.a5.Update(n)
214-
m.a15.Update(n)
215-
m.updateSnapshot()
206+
atomic.AddInt64(&m.snapshot.temp, n)
216207
}
217208

218209
// Rate1 returns the one-minute moving average rate of events per second.
219210
func (m *StandardMeter) Rate1() float64 {
220211
m.lock.RLock()
221-
rate1 := m.snapshot.rate1
222-
m.lock.RUnlock()
223-
return rate1
212+
defer m.lock.RUnlock()
213+
return m.snapshot.rate1
224214
}
225215

226216
// Rate5 returns the five-minute moving average rate of events per second.
227217
func (m *StandardMeter) Rate5() float64 {
228218
m.lock.RLock()
229-
rate5 := m.snapshot.rate5
230-
m.lock.RUnlock()
231-
return rate5
219+
defer m.lock.RUnlock()
220+
return m.snapshot.rate5
232221
}
233222

234223
// Rate15 returns the fifteen-minute moving average rate of events per second.
235224
func (m *StandardMeter) Rate15() float64 {
236225
m.lock.RLock()
237-
rate15 := m.snapshot.rate15
238-
m.lock.RUnlock()
239-
return rate15
226+
defer m.lock.RUnlock()
227+
return m.snapshot.rate15
240228
}
241229

242230
// RateMean returns the meter's mean rate of events per second.
243231
func (m *StandardMeter) RateMean() float64 {
244232
m.lock.RLock()
245-
rateMean := m.snapshot.rateMean
246-
m.lock.RUnlock()
247-
return rateMean
233+
defer m.lock.RUnlock()
234+
return m.snapshot.rateMean
248235
}
249236

250237
// Snapshot returns a read-only copy of the meter.
@@ -264,9 +251,19 @@ func (m *StandardMeter) updateSnapshot() {
264251
snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
265252
}
266253

254+
func (m *StandardMeter) updateMeter() {
255+
// should only run with write lock held on m.lock
256+
n := atomic.LoadInt64(&m.snapshot.temp)
257+
m.snapshot.count += n
258+
m.a1.Update(n)
259+
m.a5.Update(n)
260+
m.a15.Update(n)
261+
}
262+
267263
func (m *StandardMeter) tick() {
268264
m.lock.Lock()
269265
defer m.lock.Unlock()
266+
m.updateMeter()
270267
m.a1.Tick()
271268
m.a5.Tick()
272269
m.a15.Tick()
@@ -282,7 +279,7 @@ type meterArbiter struct {
282279
ticker *time.Ticker
283280
}
284281

285-
var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
282+
var arbiter = meterArbiter{ticker: time.NewTicker(5 * time.Second), meters: make(map[*StandardMeter]struct{})}
286283

287284
// Ticks meters on the scheduled interval
288285
func (ma *meterArbiter) tick() {

metrics/meter_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestGetOrRegisterMeter(t *testing.T) {
1717
r := NewRegistry()
1818
NewRegisteredMeter("foo", r).Mark(47)
1919
if m := GetOrRegisterMeter("foo", r); m.Count() != 47 {
20-
t.Fatal(m)
20+
t.Fatal(m.Count())
2121
}
2222
}
2323

@@ -29,10 +29,11 @@ func TestMeterDecay(t *testing.T) {
2929
defer ma.ticker.Stop()
3030
m := newStandardMeter()
3131
ma.meters[m] = struct{}{}
32-
go ma.tick()
3332
m.Mark(1)
33+
ma.tickMeters()
3434
rateMean := m.RateMean()
3535
time.Sleep(100 * time.Millisecond)
36+
ma.tickMeters()
3637
if m.RateMean() >= rateMean {
3738
t.Error("m.RateMean() didn't decrease")
3839
}

0 commit comments

Comments
 (0)