Skip to content

Commit

Permalink
feat: add metric for packet sending
Browse files Browse the repository at this point in the history
Now service will send data points when sending packets, so we can calculate packet drop ratio for each target host even if no packets are received.
  • Loading branch information
shallowclouds committed Jan 3, 2021
1 parent 24b8020 commit 1ca278b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 25 deletions.
42 changes: 36 additions & 6 deletions icmp/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/shallowclouds/omil/metric"
)

// Monitor sends and receives ICMP packet to the specified host.
type Monitor struct {
from, to string
host string
Expand All @@ -22,6 +23,13 @@ type Monitor struct {
timeout time.Duration
}

// NewMonitor creates a ICMP network monitor.
// `host` is the target hostname need to test.
// `from` is the name of this server, used as metric tag `from`.
// `to` is the name of target host, used as metric tag `to`.
// `interval` specifies the time interval to send ICMP packets.
// `timeout` specifies the time to end the loop, 0 for infinite.
// `client` is the metric client to send data points.
func NewMonitor(host, from, to string, interval, timeout time.Duration, client metric.Client) (*Monitor, error) {
var err error
if from == "" {
Expand All @@ -40,7 +48,7 @@ func NewMonitor(host, from, to string, interval, timeout time.Duration, client m
}

if client == nil {
return nil, fmt.Errorf("nil metric client")
return nil, errors.New("nil metric client")
}

return &Monitor{
Expand All @@ -53,21 +61,25 @@ func NewMonitor(host, from, to string, interval, timeout time.Duration, client m
}, nil
}

func (m *Monitor) Start(ctx context.Context) error {
// Start starts the loop to test network and send data points.
//
// Return an error if loop failed.
func (m *Monitor) Start(_ context.Context) error {
pinger, err := ping.NewPinger(m.host)
if err != nil {
return errors.Wrap(err, "failed to create pinger")
return errors.WithMessage(err, "failed to create pinger")
}

if m.timeout != 0 {
pinger.Timeout = m.timeout
}
pinger.Interval = m.interval
// true for ICMP
// True for ICMP
pinger.SetPrivileged(true)
pinger.Size = 64

// use startTime to calculate packet sent time: startTime + interval * packet_sequence_number
// Use startTime to calculate packet sent time: startTime + interval * packet_sequence_number,
// as will cant put the send time in the ICMP packet data at present.
// TODO: use a more accurate and graceful way
var startTime time.Time

Expand All @@ -90,9 +102,27 @@ func (m *Monitor) Start(ctx context.Context) error {

m.pinger = pinger

sendTicker := time.NewTicker(m.interval)
defer sendTicker.Stop()

startTime = time.Now()
go func() {
// Metrics for sending packets.
for t := range sendTicker.C {
if t.IsZero() {
break
}
m.client.Metric("ICMP", t, map[string]string{
"from": m.from,
"to": m.to,
"host": m.host,
}, map[string]interface{}{
"sent": 1,
})
}
}()
if err := pinger.Run(); err != nil {
return errors.Wrap(err, "failed to run pinger")
return errors.WithMessage(err, "failed to run pinger")
}

return nil
Expand Down
43 changes: 25 additions & 18 deletions loop/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,42 @@ func Loop(ctx context.Context, monitors []*icmp.Monitor) (err error) {
cancel()
}()

restart := true
var mu sync.RWMutex
go func() {
<-ctx.Done()
mu.Lock()
restart = false
mu.Unlock()
for _, monitor := range monitors {
logrus.Infof("stopping monitor %s", monitor.Name())
if err := monitor.Stop(); err != nil {
logrus.WithError(err).Error("failed to stop monitor")
}
}
}()
for _, monitor := range monitors {
wg.Add(1)
monitor := monitor
m := monitor
go func() {
restart := true
var mu sync.RWMutex
go func() {
<-ctx.Done()
logrus.Infof("stopping monitor %s", monitor.Name())
if err := monitor.Stop(); err != nil {
logrus.WithError(err).Error("failed to stop monitor")
}
mu.Lock()
restart = false
mu.Unlock()
}()
for {
if err := monitor.Start(ctx); err != nil {
logrus.WithError(err).Error("failed to run monitor")
}
mu.RLock()
if !restart {
logrus.Infof("exiting monitor %s", monitor.Name())
logrus.Infof("exiting monitor %s", m.Name())
wg.Done()
break
}
mu.RUnlock()
if err := m.Start(ctx); err != nil {
logrus.WithError(err).Error("failed to run monitor")
}
time.Sleep(restartInterval)
logrus.Infof("restarting monitor %s", monitor.Name())

mu.RLock()
if restart {
logrus.Infof("restarting monitor %s", m.Name())
}
mu.RUnlock()
}
}()
}
Expand Down
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func mainAction(ctx *cli.Context) error {

monitors := make([]*icmp.Monitor, 0, len(conf.Targets))
for _, t := range conf.Targets {
monitor, err := icmp.NewMonitor(t.Host, conf.Hostname, t.Name, time.Second, time.Hour*12, metricClient)
// As pinger stores all packets data in memory,
// so too long timeout may cause high memory usage.
// Just let it stop and restart.
monitor, err := icmp.NewMonitor(t.Host, conf.Hostname, t.Name, time.Second, time.Minute*60, metricClient)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"target_host": t.Host,
Expand Down

0 comments on commit 1ca278b

Please sign in to comment.