From 1ca278b861f1f13477a121c82c9071f027a1741f Mon Sep 17 00:00:00 2001 From: Yorling Date: Sun, 3 Jan 2021 21:22:47 +0800 Subject: [PATCH] feat: add metric for packet sending Now service will send data points when sending packets, so we can calculate packet drop ratio for each target host even if no packets are received. --- icmp/pinger.go | 42 ++++++++++++++++++++++++++++++++++++------ loop/loop.go | 43 +++++++++++++++++++++++++------------------ main.go | 5 ++++- 3 files changed, 65 insertions(+), 25 deletions(-) diff --git a/icmp/pinger.go b/icmp/pinger.go index b06fbdc..9e6cb7c 100644 --- a/icmp/pinger.go +++ b/icmp/pinger.go @@ -13,6 +13,7 @@ import ( "github.com/shallowclouds/omil/metric" ) +// Monitor sends and receives ICMP packet to the specified host. type Monitor struct { from, to string host string @@ -22,6 +23,13 @@ type Monitor struct { timeout time.Duration } +// NewMonitor creates a ICMP network monitor. +// `host` is the target hostname need to test. +// `from` is the name of this server, used as metric tag `from`. +// `to` is the name of target host, used as metric tag `to`. +// `interval` specifies the time interval to send ICMP packets. +// `timeout` specifies the time to end the loop, 0 for infinite. +// `client` is the metric client to send data points. func NewMonitor(host, from, to string, interval, timeout time.Duration, client metric.Client) (*Monitor, error) { var err error if from == "" { @@ -40,7 +48,7 @@ func NewMonitor(host, from, to string, interval, timeout time.Duration, client m } if client == nil { - return nil, fmt.Errorf("nil metric client") + return nil, errors.New("nil metric client") } return &Monitor{ @@ -53,21 +61,25 @@ func NewMonitor(host, from, to string, interval, timeout time.Duration, client m }, nil } -func (m *Monitor) Start(ctx context.Context) error { +// Start starts the loop to test network and send data points. +// +// Return an error if loop failed. +func (m *Monitor) Start(_ context.Context) error { pinger, err := ping.NewPinger(m.host) if err != nil { - return errors.Wrap(err, "failed to create pinger") + return errors.WithMessage(err, "failed to create pinger") } if m.timeout != 0 { pinger.Timeout = m.timeout } pinger.Interval = m.interval - // true for ICMP + // True for ICMP pinger.SetPrivileged(true) pinger.Size = 64 - // use startTime to calculate packet sent time: startTime + interval * packet_sequence_number + // Use startTime to calculate packet sent time: startTime + interval * packet_sequence_number, + // as will cant put the send time in the ICMP packet data at present. // TODO: use a more accurate and graceful way var startTime time.Time @@ -90,9 +102,27 @@ func (m *Monitor) Start(ctx context.Context) error { m.pinger = pinger + sendTicker := time.NewTicker(m.interval) + defer sendTicker.Stop() + startTime = time.Now() + go func() { + // Metrics for sending packets. + for t := range sendTicker.C { + if t.IsZero() { + break + } + m.client.Metric("ICMP", t, map[string]string{ + "from": m.from, + "to": m.to, + "host": m.host, + }, map[string]interface{}{ + "sent": 1, + }) + } + }() if err := pinger.Run(); err != nil { - return errors.Wrap(err, "failed to run pinger") + return errors.WithMessage(err, "failed to run pinger") } return nil diff --git a/loop/loop.go b/loop/loop.go index bea3a50..7a533f0 100644 --- a/loop/loop.go +++ b/loop/loop.go @@ -34,35 +34,42 @@ func Loop(ctx context.Context, monitors []*icmp.Monitor) (err error) { cancel() }() + restart := true + var mu sync.RWMutex + go func() { + <-ctx.Done() + mu.Lock() + restart = false + mu.Unlock() + for _, monitor := range monitors { + logrus.Infof("stopping monitor %s", monitor.Name()) + if err := monitor.Stop(); err != nil { + logrus.WithError(err).Error("failed to stop monitor") + } + } + }() for _, monitor := range monitors { wg.Add(1) - monitor := monitor + m := monitor go func() { - restart := true - var mu sync.RWMutex - go func() { - <-ctx.Done() - logrus.Infof("stopping monitor %s", monitor.Name()) - if err := monitor.Stop(); err != nil { - logrus.WithError(err).Error("failed to stop monitor") - } - mu.Lock() - restart = false - mu.Unlock() - }() for { - if err := monitor.Start(ctx); err != nil { - logrus.WithError(err).Error("failed to run monitor") - } mu.RLock() if !restart { - logrus.Infof("exiting monitor %s", monitor.Name()) + logrus.Infof("exiting monitor %s", m.Name()) wg.Done() break } mu.RUnlock() + if err := m.Start(ctx); err != nil { + logrus.WithError(err).Error("failed to run monitor") + } time.Sleep(restartInterval) - logrus.Infof("restarting monitor %s", monitor.Name()) + + mu.RLock() + if restart { + logrus.Infof("restarting monitor %s", m.Name()) + } + mu.RUnlock() } }() } diff --git a/main.go b/main.go index b1fba99..0771ef3 100644 --- a/main.go +++ b/main.go @@ -33,7 +33,10 @@ func mainAction(ctx *cli.Context) error { monitors := make([]*icmp.Monitor, 0, len(conf.Targets)) for _, t := range conf.Targets { - monitor, err := icmp.NewMonitor(t.Host, conf.Hostname, t.Name, time.Second, time.Hour*12, metricClient) + // As pinger stores all packets data in memory, + // so too long timeout may cause high memory usage. + // Just let it stop and restart. + monitor, err := icmp.NewMonitor(t.Host, conf.Hostname, t.Name, time.Second, time.Minute*60, metricClient) if err != nil { logrus.WithError(err).WithFields(logrus.Fields{ "target_host": t.Host,