Skip to content

Commit 452c0ca

Browse files
committed
Use pebble
1 parent 29b3a63 commit 452c0ca

File tree

4 files changed

+131
-103
lines changed

4 files changed

+131
-103
lines changed

x-pack/apm-server/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,11 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
122122
}
123123

124124
storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir)
125-
badgerDB, err = getBadgerDB(storageDir)
125+
db, err := getPebbleDB(storageDir)
126126
if err != nil {
127127
return nil, fmt.Errorf("failed to get Badger database: %w", err)
128128
}
129-
readWriters := getStorage(badgerDB)
129+
readWriters := getPebbleStorage(db)
130130

131131
policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
132132
for i, in := range tailSamplingConfig.Policies {

x-pack/apm-server/sampling/config.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,9 @@ func (config DataStreamConfig) validate() error {
234234
}
235235

236236
func (config StorageConfig) validate() error {
237-
if config.DB == nil {
238-
return errors.New("DB unspecified")
239-
}
237+
//if config.DB == nil {
238+
// return errors.New("DB unspecified")
239+
//}
240240
if config.Storage == nil {
241241
return errors.New("Storage unspecified")
242242
}

x-pack/apm-server/sampling/eventstoragepebble/storage.go

+87-58
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"github.com/cockroachdb/pebble"
1212
"github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage"
13+
"sync"
1314
"sync/atomic"
1415
"time"
1516

@@ -90,6 +91,9 @@ type ReadWriter struct {
9091
// This must not be used in write operations, as keys are expected to
9192
// be unmodified until the end of a transaction.
9293
readKeyBuf []byte
94+
95+
mu sync.Mutex
96+
batch *pebble.Batch
9397
}
9498

9599
// Close closes the writer. Any writes that have not been flushed may be lost.
@@ -143,7 +147,7 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) {
143147
// return false, err
144148
//}
145149
//return item.UserMeta() == entryMetaTraceSampled, nil
146-
return false, nil
150+
return false, eventstorage.ErrNotFound
147151
}
148152

149153
// WriteTraceEvent writes a trace event to storage.
@@ -161,64 +165,84 @@ func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb.
161165
buf.WriteByte(':')
162166
buf.WriteString(id)
163167
key := buf.Bytes()
164-
return rw.s.db.Set(key, data, pebble.NoSync)
165-
//rw.writeEntry(badger.NewEntry(key, data).WithMeta(entryMetaTraceEvent), opts)
168+
169+
//return rw.s.db.Set(key, data, pebble.NoSync)
170+
return rw.writeEntry(key, data)
171+
}
172+
173+
func (rw *ReadWriter) writeEntry(key, data []byte) error {
174+
rw.mu.Lock()
175+
defer rw.mu.Unlock()
176+
if rw.batch == nil {
177+
rw.batch = rw.s.db.NewIndexedBatch()
178+
}
179+
if err := rw.batch.Set(key, data, pebble.NoSync); err != nil {
180+
return err
181+
}
182+
183+
if rw.batch.Len() > 2000 {
184+
err := rw.batch.Commit(pebble.Sync)
185+
rw.batch.Close()
186+
rw.batch = nil
187+
return err
188+
}
189+
return nil
190+
191+
//
192+
//rw.pendingWrites++
193+
//entrySize := estimateSize(e)
194+
//// The badger database has an async size reconciliation, with a 1 minute
195+
//// ticker that keeps the lsm and vlog sizes updated in an in-memory map.
196+
//// It's OK to call call s.db.Size() on the hot path, since the memory
197+
//// lookup is cheap.
198+
//lsm, vlog := rw.s.db.Size()
199+
//
200+
//// there are multiple ReadWriters writing to the same storage so add
201+
//// the entry size and consider the new value to avoid TOCTOU issues.
202+
//pendingSize := rw.s.pendingSize.Add(entrySize)
203+
//rw.pendingSize += entrySize
204+
//
205+
//if current := pendingSize + lsm + vlog; opts.StorageLimitInBytes != 0 && current >= opts.StorageLimitInBytes {
206+
// // flush what we currently have and discard the current entry
207+
// if err := rw.Flush(); err != nil {
208+
// return err
209+
// }
210+
// return fmt.Errorf("%w (current: %d, limit: %d)", ErrLimitReached, current, opts.StorageLimitInBytes)
211+
//}
212+
//
213+
//if rw.pendingWrites >= 200 {
214+
// // Attempt to flush if there are 200 or more uncommitted writes.
215+
// // This ensures calls to ReadTraceEvents are not slowed down;
216+
// // ReadTraceEvents uses an iterator, which must sort all keys
217+
// // of uncommitted writes.
218+
// // The 200 value yielded a good balance between read and write speed:
219+
// // https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
220+
// if err := rw.Flush(); err != nil {
221+
// return err
222+
// }
223+
//
224+
// // the current ReadWriter flushed the transaction and reset the pendingSize so add
225+
// // the entrySize again.
226+
// rw.pendingSize += entrySize
227+
// rw.s.pendingSize.Add(entrySize)
228+
//}
229+
//
230+
//err := rw.txn.SetEntry(e.WithTTL(opts.TTL))
231+
//
232+
//// If the transaction is already too big to accommodate the new entry, flush
233+
//// the existing transaction and set the entry on a new one, otherwise,
234+
//// returns early.
235+
//if err != badger.ErrTxnTooBig {
236+
// return err
237+
//}
238+
//if err := rw.Flush(); err != nil {
239+
// return err
240+
//}
241+
//rw.pendingSize += entrySize
242+
//rw.s.pendingSize.Add(entrySize)
243+
//return rw.txn.SetEntry(e.WithTTL(opts.TTL))
166244
}
167245

168-
//func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
169-
// rw.pendingWrites++
170-
// entrySize := estimateSize(e)
171-
// // The badger database has an async size reconciliation, with a 1 minute
172-
// // ticker that keeps the lsm and vlog sizes updated in an in-memory map.
173-
// // It's OK to call call s.db.Size() on the hot path, since the memory
174-
// // lookup is cheap.
175-
// lsm, vlog := rw.s.db.Size()
176-
//
177-
// // there are multiple ReadWriters writing to the same storage so add
178-
// // the entry size and consider the new value to avoid TOCTOU issues.
179-
// pendingSize := rw.s.pendingSize.Add(entrySize)
180-
// rw.pendingSize += entrySize
181-
//
182-
// if current := pendingSize + lsm + vlog; opts.StorageLimitInBytes != 0 && current >= opts.StorageLimitInBytes {
183-
// // flush what we currently have and discard the current entry
184-
// if err := rw.Flush(); err != nil {
185-
// return err
186-
// }
187-
// return fmt.Errorf("%w (current: %d, limit: %d)", ErrLimitReached, current, opts.StorageLimitInBytes)
188-
// }
189-
//
190-
// if rw.pendingWrites >= 200 {
191-
// // Attempt to flush if there are 200 or more uncommitted writes.
192-
// // This ensures calls to ReadTraceEvents are not slowed down;
193-
// // ReadTraceEvents uses an iterator, which must sort all keys
194-
// // of uncommitted writes.
195-
// // The 200 value yielded a good balance between read and write speed:
196-
// // https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
197-
// if err := rw.Flush(); err != nil {
198-
// return err
199-
// }
200-
//
201-
// // the current ReadWriter flushed the transaction and reset the pendingSize so add
202-
// // the entrySize again.
203-
// rw.pendingSize += entrySize
204-
// rw.s.pendingSize.Add(entrySize)
205-
// }
206-
//
207-
// err := rw.txn.SetEntry(e.WithTTL(opts.TTL))
208-
//
209-
// // If the transaction is already too big to accommodate the new entry, flush
210-
// // the existing transaction and set the entry on a new one, otherwise,
211-
// // returns early.
212-
// if err != badger.ErrTxnTooBig {
213-
// return err
214-
// }
215-
// if err := rw.Flush(); err != nil {
216-
// return err
217-
// }
218-
// rw.pendingSize += entrySize
219-
// rw.s.pendingSize.Add(entrySize)
220-
// return rw.txn.SetEntry(e.WithTTL(opts.TTL))
221-
//}
222246
//
223247
//func estimateSize(e *badger.Entry) int64 {
224248
// // See badger WithValueThreshold option
@@ -261,7 +285,12 @@ func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
261285

262286
// ReadTraceEvents reads trace events with the given trace ID from storage into out.
263287
func (rw *ReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
264-
iter, err := rw.s.db.NewIter(&pebble.IterOptions{
288+
rw.mu.Lock()
289+
defer rw.mu.Unlock()
290+
if rw.batch == nil {
291+
rw.batch = rw.s.db.NewIndexedBatch()
292+
}
293+
iter, err := rw.batch.NewIter(&pebble.IterOptions{
265294
LowerBound: append([]byte(traceID), ':'),
266295
UpperBound: append([]byte(traceID), ';'),
267296
})

x-pack/apm-server/sampling/processor.go

+39-40
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"sync/atomic"
1515
"time"
1616

17-
"github.com/dgraph-io/badger/v2"
1817
"github.com/pkg/errors"
1918
"golang.org/x/sync/errgroup"
2019

@@ -117,11 +116,11 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) {
117116
p.groups.mu.RUnlock()
118117
monitoring.ReportInt(V, "dynamic_service_groups", int64(numDynamicGroups))
119118

120-
monitoring.ReportNamespace(V, "storage", func() {
121-
lsmSize, valueLogSize := p.config.DB.Size()
122-
monitoring.ReportInt(V, "lsm_size", int64(lsmSize))
123-
monitoring.ReportInt(V, "value_log_size", int64(valueLogSize))
124-
})
119+
//monitoring.ReportNamespace(V, "storage", func() {
120+
// lsmSize, valueLogSize := p.config.DB.Size()
121+
// monitoring.ReportInt(V, "lsm_size", int64(lsmSize))
122+
// monitoring.ReportInt(V, "value_log_size", int64(valueLogSize))
123+
//})
125124
monitoring.ReportNamespace(V, "events", func() {
126125
monitoring.ReportInt(V, "processed", atomic.LoadInt64(&p.eventMetrics.processed))
127126
monitoring.ReportInt(V, "dropped", atomic.LoadInt64(&p.eventMetrics.dropped))
@@ -390,40 +389,40 @@ func (p *Processor) Run() error {
390389
}
391390
}
392391
})
393-
g.Go(func() error {
394-
// Protect this goroutine from running concurrently when 2 TBS processors are active
395-
// as badger GC is not concurrent safe.
396-
select {
397-
case <-p.stopping:
398-
return nil
399-
case gcCh <- struct{}{}:
400-
}
401-
defer func() {
402-
<-gcCh
403-
}()
404-
// This goroutine is responsible for periodically garbage
405-
// collecting the Badger value log, using the recommended
406-
// discard ratio of 0.5.
407-
ticker := time.NewTicker(p.config.StorageGCInterval)
408-
defer ticker.Stop()
409-
for {
410-
select {
411-
case <-p.stopping:
412-
return nil
413-
case <-ticker.C:
414-
const discardRatio = 0.5
415-
var err error
416-
for err == nil {
417-
// Keep garbage collecting until there are no more rewrites,
418-
// or garbage collection fails.
419-
err = p.config.DB.RunValueLogGC(discardRatio)
420-
}
421-
if err != nil && err != badger.ErrNoRewrite {
422-
return err
423-
}
424-
}
425-
}
426-
})
392+
//g.Go(func() error {
393+
// // Protect this goroutine from running concurrently when 2 TBS processors are active
394+
// // as badger GC is not concurrent safe.
395+
// select {
396+
// case <-p.stopping:
397+
// return nil
398+
// case gcCh <- struct{}{}:
399+
// }
400+
// defer func() {
401+
// <-gcCh
402+
// }()
403+
// // This goroutine is responsible for periodically garbage
404+
// // collecting the Badger value log, using the recommended
405+
// // discard ratio of 0.5.
406+
// ticker := time.NewTicker(p.config.StorageGCInterval)
407+
// defer ticker.Stop()
408+
// for {
409+
// select {
410+
// case <-p.stopping:
411+
// return nil
412+
// case <-ticker.C:
413+
// const discardRatio = 0.5
414+
// var err error
415+
// for err == nil {
416+
// // Keep garbage collecting until there are no more rewrites,
417+
// // or garbage collection fails.
418+
// err = p.config.DB.RunValueLogGC(discardRatio)
419+
// }
420+
// if err != nil && err != badger.ErrNoRewrite {
421+
// return err
422+
// }
423+
// }
424+
// }
425+
//})
427426
g.Go(func() error {
428427
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
429428
// Stop is called. But it is possible that both old and new subscriber goroutines

0 commit comments

Comments
 (0)