Skip to content

Commit 148c7d7

Browse files
committed
perf: 优化下流量暴力压测
1 parent 8ffbc71 commit 148c7d7

File tree

4 files changed

+64
-20
lines changed

4 files changed

+64
-20
lines changed

bytes_pool.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ func putBytes(bytes *[]byte) {
9898
if cap(*bytes)%page != 0 {
9999
index-- // 向前挪一格, 可以保证空间是够的
100100
}
101-
// fmt.Printf("putBytes index: %d\n", index)
102101
smallPools[index].Put(bytes)
103102
}
104103

@@ -107,7 +106,6 @@ func putBytes(bytes *[]byte) {
107106
var bigPools = make([]sync.Pool, 0, 4)
108107
var bigPoolsSize = []int{
109108
512 * 1024,
110-
1024 * 1024,
111109
}
112110

113111
func init() {

conn.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package pulse
22

33
import (
44
"errors"
5-
"log"
5+
"log/slog"
66
"net"
77
"sync"
88
"sync/atomic"
@@ -79,11 +79,13 @@ func (c *Conn) close() {
7979
c.safeConns.Del(int(oldFd))
8080
if err := core.Close(int(oldFd)); err != nil {
8181
// Log the error but don't panic as this is a cleanup function
82-
log.Printf("failed to close fd %d: %v", oldFd, err)
82+
slog.Error("failed to close fd", "fd", oldFd, "error", err)
8383
}
8484
}
8585
for _, wbuf := range c.wbufList {
86-
putBytes(wbuf)
86+
if wbuf != nil {
87+
putBytes(wbuf)
88+
}
8789
}
8890
c.wbufList = c.wbufList[:0]
8991
}
@@ -129,13 +131,19 @@ func (c *Conn) Write(data []byte) (int, error) {
129131
if errors.Is(err, core.EAGAIN) || errors.Is(err, core.EINTR) || err == nil {
130132
// 部分写入成功,或者全部失败
131133
// 把剩余数据放到缓冲区
134+
if n < 0 {
135+
n = 0
136+
}
132137
if n < len(data) {
133138
newBuf := getBytes(len(data) - n)
134139
copy(*newBuf, data[n:])
135-
c.wbufList = append(c.wbufList, newBuf)
136140
if err := c.eventLoop.AddWrite(c.getFd()); err != nil {
137-
log.Printf("failed to add write event: %v", err)
141+
// 如果事件注册失败,释放缓冲区
142+
putBytes(newBuf)
143+
slog.Error("failed to add write event", "error", err)
144+
return n, err
138145
}
146+
c.wbufList = append(c.wbufList, newBuf)
139147
}
140148
return n, nil
141149
}
@@ -161,18 +169,27 @@ func (c *Conn) Write(data []byte) (int, error) {
161169
if errors.Is(err, core.EAGAIN) || errors.Is(err, core.EINTR) || err == nil {
162170
if n < len(*wbuf) {
163171
// 部分写入,移动剩余数据到缓冲区开始位置
164-
copy(*wbuf, (*wbuf)[n:])
165-
*wbuf = (*wbuf)[:len(*wbuf)-n]
172+
if n > 0 {
173+
newBuf := getBytes(len(*wbuf) - n)
174+
copy(*newBuf, (*wbuf)[n:])
175+
putBytes(wbuf)
176+
c.wbufList[i] = newBuf
177+
}
166178
// 释放已处理完的缓冲区
167179
for j := lastIndex; j < i; j++ {
180+
if c.wbufList[j] == nil {
181+
continue
182+
}
168183
putBytes(c.wbufList[j])
184+
c.wbufList[j] = nil
169185
}
170186
// 移动未处理的缓冲区到列表开始位置
171187
copy(c.wbufList, c.wbufList[i:])
172188
c.wbufList = c.wbufList[:len(c.wbufList)-i]
173189
return len(data), nil
174190
}
175191
putBytes(wbuf)
192+
c.wbufList[i] = nil
176193
lastIndex = i + 1
177194
continue
178195
}
@@ -185,14 +202,14 @@ func (c *Conn) Write(data []byte) (int, error) {
185202
// 所有数据都已写入
186203
c.wbufList = c.wbufList[:0]
187204
if err := c.eventLoop.ResetRead(c.getFd()); err != nil {
188-
log.Printf("failed to reset read event: %v", err)
205+
slog.Error("failed to reset read event", "error", err)
189206
}
190207
return len(data), nil
191208
}
192209

193210
func (c *Conn) flush() {
194211
if _, err := c.Write(nil); err != nil {
195-
log.Printf("failed to flush write buffer: %v", err)
212+
slog.Error("failed to flush write buffer", "error", err)
196213
}
197214
}
198215

@@ -229,7 +246,7 @@ func handleData(c *Conn, options *Options, rawData []byte) {
229246
}
230247
return true
231248
}); err != nil {
232-
log.Printf("failed to add task: %v", err)
249+
slog.Error("failed to add task", "error", err)
233250
// 释放newBytes since task failed
234251
if newBytes != nil {
235252
putBytes(newBytes)

multi_event_loops.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pulse
33
import (
44
"context"
55
"errors"
6+
"io"
67
"log"
78
"log/slog"
89
"net"
@@ -43,6 +44,17 @@ func (m *MultiEventLoop) initDefaultSetting() {
4344
if m.options.eventLoopReadBufferSize == 0 {
4445
m.options.eventLoopReadBufferSize = defEventLoopReadBufferSize
4546
}
47+
48+
if m.options.maxSocketReadTimes == 0 {
49+
if m.options.triggerType == TriggerTypeLevel {
50+
// 水平触发模式下使用默认值
51+
m.options.maxSocketReadTimes = defMaxSocketReadTimes
52+
} else {
53+
// 边缘触发模式下不限制读取次数
54+
m.options.maxSocketReadTimes = -1
55+
}
56+
m.options.maxSocketReadTimes = defMaxSocketReadTimes
57+
}
4658
}
4759

4860
func NewMultiEventLoop(ctx context.Context, options ...func(*Options)) (e *MultiEventLoop, err error) {
@@ -133,14 +145,15 @@ func (e *MultiEventLoop) ListenAndServe(addr string) error {
133145
if _, err := eventLoop.Poll(0, func(fd int, state core.State, err error) {
134146

135147
c := safeConns.GetUnsafe(fd)
148+
// c := safeConns.Get(fd)
136149
// slog.Debug("poll", "fd", fd, "state", state, "err", err)
137150
if err != nil {
138151
if errors.Is(err, core.EAGAIN) {
139152
return
140153
}
141154
if c != nil {
142-
e.options.callback.OnClose(c, err)
143155
c.Close()
156+
e.options.callback.OnClose(c, err)
144157
}
145158
return
146159
}
@@ -167,7 +180,11 @@ func (e *MultiEventLoop) ListenAndServe(addr string) error {
167180
}
168181

169182
func (e *MultiEventLoop) doRead(c *Conn, rbuf []byte) {
170-
for {
183+
for i := 0; ; i++ {
184+
if e.options.maxSocketReadTimes > 0 && i >= e.options.maxSocketReadTimes {
185+
return
186+
}
187+
171188
// 循环读取数据
172189
c.mu.Lock()
173190
n, err := core.Read(c.getFd(), rbuf)
@@ -189,6 +206,9 @@ func (e *MultiEventLoop) doRead(c *Conn, rbuf []byte) {
189206
}
190207

191208
if n == 0 {
209+
// 如果不是这个错误直接关闭连接
210+
c.Close()
211+
e.options.callback.OnClose(c, io.EOF)
192212
return
193213
}
194214
if n > 0 {

multi_event_loops_options.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ var (
1111
defTaskMax = 30000
1212
defTaskInitCount = 8
1313
defEventLoopReadBufferSize = 1024 * 4
14+
defMaxSocketReadTimes = 1
1415
)
1516

1617
type TaskType int
@@ -38,12 +39,20 @@ type taskConfig struct {
3839

3940
// 边缘触发
4041
type Options struct {
41-
callback Callback
42-
task taskConfig
43-
level slog.Level
44-
taskType TaskType
45-
triggerType core.TriggerType
46-
eventLoopReadBufferSize int
42+
callback Callback // 回调函数
43+
task taskConfig // 协程池配置
44+
level slog.Level // 日志级别
45+
taskType TaskType // 任务类型
46+
triggerType core.TriggerType // 触发类型, 水平触发还是边缘触发
47+
eventLoopReadBufferSize int // event loop中读buffer的大小
48+
maxSocketReadTimes int // socket单次最大读取次数
49+
}
50+
51+
// 最大读取次数
52+
func WithMaxSocketReadTimes(maxSocketReadTimes int) func(*Options) {
53+
return func(o *Options) {
54+
o.maxSocketReadTimes = maxSocketReadTimes
55+
}
4756
}
4857

4958
// 设置回调函数

0 commit comments

Comments
 (0)