Skip to content

Commit

Permalink
fix: big syscall data truncated may lead to failing to parse HTTP mes…
Browse files Browse the repository at this point in the history
…sage (#274)

fix: handle big syscall data (truncated) properly

When we fail to read the body, it might be due to the response being too large, causing syscall data
to be missing when transferred to user space. Here, we attempt to find a boundary. If found, that's
ideal and we return immediately. Otherwise, we try to locate a Fake Data Mark (FDM). When user space
detects missing data from the kernel (possibly due to exceeding MAX_MSG_SIZE or situations like
readv/writev where a buffer array is read/written at once), it supplements with fake data in user
space. At the beginning of this fake data, an FDM is set, which is a special string. Following the
FDM, the length of the supplemental fake data (minus the length of the FDM) is written.
  • Loading branch information
hengyoush authored Jan 23, 2025
1 parent 927d0dc commit 42267e4
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 46 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,23 @@ jobs:
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version
- name: Test Truncated Data parsing
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
if: ${{ !contains(fromJSON('["4.19-20240912.022020", "5.4-20240912.022020"]'), matrix.kernel) }}
with:
provision: 'false'
cmd: |
set -euxo pipefail
uname -a
cat /etc/issue
pushd /host
if [ -f "/var/lib/kyanos/btf/current.btf" ]; then
bash /host/testdata/test_truncated_data.sh 'sudo /host/kyanos/kyanos $kyanos_log_option --btf /var/lib/kyanos/btf/current.btf'
else
bash /host/testdata/test_truncated_data.sh 'sudo /host/kyanos/kyanos $kyanos_log_option'
fi
popd
- name: Test Kafka
uses: cilium/little-vm-helper@97c89f004bd0ab4caeacfe92ebc956e13e362e6b # v0.0.19
with:
Expand Down
36 changes: 34 additions & 2 deletions agent/conn/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,29 @@ func (c *Connection4) OnSslDataEvent(data []byte, event *bpf.SslData, recordChan
}
}
}

func isSyscallFunctionMultiMessage(f bpf.AgentSourceFunctionT) bool {
return f == bpf.AgentSourceFunctionTKSyscallSendMMsg ||
f == bpf.AgentSourceFunctionTKSyscallRecvMMsg ||
f == bpf.AgentSourceFunctionTKSyscallWriteV ||
f == bpf.AgentSourceFunctionTKSyscallReadV
}
func fillSyscallDataIfNeeded(data []byte, event *bpf.SyscallEventData, c *Connection4) []byte {

if event.SyscallEvent.Ke.Len > event.SyscallEvent.BufSize {
common.ConntrackLog.Debugf("syscall read/write data too len and some data can't be captured, so we need to fill a fake data, len: %d, bufsize: %d", event.SyscallEvent.Ke.Len, event.SyscallEvent.BufSize)

fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len - event.SyscallEvent.BufSize)
if !ok {
fakeData = make([]byte, event.SyscallEvent.Ke.Len-event.SyscallEvent.BufSize)
}
data = append(data, fakeData...)
return data
} else {
return data
}
}

func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, recordChannel chan RecordWithConn) bool {
addedToBuffer := true
if len(data) > 0 {
Expand All @@ -515,13 +538,23 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r
common.ConntrackLog.Warnf("%s is ssl, but receive syscall event with data!", c.ToString())
}
} else {
data = fillSyscallDataIfNeeded(data, event, c)
addedToBuffer = c.addDataToBufferAndTryParse(data, &event.SyscallEvent.Ke)
}
} else if event.SyscallEvent.GetSourceFunction() == bpf.AgentSourceFunctionTKSyscallSendfile {
// sendfile has no data, so we need to fill a fake data
common.ConntrackLog.Debug("sendfile has no data, so we need to fill a fake data")
fakeData := make([]byte, event.SyscallEvent.Ke.Len)
fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len)
if !ok {
fakeData = make([]byte, event.SyscallEvent.Ke.Len)
}
addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke)
} else if isSyscallFunctionMultiMessage(event.SyscallEvent.GetSourceFunction()) && !c.ssl {
common.ConntrackLog.Debug("syscall read/write multiple message and some data can't be captured, so we need to fill a fake data")
fakeData, ok := protocol.MakeNewFakeData(event.SyscallEvent.Ke.Len)
if ok {
addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke)
}
}
if !addedToBuffer {
return false
Expand All @@ -531,7 +564,6 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r
parser := c.GetProtocolParser(c.Protocol)
if parser == nil {
return true
panic("no protocol parser!")
}

records := parser.Match(c.ReqQueue, c.RespQueue)
Expand Down
4 changes: 2 additions & 2 deletions agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (p *Processor) run() {
case event := <-p.firstPacketsEvents:
p.handleFirstPacketEvent(event, recordChannel)
case <-ticker.C:
p.processTimedKernEvents(recordChannel)
p.processTimedSyscallEvents(recordChannel)
p.processTimedSslEvents(recordChannel)
p.processTimedKernEvents(recordChannel)
p.processOldFirstPacketEvents(recordChannel)
p.processTimedSyscallEvents(recordChannel)
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions agent/protocol/fakedata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package protocol

import "strings"

const fakeDataMarkPrefix = "__FAKE_DATA__"
const fakeDataMarkLen = len(fakeDataMarkPrefix) + 4

func MakeNewFakeData(size uint32) ([]byte, bool) {
if size < uint32(fakeDataMarkLen) {
return nil, false
} else {
buf := make([]byte, size)
copy(buf, fakeDataMarkPrefix)
size -= uint32(fakeDataMarkLen)
lenByte := []byte{byte(size >> 24), byte(size >> 16), byte(size >> 8), byte(size)}
buf[fakeDataMarkLen] = lenByte[0]
buf[fakeDataMarkLen+1] = lenByte[1]
buf[fakeDataMarkLen+2] = lenByte[2]
buf[fakeDataMarkLen+3] = lenByte[3]
return buf, true
}
}

func fakeDataMarkIndex(buf []byte) (int, bool) {
if len(buf) < fakeDataMarkLen {
return -1, false
}
idx := strings.Index(string(buf), fakeDataMarkPrefix)
if idx < 0 {
return -1, false
}
return idx, true
}

func getFakeDataSize(buf []byte, pos int) uint32 {
buf = buf[pos:]
if len(buf) < fakeDataMarkLen {
return 0
} else {
return uint32(buf[fakeDataMarkLen])<<24 | uint32(buf[fakeDataMarkLen+1])<<16 | uint32(buf[fakeDataMarkLen+2])<<8 | uint32(buf[fakeDataMarkLen+3])
}
}
101 changes: 68 additions & 33 deletions agent/protocol/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,55 +127,90 @@ func (h *HTTPStreamParser) ParseRequest(buf string, messageType MessageType, tim
}
}

func (h HTTPStreamParser) ParseResponse(buf string, messageType MessageType, timestamp uint64, seq uint64) ParseResult {
func (h *HTTPStreamParser) ParseResponse(buf string, messageType MessageType, timestamp uint64, seq uint64, streamBuffer *buffer.StreamBuffer) ParseResult {
reader := strings.NewReader(buf)
bufioReader := bufio.NewReader(reader)
resp, err := http.ReadResponse(bufioReader, nil)
parseResult := ParseResult{}
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return ParseResult{
ParseState: NeedsMoreData,
}
} else {
return ParseResult{
ParseState: Invalid,
}
return h.handleReadResponseError(err, buf, streamBuffer, messageType, timestamp, seq)
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return h.handleReadBodyError(err, buf, streamBuffer, messageType, timestamp, seq)
}

readIndex := common.GetBufioReaderReadIndex(bufioReader)
if readIndex == 0 && len(respBody) > 0 {
readIndex = len(buf)
} else if readIndex == 0 {
return ParseResult{
ParseState: NeedsMoreData,
}
} else {
respBody, err := io.ReadAll(resp.Body)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return ParseResult{
ParseState: NeedsMoreData,
}
} else {
return ParseResult{
ParseState: Invalid,
}
}
}
parseResult.ReadBytes = readIndex
parseResult.ParsedMessages = []ParsedMessage{
&ParsedHttpResponse{
FrameBase: NewFrameBase(timestamp, readIndex, seq),
buf: []byte(buf[:readIndex]),
},
}
parseResult.ParseState = Success
return parseResult
}

func (h *HTTPStreamParser) handleReadResponseError(err error, buf string, streamBuffer *buffer.StreamBuffer, messageType MessageType, timestamp uint64, seq uint64) ParseResult {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return ParseResult{
ParseState: NeedsMoreData,
}
readIndex := common.GetBufioReaderReadIndex(bufioReader)
if readIndex == 0 && len(respBody) > 0 {
readIndex = len(buf)
} else if readIndex == 0 {
return ParseResult{
ParseState: NeedsMoreData,
}
} else {
return ParseResult{
ParseState: Invalid,
}
parseResult.ReadBytes = readIndex
}
}

func (h *HTTPStreamParser) handleReadBodyError(err error, buf string, streamBuffer *buffer.StreamBuffer, messageType MessageType, timestamp uint64, seq uint64) ParseResult {
parseResult := ParseResult{}
boundary := h.FindBoundary(streamBuffer, messageType, 0)
if boundary > 0 {
parseResult.ReadBytes = boundary
parseResult.ParsedMessages = []ParsedMessage{
&ParsedHttpResponse{
FrameBase: NewFrameBase(timestamp, readIndex, seq),
buf: []byte(buf[:readIndex]),
FrameBase: NewFrameBase(timestamp, boundary, seq),
buf: []byte(buf[:boundary]),
},
}
parseResult.ParseState = Success
return parseResult
} else if fakeDataIdx, _ := fakeDataMarkIndex([]byte(buf)); fakeDataIdx != -1 {
fakeDataSize := getFakeDataSize([]byte(buf), fakeDataIdx)
if len(buf) >= fakeDataIdx+int(fakeDataSize)+fakeDataMarkLen {
parseResult.ReadBytes = fakeDataIdx + int(fakeDataSize) + fakeDataMarkLen
parseResult.ParsedMessages = []ParsedMessage{
&ParsedHttpResponse{
FrameBase: NewFrameBase(timestamp, parseResult.ReadBytes, seq),
buf: []byte(buf[:parseResult.ReadBytes]),
},
}
parseResult.ParseState = Success
return parseResult
}
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
return ParseResult{
ParseState: NeedsMoreData,
}
} else {
return ParseResult{
ParseState: Invalid,
}
}
}

func (h HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult {
func (h *HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType MessageType) ParseResult {
head := streamBuffer.Head()
buf := string(head.Buffer())
ts, ok := streamBuffer.FindTimestampBySeq(head.LeftBoundary())
Expand All @@ -188,7 +223,7 @@ func (h HTTPStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, message
case Request:
return h.ParseRequest(buf, messageType, ts, head.LeftBoundary())
case Response:
return h.ParseResponse(buf, messageType, ts, head.LeftBoundary())
return h.ParseResponse(buf, messageType, ts, head.LeftBoundary(), streamBuffer)
default:
panic("messageType invalid")
}
Expand Down
2 changes: 1 addition & 1 deletion agent/protocol/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestParseResponse(t *testing.T) {

parser := protocol.HTTPStreamParser{}

parseResult := parser.ParseResponse(httpMessage, protocol.Response, 10, 20)
parseResult := parser.ParseResponse(httpMessage, protocol.Response, 10, 20, buffer)

assert.Equal(t, protocol.Success, parseResult.ParseState)
assert.Equal(t, 1, len(parseResult.ParsedMessages))
Expand Down
13 changes: 5 additions & 8 deletions bpf/data_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,6 @@ static bool __always_inline report_conn_evt(void* ctx, struct conn_info_t *conn_
}

static __inline bool should_trace_conn(struct conn_info_t *conn_info) {
// conn_info->laddr.in4.sin_port
// bpf_printk("conn_info->laddr.in4.sin_port: %d, %d",
// conn_info->laddr.in4.sin_port,conn_info->raddr.in4.sin_port);
// if (conn_info->laddr.in4.sin_port == target_port ||
// conn_info->raddr.in4.sin_port == target_port) {
// return true;
// }

return conn_info->protocol != kProtocolUnknown && conn_info->no_trace <= traceable ;
}

Expand Down Expand Up @@ -205,6 +197,8 @@ static void __always_inline report_syscall_buf(void* ctx, uint64_t seq, struct c
evt->buf_size = amount_copied;
size_t __len = sizeof(struct kern_evt) + sizeof(uint32_t) + amount_copied;
bpf_perf_event_output(ctx, &syscall_rb, BPF_F_CURRENT_CPU, evt, __len);

// bpf_printk("len:%d, amount_copied:%d", len, amount_copied);
}
static void __always_inline report_syscall_evt(void* ctx, uint64_t seq, struct conn_id_s_t *conn_id_s, uint32_t len, enum step_t step, struct data_args *args, bool prepend_length_header, uint32_t length_header) {
report_syscall_buf(ctx, seq, conn_id_s, len, step, args->start_ts, args->end_ts - args->start_ts, args->buf, args->source_fn, prepend_length_header, length_header);
Expand Down Expand Up @@ -266,6 +260,9 @@ static void __always_inline report_syscall_evt_vecs(void* ctx, uint64_t seq, str
bytes_sent += iov_size;
seq += iov_size;
}
if (bytes_sent < total_size) {
report_syscall_buf_without_data(ctx, seq, conn_id_s, total_size - bytes_sent, step, args->start_ts, args->end_ts - args->start_ts, args->source_fn);
}
}


Expand Down
69 changes: 69 additions & 0 deletions testdata/test_truncated_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env bash
. $(dirname "$0")/common.sh
set -ex

CMD="$1"
DOCKER_REGISTRY="$2"
FILE_PREFIX="/tmp/kyanos"
CLIENT_LNAME="${FILE_PREFIX}_truncated_client.log"
SERVER_LNAME="${FILE_PREFIX}_truncated_server.log"


function test_client() {
if [ -z "$DOCKER_REGISTRY" ]; then
IMAGE_NAME="lobehub/lobe-chat:v1.46.7"
else
IMAGE_NAME=$DOCKER_REGISTRY"/lobehub/lobe-chat:v1.46.7"
fi
docker pull "$IMAGE_NAME"

cname='lobe'
port=3210
docker rm -f $cname
cid1=$(docker run --name $cname -p $port:$port -d "$IMAGE_NAME")
export cid1
echo $cid1

timeout 30 ${CMD} watch --debug-output http --remote-ports $port 2>&1 | tee "${CLIENT_LNAME}" &
sleep 15
curl http://localhost:3210
wait

cat "${CLIENT_LNAME}"
docker rm -f $cid1 || true
check_patterns_in_file "${CLIENT_LNAME}" "localhost:3210"
}


function test_server() {
if [ -z "$DOCKER_REGISTRY" ]; then
IMAGE_NAME="lobehub/lobe-chat:v1.46.7"
else
IMAGE_NAME=$DOCKER_REGISTRY"/lobehub/lobe-chat:v1.46.7"
fi
docker pull "$IMAGE_NAME"

cname='lobe'
port=3210
docker rm -f $cname
cid1=$(docker run --name $cname -p $port:$port -d "$IMAGE_NAME")
export cid1
echo $cid1

timeout 30 ${CMD} watch --debug-output http --local-ports $port 2>&1 | tee "${SERVER_LNAME}" &
sleep 15
curl http://localhost:3210
wait

cat "${SERVER_LNAME}"
docker rm -f $cid1 || true
check_patterns_in_file "${SERVER_LNAME}" "localhost:3210"
}


function main() {
test_client
test_server
}

main

0 comments on commit 42267e4

Please sign in to comment.