Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compute tbs entry size before writing and flush to avoid limit error #12120

Merged
merged 26 commits into from
Jan 18, 2024
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1d22c37
feat: compute tbs entry size before writing and flush to avoid limit …
kruskall Nov 29, 2023
bb819bc
Merge branch 'main' into feat/discarded-txn
kruskall Dec 5, 2023
a16a4ce
fix: move pendingSize to storage and make it atomic
kruskall Dec 5, 2023
613e58f
refactor: remove default pending size
kruskall Dec 5, 2023
958ffc5
fix: track readwriters pending size and resolve race conditions
kruskall Dec 5, 2023
c19c88e
lint: remove unused method
kruskall Dec 5, 2023
9c9cca2
fix: add base transaction size and add more comments
kruskall Dec 5, 2023
40c17a2
test: fix storage limit test
kruskall Dec 5, 2023
25f0a4c
lint: remove unused ErrLimitReached
kruskall Dec 5, 2023
52767f8
fix: do not add entrySize twice
kruskall Dec 6, 2023
de0230b
Merge branch 'main' into feat/discarded-txn
kruskall Jan 4, 2024
ac8f074
docs: add pendingSize comment
kruskall Jan 4, 2024
d6fece5
lint: fix linter issues
kruskall Jan 4, 2024
af8a170
fix: respect storage limit
kruskall Jan 5, 2024
5842784
fix: handle 0 storage limit (unlimited)
kruskall Jan 5, 2024
57c734b
Merge branch 'main' into feat/discarded-txn
kruskall Jan 5, 2024
bc0188e
fix: flush what we have before discarding the transaction
kruskall Jan 5, 2024
94aaa7e
fix: do not discard txn twice
kruskall Jan 11, 2024
c0cd5ff
test: fix error message typo
kruskall Jan 11, 2024
0848890
fix: update pendingSize after flush and refactor for clarity
kruskall Jan 11, 2024
4a351af
Merge branch 'main' into feat/discarded-txn
kruskall Jan 11, 2024
5519367
Merge branch 'main' into feat/discarded-txn
kruskall Jan 17, 2024
a71b50d
docs: update test comment
kruskall Jan 17, 2024
7113324
Merge branch 'main' into feat/discarded-txn
kruskall Jan 17, 2024
22b613f
docs: readd db.size comment
kruskall Jan 17, 2024
6bb59ca
Merge branch 'main' into feat/discarded-txn
kruskall Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
entryMetaTraceSampled = 's'
entryMetaTraceUnsampled = 'u'
entryMetaTraceEvent = 'e'

defaultPendingSize = 1024
kruskall marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -65,8 +67,9 @@ func (s *Storage) NewShardedReadWriter() *ShardedReadWriter {
// The returned ReadWriter must be closed when it is no longer needed.
func (s *Storage) NewReadWriter() *ReadWriter {
return &ReadWriter{
s: s,
txn: s.db.NewTransaction(true),
s: s,
txn: s.db.NewTransaction(true),
pendingSize: defaultPendingSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of defaulting to 1024, should we perhaps use s.db.Size()? And rather than calculating pendingSize, maybe we could track the remaining capacity? i.e. StorageLimitInBytes - Size()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the default 1024. For tracking I'd prefer to use pendingSize but I have no strong opinion on it

}
}

Expand Down Expand Up @@ -105,6 +108,7 @@ type ReadWriter struct {
// be unmodified until the end of a transaction.
readKeyBuf []byte
pendingWrites int
pendingSize int64
}

// Close closes the writer. Any writes that have not been flushed may be lost.
Expand All @@ -126,19 +130,10 @@ func (rw *ReadWriter) Close() {
// files exceeds the configured threshold.
func (rw *ReadWriter) Flush(limit int64) error {
kruskall marked this conversation as resolved.
Show resolved Hide resolved
const flushErrFmt = "failed to flush pending writes: %w"
if current, limitReached := rw.s.limitReached(limit); limitReached {
// Discard the txn and re-create it if the soft limit has been reached.
rw.txn.Discard()
rw.txn = rw.s.db.NewTransaction(true)
rw.pendingWrites = 0
return fmt.Errorf(
flushErrFmt+" (current: %d, limit: %d)",
ErrLimitReached, current, limit,
)
}
err := rw.txn.Commit()
rw.txn = rw.s.db.NewTransaction(true)
rw.pendingWrites = 0
rw.pendingSize = defaultPendingSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like in ReadWriter.writeEntry, this should be coordinated across all ReadWriters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this a bit.

IIUC when flushing we're just committing the current (readwriter) transaction which is independent from other ReadWriters so we need to track the total underlying storage pendingSize (sum of all the readwriters pendingSizes) but also the per-readwriter pendingSize.

When a ReadWriter flushes we reset its current pendingSize and subtract it from the storage pendingSize.

if err != nil {
return fmt.Errorf(flushErrFmt, err)
}
Expand Down Expand Up @@ -185,18 +180,26 @@ func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb.

func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
rw.pendingWrites++
err := rw.txn.SetEntry(e.WithTTL(opts.TTL))
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
if rw.pendingWrites >= 200 {
entrySize := int64(estimateSize(e)) + 10
kruskall marked this conversation as resolved.
Show resolved Hide resolved
lsm, vlog := rw.s.db.Size()
kruskall marked this conversation as resolved.
Show resolved Hide resolved

if rw.pendingSize+entrySize+lsm+vlog >= opts.StorageLimitInBytes {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
} else if rw.pendingWrites >= 200 {
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
}
err := rw.txn.SetEntry(e.WithTTL(opts.TTL))

// If the transaction is already too big to accommodate the new entry, flush
// the existing transaction and set the entry on a new one, otherwise,
// returns early.
Expand All @@ -206,9 +209,14 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
rw.pendingSize += entrySize
kruskall marked this conversation as resolved.
Show resolved Hide resolved
return rw.txn.SetEntry(e)
kruskall marked this conversation as resolved.
Show resolved Hide resolved
}

func estimateSize(e *badger.Entry) int {
return len(e.Key) + len(e.Value) + 12 + 2
kruskall marked this conversation as resolved.
Show resolved Hide resolved
}

// DeleteTraceEvent deletes the trace event from storage.
func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
key := append(append([]byte(traceID), ':'), id...)
Expand Down