From 58cc5adcc62842beec598846f2338bd50fe11195 Mon Sep 17 00:00:00 2001 From: Seth Bunce Date: Thu, 5 Nov 2020 15:11:57 -0800 Subject: [PATCH 1/3] memmetrics: simplify locking and solve data race This package was updating counters concurrently because of incorrect locking. There's not really a reason to have two separate locks or try to optimize with RW locks. This change replaces the two RW locks with one exclusive lock. The RTMetrics struct methods always acquire this exclusive lock. This is simple and will be easier to keep correct as the code evolves. --- memmetrics/roundtrip.go | 77 +++++++++++++++++------------------- memmetrics/roundtrip_test.go | 35 ++++------------ 2 files changed, 45 insertions(+), 67 deletions(-) diff --git a/memmetrics/roundtrip.go b/memmetrics/roundtrip.go index 34b39691..5cbf0779 100644 --- a/memmetrics/roundtrip.go +++ b/memmetrics/roundtrip.go @@ -15,16 +15,15 @@ import ( // are a rolling window histograms with defined precision as well. // See RTOptions for more detail on parameters. type RTMetrics struct { - total *RollingCounter - netErrors *RollingCounter - statusCodes map[int]*RollingCounter - statusCodesLock sync.RWMutex - histogram *RollingHDRHistogram - histogramLock sync.RWMutex - - newCounter NewCounterFn - newHist NewRollingHistogramFn - clock timetools.TimeProvider + // lock protects all data members. + lock sync.Mutex + total *RollingCounter + netErrors *RollingCounter + statusCodes map[int]*RollingCounter + histogram *RollingHDRHistogram + newCounter NewCounterFn + newHist NewRollingHistogramFn + clock timetools.TimeProvider } type rrOptSetter func(r *RTMetrics) error @@ -65,8 +64,7 @@ func RTClock(clock timetools.TimeProvider) rrOptSetter { // NewRTMetrics returns new instance of metrics collector. func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) { m := &RTMetrics{ - statusCodes: make(map[int]*RollingCounter), - statusCodesLock: sync.RWMutex{}, + statusCodes: make(map[int]*RollingCounter), } for _, s := range settings { if err := s(m); err != nil { @@ -113,14 +111,10 @@ func NewRTMetrics(settings ...rrOptSetter) (*RTMetrics, error) { // Export Returns a new RTMetrics which is a copy of the current one func (m *RTMetrics) Export() *RTMetrics { - m.statusCodesLock.RLock() - defer m.statusCodesLock.RUnlock() - m.histogramLock.RLock() - defer m.histogramLock.RUnlock() + m.lock.Lock() + defer m.lock.Unlock() export := &RTMetrics{} - export.statusCodesLock = sync.RWMutex{} - export.histogramLock = sync.RWMutex{} export.total = m.total.Clone() export.netErrors = m.netErrors.Clone() exportStatusCodes := map[int]*RollingCounter{} @@ -140,12 +134,16 @@ func (m *RTMetrics) Export() *RTMetrics { // CounterWindowSize gets total windows size func (m *RTMetrics) CounterWindowSize() time.Duration { + m.lock.Lock() + defer m.lock.Unlock() return m.total.WindowSize() } // NetworkErrorRatio calculates the amont of network errors such as time outs and dropped connection // that occurred in the given time window compared to the total requests count. func (m *RTMetrics) NetworkErrorRatio() float64 { + m.lock.Lock() + defer m.lock.Unlock() if m.total.Count() == 0 { return 0 } @@ -154,10 +152,10 @@ func (m *RTMetrics) NetworkErrorRatio() float64 { // ResponseCodeRatio calculates ratio of count(startA to endA) / count(startB to endB) func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 { + m.lock.Lock() + defer m.lock.Unlock() a := int64(0) b := int64(0) - m.statusCodesLock.RLock() - defer m.statusCodesLock.RUnlock() for code, v := range m.statusCodes { if code < endA && code >= startA { a += v.Count() @@ -174,6 +172,9 @@ func (m *RTMetrics) ResponseCodeRatio(startA, endA, startB, endB int) float64 { // Append append a metric func (m *RTMetrics) Append(other *RTMetrics) error { + m.lock.Lock() + defer m.lock.Unlock() + if m == other { return errors.New("RTMetrics cannot append to self") } @@ -188,10 +189,6 @@ func (m *RTMetrics) Append(other *RTMetrics) error { copied := other.Export() - m.statusCodesLock.Lock() - defer m.statusCodesLock.Unlock() - m.histogramLock.Lock() - defer m.histogramLock.Unlock() for code, c := range copied.statusCodes { o, ok := m.statusCodes[code] if ok { @@ -208,6 +205,9 @@ func (m *RTMetrics) Append(other *RTMetrics) error { // Record records a metric func (m *RTMetrics) Record(code int, duration time.Duration) { + m.lock.Lock() + defer m.lock.Unlock() + m.total.Inc(1) if code == http.StatusGatewayTimeout || code == http.StatusBadGateway { m.netErrors.Inc(1) @@ -218,19 +218,24 @@ func (m *RTMetrics) Record(code int, duration time.Duration) { // TotalCount returns total count of processed requests collected. func (m *RTMetrics) TotalCount() int64 { + m.lock.Lock() + defer m.lock.Unlock() return m.total.Count() } // NetworkErrorCount returns total count of processed requests observed func (m *RTMetrics) NetworkErrorCount() int64 { + m.lock.Lock() + defer m.lock.Unlock() return m.netErrors.Count() } // StatusCodesCounts returns map with counts of the response codes func (m *RTMetrics) StatusCodesCounts() map[int]int64 { + m.lock.Lock() + defer m.lock.Unlock() + sc := make(map[int]int64) - m.statusCodesLock.RLock() - defer m.statusCodesLock.RUnlock() for k, v := range m.statusCodes { if v.Count() != 0 { sc[k] = v.Count() @@ -241,40 +246,32 @@ func (m *RTMetrics) StatusCodesCounts() map[int]int64 { // LatencyHistogram computes and returns resulting histogram with latencies observed. func (m *RTMetrics) LatencyHistogram() (*HDRHistogram, error) { - m.histogramLock.Lock() - defer m.histogramLock.Unlock() + m.lock.Lock() + defer m.lock.Unlock() return m.histogram.Merged() } // Reset reset metrics func (m *RTMetrics) Reset() { - m.statusCodesLock.Lock() - defer m.statusCodesLock.Unlock() - m.histogramLock.Lock() - defer m.histogramLock.Unlock() + m.lock.Lock() + defer m.lock.Unlock() m.histogram.Reset() m.total.Reset() m.netErrors.Reset() m.statusCodes = make(map[int]*RollingCounter) } +// WARNING: Lock must be held before calling. func (m *RTMetrics) recordLatency(d time.Duration) error { - m.histogramLock.Lock() - defer m.histogramLock.Unlock() return m.histogram.RecordLatencies(d, 1) } +// WARNING: Lock must be held before calling. func (m *RTMetrics) recordStatusCode(statusCode int) error { - m.statusCodesLock.Lock() if c, ok := m.statusCodes[statusCode]; ok { c.Inc(1) - m.statusCodesLock.Unlock() return nil } - m.statusCodesLock.Unlock() - - m.statusCodesLock.Lock() - defer m.statusCodesLock.Unlock() // Check if another goroutine has written our counter already if c, ok := m.statusCodes[statusCode]; ok { diff --git a/memmetrics/roundtrip_test.go b/memmetrics/roundtrip_test.go index 2de6be23..e301d97f 100644 --- a/memmetrics/roundtrip_test.go +++ b/memmetrics/roundtrip_test.go @@ -2,7 +2,6 @@ package memmetrics import ( "runtime" - "sync" "testing" "time" @@ -75,8 +74,11 @@ func TestAppend(t *testing.T) { } func TestConcurrentRecords(t *testing.T) { - // This test asserts a race condition which requires parallelism + // This test asserts a race condition which requires concurrency. Set + // GOMAXPROCS high for this test, then restore after test completes. + n := runtime.GOMAXPROCS(0) runtime.GOMAXPROCS(100) + defer runtime.GOMAXPROCS(n) rr, err := NewRTMetrics(RTClock(testutils.GetClock())) require.NoError(t, err) @@ -84,7 +86,7 @@ func TestConcurrentRecords(t *testing.T) { for code := 0; code < 100; code++ { for numRecords := 0; numRecords < 10; numRecords++ { go func(statusCode int) { - _ = rr.recordStatusCode(statusCode) + rr.Record(statusCode, time.Second) }(code) } } @@ -92,11 +94,9 @@ func TestConcurrentRecords(t *testing.T) { func TestRTMetricExportReturnsNewCopy(t *testing.T) { a := RTMetrics{ - clock: &timetools.RealTime{}, - statusCodes: map[int]*RollingCounter{}, - statusCodesLock: sync.RWMutex{}, - histogram: &RollingHDRHistogram{}, - histogramLock: sync.RWMutex{}, + clock: &timetools.RealTime{}, + statusCodes: map[int]*RollingCounter{}, + histogram: &RollingHDRHistogram{}, } var err error @@ -129,23 +129,4 @@ func TestRTMetricExportReturnsNewCopy(t *testing.T) { assert.NotNil(t, b.newCounter) assert.NotNil(t, b.newHist) assert.NotNil(t, b.clock) - - // a and b should have different locks - locksSucceed := make(chan bool) - go func() { - a.statusCodesLock.Lock() - b.statusCodesLock.Lock() - a.histogramLock.Lock() - b.histogramLock.Lock() - locksSucceed <- true - }() - - for { - select { - case <-locksSucceed: - return - case <-time.After(10 * time.Second): - t.FailNow() - } - } } From a71cfa13268265e3cfa83c2db1cc742b58716426 Mon Sep 17 00:00:00 2001 From: Seth Bunce Date: Fri, 6 Nov 2020 10:23:35 -0800 Subject: [PATCH 2/3] add benchmark for recording status code On a 3ghz intel CPU we do zero allocs and record takes 255ns. --- memmetrics/roundtrip_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/memmetrics/roundtrip_test.go b/memmetrics/roundtrip_test.go index e301d97f..6385d46b 100644 --- a/memmetrics/roundtrip_test.go +++ b/memmetrics/roundtrip_test.go @@ -11,6 +11,25 @@ import ( "github.com/vulcand/oxy/testutils" ) +func BenchmarkRecord(b *testing.B) { + b.ReportAllocs() + + rr, err := NewRTMetrics(RTClock(testutils.GetClock())) + require.NoError(b, err) + + // warm up metrics. Adding a new code can do allocations, but in the steady + // state recording a code is cheap. We want to measure the steady state. + const codes = 100 + for code := 0; code < codes; code++ { + rr.Record(code, time.Second) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rr.Record(i%codes, time.Second) + } +} + func TestDefaults(t *testing.T) { rr, err := NewRTMetrics(RTClock(testutils.GetClock())) require.NoError(t, err) From b4943291a6049899daf60f96c2d04189b0749886 Mon Sep 17 00:00:00 2001 From: Seth Bunce Date: Mon, 9 Nov 2020 09:16:20 -0800 Subject: [PATCH 3/3] add concurrent benchmark for recording the status code --- memmetrics/roundtrip_test.go | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/memmetrics/roundtrip_test.go b/memmetrics/roundtrip_test.go index 6385d46b..022b945e 100644 --- a/memmetrics/roundtrip_test.go +++ b/memmetrics/roundtrip_test.go @@ -2,6 +2,7 @@ package memmetrics import ( "runtime" + "sync" "testing" "time" @@ -30,6 +31,42 @@ func BenchmarkRecord(b *testing.B) { } } +func BenchmarkRecordConcurrently(b *testing.B) { + b.ReportAllocs() + + rr, err := NewRTMetrics(RTClock(testutils.GetClock())) + require.NoError(b, err) + + // warm up metrics. Adding a new code can do allocations, but in the steady + // state recording a code is cheap. We want to measure the steady state. + const codes = 100 + for code := 0; code < codes; code++ { + rr.Record(code, time.Second) + } + + concurrency := runtime.NumCPU() + b.Logf("NumCPU: %d, Concurrency: %d, GOMAXPROCS: %d", + runtime.NumCPU(), concurrency, runtime.GOMAXPROCS(0)) + wg := sync.WaitGroup{} + wg.Add(concurrency) + perG := b.N/concurrency + if perG == 0 { + perG = 1 + } + + b.ResetTimer() + for i := 0; i < concurrency; i++ { + go func() { + for j := 0; j < perG; j++ { + rr.Record(j%codes, time.Second) + } + wg.Done() + }() + } + + wg.Wait() +} + func TestDefaults(t *testing.T) { rr, err := NewRTMetrics(RTClock(testutils.GetClock())) require.NoError(t, err)