Skip to content

Commit 9605745

Browse files
committed
TBS: drop and recreate badger db after exceeding storage limit for TTL time
1 parent 5a3dd8d commit 9605745

File tree

3 files changed

+260
-0
lines changed

3 files changed

+260
-0
lines changed

x-pack/apm-server/sampling/eventstorage/storage_manager.go

+134
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package eventstorage
66

77
import (
8+
"errors"
89
"os"
910
"path/filepath"
1011
"sync"
@@ -34,9 +35,14 @@ type StorageManager struct {
3435
storage *Storage
3536
rw *ShardedReadWriter
3637

38+
// mu guards db, storage, and rw swaps.
39+
mu sync.RWMutex
3740
// subscriberPosMu protects the subscriber file from concurrent RW.
3841
subscriberPosMu sync.Mutex
3942

43+
// dropLoopCh acts as a mutex to ensure that there is only 1 active RunDropLoop per StorageManager,
44+
// as it is possible that 2 separate RunDropLoop are created by 2 TBS processors during a hot reload.
45+
dropLoopCh chan struct{}
4046
// gcLoopCh acts as a mutex to ensure only 1 gc loop is running per StorageManager.
4147
// as it is possible that 2 separate RunGCLoop are created by 2 TBS processors during a hot reload.
4248
gcLoopCh chan struct{}
@@ -46,6 +52,7 @@ type StorageManager struct {
4652
func NewStorageManager(storageDir string) (*StorageManager, error) {
4753
sm := &StorageManager{
4854
storageDir: storageDir,
55+
dropLoopCh: make(chan struct{}, 1),
4956
gcLoopCh: make(chan struct{}, 1),
5057
logger: logp.NewLogger(logs.Sampling),
5158
}
@@ -107,16 +114,131 @@ func (s *StorageManager) runValueLogGC(discardRatio float64) error {
107114
return s.db.RunValueLogGC(discardRatio)
108115
}
109116

117+
// RunDropLoop runs a loop that detects if storage limit has been exceeded for at least ttl.
118+
// If so, it drops and recreates the underlying badger DB.
119+
// The loop stops when it receives from stopping.
120+
func (s *StorageManager) RunDropLoop(stopping <-chan struct{}, ttl time.Duration, storageLimitInBytes uint64) error {
121+
select {
122+
case <-stopping:
123+
return nil
124+
case s.dropLoopCh <- struct{}{}:
125+
}
126+
defer func() {
127+
<-s.dropLoopCh
128+
}()
129+
130+
if storageLimitInBytes == 0 {
131+
<-stopping
132+
return nil
133+
}
134+
135+
timer := time.NewTicker(min(time.Minute, ttl)) // Eval db size every minute as badger reports them with 1m lag, but use min to facilitate testing
136+
defer timer.Stop()
137+
var firstExceeded time.Time
138+
for {
139+
select {
140+
case <-stopping:
141+
return nil
142+
case <-timer.C:
143+
lsm, vlog := s.Size()
144+
if uint64(lsm+vlog) >= storageLimitInBytes { //FIXME: Add a bit of buffer? Is s.storage.pendingSize reliable enough?
145+
now := time.Now()
146+
if firstExceeded.IsZero() {
147+
firstExceeded = now
148+
}
149+
if now.Sub(firstExceeded) >= ttl {
150+
s.logger.Warnf("badger db size has exceeded storage limit for over TTL, please consider increasing sampling.tail.storage_size; dropping and recreating badger db to recover")
151+
s.DropAndRecreate()
152+
s.logger.Info("badger db dropped and recreated")
153+
}
154+
} else {
155+
firstExceeded = time.Time{}
156+
}
157+
}
158+
}
159+
}
160+
110161
func (s *StorageManager) Close() error {
162+
s.mu.RLock()
163+
defer s.mu.RUnlock()
111164
s.rw.Close()
112165
return s.db.Close()
113166
}
114167

168+
// Reset initializes db, storage, and rw.
169+
func (s *StorageManager) Reset() error {
170+
db, err := OpenBadger(s.storageDir, -1)
171+
if err != nil {
172+
return err
173+
}
174+
s.db = db
175+
s.storage = New(db, ProtobufCodec{})
176+
s.rw = s.storage.NewShardedReadWriter()
177+
return nil
178+
}
179+
115180
// Size returns the db size
181+
//
182+
// Caller should either be main Run loop or should be holding RLock already
116183
func (s *StorageManager) Size() (lsm, vlog int64) {
117184
return s.db.Size()
118185
}
119186

187+
func (s *StorageManager) runValueLogGC(discardRatio float64) error {
188+
s.mu.RLock()
189+
defer s.mu.RUnlock()
190+
return s.db.RunValueLogGC(discardRatio)
191+
}
192+
193+
// DropAndRecreate deletes the underlying badger DB at a file system level, and replaces it with a new badger DB.
194+
func (s *StorageManager) DropAndRecreate() {
195+
s.mu.Lock()
196+
s.rw.Close()
197+
err := s.db.Close()
198+
if err != nil {
199+
s.logger.With(logp.Error(err)).Error("error closing badger db during drop and recreate")
200+
}
201+
202+
s.subscriberPosMu.Lock()
203+
backupPath := filepath.Join(filepath.Dir(s.storageDir), filepath.Base(s.storageDir)+".old")
204+
// FIXME: what if backupPath already exists?
205+
err = os.Rename(s.storageDir, backupPath)
206+
if err != nil {
207+
s.logger.With(logp.Error(err)).Error("error renaming old badger db during drop and recreate")
208+
}
209+
210+
// Since subscriber position file lives in the same tail sampling directory as badger DB,
211+
// Create tail sampling dir, move back subscriber position file, as it is not a part of the DB.
212+
var mode os.FileMode
213+
stat, err := os.Stat(backupPath)
214+
if err != nil {
215+
mode = 0700
216+
s.logger.With(logp.Error(err)).Error("error stat backup path during drop and recreate")
217+
} else {
218+
mode = stat.Mode()
219+
}
220+
err = os.Mkdir(s.storageDir, mode)
221+
if err != nil {
222+
s.logger.With(logp.Error(err)).Error("error mkdir storage dir during drop and recreate")
223+
}
224+
err = os.Rename(filepath.Join(backupPath, subscriberPositionFile), filepath.Join(s.storageDir, subscriberPositionFile))
225+
if err != nil && !errors.Is(err, os.ErrNotExist) {
226+
s.logger.With(logp.Error(err)).Error("error copying subscriber position file during drop and recreate")
227+
}
228+
229+
err = s.Reset() //FIXME: this is likely fatal. Return error to crash the processor
230+
if err != nil {
231+
s.logger.With(logp.Error(err)).Error("error creating new badger db during drop and recreate")
232+
}
233+
s.subscriberPosMu.Unlock()
234+
s.mu.Unlock()
235+
236+
err = os.RemoveAll(backupPath)
237+
if err != nil {
238+
s.logger.With(logp.Error(err)).Error("error removing old badger db during drop and recreate")
239+
}
240+
}
241+
120242
func (s *StorageManager) ReadSubscriberPosition() ([]byte, error) {
121243
s.subscriberPosMu.Lock()
122244
defer s.subscriberPosMu.Unlock()
@@ -142,26 +264,38 @@ type ManagedReadWriter struct {
142264
}
143265

144266
func (s *ManagedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
267+
s.sm.mu.RLock()
268+
defer s.sm.mu.RUnlock()
145269
return s.sm.rw.ReadTraceEvents(traceID, out)
146270
}
147271

148272
func (s *ManagedReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent, opts WriterOpts) error {
273+
s.sm.mu.RLock()
274+
defer s.sm.mu.RUnlock()
149275
return s.sm.rw.WriteTraceEvent(traceID, id, event, opts)
150276
}
151277

152278
func (s *ManagedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
279+
s.sm.mu.RLock()
280+
defer s.sm.mu.RUnlock()
153281
return s.sm.rw.WriteTraceSampled(traceID, sampled, opts)
154282
}
155283

156284
func (s *ManagedReadWriter) IsTraceSampled(traceID string) (bool, error) {
285+
s.sm.mu.RLock()
286+
defer s.sm.mu.RUnlock()
157287
return s.sm.rw.IsTraceSampled(traceID)
158288
}
159289

160290
func (s *ManagedReadWriter) DeleteTraceEvent(traceID, id string) error {
291+
s.sm.mu.RLock()
292+
defer s.sm.mu.RUnlock()
161293
return s.sm.rw.DeleteTraceEvent(traceID, id)
162294
}
163295

164296
func (s *ManagedReadWriter) Flush() error {
297+
s.sm.mu.RLock()
298+
defer s.sm.mu.RUnlock()
165299
return s.sm.rw.Flush()
166300
}
167301

x-pack/apm-server/sampling/processor.go

+3
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ func (p *Processor) Run() error {
382382
g.Go(func() error {
383383
return p.config.DB.RunGCLoop(p.stopping, p.config.StorageGCInterval)
384384
})
385+
g.Go(func() error {
386+
return p.config.DB.RunDropLoop(p.stopping, p.config.TTL, p.config.StorageLimit)
387+
})
385388
g.Go(func() error {
386389
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
387390
// Stop is called. But it is possible that both old and new subscriber goroutines

x-pack/apm-server/sampling/processor_test.go

+123
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,129 @@ func TestProcessRemoteTailSamplingPersistence(t *testing.T) {
776776
assert.Equal(t, `{"index_name":1}`, string(data))
777777
}
778778

779+
func TestDropAndRecreate(t *testing.T) {
780+
// This test ensures that if badger is stuck at storage limit for TTL,
781+
// DB is dropped and recreated.
782+
if testing.Short() {
783+
t.Skip("skipping slow test")
784+
}
785+
786+
writeBatch := func(t *testing.T, n int, c sampling.Config, assertBatch func(b modelpb.Batch)) *sampling.Processor {
787+
processor, err := sampling.NewProcessor(c)
788+
require.NoError(t, err)
789+
go processor.Run()
790+
defer processor.Stop(context.Background())
791+
batch := make(modelpb.Batch, 0, n)
792+
for i := 0; i < n; i++ {
793+
traceID := uuid.Must(uuid.NewV4()).String()
794+
batch = append(batch, &modelpb.APMEvent{
795+
Trace: &modelpb.Trace{Id: traceID},
796+
Event: &modelpb.Event{Duration: uint64(123 * time.Millisecond)},
797+
Span: &modelpb.Span{
798+
Type: "type",
799+
Id: traceID,
800+
},
801+
})
802+
}
803+
err = processor.ProcessBatch(context.Background(), &batch)
804+
require.NoError(t, err)
805+
assertBatch(batch)
806+
return processor
807+
}
808+
809+
for _, tc := range []struct {
810+
name string
811+
subscriberPosExists bool
812+
}{
813+
{
814+
name: "subscriber_position_not_exist",
815+
subscriberPosExists: false,
816+
},
817+
{
818+
name: "subscriber_position_exists",
819+
subscriberPosExists: true,
820+
},
821+
} {
822+
t.Run(tc.name, func(t *testing.T) {
823+
config := newTempdirConfig(t)
824+
config.StorageGCInterval = time.Hour // effectively disable GC
825+
826+
config.FlushInterval = 10 * time.Millisecond
827+
subscriberChan := make(chan string)
828+
subscriber := pubsubtest.SubscriberChan(subscriberChan)
829+
config.Elasticsearch = pubsubtest.Client(nil, subscriber)
830+
subscriberPositionFile := filepath.Join(config.StorageDir, "subscriber_position.json")
831+
832+
// Write 5K span events and close the DB to persist to disk the storage
833+
// size and assert that none are reported immediately.
834+
writeBatch(t, 5000, config, func(b modelpb.Batch) {
835+
assert.Empty(t, b, fmt.Sprintf("expected empty but size is %d", len(b)))
836+
837+
subscriberChan <- "0102030405060708090a0b0c0d0e0f10"
838+
assert.Eventually(t, func() bool {
839+
b, err := os.ReadFile(subscriberPositionFile)
840+
return err == nil && string(b) == `{"index_name":1}`
841+
}, time.Second, 100*time.Millisecond)
842+
})
843+
assert.NoError(t, config.Storage.Flush())
844+
config.Storage.Close()
845+
assert.NoError(t, config.DB.Close())
846+
847+
if !tc.subscriberPosExists {
848+
err := os.Remove(subscriberPositionFile)
849+
assert.NoError(t, err)
850+
}
851+
852+
// Open a new instance of the badgerDB and check the size.
853+
var err error
854+
config.DB, err = eventstorage.NewStorageManager(config.StorageDir)
855+
require.NoError(t, err)
856+
t.Cleanup(func() { config.DB.Close() })
857+
858+
lsm, vlog := config.DB.Size()
859+
assert.GreaterOrEqual(t, lsm+vlog, int64(1024))
860+
861+
sstFilenames := func() []string {
862+
entries, _ := os.ReadDir(config.StorageDir)
863+
864+
var ssts []string
865+
for _, entry := range entries {
866+
name := entry.Name()
867+
if strings.HasSuffix(name, ".sst") {
868+
ssts = append(ssts, name)
869+
}
870+
}
871+
sort.Strings(ssts)
872+
return ssts
873+
}
874+
assert.NotEqual(t, "000000.sst", sstFilenames()[0])
875+
876+
config.Elasticsearch = pubsubtest.Client(nil, nil)
877+
878+
config.StorageLimit = uint64(lsm) - 100
879+
config.TTL = time.Second
880+
processor, err := sampling.NewProcessor(config)
881+
require.NoError(t, err)
882+
go processor.Run()
883+
defer processor.Stop(context.Background())
884+
885+
var filenames []string
886+
assert.Eventually(t, func() bool {
887+
filenames = sstFilenames()
888+
return len(filenames) == 0
889+
}, 10*time.Second, 500*time.Millisecond, filenames)
890+
891+
b, err := os.ReadFile(subscriberPositionFile)
892+
assert.NoError(t, err)
893+
if tc.subscriberPosExists {
894+
assert.Equal(t, `{"index_name":1}`, string(b))
895+
} else {
896+
assert.Equal(t, "{}", string(b))
897+
}
898+
})
899+
}
900+
}
901+
779902
func TestGracefulShutdown(t *testing.T) {
780903
config := newTempdirConfig(t)
781904
sampleRate := 0.5

0 commit comments

Comments
 (0)