Skip to content

Commit

Permalink
Merge pull request #40 from hengyoush/fix-bug
Browse files Browse the repository at this point in the history
Delay processing the record to prevent missing any kernel events that have not been collected yet
  • Loading branch information
hengyoush authored Sep 14, 2024
2 parents 91e4664 + 364d289 commit 7179b86
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 86 deletions.
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ func startPerfeventReader(reader io.Closer, consumeFunction func(perf.Record) er
if err := consumeFunction(record); err != nil {
log.Errorf("[dataReader] handleKernEvt err: %s\n", err)
continue
} else if record.LostSamples > 0 {
log.Warnf("[dataReader] lost sample: %d", record.LostSamples)
}
}
}()
Expand Down
8 changes: 8 additions & 0 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
}
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.blackBoxDuration = float64(writeSyscallEvents[len(writeSyscallEvents)-1].GetTimestamp()) - float64(readSyscallEvents[0].GetTimestamp())
} else {
annotatedRecord.blackBoxDuration = float64(egressMessage.TimestampNs()) - float64(ingressMessage.TimestampNs())
}
if hasUserCopyEvents && hasTcpInEvents {
annotatedRecord.readFromSocketBufferDuration = float64(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - float64(tcpInEvents[0].GetTimestamp())
Expand All @@ -213,14 +215,20 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
} else {
if hasWriteSyscallEvents {
annotatedRecord.startTs = writeSyscallEvents[0].GetTimestamp()
} else {
annotatedRecord.startTs = egressMessage.TimestampNs()
}
if hasReadSyscallEvents {
annotatedRecord.endTs = readSyscallEvents[len(readSyscallEvents)-1].GetTimestamp()
} else {
annotatedRecord.endTs = ingressMessage.TimestampNs()
}
annotatedRecord.reqSize = egressMessage.ByteSize()
annotatedRecord.respSize = ingressMessage.ByteSize()
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.totalDuration = float64(annotatedRecord.endTs) - float64(annotatedRecord.startTs)
} else {
annotatedRecord.totalDuration = float64(ingressMessage.TimestampNs()) - float64(egressMessage.TimestampNs())
}
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.blackBoxDuration = float64(nicIngressEvents[len(nicIngressEvents)-1].GetTimestamp()) - float64(devOutSyscallEvents[0].GetTimestamp())
Expand Down
16 changes: 11 additions & 5 deletions agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func (sb *StreamBuffer) Buffers() []*Buffer {
return sb.buffers
}

func (sb *StreamBuffer) position0() int {
func (sb *StreamBuffer) Position0() int {
if sb.IsEmpty() {
return 0
}
return int(sb.buffers[0].seq)
}

func (sb *StreamBuffer) positionN() int {
func (sb *StreamBuffer) PositionN() int {
if sb.IsEmpty() {
return 0
}
Expand Down Expand Up @@ -79,6 +79,12 @@ func (sb *StreamBuffer) RemovePrefix(length int) {
}
}
}
func (sb *StreamBuffer) RemoveHead() {
sb.RemovePrefix(sb.Head().Len())
}
func (sb *StreamBuffer) IsContinugous() bool {
return len(sb.buffers) == 1
}
func (sb *StreamBuffer) shrinkHeadBuffer() {
if sb.IsEmpty() {
return
Expand All @@ -90,7 +96,7 @@ func (sb *StreamBuffer) shrinkHeadBuffer() {
}
func (sb *StreamBuffer) shrinkBufferUntilSizeBelowCapacity() {
var lastDelete *Buffer
for !sb.IsEmpty() && sb.positionN()-sb.position0() > sb.capacity {
for !sb.IsEmpty() && sb.PositionN()-sb.Position0() > sb.capacity {
lastDelete = sb.buffers[0]
sb.buffers = sb.buffers[1:]
}
Expand Down Expand Up @@ -133,10 +139,10 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
sb.buffers = append(sb.buffers, newBuffer)
return
}
if sb.position0()-int(seq) >= maxBytesGap {
if sb.Position0()-int(seq) >= maxBytesGap {
return
}
if int(seq)-sb.positionN() >= maxBytesGap {
if int(seq)-sb.PositionN() >= maxBytesGap {
sb.Clear()
sb.buffers = append(sb.buffers, newBuffer)
sb.updateTimestamp(seq, timestamp)
Expand Down
118 changes: 68 additions & 50 deletions agent/conn/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ type Connection4 struct {
Status ConnStatus
TCPHandshakeStatus

reqStreamBuffer *buffer.StreamBuffer
respStreamBuffer *buffer.StreamBuffer
ReqQueue []protocol.ParsedMessage
RespQueue []protocol.ParsedMessage
StreamEvents *KernEventStream
reqStreamBuffer *buffer.StreamBuffer
respStreamBuffer *buffer.StreamBuffer
ReqQueue []protocol.ParsedMessage
lastReqMadeProgressTime int64
lastRespMadeProgressTime int64
RespQueue []protocol.ParsedMessage
StreamEvents *KernEventStream

MessageFilter protocol.ProtocolFilter
LatencyFilter protocol.LatencyFilter
Expand Down Expand Up @@ -189,43 +191,6 @@ func (c *Connection4) ProtocolInferred() bool {
return (c.Protocol != bpf.AgentTrafficProtocolTKProtocolUnknown) && (c.Protocol != bpf.AgentTrafficProtocolTKProtocolUnset)
}

func (c *Connection4) submitRecord(record protocol.Record) {
var needSubmit bool

needSubmit = c.MessageFilter.FilterByProtocol(c.Protocol)
var duration uint64
if c.IsServerSide() {
duration = record.Request().TimestampNs() - record.Response().TimestampNs()
} else {
duration = record.Response().TimestampNs() - record.Request().TimestampNs()
}

needSubmit = needSubmit && c.LatencyFilter.Filter(float64(duration)/1000000)
needSubmit = needSubmit &&
c.SizeFilter.FilterByReqSize(int64(record.Request().ByteSize())) &&
c.SizeFilter.FilterByRespSize(int64(record.Response().ByteSize()))
if parser := c.GetProtocolParser(c.Protocol); needSubmit && parser != nil {
var parsedRequest, parsedResponse protocol.ParsedMessage
if c.MessageFilter.FilterByRequest() {
parsedRequest = record.Request()
}
if c.MessageFilter.FilterByResponse() {
parsedResponse = record.Response()
}
if parsedRequest != nil || parsedResponse != nil {
needSubmit = c.MessageFilter.Filter(parsedRequest, parsedResponse)
} else {
needSubmit = true
}

} else {
needSubmit = false
}
if needSubmit {
RecordFunc(record, c)
}
}

func (c *Connection4) extractSockKeys() (bpf.AgentSockKey, bpf.AgentSockKey) {
var key bpf.AgentSockKey
key.Dip = common.BytesToInt[uint32](c.RemoteIp)
Expand Down Expand Up @@ -339,7 +304,7 @@ func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
}
return true
}
func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData) {
func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, recordChannel chan RecordWithConn) {
isReq, _ := isReq(c, &event.SyscallEvent.Ke)
if isReq {
c.reqStreamBuffer.Add(event.SyscallEvent.Ke.Seq, data, event.SyscallEvent.Ke.Ts)
Expand All @@ -357,8 +322,10 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData) {
}

records := parser.Match(&c.ReqQueue, &c.RespQueue)
for _, each := range records {
c.submitRecord(each)
if len(records) != 0 {
for _, record := range records {
recordChannel <- RecordWithConn{record, c}
}
}
}

Expand All @@ -378,8 +345,11 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
startPos = 0
}
streamBuffer.RemovePrefix(startPos)
originPos := streamBuffer.Position0()
// var parseState protocol.ParseState
for !stop && !streamBuffer.IsEmpty() {
parseResult := parser.ParseStream(streamBuffer, messageType)
// parseState = parseResult.ParseState
switch parseResult.ParseState {
case protocol.Success:
if c.Role == bpf.AgentEndpointRoleTKRoleUnknown && len(parseResult.ParsedMessages) > 0 {
Expand All @@ -392,27 +362,75 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
log.Debugf("Update %s role", c.ToString())
c.resetParseProgress()
} else {
*resultQueue = append(*resultQueue, parseResult.ParsedMessages...)
streamBuffer.RemovePrefix(parseResult.ReadBytes)
if len(parseResult.ParsedMessages) > 0 && parseResult.ParsedMessages[0].IsReq() != (messageType == protocol.Request) {
streamBuffer.RemovePrefix(parseResult.ReadBytes)
} else {
*resultQueue = append(*resultQueue, parseResult.ParsedMessages...)
streamBuffer.RemovePrefix(parseResult.ReadBytes)
}
}
case protocol.Invalid:
pos := parser.FindBoundary(streamBuffer, messageType, 1)
if pos != -1 {
streamBuffer.RemovePrefix(pos)
stop = false
} else {
stop = true
removed := c.checkProgress(streamBuffer)
if removed {
log.Debugf("Invalid, %s Removed streambuffer head due to stuck", c.ToString())
stop = false
} else {
stop = true
}
}
case protocol.NeedsMoreData:
stop = true
removed := c.checkProgress(streamBuffer)
if removed {
log.Debugf("Needs more data, %s Removed streambuffer head due to stuck", c.ToString())
stop = false
} else {
stop = true
}
case protocol.Ignore:
stop = false
streamBuffer.RemovePrefix(parseResult.ReadBytes)
default:
panic("invalid parse state!")
}
}

curProgress := streamBuffer.Position0()
if streamBuffer.IsEmpty() || curProgress != int(originPos) {
c.updateProgressTime(streamBuffer)
}
// if parseState == protocol.Invalid {
// streamBuffer.Clear()
// }
}
func (c *Connection4) updateProgressTime(sb *buffer.StreamBuffer) {
if c.reqStreamBuffer == sb {
c.lastReqMadeProgressTime = time.Now().UnixMilli()
} else {
c.lastRespMadeProgressTime = time.Now().UnixMilli()
}
}
func (c *Connection4) getLastProgressTime(sb *buffer.StreamBuffer) int64 {
if c.reqStreamBuffer == sb {
return c.lastReqMadeProgressTime
} else {
return c.lastRespMadeProgressTime
}
}
func (c *Connection4) checkProgress(sb *buffer.StreamBuffer) bool {
if c.getLastProgressTime(sb) == 0 {
c.updateProgressTime(sb)
return false
}
if time.Now().UnixMilli()-c.getLastProgressTime(sb) > 1000 {
sb.RemoveHead()
return true
} else {
return false
}
}

func isReq(conn *Connection4, event *bpf.AgentKernEvt) (bool, bool) {
Expand Down
3 changes: 3 additions & 0 deletions agent/conn/kern_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (s *KernEventStream) AddKernEvent(event *bpf.AgentKernEvt) {
timestamp: event.Ts,
step: event.Step,
})
if len(kernEvtSlice) > s.maxLen {
log.Debugf("kern event stream size: %d exceed maxLen", len(kernEvtSlice))
}
for len(kernEvtSlice) > s.maxLen {
kernEvtSlice = kernEvtSlice[1:]
}
Expand Down
12 changes: 9 additions & 3 deletions agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type Processor struct {
messageFilter protocol.ProtocolFilter
latencyFilter protocol.LatencyFilter
protocol.SizeFilter
side common.SideEnum
side common.SideEnum
recordProcessor *RecordsProcessor
}

func initProcessor(name string, wg *sync.WaitGroup, ctx context.Context, connManager *ConnManager, filter protocol.ProtocolFilter,
Expand All @@ -81,6 +82,9 @@ func initProcessor(name string, wg *sync.WaitGroup, ctx context.Context, connMan
p.latencyFilter = latencyFilter
p.SizeFilter = sizeFilter
p.side = side
p.recordProcessor = &RecordsProcessor{
records: make([]RecordWithConn, 0),
}
return p
}

Expand All @@ -96,6 +100,8 @@ func (p *Processor) AddKernEvent(record *bpf.AgentKernEvt) {
p.kernEvents <- record
}
func (p *Processor) run() {
recordChannel := make(chan RecordWithConn)
go p.recordProcessor.Run(recordChannel, time.NewTicker(1*time.Second))
for {
select {
case <-p.ctx.Done():
Expand Down Expand Up @@ -169,7 +175,7 @@ func (p *Processor) run() {
if conn.Protocol != bpf.AgentTrafficProtocolTKProtocolUnknown {
for _, sysEvent := range conn.TempSyscallEvents {
log.Debugf("%s process temp syscall events before infer\n", conn.ToString())
conn.OnSyscallEvent(sysEvent.Buf, sysEvent)
conn.OnSyscallEvent(sysEvent.Buf, sysEvent, recordChannel)
}
conn.UpdateConnectionTraceable(true)
}
Expand Down Expand Up @@ -207,7 +213,7 @@ func (p *Processor) run() {
if conn != nil && conn.ProtocolInferred() {
log.Debugf("[syscall][len=%d]%s | %s", event.SyscallEvent.BufSize, conn.ToString(), string(event.Buf))

conn.OnSyscallEvent(event.Buf, event)
conn.OnSyscallEvent(event.Buf, event, recordChannel)
} else if conn != nil && conn.Protocol == bpf.AgentTrafficProtocolTKProtocolUnset {
conn.AddSyscallEvent(event)
log.Debugf("[syscall][protocol unset][len=%d]%s | %s", event.SyscallEvent.BufSize, conn.ToString(), string(event.Buf))
Expand Down
Loading

0 comments on commit 7179b86

Please sign in to comment.