diff --git a/gohangout.go b/gohangout.go index 436759a..03bc40c 100644 --- a/gohangout.go +++ b/gohangout.go @@ -114,15 +114,14 @@ func reload() { klog.Errorf("could not parse config, ignore reload: %v", err) return } + klog.Info("stop old inputs") + inputs.stop() + boxes, err := buildPluginLink(gohangoutConfig) if err != nil { klog.Errorf("build plugin link error, ignore reload: %v", err) return } - - klog.Info("stop old inputs") - inputs.stop() - inputs = gohangoutInputs(boxes) klog.Info("start new inputs") go inputs.start() diff --git a/input/input_box.go b/input/input_box.go index 65bf60d..33a24e9 100644 --- a/input/input_box.go +++ b/input/input_box.go @@ -157,6 +157,6 @@ func (box *InputBox) shutdown() { // Shutdown shutdowns the inputs and outputs func (box *InputBox) Shutdown() { - box.shutdown() box.stop = true + box.shutdown() } diff --git a/input/tcp_input.go b/input/tcp_input.go index e077d54..d342606 100644 --- a/input/tcp_input.go +++ b/input/tcp_input.go @@ -19,6 +19,8 @@ type TCPInput struct { l net.Listener messages chan []byte stop bool + + connections []net.Conn } func readLine(scanner *bufio.Scanner, c net.Conn, messages chan<- []byte) { @@ -86,6 +88,7 @@ func newTCPInput(config map[interface{}]interface{}) topology.Input { } klog.Error(err) } else { + p.connections = append(p.connections, conn) scanner := bufio.NewScanner(conn) if v, ok := config["max_length"]; ok { max := v.(int) @@ -109,5 +112,8 @@ func (p *TCPInput) ReadOneEvent() map[string]interface{} { func (p *TCPInput) Shutdown() { p.stop = true p.l.Close() + for _, conn := range p.connections { + conn.Close() + } close(p.messages) }