Skip to content

Commit

Permalink
Add support for stopping multicast receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
g3force committed Feb 2, 2021
1 parent d0adb20 commit 1d370ff
Showing 1 changed file with 33 additions and 4 deletions.
37 changes: 33 additions & 4 deletions pkg/sslnet/multicast_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const maxDatagramSize = 8192

type MulticastReceiver struct {
activeIfis map[string]bool
connections []*net.UDPConn
running bool
consumer func([]byte)
mutex sync.Mutex
SkipInterfaces []string
Expand All @@ -24,11 +26,23 @@ func NewMulticastReceiver(consumer func([]byte)) (r *MulticastReceiver) {
}

func (r *MulticastReceiver) Start(multicastAddress string) {
go r.Receive(multicastAddress)
r.running = true
go r.receive(multicastAddress)
}

func (r *MulticastReceiver) Receive(multicastAddress string) {
for {
func (r *MulticastReceiver) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
r.running = false
for _, c := range r.connections {
if err := c.Close(); err != nil {
log.Println("Could not close connection: ", err)
}
}
}

func (r *MulticastReceiver) receive(multicastAddress string) {
for r.isRunning() {
ifis, _ := net.Interfaces()
for _, ifi := range ifis {
if ifi.Flags&net.FlagMulticast == 0 || // No multicast support
Expand All @@ -46,6 +60,12 @@ func (r *MulticastReceiver) Receive(multicastAddress string) {
}
}

func (r *MulticastReceiver) isRunning() bool {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.running
}

func (r *MulticastReceiver) skipInterface(ifiName string) bool {
for _, skipIfi := range r.SkipInterfaces {
if skipIfi == ifiName {
Expand All @@ -67,13 +87,14 @@ func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net.
log.Printf("Could not listen at %v: %v", multicastAddress, err)
return
}

if err := listener.SetReadBuffer(maxDatagramSize); err != nil {
log.Println("Could not set read buffer: ", err)
}

r.mutex.Lock()
r.connections = append(r.connections, listener)
r.activeIfis[ifi.Name] = true
defer delete(r.activeIfis, ifi.Name)
r.mutex.Unlock()

log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name)
Expand All @@ -94,4 +115,12 @@ func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net.
if err := listener.Close(); err != nil {
log.Println("Could not close listener: ", err)
}
r.mutex.Lock()
delete(r.activeIfis, ifi.Name)
for i, c := range r.connections {
if c == listener {
r.connections = append(r.connections[:i], r.connections[i+1:]...)
}
}
r.mutex.Unlock()
}

0 comments on commit 1d370ff

Please sign in to comment.