Skip to content

Commit e0bc64e

Browse files
committed
feat: 加上流量背压机制的选项
1 parent 329e57a commit e0bc64e

File tree

9 files changed

+149
-33
lines changed

9 files changed

+149
-33
lines changed

bytes_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func init() {
4949
// go debug()
5050
}
5151

52-
func debug() {
52+
func debugPool() {
5353
for {
5454
time.Sleep(time.Second * 1)
5555
slog.Info("debug", "index7AllocCount", atomic.LoadInt64(&index7AllocCount),

conn.go

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@ import (
1414
)
1515

1616
type Conn struct {
17-
fd int64
18-
wbufList []*[]byte // write buffer, 为了理精细控制内存使用量
19-
mu sync.Mutex
20-
safeConns *safeConns[Conn]
21-
task driver.TaskExecutor
22-
eventLoop core.PollingApi
23-
readTimer *time.Timer
24-
writeTimer *time.Timer
25-
session any // 会话数据
26-
readBufferSize int // 读缓冲区大小
17+
fd int64
18+
wbufList []*[]byte // write buffer, 为了理精细控制内存使用量
19+
mu sync.Mutex
20+
safeConns *safeConns[Conn]
21+
task driver.TaskExecutor
22+
eventLoop core.PollingApi
23+
readTimer *time.Timer
24+
writeTimer *time.Timer
25+
session any // 会话数据
26+
27+
// 如果再加字段,可以改成对options的指针的访问, 目前只是浪费了8个字节
28+
readBufferSize int // 读缓冲区大小
29+
flowBackPressureRemoveRead bool // 流量背压机制,当连接的写缓冲区满了,会移除读事件
2730
}
2831

2932
func (c *Conn) SetNoDelay(nodelay bool) error {
@@ -36,7 +39,7 @@ func (c *Conn) getFd() int {
3639

3740
func newConn(fd int, safeConns *safeConns[Conn],
3841
task selectTasks, taskType TaskType,
39-
eventLoop core.PollingApi, readBufferSize int) *Conn {
42+
eventLoop core.PollingApi, readBufferSize int, flowBackPressureRemoveRead bool) *Conn {
4043
var taskExecutor driver.TaskExecutor
4144
switch taskType {
4245
case TaskTypeInConnectionGoroutine:
@@ -50,11 +53,12 @@ func newConn(fd int, safeConns *safeConns[Conn],
5053
}
5154

5255
return &Conn{
53-
fd: int64(fd),
54-
safeConns: safeConns,
55-
task: taskExecutor,
56-
eventLoop: eventLoop,
57-
readBufferSize: readBufferSize,
56+
fd: int64(fd),
57+
safeConns: safeConns,
58+
task: taskExecutor,
59+
eventLoop: eventLoop,
60+
readBufferSize: readBufferSize,
61+
flowBackPressureRemoveRead: flowBackPressureRemoveRead,
5862
}
5963
}
6064

@@ -109,7 +113,7 @@ func (c *Conn) writeToSocket(data []byte) (int, error) {
109113
if err == syscall.EAGAIN {
110114
return 0, err // 资源暂时不可用
111115
}
112-
return n, err // 其他错误直接返回
116+
return 0, err // 其他错误直接返回
113117

114118
}
115119

@@ -179,10 +183,19 @@ func (c *Conn) handlePartialWrite(data *[]byte, n int, needAppend bool) error {
179183
*data = (*data)[:len(*data)-n]
180184
}
181185

182-
if err := c.eventLoop.AddWrite(c.getFd()); err != nil {
183-
slog.Error("failed to add write event", "error", err)
184-
return err
186+
// 部分写入成功,或者全部失败
187+
// 如果启用了流量背压机制且有部分写入,先删除读事件
188+
if c.flowBackPressureRemoveRead {
189+
if delErr := c.eventLoop.DelRead(c.getFd()); delErr != nil {
190+
slog.Error("failed to delete read event", "error", delErr)
191+
}
192+
} else {
193+
if err := c.eventLoop.AddWrite(c.getFd()); err != nil {
194+
slog.Error("failed to add write event", "error", err)
195+
return err
196+
}
185197
}
198+
186199
return nil
187200
}
188201

@@ -201,7 +214,9 @@ func (c *Conn) Write(data []byte) (int, error) {
201214
if len(c.wbufList) == 0 {
202215
n, err := c.writeToSocket(data)
203216
if errors.Is(err, core.EAGAIN) || errors.Is(err, core.EINTR) || err == nil {
204-
// 部分写入成功,或者全部失败
217+
if n == len(data) {
218+
return n, nil
219+
}
205220
// 把剩余数据放到缓冲区
206221
if err := c.handlePartialWrite(&data, n, true); err != nil {
207222
c.close()
@@ -223,8 +238,14 @@ func (c *Conn) Write(data []byte) (int, error) {
223238
for i < len(c.wbufList) {
224239
wbuf := c.wbufList[i]
225240
n, err := c.writeToSocket(*wbuf)
226-
if errors.Is(err, core.EAGAIN) || errors.Is(err, core.EINTR) {
227-
// 部分写入,移动剩余数据到缓冲区开始位置
241+
if errors.Is(err, core.EAGAIN) || errors.Is(err, core.EINTR) || err == nil /*写入成功,也有n != len(*wbuf)的情况*/ {
242+
if n == len(*wbuf) {
243+
putBytes(wbuf)
244+
c.wbufList[i] = nil
245+
i++
246+
continue
247+
}
248+
// 移动剩余数据到缓冲区开始位置
228249
if err := c.handlePartialWrite(wbuf, n, false); err != nil {
229250
c.close()
230251
return 0, err
@@ -236,18 +257,13 @@ func (c *Conn) Write(data []byte) (int, error) {
236257
return len(data), nil
237258
}
238259

239-
if err != nil {
240-
c.close()
241-
return 0, err
242-
}
243-
244-
putBytes(wbuf)
245-
c.wbufList[i] = nil
246-
i++
260+
c.close()
261+
return n, err
247262
}
248263

249264
// 所有数据都已写入
250265
c.wbufList = c.wbufList[:0]
266+
// 如果启用了流量背压机制,重新添加读事件
251267
if err := c.eventLoop.ResetRead(c.getFd()); err != nil {
252268
slog.Error("failed to reset read event", "error", err)
253269
}

core/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type PollingApi interface {
4747
AddRead(fd int) error
4848
AddWrite(fd int) error
4949
ResetRead(fd int) error
50+
DelRead(fd int) error
5051
Del(fd int) error
5152
Poll(tv time.Duration, cb func(int, State, error)) (retVal int, err error)
5253
Free()

core/api_epoll.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ func Create(triggerType TriggerType) (la PollingApi, err error) {
8181
slog.Info("create epoll", "triggerType", triggerType)
8282
e.events = make([]syscall.EpollEvent, 1024)
8383
e.rev, e.wev, e.dwEv, e.resetEv = getReadWriteDeleteReset(triggerType == TriggerTypeEdge)
84+
if e.dwEv == 0 {
85+
panic("dwEv is 0")
86+
}
8487
return &e, nil
8588
}
8689

@@ -136,6 +139,18 @@ func (e *eventPollState) DelWrite(fd int) error {
136139
return nil
137140
}
138141

142+
// 删除读事件
143+
func (e *eventPollState) DelRead(fd int) error {
144+
if fd > 0 {
145+
// 移除读事件,只保留写事件
146+
return syscall.EpollCtl(e.epfd, syscall.EPOLL_CTL_MOD, fd, &syscall.EpollEvent{
147+
Fd: int32(fd),
148+
Events: uint32(syscall.EPOLLOUT),
149+
})
150+
}
151+
return nil
152+
}
153+
139154
// 删除事件
140155
func (e *eventPollState) Del(fd int) error {
141156
return syscall.EpollCtl(e.epfd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{Fd: int32(fd)})

core/api_iocp.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ func (i *iocp) ResetRead(fd int) error {
5656
return nil
5757
}
5858

59+
func (i *iocp) DelRead(fd int) error {
60+
i.events[fd] &^= READ
61+
return nil
62+
}
63+
5964
func (i *iocp) Del(fd int) error {
6065
delete(i.events, fd)
6166
return nil

core/api_kqueue.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ func (as *eventPollState) ResetRead(fd int) error {
7979
return err
8080
}
8181

82+
func (as *eventPollState) DelRead(fd int) error {
83+
if fd == -1 {
84+
return nil
85+
}
86+
87+
_, err := unix.Kevent(as.kqfd, []unix.Kevent_t{
88+
{Ident: uint64(fd), Flags: unix.EV_DELETE, Filter: unix.EVFILT_READ},
89+
}, nil, nil)
90+
return err
91+
}
92+
8293
func (as *eventPollState) Del(fd int) error {
8394
// _, err := unix.Kevent(as.kqfd, []unix.Kevent_t{
8495
// {Ident: uint64(fd), Flags: unix.EV_DELETE, Filter: unix.EVFILT_READ},

example/flow_backpressure/main.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"time"
8+
9+
"github.com/antlabs/pulse"
10+
)
11+
12+
type handler struct {
13+
name string
14+
}
15+
16+
func (h *handler) OnOpen(c *pulse.Conn) {
17+
slog.Info("connection opened", "name", h.name)
18+
}
19+
20+
func (h *handler) OnData(c *pulse.Conn, data []byte) {
21+
slog.Info("received data", "name", h.name, "size", len(data))
22+
23+
// 模拟回显大量数据,可能触发流量背压
24+
largeResponse := make([]byte, 10240) // 10KB 响应
25+
copy(largeResponse, data)
26+
27+
n, err := c.Write(largeResponse)
28+
if err != nil {
29+
slog.Error("failed to write response", "error", err)
30+
return
31+
}
32+
slog.Info("sent response", "bytes", n)
33+
}
34+
35+
func (h *handler) OnClose(c *pulse.Conn, err error) {
36+
if err != nil {
37+
slog.Info("connection closed with error", "name", h.name, "error", err)
38+
} else {
39+
slog.Info("connection closed normally", "name", h.name)
40+
}
41+
}
42+
43+
func main() {
44+
// 创建启用流量背压删除读事件机制的服务器
45+
server, err := pulse.NewMultiEventLoop(
46+
context.Background(),
47+
pulse.WithCallback(&handler{name: "flow-backpressure-server"}),
48+
pulse.WithTaskType(pulse.TaskTypeInEventLoop),
49+
pulse.WithTriggerType(pulse.TriggerTypeEdge),
50+
pulse.WithLogLevel(slog.LevelInfo),
51+
pulse.WithFlowBackPressureRemoveRead(true), // 启用流量背压删除读事件机制
52+
)
53+
if err != nil {
54+
panic(err)
55+
}
56+
57+
go func() {
58+
fmt.Println("Server starting on :8080 with flow backpressure (remove read events) enabled")
59+
if err := server.ListenAndServe(":8080"); err != nil {
60+
panic(err)
61+
}
62+
}()
63+
64+
// 保持服务器运行
65+
time.Sleep(time.Hour)
66+
}

multi_event_loops.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ func (e *MultiEventLoop) ListenAndServe(addr string) error {
137137
c2 := newConn(fd, &safeConns, e.localTask,
138138
e.options.taskType,
139139
e.eventLoops[index],
140-
e.options.eventLoopReadBufferSize)
140+
e.options.eventLoopReadBufferSize,
141+
e.options.flowBackPressureRemoveRead)
141142
safeConns.Add(fd, c2)
142143
e.options.callback.OnOpen(c2)
143144
err = e.eventLoops[index].AddRead(fd)

multi_event_loops_options.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func WithFlowBackPressure(enable bool) func(*Options) {
100100
}
101101

102102
// 设置流量背压机制,当连接的写缓冲区满了,会移除读事件,直到写缓冲区有空闲空间
103+
// 第二种背压机制会比第一种背压机制更高效, 7945hx cpu上,第二种是3.4GB/s的读写 第一种是3.0GB/s的读写
103104
func WithFlowBackPressureRemoveRead(enable bool) func(*Options) {
104105
return func(o *Options) {
105106
o.flowBackPressureRemoveRead = enable

0 commit comments

Comments
 (0)