-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathresend.go
149 lines (115 loc) · 3.11 KB
/
resend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package otr3
import (
"sync"
"time"
)
const resendInterval = 60 * time.Second
type retransmitFlag int
var defaultResentPrefix = []byte("[resent] ")
const (
noRetransmit retransmitFlag = iota
retransmitWithPrefix
retransmitExact
)
type messageToResend struct {
m MessagePlaintext
opaque []interface{}
}
type resendContext struct {
mayRetransmit retransmitFlag
messageTransform func([]byte) []byte
retransmitting bool
messages struct {
m []messageToResend
sync.RWMutex
}
}
func (r *resendContext) later(msg MessagePlaintext, opaque ...interface{}) {
if r.retransmitting {
return
}
r.messages.Lock()
defer r.messages.Unlock()
if r.messages.m == nil {
r.messages.m = make([]messageToResend, 0, 5)
}
r.messages.m = append(r.messages.m, messageToResend{makeCopy(msg), opaque})
}
func (r *resendContext) pending() []messageToResend {
r.messages.RLock()
defer r.messages.RUnlock()
ret := make([]messageToResend, len(r.messages.m))
copy(ret, r.messages.m)
return ret
}
func (r *resendContext) clear() {
r.messages.Lock()
defer r.messages.Unlock()
r.messages.m = nil
}
func (r *resendContext) shouldRetransmit() bool {
return len(r.messages.m) > 0 && r.mayRetransmit != noRetransmit
}
func (r *resendContext) startRetransmitting() {
r.retransmitting = true
}
func (r *resendContext) endRetransmitting() {
r.retransmitting = false
}
func defaultResendMessageTransform(msg []byte) []byte {
return append(defaultResentPrefix, msg...)
}
func (c *Conversation) resendMessageTransformer() func([]byte) []byte {
if c.resend.messageTransform == nil {
return defaultResendMessageTransform
}
return c.resend.messageTransform
}
func (c *Conversation) lastMessage(msg MessagePlaintext, opaque ...interface{}) {
c.resend.later(msg, opaque...)
}
func (c *Conversation) updateMayRetransmitTo(f retransmitFlag) {
c.resend.mayRetransmit = f
}
func (c *Conversation) shouldRetransmit() bool {
return c.resend.shouldRetransmit() &&
c.heartbeat.lastSent.After(time.Now().Add(-resendInterval))
}
func (c *Conversation) maybeRetransmit() ([]messageWithHeader, error) {
if !c.shouldRetransmit() {
return nil, nil
}
return c.retransmit()
}
func (c *Conversation) retransmit() ([]messageWithHeader, error) {
msgs := c.resend.pending()
c.resend.clear()
ret := make([]messageWithHeader, 0, len(msgs))
resending := c.resend.mayRetransmit == retransmitWithPrefix
c.resend.startRetransmitting()
defer c.resend.endRetransmitting()
for _, msgx := range msgs {
msg := msgx.m
if resending {
msg = c.resendMessageTransformer()(msg)
}
dataMsg, _, err := c.genDataMsg(msg)
if err != nil {
return nil, err
}
// It is actually safe to ignore this error, since the only possible error
// here is a problem with generating the instance tags for the message header,
// which we already do once in genDataMsg
toSend, _ := c.wrapMessageHeader(msgTypeData, dataMsg.serialize(c.version))
ret = append(ret, toSend)
}
ev := MessageEventMessageSent
if resending {
ev = MessageEventMessageResent
}
for _, msgx := range msgs {
c.messageEvent(ev, msgx.opaque...)
}
c.updateLastSent()
return ret, nil
}