Skip to content

Commit cdf1855

Browse files
committed
WIP
1 parent ed3596e commit cdf1855

File tree

1 file changed

+129
-0
lines changed
  • x-pack/apm-server/sampling/eventstorage

1 file changed

+129
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package eventstorage
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"github.com/dgraph-io/badger/v2"
8+
9+
"github.com/elastic/apm-data/model/modelpb"
10+
)
11+
12+
const (
13+
// NOTE(axw) these values (and their meanings) must remain stable
14+
// over time, to avoid misinterpreting historical data.
15+
badgerEntryMetaTraceSampled = 's'
16+
badgerEntryMetaTraceUnsampled = 'u'
17+
badgerEntryMetaTraceEvent = 'e'
18+
)
19+
20+
type badgerStorage struct {
21+
db *badger.DB
22+
codec Codec
23+
enabled bool
24+
mu sync.RWMutex
25+
}
26+
27+
type BadgerMigrationRW struct {
28+
s *badgerStorage
29+
nextRW RW
30+
}
31+
32+
// WriteTraceSampled records the tail-sampling decision for the given trace ID.
33+
func (rw *BadgerMigrationRW) WriteTraceSampled(traceID string, sampled bool) error {
34+
return rw.nextRW.WriteTraceSampled(traceID, sampled)
35+
}
36+
37+
// IsTraceSampled reports whether traceID belongs to a trace that is sampled
38+
// or unsampled. If no sampling decision has been recorded, IsTraceSampled
39+
// returns ErrNotFound.
40+
func (rw *BadgerMigrationRW) IsTraceSampled(traceID string) (bool, error) {
41+
sampled, err := rw.nextRW.IsTraceSampled(traceID)
42+
if err != ErrNotFound {
43+
return sampled, err
44+
}
45+
rw.s.mu.RLock()
46+
defer rw.s.mu.RUnlock()
47+
if !rw.s.enabled {
48+
return sampled, err
49+
}
50+
51+
txn := rw.s.db.NewTransaction(false)
52+
defer txn.Discard()
53+
item, err := txn.Get([]byte(traceID))
54+
if err != nil {
55+
if err == badger.ErrKeyNotFound {
56+
return false, ErrNotFound
57+
}
58+
return false, err
59+
}
60+
return item.UserMeta() == badgerEntryMetaTraceSampled, nil
61+
}
62+
63+
func (rw *BadgerMigrationRW) WriteTraceEvent(traceID string, id string, event *modelpb.APMEvent) error {
64+
return rw.nextRW.WriteTraceEvent(traceID, id, event)
65+
}
66+
67+
// DeleteTraceEvent deletes the trace event from storage.
68+
func (rw *BadgerMigrationRW) DeleteTraceEvent(traceID, id string) error {
69+
// FIXME: inclined to not delete from badger.
70+
// At worst it will produce duplicates if apm-server restarts without persisting pubsub checkpoint.
71+
return rw.nextRW.DeleteTraceEvent(traceID, id)
72+
}
73+
74+
// ReadTraceEvents reads trace events with the given trace ID from storage into out.
75+
func (rw *BadgerMigrationRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
76+
err := rw.nextRW.ReadTraceEvents(traceID, out)
77+
78+
rw.s.mu.RLock()
79+
defer rw.s.mu.RUnlock()
80+
if !rw.s.enabled {
81+
return err
82+
}
83+
84+
txn := rw.s.db.NewTransaction(false)
85+
opts := badger.DefaultIteratorOptions
86+
opts.Prefix = append([]byte(traceID), ':')
87+
88+
// 1st pass: check whether there exist keys matching the prefix.
89+
// Do not prefetch values so that the check is done in-memory.
90+
// This is to optimize for cases when it is a miss.
91+
opts.PrefetchValues = false
92+
iter := txn.NewIterator(opts)
93+
iter.Rewind()
94+
if !iter.Valid() {
95+
iter.Close()
96+
return nil
97+
}
98+
iter.Close()
99+
100+
// 2nd pass: this is only done when there exist keys matching the prefix.
101+
// Fetch the events with PrefetchValues for performance.
102+
// This is to optimize for cases when it is a hit.
103+
opts.PrefetchValues = true
104+
iter = txn.NewIterator(opts)
105+
defer iter.Close()
106+
for iter.Rewind(); iter.Valid(); iter.Next() {
107+
item := iter.Item()
108+
if item.IsDeletedOrExpired() {
109+
continue
110+
}
111+
switch item.UserMeta() {
112+
case badgerEntryMetaTraceEvent:
113+
event := &modelpb.APMEvent{}
114+
if err := item.Value(func(data []byte) error {
115+
if err := rw.s.codec.DecodeEvent(data, event); err != nil {
116+
return fmt.Errorf("codec failed to decode event: %w", err)
117+
}
118+
return nil
119+
}); err != nil {
120+
return err
121+
}
122+
*out = append(*out, event)
123+
default:
124+
// Unknown entry meta: ignore.
125+
continue
126+
}
127+
}
128+
return nil
129+
}

0 commit comments

Comments
 (0)