Skip to content

Commit

Permalink
inlet/bmp: initial support for BMP protocol
Browse files Browse the repository at this point in the history
At first, there was a tentative to use BMP collector implementation
from bio-rd. However, this current implementation is using GoBGP
instead:

- BMP is very simple from a protocol point of view. The hard work is
  mostly around decoding. Both bio-rd and GoBGP can decode, but for
  testing, GoBGP is able to generate messages as well (this is its
  primary purpose, I suppose parsing was done for testing purpose).
  Using only one library is always better. An alternative would be
  GoBMP, but it also only do parsing.
- Logging and metrics can be customized easily (but the work was done
  for bio-rd, so not a real argument).
- bio-rd is an application and there is no API stability (and I did
  that too)
- GoBGP supports FlowSpec, which may be useful in the future for the
  DDoS part. Again, one library for everything is better (but
  honestly, GoBGP as a lib is not the best part of it, maybe
  github.com/jwhited/corebgp would be a better fit while keeping GoBGP
  for decoding/encoding).

There was a huge effort around having a RIB which is efficient
memory-wise (data are interned to save memory), performant during
reads, while being decent during insertions. We rely on a patched
version of Kentik's Patricia trees to be able to apply mutations to
the tree.

There was several tentatives to implement some kind of graceful
restart, but ultimetaly, the design is kept simple: when a BMP
connection goes down, routes will be removed after a configurable
time. If the connection comes back up, then it is just considered new.
It would have been ideal to rely on EoR markers, but the RFC is
unclear about them, and they are likely to be per peer, making it
difficult to know what to do if one peer is back, but not the other.

Remaining tasks:

- [ ] Confirm support for LocRIB
- [ ] Import data in ClickHouse
- [ ] Make data available in the frontend

Fix #52
  • Loading branch information
vincentbernat committed Sep 26, 2022
1 parent b1478eb commit c769bb5
Show file tree
Hide file tree
Showing 66 changed files with 4,075 additions and 94 deletions.
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ inlet/flow/decoder/flow-ANY.pb.go: inlet/flow/decoder/flow-$(FLOW_VERSION).pb.go
done
inlet/flow/decoder/flow-$(FLOW_VERSION).pb.go: inlet/flow/data/schemas/flow-$(FLOW_VERSION).proto | $(PROTOC_GEN_GO) ; $(info $(M) compiling protocol buffers definition…)
$Q $(PROTOC) -I=. --plugin=$(PROTOC_GEN_GO) --go_out=module=$(MODULE):. $<
$Q sed -i.bkp s/FlowMessagev./FlowMessage/g $@ && rm $@.bkp
$Q sed -i.bkp s/v$(FLOW_VERSION)//g $@ && rm $@.bkp

common/clickhousedb/mocks/mock_driver.go: $(MOCKGEN) ; $(info $(M) generate mocks for ClickHouse driver…)
$Q echo '//go:build !release' > $@
Expand Down Expand Up @@ -125,9 +125,8 @@ changelog.md: docs/99-changelog.md # To be used by GitHub actions only.

# Tests

TEST_TARGETS := test-bench test-short test-verbose test-race
TEST_TARGETS := test-short test-verbose test-race
.PHONY: $(TEST_TARGETS) check test tests
test-bench: ARGS=-run=__absolutelynothing__ -bench=. ## Run benchmarks
test-short: ARGS=-short ## Run only short tests
test-verbose: ARGS=-v ## Run tests in verbose mode with coverage reporting
test-race: CGO_ENABLED=1 ARGS=-race ## Run tests with race detector
Expand All @@ -138,6 +137,9 @@ check test tests: fmt lint $(GENERATED) | $(GOTESTSUM) ; $(info $(M) running $(N
$Q $(GOTESTSUM) --junitfile test/tests.xml -- \
-timeout $(TIMEOUT)s \
$(ARGS) $(PKGS)
.PHONY: test-bench
test-bench: $(GENERATED) ; $(info $(M) running benchmarks…) @ ## Run benchmarks
$Q $(GOTESTSUM) -f standard-quiet -- --timeout $(TIMEOUT)s -run=__absolutelynothing__ -bench=. $(PKGS)

COVERAGE_MODE = atomic
.PHONY: test-coverage test-coverage-xml test-coverage-lcov
Expand Down
35 changes: 35 additions & 0 deletions akvorado.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,35 @@ demo-exporter:
20: "core"
21: "core"
listen: 0.0.0.0:161
bmp: &bmp
target: akvorado-inlet:10179
routes:
- prefixes: 192.0.2.0/24,2a01:db8:cafe:1::/64
aspath: 64501
communities: 65401:10,65401:12
- prefixes: 203.0.113.0/24,2a01:db8:cafe:2::/64
aspath: 65401
communities: 65401:10,65401:13
- prefixes: 216.58.206.0/24,2a00:1450:4007:807::2000/124
aspath: 174,1299,15169
- prefixes: 179.60.192.0/24,2a03:2880:f130:83:face:b00c:0::/112
aspath: 1299,1299,32934
- prefixes: 198.38.120.0/23,2a00:86c0:115:115::/112
aspath: 5511,1299,1299,32934
- prefixes: 23.33.27.0/24,2a02:26f0:9100:28:0:17c0::/112
aspath: 174,174,174,20940
- prefixes: 52.84.175.0/24,2600:9000:218d:4a00:15:74db::/112
aspath: 16509
- prefixes: 199.232.178.0/29,2a04:4e42:1d::/126
aspath: 1299,54113
- prefixes: 52.223.202.128/27
aspath: 16509,46489
- prefixes: 138.231.0.0/16
aspath: 1299,174,2269,2269
- prefixes: 0.0.0.0/0
aspath: 174
- prefixes: ::/0
aspath: 1299
flows: &flows1
samplingrate: 50000
target: akvorado-inlet:2055
Expand Down Expand Up @@ -351,6 +380,8 @@ demo-exporter:
20: "core"
21: "core"
listen: 0.0.0.0:161
bmp:
<<: *bmp
flows:
<<: *flows1
seed: 100
Expand All @@ -362,6 +393,8 @@ demo-exporter:
20: "core"
21: "core"
listen: 0.0.0.0:161
bmp:
<<: *bmp
flows:
<<: *flows1
seed: 200
Expand All @@ -373,6 +406,8 @@ demo-exporter:
20: "core"
21: "core"
listen: 0.0.0.0:161
bmp:
<<: *bmp
flows:
<<: *flows1
seed: 300
12 changes: 12 additions & 0 deletions cmd/demo-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/demoexporter"
"akvorado/demoexporter/bmp"
"akvorado/demoexporter/flows"
"akvorado/demoexporter/snmp"
)
Expand All @@ -22,6 +23,7 @@ type DemoExporterConfiguration struct {
HTTP http.Configuration
DemoExporter demoexporter.Configuration `mapstructure:",squash" yaml:",inline"`
SNMP snmp.Configuration
BMP bmp.Configuration
Flows flows.Configuration
}

Expand All @@ -31,6 +33,9 @@ func (c *DemoExporterConfiguration) Reset() {
HTTP: http.DefaultConfiguration(),
Reporting: reporter.DefaultConfiguration(),
DemoExporter: demoexporter.DefaultConfiguration(),
SNMP: snmp.DefaultConfiguration(),
BMP: bmp.DefaultConfiguration(),
Flows: flows.DefaultConfiguration(),
}
}

Expand Down Expand Up @@ -89,6 +94,12 @@ func demoExporterStart(r *reporter.Reporter, config DemoExporterConfiguration, c
if err != nil {
return fmt.Errorf("unable to initialize SNMP component: %w", err)
}
bmpComponent, err := bmp.New(r, config.BMP, bmp.Dependencies{
Daemon: daemonComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize BMP component: %w", err)
}
flowsComponent, err := flows.New(r, config.Flows, flows.Dependencies{
Daemon: daemonComponent,
})
Expand Down Expand Up @@ -116,6 +127,7 @@ func demoExporterStart(r *reporter.Reporter, config DemoExporterConfiguration, c
components := []interface{}{
httpComponent,
snmpComponent,
bmpComponent,
flowsComponent,
demoExporterComponent,
}
Expand Down
13 changes: 12 additions & 1 deletion cmd/inlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"akvorado/common/daemon"
"akvorado/common/http"
"akvorado/common/reporter"
"akvorado/inlet/bmp"
"akvorado/inlet/core"
"akvorado/inlet/flow"
"akvorado/inlet/geoip"
Expand All @@ -24,6 +25,7 @@ type InletConfiguration struct {
HTTP http.Configuration
Flow flow.Configuration
SNMP snmp.Configuration
BMP bmp.Configuration
GeoIP geoip.Configuration
Kafka kafka.Configuration
Core core.Configuration
Expand All @@ -36,6 +38,7 @@ func (c *InletConfiguration) Reset() {
Reporting: reporter.DefaultConfiguration(),
Flow: flow.DefaultConfiguration(),
SNMP: snmp.DefaultConfiguration(),
BMP: bmp.DefaultConfiguration(),
GeoIP: geoip.DefaultConfiguration(),
Kafka: kafka.DefaultConfiguration(),
Core: core.DefaultConfiguration(),
Expand Down Expand Up @@ -105,6 +108,12 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
if err != nil {
return fmt.Errorf("unable to initialize SNMP component: %w", err)
}
bmpComponent, err := bmp.New(r, config.BMP, bmp.Dependencies{
Daemon: daemonComponent,
})
if err != nil {
return fmt.Errorf("unable to initialize BMP component: %w", err)
}
geoipComponent, err := geoip.New(r, config.GeoIP, geoip.Dependencies{
Daemon: daemonComponent,
})
Expand All @@ -120,7 +129,8 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
coreComponent, err := core.New(r, config.Core, core.Dependencies{
Daemon: daemonComponent,
Flow: flowComponent,
Snmp: snmpComponent,
SNMP: snmpComponent,
BMP: bmpComponent,
GeoIP: geoipComponent,
Kafka: kafkaComponent,
HTTP: httpComponent,
Expand All @@ -142,6 +152,7 @@ func inletStart(r *reporter.Reporter, config InletConfiguration, checkOnly bool)
components := []interface{}{
httpComponent,
snmpComponent,
bmpComponent,
geoipComponent,
kafkaComponent,
coreComponent,
Expand Down
134 changes: 134 additions & 0 deletions common/helpers/intern.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// SPDX-FileCopyrightText: 2022 Free Mobile
// SPDX-License-Identifier: AGPL-3.0-only

package helpers

// InternValue is the interface that should be implemented by types
// used in an intern pool. Also, it should be immutable.
type InternValue[T any] interface {
Hash() uint64
Equal(T) bool
}

// InternReference is a reference to an interned value. 0 is not a
// valid reference value.
type InternReference[T any] uint32

// InternPool keeps values in a pool by storing only one distinct copy
// of each. Values will be referred as an uint32 (implemented as an
// index).
type InternPool[T InternValue[T]] struct {
values []internValue[T]
availableIndexes []InternReference[T]
valueIndexes map[uint64]InternReference[T]
}

// internValue is the value stored in an intern pool. It adds resource
// keeping to the raw value.
type internValue[T InternValue[T]] struct {
next InternReference[T] // next value with the same hash
previous InternReference[T] // previous value with the same hash
refCount uint32

value T
}

// NewInternPool creates a new intern pool.
func NewInternPool[T InternValue[T]]() *InternPool[T] {
return &InternPool[T]{
values: make([]internValue[T], 1), // first slot is reserved
availableIndexes: make([]InternReference[T], 0),
valueIndexes: make(map[uint64]InternReference[T]),
}
}

// Get retrieves a (copy of the) value from the intern pool using its reference.
func (p *InternPool[T]) Get(ref InternReference[T]) T {
return p.values[ref].value
}

// Take removes a value from the intern pool. If this is the last
// used reference, it will be deleted from the pool.
func (p *InternPool[T]) Take(ref InternReference[T]) {
value := &p.values[ref]
value.refCount--
if value.refCount == 0 {
p.availableIndexes = append(p.availableIndexes, ref)
if value.previous > 0 {
// Not the first one, link previous to next
p.values[value.previous].next = value.next
p.values[value.next].previous = value.previous
return
}
hash := value.value.Hash()
if value.next > 0 {
// We are the first one of a chain, move the pointer to the next one
p.valueIndexes[hash] = value.next
p.values[value.next].previous = 0
return
}
// Last case, we are the last one, let's find our hash and delete us from here
delete(p.valueIndexes, hash)
}
}

// Put adds a value to the intern pool, returning its reference.
func (p *InternPool[T]) Put(value T) InternReference[T] {
v := internValue[T]{
value: value,
refCount: 1,
previous: 0,
next: 0,
}

// Allocate a new index
newIndex := func() InternReference[T] {
availCount := len(p.availableIndexes)
if availCount > 0 {
index := p.availableIndexes[availCount-1]
p.availableIndexes = p.availableIndexes[:availCount-1]
return index
}
if len(p.values) == cap(p.values) {
// We need to extend capacity first
temp := make([]internValue[T], len(p.values), (cap(p.values)+1)*2)
copy(temp, p.values)
p.values = temp
}
index := len(p.values)
p.values = p.values[:index+1]
return InternReference[T](index)
}

// Check if we have already something
hash := value.Hash()
if index := p.valueIndexes[hash]; index > 0 {
prevIndex := index
for index > 0 {
if p.values[index].value.Equal(value) {
p.values[index].refCount++
return index
}
prevIndex = index
index = p.values[index].next
}

// We have a collision, add to the chain
index = newIndex()
v.previous = prevIndex
p.values[prevIndex].next = index
p.values[index] = v
return index
}

// Add a new one
index := newIndex()
p.values[index] = v
p.valueIndexes[hash] = index
return index
}

// Len returns the number of elements in the pool.
func (p *InternPool[T]) Len() int {
return len(p.values) - len(p.availableIndexes) - 1
}
Loading

0 comments on commit c769bb5

Please sign in to comment.