Skip to content

Commit 12cb418

Browse files
committed
fix: fix some bugs
1 parent 8eccd76 commit 12cb418

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (cli *Client) reconnect(sess *Session) {
7777
addr := sess.RemoteAddr()
7878
policy := cli.opts.ReconnectPolicy
7979

80-
for policy.Retry() && cli.needReconnect(sess) {
80+
for cli.needReconnect(sess) && policy.Retry() {
8181
if s, err := cli.Dial(addr.Network(), addr.String()); err == nil {
8282
*sess = *s // replace old session
8383
return

manager.go

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func NewManager(handler Handler, opts *NewManagerOptions) *Manager {
6868
closed: make(chan struct{}),
6969
closeDone: make(chan struct{}),
7070
}
71+
if opts.KeepaliveTick != 0 {
72+
m.keepaliveTicker = time.NewTicker(opts.KeepaliveTick)
73+
}
7174

7275
return m
7376
}

session.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,17 @@ var idGenerator int64
3535

3636
func NewSession(c Conn, mgr ConnManager, user User, handler Handler) *Session {
3737
sess := &Session{
38-
id: atomic.AddInt64(&idGenerator, 1),
39-
c: c,
40-
mgr: mgr,
41-
user: user,
42-
handler: handler,
43-
dataLock: &sync.RWMutex{},
44-
data: make(map[string]interface{}),
45-
reqLock: &sync.RWMutex{},
46-
requests: make(map[int64]chan Packet),
47-
closed: make(chan struct{}),
38+
id: atomic.AddInt64(&idGenerator, 1),
39+
c: c,
40+
mgr: mgr,
41+
user: user,
42+
handler: handler,
43+
dataLock: &sync.RWMutex{},
44+
data: make(map[string]interface{}),
45+
reqLock: &sync.RWMutex{},
46+
requests: make(map[int64]chan Packet),
47+
closed: make(chan struct{}),
48+
lastPackTs: time.Now(),
4849
}
4950

5051
go sess.readPacket()
@@ -79,7 +80,9 @@ func (s *Session) readPacket() {
7980
if ok {
8081
ch <- packet
8182
close(ch)
82-
continue
83+
s.reqLock.Lock()
84+
delete(s.requests, packet.Id())
85+
s.reqLock.Unlock()
8386
} else {
8487
go s.handler.Handle(packet, s)
8588
}
@@ -135,6 +138,7 @@ func (s *Session) SendRequest(p Packet) (<-chan Packet, error) {
135138
ch := make(chan Packet, 1)
136139
s.reqLock.Lock()
137140
s.requests[p.Id()] = ch
141+
s.reqLock.Unlock()
138142

139143
if err := s.SendPacket(p); err != nil {
140144
return nil, err
@@ -147,6 +151,7 @@ func (s *Session) SendRequestTimeout(p Packet, timeout time.Duration) (Packet, e
147151
ch := make(chan Packet, 1)
148152
s.reqLock.Lock()
149153
s.requests[p.Id()] = ch
154+
s.reqLock.Unlock()
150155

151156
if err := s.SendPacket(p); err != nil {
152157
return nil, err

0 commit comments

Comments
 (0)