forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreverse_dns.go
109 lines (96 loc) · 2.7 KB
/
reverse_dns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//go:generate ../../../tools/readme_config_includer/generator
package reverse_dns
import (
_ "embed"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/parallel"
"github.com/influxdata/telegraf/plugins/processors"
)
//go:embed sample.conf
var sampleConfig string
type lookupEntry struct {
Tag string `toml:"tag"`
Field string `toml:"field"`
Dest string `toml:"dest"`
}
type ReverseDNS struct {
reverseDNSCache *ReverseDNSCache
acc telegraf.Accumulator
parallel parallel.Parallel
Lookups []lookupEntry `toml:"lookup"`
CacheTTL config.Duration `toml:"cache_ttl"`
LookupTimeout config.Duration `toml:"lookup_timeout"`
MaxParallelLookups int `toml:"max_parallel_lookups"`
Ordered bool `toml:"ordered"`
Log telegraf.Logger `toml:"-"`
}
func (*ReverseDNS) SampleConfig() string {
return sampleConfig
}
func (r *ReverseDNS) Start(acc telegraf.Accumulator) error {
r.acc = acc
r.reverseDNSCache = NewReverseDNSCache(
time.Duration(r.CacheTTL),
time.Duration(r.LookupTimeout),
r.MaxParallelLookups, // max parallel reverse-dns lookups
)
if r.Ordered {
r.parallel = parallel.NewOrdered(acc, r.asyncAdd, 10000, r.MaxParallelLookups)
} else {
r.parallel = parallel.NewUnordered(acc, r.asyncAdd, r.MaxParallelLookups)
}
return nil
}
func (r *ReverseDNS) Stop() {
r.parallel.Stop()
r.reverseDNSCache.Stop()
}
func (r *ReverseDNS) Add(metric telegraf.Metric, _ telegraf.Accumulator) error {
r.parallel.Enqueue(metric)
return nil
}
func (r *ReverseDNS) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
for _, lookup := range r.Lookups {
if len(lookup.Field) > 0 {
if ipField, ok := metric.GetField(lookup.Field); ok {
if ip, ok := ipField.(string); ok {
result, err := r.reverseDNSCache.Lookup(ip)
if err != nil {
r.Log.Errorf("lookup error: %v", err)
continue
}
if len(result) > 0 {
metric.AddField(lookup.Dest, result[0])
}
}
}
}
if len(lookup.Tag) > 0 {
if ipTag, ok := metric.GetTag(lookup.Tag); ok {
result, err := r.reverseDNSCache.Lookup(ipTag)
if err != nil {
r.Log.Errorf("lookup error: %v", err)
continue
}
if len(result) > 0 {
metric.AddTag(lookup.Dest, result[0])
}
}
}
}
return []telegraf.Metric{metric}
}
func init() {
processors.AddStreaming("reverse_dns", func() telegraf.StreamingProcessor {
return newReverseDNS()
})
}
func newReverseDNS() *ReverseDNS {
return &ReverseDNS{
CacheTTL: config.Duration(24 * time.Hour),
LookupTimeout: config.Duration(1 * time.Minute),
MaxParallelLookups: 10,
}
}