Skip to content

Commit

Permalink
fix: fix memory leak (#281)
Browse files Browse the repository at this point in the history
* fix: fix memory leak

fix: fix test_filter_by_remote_port test

* fix: fix index out of range error

---------

Signed-off-by: 烈香 <[email protected]>
  • Loading branch information
hengyoush authored Jan 28, 2025
1 parent 4532e6b commit 237c9e3
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 73 deletions.
4 changes: 2 additions & 2 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
if !traceDevEvent {
annotatedRecord.TotalDuration = annotatedRecord.BlackBoxDuration
}
if !traceSocketEvent && hasNicInEvents && canCalculateReadPathTime {
if !traceSocketEvent && hasNicInEvents && canCalculateReadPathTime && hasReadSyscallEvents {
if nicInTimestamp, _, ok := events.nicIngressEvents[0].GetMinIfItmestampAttr(); ok {
annotatedRecord.ReadFromSocketBufferDuration = float64(events.readSyscallEvents[len(events.readSyscallEvents)-1].GetEndTs() - uint64(nicInTimestamp))
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
if hasTcpInEvents && hasNicInEvents && canCalculateReadPathTime {
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetStartTs() - events.nicIngressEvents[0].GetStartTs())
}
if !traceSocketEvent && hasNicInEvents && canCalculateReadPathTime {
if !traceSocketEvent && hasNicInEvents && canCalculateReadPathTime && hasReadSyscallEvents {
if _nicIngressTimestamp, _, ok := events.nicIngressEvents[0].GetMinIfItmestampAttr(); ok {
annotatedRecord.ReadFromSocketBufferDuration = float64(events.readSyscallEvents[len(events.readSyscallEvents)-1].GetEndTs() - uint64(_nicIngressTimestamp))
}
Expand Down
48 changes: 22 additions & 26 deletions agent/conn/kern_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type KernEventStream struct {
conn *Connection4
kernEvents map[bpf.AgentStepT][]KernEvent
kernEvents map[bpf.AgentStepT]*common.RingBuffer
kernEventsMu sync.RWMutex
sslInEvents []SslEvent
sslOutEvents []SslEvent
Expand All @@ -32,7 +32,7 @@ type KernEventStream struct {
func NewKernEventStream(conn *Connection4, maxLen int) *KernEventStream {
stream := &KernEventStream{
conn: conn,
kernEvents: make(map[bpf.AgentStepT][]KernEvent),
kernEvents: make(map[bpf.AgentStepT]*common.RingBuffer),
maxLen: maxLen,
}
monitor.RegisterMetricExporter(stream)
Expand Down Expand Up @@ -93,31 +93,31 @@ func (s *KernEventStream) AddKernEvent(event *bpf.AgentKernEvt) bool {
s.discardEventsIfNeeded()
if event.Len > 0 {
if _, ok := s.kernEvents[event.Step]; !ok {
s.kernEvents[event.Step] = make([]KernEvent, 0)
s.kernEvents[event.Step] = common.NewRingBuffer(s.maxLen)
}

kernEvtSlice := s.kernEvents[event.Step]
index, found := slices.BinarySearchFunc(kernEvtSlice, KernEvent{seq: uint64(event.Seq)}, func(i KernEvent, j KernEvent) int {
return cmp.Compare(i.seq, j.seq)
kernEvtRingBuffer := s.kernEvents[event.Step]
index, found := kernEvtRingBuffer.BinarySearch(KernEvent{seq: uint64(event.Seq)}, func(i any, j any) int {
return cmp.Compare(i.(KernEvent).seq, j.(KernEvent).seq)
})
isNicEvnt := event.Step == bpf.AgentStepTDEV_OUT || event.Step == bpf.AgentStepTDEV_IN

var kernEvent *KernEvent
if found {
oldKernEvent := &kernEvtSlice[index]
_oldKernEvent, _ := kernEvtRingBuffer.ReadIndex(index)
oldKernEvent := _oldKernEvent.(KernEvent)
if oldKernEvent.startTs > event.Ts && !isNicEvnt {
// this is a duplicate event which belongs to a future conn
oldKernEvent.seq = uint64(event.Seq)
oldKernEvent.len = int(event.Len)
oldKernEvent.startTs = event.Ts
oldKernEvent.tsDelta = event.TsDelta
oldKernEvent.step = event.Step
kernEvent = oldKernEvent
kernEvent = &oldKernEvent
} else if !isNicEvnt {
kernEvent = &kernEvtSlice[index]
return false
} else {
kernEvent = &kernEvtSlice[index]
kernEvent = &oldKernEvent
}
} else {
kernEvent = &KernEvent{
Expand All @@ -143,17 +143,11 @@ func (s *KernEventStream) AddKernEvent(event *bpf.AgentKernEvt) bool {
}
}
if !found {
kernEvtSlice = slices.Insert(kernEvtSlice, index, *kernEvent)
}
if len(kernEvtSlice) > s.maxLen {
if common.ConntrackLog.Level >= logrus.DebugLevel {
common.ConntrackLog.Debugf("kern event stream size: %d exceed maxLen", len(kernEvtSlice))
if err := kernEvtRingBuffer.Insert(index, *kernEvent); err != nil {
common.ConntrackLog.Debugf("kern event stream size: %d exceed maxLen", kernEvtRingBuffer.MaxCapacity())
return false
}
}
for len(kernEvtSlice) > s.maxLen {
kernEvtSlice = kernEvtSlice[1:]
}
s.kernEvents[event.Step] = kernEvtSlice
}
return true
}
Expand Down Expand Up @@ -200,17 +194,19 @@ func (s *KernEventStream) FindEventsBySeqAndLen(step bpf.AgentStepT, seq uint64,
start := seq
end := start + uint64(len)
result := make([]KernEvent, 0)
for _, each := range events {
events.ForEach(func(i any) bool {
each := i.(KernEvent)
if each.seq <= start && each.seq+uint64(each.len) > start {
result = append(result, each)
} else if each.seq < end && each.seq+uint64(each.len) >= end {
result = append(result, each)
} else if each.seq >= start && each.seq+uint64(each.len) <= end {
result = append(result, each)
} else if each.seq > end {
break
return false
}
}
return true
})
return result
}

Expand Down Expand Up @@ -272,12 +268,12 @@ func (s *KernEventStream) discardEventsBySeq(seq uint64, egress bool) {
if !egress && !bpf.IsIngressStep(step) {
continue
}
index, _ := slices.BinarySearchFunc(events, KernEvent{seq: seq}, func(i KernEvent, j KernEvent) int {
return cmp.Compare(i.seq, j.seq)
index, _ := events.BinarySearch(KernEvent{seq: seq}, func(i any, j any) int {
return cmp.Compare(i.(KernEvent).seq, j.(KernEvent).seq)
})
discardIdx := index
if discardIdx > 0 {
s.kernEvents[step] = events[discardIdx:]
events.DiscardBeforeIndex(discardIdx)
// common.ConntrackLog.Debugf("Discarded kern events, step: %d(egress: %v) events num: %d, cur len: %d", step, egress, discardIdx, len(s.kernEvents[step]))
}
}
Expand Down Expand Up @@ -407,7 +403,7 @@ var _ monitor.MetricExporter = &KernEventStream{}
func (s *KernEventStream) ExportMetrics() monitor.MetricMap {
allEventsNum := 0
for _, events := range s.kernEvents {
allEventsNum += len(events)
allEventsNum += events.Size()
}
return monitor.MetricMap{
"events_num": float64(allEventsNum),
Expand Down
89 changes: 48 additions & 41 deletions agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ type Processor struct {
side common.SideEnum
recordProcessor *RecordsProcessor
conntrackCloseWaitTimeMills int
tempKernEvents []TimedEvent
tempSyscallEvents []TimedSyscallEvent
tempSslEvents []TimedSslEvent
tempFirstPacketEvents []TimedFirstPacketEvent
tempKernEvents *common.RingBuffer
tempSyscallEvents *common.RingBuffer
tempSslEvents *common.RingBuffer
tempFirstPacketEvents *common.RingBuffer
}

type TimedEvent struct {
Expand Down Expand Up @@ -149,10 +149,10 @@ func initProcessor(name string, wg *sync.WaitGroup, ctx context.Context, connMan
records: make([]RecordWithConn, 0),
}
p.conntrackCloseWaitTimeMills = conntrackCloseWaitTimeMills
p.tempKernEvents = make([]TimedEvent, 0, 100) // Preallocate with a capacity of 100
p.tempSyscallEvents = make([]TimedSyscallEvent, 0, 100) // Preallocate with a capacity of 100
p.tempFirstPacketEvents = make([]TimedFirstPacketEvent, 0, 100)
p.tempSslEvents = make([]TimedSslEvent, 0, 100) // Preallocate with a capacity of 100
p.tempKernEvents = common.NewRingBuffer(1000) // Preallocate with a capacity of 100
p.tempSyscallEvents = common.NewRingBuffer(1000) // Preallocate with a capacity of 100
p.tempFirstPacketEvents = common.NewRingBuffer(100)
p.tempSslEvents = common.NewRingBuffer(100) // Preallocate with a capacity of 100
return p
}

Expand Down Expand Up @@ -319,7 +319,7 @@ func (p *Processor) run() {

func (p *Processor) handleFirstPacketEvent(event *agentKernEvtWithConn, recordChannel chan RecordWithConn) {
// Add event to the temporary queue
p.tempFirstPacketEvents = append(p.tempFirstPacketEvents, TimedFirstPacketEvent{event: event, timestamp: time.Now()})
p.tempFirstPacketEvents.Write(TimedFirstPacketEvent{event: event, timestamp: time.Now()})
// Process events in the queue that have been there for more than 100ms
p.processOldFirstPacketEvents(recordChannel)
}
Expand All @@ -330,16 +330,17 @@ func (p *Processor) processTimedFirstPacketEvents(recordChannel chan RecordWithC

func (p *Processor) processOldFirstPacketEvents(recordChannel chan RecordWithConn) {
now := time.Now()
lastIndex := 0
for i := 0; i < len(p.tempFirstPacketEvents); i++ {
if now.Sub(p.tempFirstPacketEvents[i].timestamp) > 100*time.Millisecond {
p.processFirstPacketEvent(p.tempFirstPacketEvents[i].event, recordChannel)
lastIndex = i + 1
} else {
for !p.tempFirstPacketEvents.IsEmpty() {
_event, err := p.tempFirstPacketEvents.Peek()
if err != nil {
break
}
event := _event.(TimedFirstPacketEvent)
if now.Sub(event.timestamp) > 100*time.Millisecond {
p.processFirstPacketEvent(event.event, recordChannel)
p.tempFirstPacketEvents.Read()
}
}
p.tempFirstPacketEvents = p.tempFirstPacketEvents[lastIndex:]
}

func (p *Processor) processFirstPacketEvent(event *agentKernEvtWithConn, recordChannel chan RecordWithConn) {
Expand All @@ -351,7 +352,7 @@ func (p *Processor) processFirstPacketEvent(event *agentKernEvtWithConn, recordC

func (p *Processor) handleKernEvent(event *bpf.AgentKernEvt, recordChannel chan RecordWithConn) {
// Add event to the temporary queue
p.tempKernEvents = append(p.tempKernEvents, TimedEvent{event: event, timestamp: time.Now()})
p.tempKernEvents.Write(TimedEvent{event: event, timestamp: time.Now()})

// Process events in the queue that have been there for more than 100ms
p.processOldKernEvents(recordChannel)
Expand All @@ -363,16 +364,19 @@ func (p *Processor) processTimedKernEvents(recordChannel chan RecordWithConn) {

func (p *Processor) processOldKernEvents(recordChannel chan RecordWithConn) {
now := time.Now()
lastIndex := 0
for i := 0; i < len(p.tempKernEvents); i++ {
if now.Sub(p.tempKernEvents[i].timestamp) > 100*time.Millisecond {
p.processKernEvent(p.tempKernEvents[i].event, recordChannel)
lastIndex = i + 1
for !p.tempKernEvents.IsEmpty() {
_event, err := p.tempKernEvents.Peek()
if err != nil {
break
}
event := _event.(TimedEvent)
if now.Sub(event.timestamp) > 100*time.Millisecond {
p.processKernEvent(event.event, recordChannel)
p.tempKernEvents.Read()
} else {
break
}
}
p.tempKernEvents = p.tempKernEvents[lastIndex:]
}

func (p *Processor) processKernEvent(event *bpf.AgentKernEvt, recordChannel chan RecordWithConn) {
Expand Down Expand Up @@ -426,7 +430,7 @@ func (p *Processor) processKernEvent(event *bpf.AgentKernEvt, recordChannel chan

func (p *Processor) handleSyscallEvent(event *bpf.SyscallEventData, recordChannel chan RecordWithConn) {
// Add event to the temporary queue
p.tempSyscallEvents = append(p.tempSyscallEvents, TimedSyscallEvent{event: event, timestamp: time.Now()})
p.tempSyscallEvents.Write(TimedSyscallEvent{event: event, timestamp: time.Now()})

// Process events in the queue that have been there for more than 100ms
p.processOldSyscallEvents(recordChannel)
Expand All @@ -439,16 +443,19 @@ func (p *Processor) processTimedSyscallEvents(recordChannel chan RecordWithConn)

func (p *Processor) processOldSyscallEvents(recordChannel chan RecordWithConn) {
now := time.Now()
lastIndex := 0
for i := 0; i < len(p.tempSyscallEvents); i++ {
if now.Sub(p.tempSyscallEvents[i].timestamp) > 100*time.Millisecond {
p.processSyscallEvent(p.tempSyscallEvents[i].event, recordChannel)
lastIndex = i + 1
for !p.tempSyscallEvents.IsEmpty() {
_event, err := p.tempSyscallEvents.Peek()
if err != nil {
break
}
event := _event.(TimedSyscallEvent)
if now.Sub(event.timestamp) > 100*time.Millisecond {
p.processSyscallEvent(event.event, recordChannel)
p.tempSyscallEvents.Read()
} else {
break
}
}
p.tempSyscallEvents = p.tempSyscallEvents[lastIndex:]
}

func (p *Processor) processSyscallEvent(event *bpf.SyscallEventData, recordChannel chan RecordWithConn) {
Expand All @@ -475,10 +482,7 @@ func (p *Processor) processSyscallEvent(event *bpf.SyscallEventData, recordChann
common.BPFEventLog.Debugf("[syscall][len=%d][ts=%d][fn=%d]%s | %s", max(event.SyscallEvent.BufSize, event.SyscallEvent.Ke.Len), event.SyscallEvent.Ke.Ts, event.SyscallEvent.GetSourceFunction(), conn.ToString(), string(event.Buf))
}

addedToBuffer := conn.OnSyscallEvent(event.Buf, event, recordChannel)
if addedToBuffer {
conn.AddSyscallEvent(event)
}
conn.OnSyscallEvent(event.Buf, event, recordChannel)
} else if conn.Protocol == bpf.AgentTrafficProtocolTKProtocolUnset {
conn.AddSyscallEvent(event)
if common.BPFEventLog.Level >= logrus.DebugLevel {
Expand All @@ -498,7 +502,7 @@ func (p *Processor) processSyscallEvent(event *bpf.SyscallEventData, recordChann

func (p *Processor) handleSslEvent(event *bpf.SslData, recordChannel chan RecordWithConn) {
// Add event to the temporary queue
p.tempSslEvents = append(p.tempSslEvents, TimedSslEvent{event: event, timestamp: time.Now()})
p.tempSslEvents.Write(TimedSslEvent{event: event, timestamp: time.Now()})

// Process events in the queue that have been there for more than 100ms
p.processOldSslEvents(recordChannel)
Expand All @@ -510,16 +514,19 @@ func (p *Processor) processTimedSslEvents(recordChannel chan RecordWithConn) {

func (p *Processor) processOldSslEvents(recordChannel chan RecordWithConn) {
now := time.Now()
lastIndex := 0
for i := 0; i < len(p.tempSslEvents); i++ {
if now.Sub(p.tempSslEvents[i].timestamp) > 100*time.Millisecond {
p.processSslEvent(p.tempSslEvents[i].event, recordChannel)
lastIndex = i + 1
for !p.tempSslEvents.IsEmpty() {
_event, err := p.tempSslEvents.Peek()
if err != nil {
break
}
event := _event.(TimedSslEvent)
if now.Sub(event.timestamp) > 100*time.Millisecond {
p.processSslEvent(event.event, recordChannel)
p.tempSslEvents.Read()
} else {
break
}
}
p.tempSslEvents = p.tempSslEvents[lastIndex:]
}

func (p *Processor) processSslEvent(event *bpf.SslData, recordChannel chan RecordWithConn) {
Expand Down
Loading

0 comments on commit 237c9e3

Please sign in to comment.