From 42267e4ed9838733daeb75294dfe1f1ea5b246e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=83=88=E9=A6=99?= Date: Fri, 24 Jan 2025 03:08:42 +0800 Subject: [PATCH] fix: big syscall data truncated may lead to failing to parse HTTP message (#274) fix: handle big syscall data (truncated) properly When we fail to read the body, it might be due to the response being too large, causing syscall data to be missing when transferred to user space. Here, we attempt to find a boundary. If found, that's ideal and we return immediately. Otherwise, we try to locate a Fake Data Mark (FDM). When user space detects missing data from the kernel (possibly due to exceeding MAX_MSG_SIZE or situations like readv/writev where a buffer array is read/written at once), it supplements with fake data in user space. At the beginning of this fake data, an FDM is set, which is a special string. Following the FDM, the length of the supplemental fake data (minus the length of the FDM) is written. --- .github/workflows/test.yml | 17 ++++++ agent/conn/conntrack.go | 36 +++++++++++- agent/conn/processor.go | 4 +- agent/protocol/fakedata.go | 42 +++++++++++++ agent/protocol/http.go | 101 +++++++++++++++++++++----------- agent/protocol/http_test.go | 2 +- bpf/data_common.h | 13 ++-- testdata/test_truncated_data.sh | 69 ++++++++++++++++++++++ 8 files changed, 238 insertions(+), 46 deletions(-) create mode 100644 agent/protocol/fakedata.go create mode 100644 testdata/test_truncated_data.sh diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 28d46571..7b8cedbf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -161,6 +161,23 @@ jobs: sudo chmod +x /usr/local/bin/docker-compose docker-compose --version + - name: Test Truncated Data parsing + uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19 + if: ${{ !contains(fromJSON('["4.19-20240912.022020", "5.4-20240912.022020"]'), matrix.kernel) }} + with: + provision: 'false' + cmd: | + set -euxo pipefail + uname -a + cat /etc/issue + pushd /host + if [ -f "/var/lib/kyanos/btf/current.btf" ]; then + bash /host/testdata/test_truncated_data.sh 'sudo /host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf' + else + bash /host/testdata/test_truncated_data.sh 'sudo /host/kyanos/kyanos $kyanos_log_option' + fi + popd + - name: Test Kafka uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19 with: diff --git a/agent/conn/conntrack.go b/agent/conn/conntrack.go index 5f98f34c..dd8bfe0b 100644 --- a/agent/conn/conntrack.go +++ b/agent/conn/conntrack.go @@ -507,6 +507,29 @@ func (c *Connection4) OnSslDataEvent(data []byte, event *bpf.SslData, recordChan } } } + +func isSyscallFunctionMultiMessage(f bpf.AgentSourceFunctionT) bool { + return f == bpf.AgentSourceFunctionTKSyscallSendMMsg || + f == bpf.AgentSourceFunctionTKSyscallRecvMMsg || + f == bpf.AgentSourceFunctionTKSyscallWriteV || + f == bpf.AgentSourceFunctionTKSyscallReadV +} +func fillSyscallDataIfNeeded(data []byte, event *bpf.SyscallEventData, c *Connection4) []byte { + + if event.SyscallEvent.Ke.Len > event.SyscallEvent.BufSize { + common.ConntrackLog.Debugf("syscall read/write data too len and some data can't be captured, so we need to fill a fake data, len: %d, bufsize: %d", event.SyscallEvent.Ke.Len, event.SyscallEvent.BufSize) + + fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len - event.SyscallEvent.BufSize) + if !ok { + fakeData = make([]byte, event.SyscallEvent.Ke.Len-event.SyscallEvent.BufSize) + } + data = append(data, fakeData...) + return data + } else { + return data + } +} + func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, recordChannel chan RecordWithConn) bool { addedToBuffer := true if len(data) > 0 { @@ -515,13 +538,23 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r common.ConntrackLog.Warnf("%s is ssl, but receive syscall event with data!", c.ToString()) } } else { + data = fillSyscallDataIfNeeded(data, event, c) addedToBuffer = c.addDataToBufferAndTryParse(data, &event.SyscallEvent.Ke) } } else if event.SyscallEvent.GetSourceFunction() == bpf.AgentSourceFunctionTKSyscallSendfile { // sendfile has no data, so we need to fill a fake data common.ConntrackLog.Debug("sendfile has no data, so we need to fill a fake data") - fakeData := make([]byte, event.SyscallEvent.Ke.Len) + fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len) + if !ok { + fakeData = make([]byte, event.SyscallEvent.Ke.Len) + } addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke) + } else if isSyscallFunctionMultiMessage(event.SyscallEvent.GetSourceFunction()) && !c.ssl { + common.ConntrackLog.Debug("syscall read/write multiple message and some data can't be captured, so we need to fill a fake data") + fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len) + if ok { + addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke) + } } if !addedToBuffer { return false @@ -531,7 +564,6 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r parser := c.GetProtocolParser(c.Protocol) if parser == nil { return true - panic("no protocol parser!") } records := parser.Match(c.ReqQueue, c.RespQueue) diff --git a/agent/conn/processor.go b/agent/conn/processor.go index d416d9e1..9af95854 100644 --- a/agent/conn/processor.go +++ b/agent/conn/processor.go @@ -309,10 +309,10 @@ func (p *Processor) run() { case event := <-p.firstPacketsEvents: p.handleFirstPacketEvent(event, recordChannel) case <-ticker.C: - p.processTimedKernEvents(recordChannel) - p.processTimedSyscallEvents(recordChannel) p.processTimedSslEvents(recordChannel) + p.processTimedKernEvents(recordChannel) p.processOldFirstPacketEvents(recordChannel) + p.processTimedSyscallEvents(recordChannel) } } } diff --git a/agent/protocol/fakedata.go b/agent/protocol/fakedata.go new file mode 100644 index 00000000..2e9a18fa --- /dev/null +++ b/agent/protocol/fakedata.go @@ -0,0 +1,42 @@ +package protocol + +import "strings" + +const fakeDataMarkPrefix = "__FAKE_DATA__" +const fakeDataMarkLen = len(fakeDataMarkPrefix) + 4 + +func MakeNewFakeData(size uint32) ([]byte, bool) { + if size < uint32(fakeDataMarkLen) { + return nil, false + } else { + buf := make([]byte, size) + copy(buf, fakeDataMarkPrefix) + size -= uint32(fakeDataMarkLen) + lenByte := []byte{byte(size >> 24), byte(size >> 16), byte(size >> 8), byte(size)} + buf[fakeDataMarkLen] = lenByte[0] + buf[fakeDataMarkLen+1] = lenByte[1] + buf[fakeDataMarkLen+2] = lenByte[2] + buf[fakeDataMarkLen+3] = lenByte[3] + return buf, true + } +} + +func fakeDataMarkIndex(buf []byte) (int, bool) { + if len(buf) < fakeDataMarkLen { + return -1, false + } + idx := strings.Index(string(buf), fakeDataMarkPrefix) + if idx < 0 { + return -1, false + } + return idx, true +} + +func getFakeDataSize(buf []byte, pos int) uint32 { + buf = buf[pos:] + if len(buf) < fakeDataMarkLen { + return 0 + } else { + return uint32(buf[fakeDataMarkLen])<<24 | uint32(buf[fakeDataMarkLen+1])<<16 | uint32(buf[fakeDataMarkLen+2])<<8 | uint32(buf[fakeDataMarkLen+3]) + } +} diff --git a/agent/protocol/http.go b/agent/protocol/http.go index e0f04f93..3099ac46 100644 --- a/agent/protocol/http.go +++ b/agent/protocol/http.go @@ -127,55 +127,90 @@ func (h *HTTPStreamParser) ParseRequest(buf string, messageType MessageType, tim } } -func (h HTTPStreamParser) ParseResponse(buf string, messageType MessageType, timestamp uint64, seq uint64) ParseResult { +func (h *HTTPStreamParser) ParseResponse(buf string, messageType MessageType, timestamp uint64, seq uint64, streamBuffer *buffer.StreamBuffer) ParseResult { reader := strings.NewReader(buf) bufioReader := bufio.NewReader(reader) resp, err := http.ReadResponse(bufioReader, nil) parseResult := ParseResult{} if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return ParseResult{ - ParseState: NeedsMoreData, - } - } else { - return ParseResult{ - ParseState: Invalid, - } + return h.handleReadResponseError(err, buf, streamBuffer, messageType, timestamp, seq) + } + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return h.handleReadBodyError(err, buf, streamBuffer, messageType, timestamp, seq) + } + + readIndex := common.GetBufioReaderReadIndex(bufioReader) + if readIndex == 0 && len(respBody) > 0 { + readIndex = len(buf) + } else if readIndex == 0 { + return ParseResult{ + ParseState: NeedsMoreData, } - } else { - respBody, err := io.ReadAll(resp.Body) - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - return ParseResult{ - ParseState: NeedsMoreData, - } - } else { - return ParseResult{ - ParseState: Invalid, - } - } + } + parseResult.ReadBytes = readIndex + parseResult.ParsedMessages = []ParsedMessage{ + &ParsedHttpResponse{ + FrameBase: NewFrameBase(timestamp, readIndex, seq), + buf: []byte(buf[:readIndex]), + }, + } + parseResult.ParseState = Success + return parseResult +} + +func (h *HTTPStreamParser) handleReadResponseError(err error, buf string, streamBuffer *buffer.StreamBuffer, messageType MessageType, timestamp uint64, seq uint64) ParseResult { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return ParseResult{ + ParseState: NeedsMoreData, } - readIndex := common.GetBufioReaderReadIndex(bufioReader) - if readIndex == 0 && len(respBody) > 0 { - readIndex = len(buf) - } else if readIndex == 0 { - return ParseResult{ - ParseState: NeedsMoreData, - } + } else { + return ParseResult{ + ParseState: Invalid, } - parseResult.ReadBytes = readIndex + } +} + +func (h *HTTPStreamParser) handleReadBodyError(err error, buf string, streamBuffer *buffer.StreamBuffer, messageType MessageType, timestamp uint64, seq uint64) ParseResult { + parseResult := ParseResult{} + boundary := h.FindBoundary(streamBuffer, messageType, 0) + if boundary > 0 { + parseResult.ReadBytes = boundary parseResult.ParsedMessages = []ParsedMessage{ &ParsedHttpResponse{ - FrameBase: NewFrameBase(timestamp, readIndex, seq), - buf: []byte(buf[:readIndex]), + FrameBase: NewFrameBase(timestamp, boundary, seq), + buf: []byte(buf[:boundary]), }, } parseResult.ParseState = Success return parseResult + } else if fakeDataIdx, _ := fakeDataMarkIndex([]byte(buf)); fakeDataIdx != -1 { + fakeDataSize := getFakeDataSize([]byte(buf), fakeDataIdx) + if len(buf) >= fakeDataIdx+int(fakeDataSize)+fakeDataMarkLen { + parseResult.ReadBytes = fakeDataIdx + int(fakeDataSize) + fakeDataMarkLen + parseResult.ParsedMessages = []ParsedMessage{ + &ParsedHttpResponse{ + FrameBase: NewFrameBase(timestamp, parseResult.ReadBytes, seq), + buf: []byte(buf[:parseResult.ReadBytes]), + }, + } + parseResult.ParseState = Success + return parseResult + } + } + if err == io.EOF || err == io.ErrUnexpectedEOF { + return ParseResult{ + ParseState: NeedsMoreData, + } + } else { + return ParseResult{ + ParseState: Invalid, + } } } -func (h HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult { +func (h *HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult { head := streamBuffer.Head() buf := string(head.Buffer()) ts, ok := streamBuffer.FindTimestampBySeq(head.LeftBoundary()) @@ -188,7 +223,7 @@ func (h HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, message case Request: return h.ParseRequest(buf, messageType, ts, head.LeftBoundary()) case Response: - return h.ParseResponse(buf, messageType, ts, head.LeftBoundary()) + return h.ParseResponse(buf, messageType, ts, head.LeftBoundary(), streamBuffer) default: panic("messageType invalid") } diff --git a/agent/protocol/http_test.go b/agent/protocol/http_test.go index dfa86924..6992f963 100644 --- a/agent/protocol/http_test.go +++ b/agent/protocol/http_test.go @@ -153,7 +153,7 @@ func TestParseResponse(t *testing.T) { parser := protocol.HTTPStreamParser{} - parseResult := parser.ParseResponse(httpMessage, protocol.Response, 10, 20) + parseResult := parser.ParseResponse(httpMessage, protocol.Response, 10, 20, buffer) assert.Equal(t, protocol.Success, parseResult.ParseState) assert.Equal(t, 1, len(parseResult.ParsedMessages)) diff --git a/bpf/data_common.h b/bpf/data_common.h index 10f3eec1..567930d0 100644 --- a/bpf/data_common.h +++ b/bpf/data_common.h @@ -124,14 +124,6 @@ static bool __always_inline report_conn_evt(void* ctx, struct conn_info_t *conn_ } static __inline bool should_trace_conn(struct conn_info_t *conn_info) { - // conn_info->laddr.in4.sin_port - // bpf_printk("conn_info->laddr.in4.sin_port: %d, %d", - // conn_info->laddr.in4.sin_port,conn_info->raddr.in4.sin_port); - // if (conn_info->laddr.in4.sin_port == target_port || - // conn_info->raddr.in4.sin_port == target_port) { - // return true; - // } - return conn_info->protocol != kProtocolUnknown && conn_info->no_trace <= traceable ; } @@ -205,6 +197,8 @@ static void __always_inline report_syscall_buf(void* ctx, uint64_t seq, struct c evt->buf_size = amount_copied; size_t __len = sizeof(struct kern_evt) + sizeof(uint32_t) + amount_copied; bpf_perf_event_output(ctx, &syscall_rb, BPF_F_CURRENT_CPU, evt, __len); + + // bpf_printk("len:%d, amount_copied:%d", len, amount_copied); } static void __always_inline report_syscall_evt(void* ctx, uint64_t seq, struct conn_id_s_t *conn_id_s, uint32_t len, enum step_t step, struct data_args *args, bool prepend_length_header, uint32_t length_header) { report_syscall_buf(ctx, seq, conn_id_s, len, step, args->start_ts, args->end_ts - args->start_ts, args->buf, args->source_fn, prepend_length_header, length_header); @@ -266,6 +260,9 @@ static void __always_inline report_syscall_evt_vecs(void* ctx, uint64_t seq, str bytes_sent += iov_size; seq += iov_size; } + if (bytes_sent < total_size) { + report_syscall_buf_without_data(ctx, seq, conn_id_s, total_size - bytes_sent, step, args->start_ts, args->end_ts - args->start_ts, args->source_fn); + } } diff --git a/testdata/test_truncated_data.sh b/testdata/test_truncated_data.sh new file mode 100644 index 00000000..714e3306 --- /dev/null +++ b/testdata/test_truncated_data.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash +. $(dirname "$0")/common.sh +set -ex + +CMD="$1" +DOCKER_REGISTRY="$2" +FILE_PREFIX="/tmp/kyanos" +CLIENT_LNAME="${FILE_PREFIX}_truncated_client.log" +SERVER_LNAME="${FILE_PREFIX}_truncated_server.log" + + +function test_client() { + if [ -z "$DOCKER_REGISTRY" ]; then + IMAGE_NAME="lobehub/lobe-chat:v1.46.7" + else + IMAGE_NAME=$DOCKER_REGISTRY"/lobehub/lobe-chat:v1.46.7" + fi + docker pull "$IMAGE_NAME" + + cname='lobe' + port=3210 + docker rm -f $cname + cid1=$(docker run --name $cname -p $port:$port -d "$IMAGE_NAME") + export cid1 + echo $cid1 + + timeout 30 ${CMD} watch --debug-output http --remote-ports $port 2>&1 | tee "${CLIENT_LNAME}" & + sleep 15 + curl http://localhost:3210 + wait + + cat "${CLIENT_LNAME}" + docker rm -f $cid1 || true + check_patterns_in_file "${CLIENT_LNAME}" "localhost:3210" +} + + +function test_server() { + if [ -z "$DOCKER_REGISTRY" ]; then + IMAGE_NAME="lobehub/lobe-chat:v1.46.7" + else + IMAGE_NAME=$DOCKER_REGISTRY"/lobehub/lobe-chat:v1.46.7" + fi + docker pull "$IMAGE_NAME" + + cname='lobe' + port=3210 + docker rm -f $cname + cid1=$(docker run --name $cname -p $port:$port -d "$IMAGE_NAME") + export cid1 + echo $cid1 + + timeout 30 ${CMD} watch --debug-output http --local-ports $port 2>&1 | tee "${SERVER_LNAME}" & + sleep 15 + curl http://localhost:3210 + wait + + cat "${SERVER_LNAME}" + docker rm -f $cid1 || true + check_patterns_in_file "${SERVER_LNAME}" "localhost:3210" +} + + +function main() { + test_client + test_server +} + +main \ No newline at end of file