Skip to content

Commit 761be1c

Browse files
committed
Reset the rows, fix memory leaks, reduce reallocations
1 parent eb9b104 commit 761be1c

File tree

1 file changed

+43
-30
lines changed

1 file changed

+43
-30
lines changed

pkg/engine/compat.go

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ var (
3333

3434
func newStreamsResultBuilder() *streamsResultBuilder {
3535
return &streamsResultBuilder{
36-
data: make(logqlmodel.Streams, 0),
37-
streams: make(map[string]int),
36+
data: make(logqlmodel.Streams, 0),
37+
streams: make(map[string]int),
38+
rowsBuffer: &rowsBuffer{},
3839
}
3940
}
4041

@@ -44,7 +45,7 @@ type streamsResultBuilder struct {
4445
count int
4546

4647
// buffer of rows to be reused between calls to CollectRecord to reduce reallocations of slices and builders
47-
rowsBuffer rowsBuffer
48+
rowsBuffer *rowsBuffer
4849
}
4950

5051
type rowsBuffer struct {
@@ -56,42 +57,52 @@ type rowsBuffer struct {
5657
parsedBuilders []*labels.Builder
5758
}
5859

59-
func (p *rowsBuffer) prepareFor(newLen int) {
60-
if newLen <= p.len {
61-
p.timestamps = p.timestamps[:newLen]
62-
clear(p.timestamps)
63-
64-
p.lines = p.lines[:newLen]
65-
clear(p.lines)
60+
func (buf *rowsBuffer) prepareFor(newLen int) {
61+
if newLen == buf.len {
62+
return
63+
}
6664

67-
p.lbsBuilders = p.lbsBuilders[:newLen]
68-
p.metadataBuilders = p.metadataBuilders[:newLen]
69-
p.parsedBuilders = p.parsedBuilders[:newLen]
65+
if newLen < buf.len {
66+
// free not used items at the end of the slices so they can be GC-ed
67+
clear(buf.timestamps[newLen:buf.len])
68+
clear(buf.lines[newLen:buf.len])
69+
clear(buf.lbsBuilders[newLen:buf.len])
70+
clear(buf.metadataBuilders[newLen:buf.len])
71+
clear(buf.parsedBuilders[newLen:buf.len])
7072

71-
for i := range newLen {
72-
p.lbsBuilders[i].Reset(labels.EmptyLabels())
73-
p.metadataBuilders[i].Reset(labels.EmptyLabels())
74-
p.parsedBuilders[i].Reset(labels.EmptyLabels())
75-
}
73+
// shrink to the new length, no need to zero the items as it was done before via resetRow(i)
74+
buf.timestamps = buf.timestamps[:newLen]
75+
buf.lines = buf.lines[:newLen]
76+
buf.lbsBuilders = buf.lbsBuilders[:newLen]
77+
buf.metadataBuilders = buf.metadataBuilders[:newLen]
78+
buf.parsedBuilders = buf.parsedBuilders[:newLen]
7679

77-
p.len = newLen
80+
buf.len = newLen
7881

7982
return
8083
}
8184

82-
p.timestamps = make([]time.Time, newLen)
83-
p.lines = make([]string, newLen)
84-
p.lbsBuilders = make([]*labels.Builder, newLen)
85-
p.metadataBuilders = make([]*labels.Builder, newLen)
86-
p.parsedBuilders = make([]*labels.Builder, newLen)
87-
88-
for i := range newLen {
89-
p.lbsBuilders[i] = labels.NewBuilder(labels.EmptyLabels())
90-
p.metadataBuilders[i] = labels.NewBuilder(labels.EmptyLabels())
91-
p.parsedBuilders[i] = labels.NewBuilder(labels.EmptyLabels())
85+
// newLen > buf.len
86+
numRowsToAdd := newLen - buf.len
87+
buf.timestamps = append(buf.timestamps, make([]time.Time, numRowsToAdd)...)
88+
buf.lines = append(buf.lines, make([]string, numRowsToAdd)...)
89+
buf.lbsBuilders = append(buf.lbsBuilders, make([]*labels.Builder, numRowsToAdd)...)
90+
buf.metadataBuilders = append(buf.metadataBuilders, make([]*labels.Builder, numRowsToAdd)...)
91+
buf.parsedBuilders = append(buf.parsedBuilders, make([]*labels.Builder, numRowsToAdd)...)
92+
for i := buf.len; i < newLen; i++ {
93+
buf.lbsBuilders[i] = labels.NewBuilder(labels.EmptyLabels())
94+
buf.metadataBuilders[i] = labels.NewBuilder(labels.EmptyLabels())
95+
buf.parsedBuilders[i] = labels.NewBuilder(labels.EmptyLabels())
9296
}
97+
buf.len = newLen
98+
}
9399

94-
p.len = newLen
100+
func (buf *rowsBuffer) resetRow(i int) {
101+
buf.timestamps[i] = time.Time{}
102+
buf.lines[i] = ""
103+
buf.lbsBuilders[i].Reset(labels.EmptyLabels())
104+
buf.metadataBuilders[i].Reset(labels.EmptyLabels())
105+
buf.parsedBuilders[i].Reset(labels.EmptyLabels())
95106
}
96107

97108
func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
@@ -192,6 +203,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
192203
line := b.rowsBuffer.lines[rowIdx]
193204
// Ignore rows that don't have stream labels, log line, or timestamp
194205
if line == "" || ts.IsZero() || lbs.IsEmpty() {
206+
b.rowsBuffer.resetRow(rowIdx)
195207
continue
196208
}
197209

@@ -201,6 +213,7 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.Record) {
201213
StructuredMetadata: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.metadataBuilders[rowIdx].Labels()),
202214
Parsed: logproto.FromLabelsToLabelAdapters(b.rowsBuffer.parsedBuilders[rowIdx].Labels()),
203215
}
216+
b.rowsBuffer.resetRow(rowIdx)
204217

205218
// Add entry to appropriate stream
206219
key := lbs.String()

0 commit comments

Comments
 (0)