-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
347 lines (287 loc) · 7.69 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package mp
import (
"errors"
"io"
"net"
"strings"
"sync"
)
const (
errStringAuthDenied = "authentication didn't recognize name+secret pair"
errStringBadName = "client name is invalid"
)
func isClientNameValid(name string) bool {
return len(name) > 0 && !strings.ContainsRune(name, ':')
}
type serverClient struct {
conn io.ReadWriteCloser
name string
server *Server
translator MessageTranslator
writeLock sync.Mutex
authed bool
}
func newUnauthedServerClient(c io.ReadWriteCloser, s *Server, t MessageTranslator) *serverClient {
return &serverClient{
conn: c,
server: s,
translator: t,
}
}
func (s *serverClient) Authenticate() (name string, err error) {
msg, err := s.translator.ReadMessage()
if err != nil {
return
}
if msg.Meta != MetaAuth {
err = errors.New("expected MetaAuth as first message")
return
}
name = msg.OtherClient
if !isClientNameValid(name) {
err = errors.New(errStringBadName)
return
}
secret := msg.Data
ok := s.server.auth(name, secret)
if !ok {
err = errors.New(errStringAuthDenied)
return
}
s.authed = true
return
}
func (s *serverClient) Close() error {
s.conn.Close()
s.server.removeClientAndNotify(s)
return nil
}
func (s *serverClient) Run() error {
if !s.authed {
panic("Trying to run an unauthed serverClient")
}
server := s.server
for {
msg, err := s.translator.ReadMessage()
if err != nil {
return err
}
other, ok := server.findClient(msg.OtherClient)
if ok {
msg.OtherClient = s.name
err = other.WriteMessage(msg)
ok = err == nil
}
// Please note that `ok` is set above. I just didn't want to duplicate
// error handling code.
if !ok {
msg.Meta = MetaNoSuchConnection
err = s.translator.WriteMessage(msg)
if err != nil {
return err
}
}
}
// Compat with earlier versions of Go
return errors.New("Internal error: Run exited prematurely")
}
func (s *serverClient) AuthenticateAndRun() error {
defer s.Close()
name, err := s.Authenticate()
if err != nil {
s.conn.Close()
return err
}
s.name = name
if !s.server.addClient(s) {
s.conn.Close()
return errors.New("error adding new client")
}
err = s.WriteMessage(&Message{
Meta: MetaAuthOk,
OtherClient: s.name,
})
if err != nil {
return err
}
return s.Run()
}
func (s *serverClient) WriteMessage(msg *Message) error {
s.writeLock.Lock()
defer s.writeLock.Unlock()
return s.translator.WriteMessage(msg)
}
// Authenticator is a function that returns whether or not the given name/secret
// pair is valid, and that the sender of it should be allowed to connect to the
// server.
//
// `secret` may be nil.
type Authenticator func(name string, secret []byte) bool
// TranslatorMaker is a function that, given a Reader and Writer, returns a
// MessageTranslator that reads from and writes to the given Reader and Writer.
type TranslatorMaker func(io.Reader, io.Writer) MessageTranslator
// Server is an implementation of a MessagePassing Server.
// (Golint made me do this)
type Server struct {
listener io.Closer
auth Authenticator
translatorMaker TranslatorMaker
clients map[string]*serverClient
closingClients map[string]*sync.WaitGroup // see [1]
clientsLock sync.Mutex
closed bool
}
// [1] -- There's a race when one client disconnects and another connects
// immediately (with the same name). The easiest way to solve this is to wait
// for the client closed message to be fully broadcast before the new client
// can receive connections.
// NewServer creates a new Server instance that uses the given Authenticator
// and TranslatorMaker
func NewServer(auth Authenticator, maker TranslatorMaker) *Server {
return &Server{
translatorMaker: maker,
auth: auth,
clients: make(map[string]*serverClient),
closingClients: make(map[string]*sync.WaitGroup),
}
}
func (s *Server) addClient(client *serverClient) bool {
// Adding is harder than it needs to be; 3 cases need to be considered:
// Client in clients queue? Close that + try again.
// Client in closingClients queue? Wait for that to finish closing + try again.
// Else, just add it.
// Better yet, all of this needs to happen as one single atomic operation. :)
name := client.name
ok := true
// Can't defer unlock because the lock needs to be released for running.Close() and
// closing.Wait(). If either of those panics, the clientsLock may be Unlocked when it was
// already Unlocked.
s.clientsLock.Lock()
for {
if s.closed {
ok = false
break
}
if running, ok := s.clients[name]; ok {
s.clientsLock.Unlock()
running.Close()
s.clientsLock.Lock()
continue
}
if closing, ok := s.closingClients[name]; ok {
s.clientsLock.Unlock()
closing.Wait()
s.clientsLock.Lock()
continue
}
s.clients[name] = client
break
}
s.clientsLock.Unlock()
return ok
}
func (s *Server) findClient(name string) (*serverClient, bool) {
s.clientsLock.Lock()
sc, ok := s.clients[name]
s.clientsLock.Unlock()
return sc, ok
}
// Puts a client in the 'closing' queue. The given WaitGroup needs
// Done to be called once on it.
func (s *Server) queueClientForClose(client *serverClient) (*sync.WaitGroup, bool) {
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
name := client.name
c, ok := s.clients[name]
if !ok || c != client {
return nil, false
}
wg := new(sync.WaitGroup)
wg.Add(1)
if _, ok := s.closingClients[name]; ok {
panic("Somehow we have two clients with the same name closing simultaneously")
}
s.closingClients[name] = wg
return wg, true
}
func (s *Server) removeClientFromClosingMap(c *serverClient) {
name := c.name
s.clientsLock.Lock()
wg, ok := s.closingClients[name]
if ok {
delete(s.closingClients, name)
}
s.clientsLock.Unlock()
if ok {
wg.Done()
}
}
func (s *Server) removeClientAndNotify(c *serverClient) {
_, ok := s.queueClientForClose(c)
if !ok {
return
}
defer s.removeClientFromClosingMap(c)
msg := Message{
Meta: MetaClientClosed,
OtherClient: c.name,
}
s.broadcastMessage(&msg)
}
// Gets a snapshot of all of the clients that are currently connected to
// the server.
func (s *Server) snapshotClients() []*serverClient {
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
snapshot := make([]*serverClient, len(s.clients))
i := 0
for _, c := range s.clients {
snapshot[i] = c
i++
}
return snapshot
}
// This will broadcast a message to all of the clients that are currently
// connected to the server. If a client connects in the middle of the broadcast,
// then it will *NOT* receive the message.
//
// Additionally, if the server has been closed, then the message will just be
// dropped, because there are no clients to send to.
func (s *Server) broadcastMessage(m *Message) {
clients := s.snapshotClients()
for _, c := range clients {
c.WriteMessage(m)
}
}
// Close closes and shuts down the server. It will cause Listen() on the
// current instance to exit, and will sever the server's connection to all
// Clients.
func (s *Server) Close() {
s.listener.Close()
s.clientsLock.Lock()
clients := s.clients
s.clients = make(map[string]*serverClient)
s.closed = true
s.clientsLock.Unlock()
for _, client := range clients {
client.Close()
}
}
// Listen listens for new Client connections to the Server. When Listen exits,
// the Server will continue to serve Clients that are already connected.
func (s *Server) Listen(l net.Listener) error {
s.listener = l
// Close the listener, not the server. The server can operate perfectly fine
// without the ability to accept new clients
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
return err
}
nc := newUnauthedServerClient(conn, s, s.translatorMaker(conn, conn))
go nc.AuthenticateAndRun()
}
// compat with Go 1.0
panic(unreachableCode)
}