diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 28d46571..7b8cedbf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -161,6 +161,23 @@ jobs: sudo chmod +x /usr/local/bin/docker-compose docker-compose --version + - name: Test Truncated Data parsing + uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19 + if: ${{ !contains(fromJSON('["4.19-20240912.022020", "5.4-20240912.022020"]'), matrix.kernel) }} + with: + provision: 'false' + cmd: | + set -euxo pipefail + uname -a + cat /etc/issue + pushd /host + if [ -f "/var/lib/kyanos/btf/current.btf" ]; then + bash /host/testdata/test_truncated_data.sh 'sudo /host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf' + else + bash /host/testdata/test_truncated_data.sh 'sudo /host/kyanos/kyanos $kyanos_log_option' + fi + popd + - name: Test Kafka uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19 with: diff --git a/agent/conn/conntrack.go b/agent/conn/conntrack.go index 5f98f34c..dd8bfe0b 100644 --- a/agent/conn/conntrack.go +++ b/agent/conn/conntrack.go @@ -507,6 +507,29 @@ func (c *Connection4) OnSslDataEvent(data []byte, event *bpf.SslData, recordChan } } } + +func isSyscallFunctionMultiMessage(f bpf.AgentSourceFunctionT) bool { + return f == bpf.AgentSourceFunctionTKSyscallSendMMsg || + f == bpf.AgentSourceFunctionTKSyscallRecvMMsg || + f == bpf.AgentSourceFunctionTKSyscallWriteV || + f == bpf.AgentSourceFunctionTKSyscallReadV +} +func fillSyscallDataIfNeeded(data []byte, event *bpf.SyscallEventData, c *Connection4) []byte { + + if event.SyscallEvent.Ke.Len > event.SyscallEvent.BufSize { + common.ConntrackLog.Debugf("syscall read/write data too len and some data can't be captured, so we need to fill a fake data, len: %d, bufsize: %d", event.SyscallEvent.Ke.Len, event.SyscallEvent.BufSize) + + fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len - event.SyscallEvent.BufSize) + if !ok { + fakeData = make([]byte, event.SyscallEvent.Ke.Len-event.SyscallEvent.BufSize) + } + data = append(data, fakeData...) + return data + } else { + return data + } +} + func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, recordChannel chan RecordWithConn) bool { addedToBuffer := true if len(data) > 0 { @@ -515,13 +538,23 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r common.ConntrackLog.Warnf("%s is ssl, but receive syscall event with data!", c.ToString()) } } else { + data = fillSyscallDataIfNeeded(data, event, c) addedToBuffer = c.addDataToBufferAndTryParse(data, &event.SyscallEvent.Ke) } } else if event.SyscallEvent.GetSourceFunction() == bpf.AgentSourceFunctionTKSyscallSendfile { // sendfile has no data, so we need to fill a fake data common.ConntrackLog.Debug("sendfile has no data, so we need to fill a fake data") - fakeData := make([]byte, event.SyscallEvent.Ke.Len) + fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len) + if !ok { + fakeData = make([]byte, event.SyscallEvent.Ke.Len) + } addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke) + } else if isSyscallFunctionMultiMessage(event.SyscallEvent.GetSourceFunction()) && !c.ssl { + common.ConntrackLog.Debug("syscall read/write multiple message and some data can't be captured, so we need to fill a fake data") + fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len) + if ok { + addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke) + } } if !addedToBuffer { return false @@ -531,7 +564,6 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r parser := c.GetProtocolParser(c.Protocol) if parser == nil { return true - panic("no protocol parser!") } records := parser.Match(c.ReqQueue, c.RespQueue) diff --git a/agent/conn/processor.go b/agent/conn/processor.go index d416d9e1..9af95854 100644 --- a/agent/conn/processor.go +++ b/agent/conn/processor.go @@ -309,10 +309,10 @@ func (p *Processor) run() { case event := <-p.firstPacketsEvents: p.handleFirstPacketEvent(event, recordChannel) case <-ticker.C: - p.processTimedKernEvents(recordChannel) - p.processTimedSyscallEvents(recordChannel) p.processTimedSslEvents(recordChannel) + p.processTimedKernEvents(recordChannel) p.processOldFirstPacketEvents(recordChannel) + p.processTimedSyscallEvents(recordChannel) } } } diff --git a/agent/protocol/fakedata.go b/agent/protocol/fakedata.go new file mode 100644 index 00000000..2e9a18fa --- /dev/null +++ b/agent/protocol/fakedata.go @@ -0,0 +1,42 @@ +package protocol + +import "strings" + +const fakeDataMarkPrefix = "__FAKE_DATA__" +const fakeDataMarkLen = len(fakeDataMarkPrefix) + 4 + +func MakeNewFakeData(size uint32) ([]byte, bool) { + if size < uint32(fakeDataMarkLen) { + return nil, false + } else { + buf := make([]byte, size) + copy(buf, fakeDataMarkPrefix) + size -= uint32(fakeDataMarkLen) + lenByte := []byte{byte(size >> 24), byte(size >> 16), byte(size >> 8), byte(size)} + buf[fakeDataMarkLen] = lenByte[0] + buf[fakeDataMarkLen+1] = lenByte[1] + buf[fakeDataMarkLen+2] = lenByte[2] + buf[fakeDataMarkLen+3] = lenByte[3] + return buf, true + } +} + +func fakeDataMarkIndex(buf []byte) (int, bool) { + if len(buf) < fakeDataMarkLen { + return -1, false + } + idx := strings.Index(string(buf), fakeDataMarkPrefix) + if idx < 0 { + return -1, false + } + return idx, true +} + +func getFakeDataSize(buf []byte, pos int) uint32 { + buf = buf[pos:] + if len(buf) < fakeDataMarkLen { + return 0 + } else { + return uint32(buf[fakeDataMarkLen])<<24 | uint32(buf[fakeDataMarkLen+1])<<16 | uint32(buf[fakeDataMarkLen+2])<<8 | uint32(buf[fakeDataMarkLen+3]) + } +} diff --git a/agent/protocol/http.go b/agent/protocol/http.go index e0f04f93..3099ac46 100644 --- a/agent/protocol/http.go +++ b/agent/protocol/http.go @@ -127,55 +127,90 @@ func (h *HTTPStreamParser) ParseRequest(buf string, messageType MessageType, tim } } -func (h HTTPStreamParser) ParseResponse(buf string, messageType MessageType, timestamp uint64, seq uint64) ParseResult { +func (h *HTTPStreamParser) ParseResponse(buf string, messageType MessageType, timestamp uint64, seq uint64, streamBuffer *buffer.StreamBuffer) ParseResult { reader := strings.NewReader(buf) bufioReader := bufio.NewReader(reader) resp, err := http.ReadResponse(bufioReader, nil) parseResult := ParseResult{} if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return ParseResult{ - ParseState: NeedsMoreData, - } - } else { - return ParseResult{ - ParseState: Invalid, - } + return h.handleReadResponseError(err, buf, streamBuffer, messageType, timestamp, seq) + } + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return h.handleReadBodyError(err, buf, streamBuffer, messageType, timestamp, seq) + } + + readIndex := common.GetBufioReaderReadIndex(bufioReader) + if readIndex == 0 && len(respBody) > 0 { + readIndex = len(buf) + } else if readIndex == 0 { + return ParseResult{ + ParseState: NeedsMoreData, } - } else { - respBody, err := io.ReadAll(resp.Body) - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return ParseResult{ - ParseState: NeedsMoreData, - } - } else { - return ParseResult{ - ParseState: Invalid, - } - } + } + parseResult.ReadBytes = readIndex + parseResult.ParsedMessages = []ParsedMessage{ + &ParsedHttpResponse{ + FrameBase: NewFrameBase(timestamp, readIndex, seq), + buf: []byte(buf[:readIndex]), + }, + } + parseResult.ParseState = Success + return parseResult +} + +func (h *HTTPStreamParser) handleReadResponseError(err error, buf string, streamBuffer *buffer.StreamBuffer, messageType MessageType, timestamp uint64, seq uint64) ParseResult { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return ParseResult{ + ParseState: NeedsMoreData, } - readIndex := common.GetBufioReaderReadIndex(bufioReader) - if readIndex == 0 && len(respBody) > 0 { - readIndex = len(buf) - } else if readIndex == 0 { - return ParseResult{ - ParseState: NeedsMoreData, - } + } else { + return ParseResult{ + ParseState: Invalid, } - parseResult.ReadBytes = readIndex + } +} + +func (h *HTTPStreamParser) handleReadBodyError(err error, buf string, streamBuffer *buffer.StreamBuffer, messageType MessageType, timestamp uint64, seq uint64) ParseResult { + parseResult := ParseResult{} + boundary := h.FindBoundary(streamBuffer, messageType, 0) + if boundary > 0 { + parseResult.ReadBytes = boundary parseResult.ParsedMessages = []ParsedMessage{ &ParsedHttpResponse{ - FrameBase: NewFrameBase(timestamp, readIndex, seq), - buf: []byte(buf[:readIndex]), + FrameBase: NewFrameBase(timestamp, boundary, seq), + buf: []byte(buf[:boundary]), }, } parseResult.ParseState = Success return parseResult + } else if fakeDataIdx, _ := fakeDataMarkIndex([]byte(buf)); fakeDataIdx != -1 { + fakeDataSize := getFakeDataSize([]byte(buf), fakeDataIdx) + if len(buf) >= fakeDataIdx+int(fakeDataSize)+fakeDataMarkLen { + parseResult.ReadBytes = fakeDataIdx + int(fakeDataSize) + fakeDataMarkLen + parseResult.ParsedMessages = []ParsedMessage{ + &ParsedHttpResponse{ + FrameBase: NewFrameBase(timestamp, parseResult.ReadBytes, seq), + buf: []byte(buf[:parseResult.ReadBytes]), + }, + } + parseResult.ParseState = Success + return parseResult + } + } + if err == io.EOF || err == io.ErrUnexpectedEOF { + return ParseResult{ + ParseState: NeedsMoreData, + } + } else { + return ParseResult{ + ParseState: Invalid, + } } } -func (h HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult { +func (h *HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult { head := streamBuffer.Head() buf := string(head.Buffer()) ts, ok := streamBuffer.FindTimestampBySeq(head.LeftBoundary()) @@ -188,7 +223,7 @@ func (h HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, message case Request: return h.ParseRequest(buf, messageType, ts, head.LeftBoundary()) case Response: - return h.ParseResponse(buf, messageType, ts, head.LeftBoundary()) + return h.ParseResponse(buf, messageType, ts, head.LeftBoundary(), streamBuffer) default: panic("messageType invalid") } diff --git a/agent/protocol/http_test.go b/agent/protocol/http_test.go index dfa86924..6992f963 100644 --- a/agent/protocol/http_test.go +++ b/agent/protocol/http_test.go @@ -153,7 +153,7 @@ func TestParseResponse(t *testing.T) { parser := protocol.HTTPStreamParser{} - parseResult := parser.ParseResponse(httpMessage, protocol.Response, 10, 20) + parseResult := parser.ParseResponse(httpMessage, protocol.Response, 10, 20, buffer) assert.Equal(t, protocol.Success, parseResult.ParseState) assert.Equal(t, 1, len(parseResult.ParsedMessages)) diff --git a/bpf/data_common.h b/bpf/data_common.h index 10f3eec1..567930d0 100644 --- a/bpf/data_common.h +++ b/bpf/data_common.h @@ -124,14 +124,6 @@ static bool __always_inline report_conn_evt(void* ctx, struct conn_info_t *conn_ } static __inline bool should_trace_conn(struct conn_info_t *conn_info) { - // conn_info->laddr.in4.sin_port - // bpf_printk("conn_info->laddr.in4.sin_port: %d, %d", - // conn_info->laddr.in4.sin_port,conn_info->raddr.in4.sin_port); - // if (conn_info->laddr.in4.sin_port == target_port || - // conn_info->raddr.in4.sin_port == target_port) { - // return true; - // } - return conn_info->protocol != kProtocolUnknown && conn_info->no_trace <= traceable ; } @@ -205,6 +197,8 @@ static void __always_inline report_syscall_buf(void* ctx, uint64_t seq, struct c evt->buf_size = amount_copied; size_t __len = sizeof(struct kern_evt) + sizeof(uint32_t) + amount_copied; bpf_perf_event_output(ctx, &syscall_rb, BPF_F_CURRENT_CPU, evt, __len); + + // bpf_printk("len:%d, amount_copied:%d", len, amount_copied); } static void __always_inline report_syscall_evt(void* ctx, uint64_t seq, struct conn_id_s_t *conn_id_s, uint32_t len, enum step_t step, struct data_args *args, bool prepend_length_header, uint32_t length_header) { report_syscall_buf(ctx, seq, conn_id_s, len, step, args->start_ts, args->end_ts - args->start_ts, args->buf, args->source_fn, prepend_length_header, length_header); @@ -266,6 +260,9 @@ static void __always_inline report_syscall_evt_vecs(void* ctx, uint64_t seq, str bytes_sent += iov_size; seq += iov_size; } + if (bytes_sent < total_size) { + report_syscall_buf_without_data(ctx, seq, conn_id_s, total_size - bytes_sent, step, args->start_ts, args->end_ts - args->start_ts, args->source_fn); + } } diff --git a/testdata/test_truncated_data.sh b/testdata/test_truncated_data.sh new file mode 100644 index 00000000..714e3306 --- /dev/null +++ b/testdata/test_truncated_data.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash +. $(dirname "$0")/common.sh +set -ex + +CMD="$1" +DOCKER_REGISTRY="$2" +FILE_PREFIX="/tmp/kyanos" +CLIENT_LNAME="${FILE_PREFIX}_truncated_client.log" +SERVER_LNAME="${FILE_PREFIX}_truncated_server.log" + + +function test_client() { + if [ -z "$DOCKER_REGISTRY" ]; then + IMAGE_NAME="lobehub/lobe-chat:v1.46.7" + else + IMAGE_NAME=$DOCKER_REGISTRY"/lobehub/lobe-chat:v1.46.7" + fi + docker pull "$IMAGE_NAME" + + cname='lobe' + port=3210 + docker rm -f $cname + cid1=$(docker run --name $cname -p $port:$port -d "$IMAGE_NAME") + export cid1 + echo $cid1 + + timeout 30 ${CMD} watch --debug-output http --remote-ports $port 2>&1 | tee "${CLIENT_LNAME}" & + sleep 15 + curl http://localhost:3210 + wait + + cat "${CLIENT_LNAME}" + docker rm -f $cid1 || true + check_patterns_in_file "${CLIENT_LNAME}" "localhost:3210" +} + + +function test_server() { + if [ -z "$DOCKER_REGISTRY" ]; then + IMAGE_NAME="lobehub/lobe-chat:v1.46.7" + else + IMAGE_NAME=$DOCKER_REGISTRY"/lobehub/lobe-chat:v1.46.7" + fi + docker pull "$IMAGE_NAME" + + cname='lobe' + port=3210 + docker rm -f $cname + cid1=$(docker run --name $cname -p $port:$port -d "$IMAGE_NAME") + export cid1 + echo $cid1 + + timeout 30 ${CMD} watch --debug-output http --local-ports $port 2>&1 | tee "${SERVER_LNAME}" & + sleep 15 + curl http://localhost:3210 + wait + + cat "${SERVER_LNAME}" + docker rm -f $cid1 || true + check_patterns_in_file "${SERVER_LNAME}" "localhost:3210" +} + + +function main() { + test_client + test_server +} + +main \ No newline at end of file