-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel.go
114 lines (97 loc) · 2.01 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package pubsub
import (
"iter"
"sync"
"github.com/antoniomika/syncmap"
)
type ChannelDirection int
func (d ChannelDirection) String() string {
return [...]string{"input", "output", "inputoutput"}[d]
}
const (
ChannelDirectionInput ChannelDirection = iota
ChannelDirectionOutput
ChannelDirectionInputOutput
)
type ChannelAction int
func (d ChannelAction) String() string {
return [...]string{"data", "close"}[d]
}
const (
ChannelActionData = iota
ChannelActionClose
)
type ChannelMessage struct {
Data []byte
ClientID string
Direction ChannelDirection
Action ChannelAction
}
func NewChannel(topic string) *Channel {
return &Channel{
Topic: topic,
Done: make(chan struct{}),
Data: make(chan ChannelMessage),
Clients: syncmap.New[string, *Client](),
}
}
/*
Channel is a container for a topic. It holds the list of clients and
a data channel to receive a message.
*/
type Channel struct {
Topic string
Done chan struct{}
Data chan ChannelMessage
Clients *syncmap.Map[string, *Client]
handleOnce sync.Once
cleanupOnce sync.Once
}
func (c *Channel) GetClients() iter.Seq2[string, *Client] {
return c.Clients.Range
}
func (c *Channel) Cleanup() {
c.cleanupOnce.Do(func() {
close(c.Done)
})
}
func (c *Channel) Handle() {
c.handleOnce.Do(func() {
go func() {
defer func() {
for _, client := range c.GetClients() {
client.Cleanup()
}
}()
for {
select {
case <-c.Done:
return
case data, ok := <-c.Data:
var wg sync.WaitGroup
for _, client := range c.GetClients() {
if client.Direction == ChannelDirectionInput || (client.ID == data.ClientID && !client.Replay) {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
if !ok {
client.onceData.Do(func() {
close(client.Data)
})
return
}
select {
case client.Data <- data:
case <-client.Done:
case <-c.Done:
}
}()
}
wg.Wait()
}
}
}()
})
}