Skip to content

Commit ccf9cef

Browse files
committed
fix AddPeer timing issue, bgpserver interface changed.
1 parent 5c81074 commit ccf9cef

File tree

4 files changed

+101
-95
lines changed

4 files changed

+101
-95
lines changed

cmd/db-controller/main.go

+30-34
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,7 @@ func main() {
6060
}
6161
defer lockf.Close()
6262

63-
// for controlling the traffics that they're to the DB server port.
64-
// the function returns nil if the expected chain is already exist.
65-
nftConnect := nftables.NewDefaultConnector(logger)
66-
if err := nftConnect.CreateChain(chainNameForDBAclFlag); err != nil {
67-
panic(err)
68-
}
63+
logger.Info("Hello, Starting db-controller.")
6964

7065
// get my global ip address
7166
myHostAddress, err := getNetIFAddress(globalInterfaceNameFlag)
@@ -74,42 +69,40 @@ func main() {
7469
}
7570
logger.Debug("host address", "address", myHostAddress)
7671

77-
// create context
78-
ctx, cancel := context.WithCancel(context.Background())
79-
defer cancel()
72+
dbReplicaPassword, err := readDBReplicaPassword(dbReplicaPasswordFilePathFlag)
73+
if err != nil {
74+
panic(err)
75+
}
76+
77+
// for controlling the traffics that they're to the DB server port.
78+
// the function returns nil if the expected chain is already exist.
79+
nftConnect := nftables.NewDefaultConnector(logger)
80+
if err := nftConnect.CreateChain(chainNameForDBAclFlag); err != nil {
81+
panic(err)
82+
}
83+
84+
var bgpPeers []bgpserver.Peer
85+
for v := range strings.SplitSeq(bgpPeerAddressesFlag, ",") {
86+
bgpPeers = append(bgpPeers, bgpserver.Peer{
87+
Neighbor: v,
88+
RemoteAS: uint32(bgpAsNumberFlag),
89+
RemotePort: uint32(bgpServingPortFlag),
90+
KeepaliveIntervalSec: uint64(bgpKeepaliveIntervalSecFlag),
91+
})
92+
}
8093

81-
// start bgpserver
82-
logger.Debug("starting bgpserver", "hostaddress", myHostAddress)
8394
bgpServerConnect := bgpserver.NewDefaultConnector(
8495
logger,
8596
bgpserver.WithAsn(uint32(bgpAsNumberFlag)),
8697
bgpserver.WithRouterId(myHostAddress),
8798
bgpserver.WithListenPort(int32(bgpServingPortFlag)),
8899
bgpserver.WithGrpcPort(gobgpGrpcPortFlag),
100+
bgpserver.WithPeers(bgpPeers),
89101
)
90-
if err := bgpServerConnect.Start(ctx); err != nil {
91-
panic(err)
92-
}
93-
94-
// add peers to bgpserver
95-
peers := strings.Split(bgpPeerAddressesFlag, ",")
96-
if len(peers) != 2 {
97-
panic("insufficient bgp peer addresses")
98-
}
99-
for _, v := range peers {
100-
logger.Debug("adding peer", "neighbor", v)
101-
bgpServerConnect.AddPeer(
102-
v,
103-
uint32(bgpAsNumberFlag),
104-
uint32(bgpServingPortFlag),
105-
uint64(bgpKeepaliveIntervalSecFlag),
106-
)
107-
}
108102

109-
dbReplicaPassword, err := readDBReplicaPassword(dbReplicaPasswordFilePathFlag)
110-
if err != nil {
111-
panic(err)
112-
}
103+
// create context
104+
ctx, cancel := context.WithCancel(context.Background())
105+
defer cancel()
113106

114107
c := controller.NewController(
115108
logger,
@@ -129,7 +122,10 @@ func main() {
129122
wg.Add(1)
130123
go func(ctx context.Context, wg *sync.WaitGroup, c *controller.Controller) {
131124
defer wg.Done()
132-
c.Start(ctx, time.Second*time.Duration(mainPollingSpanSecondFlag))
125+
err := c.Start(ctx, time.Second*time.Duration(mainPollingSpanSecondFlag))
126+
if err != nil {
127+
panic(err)
128+
}
133129
}(ctx, wg, c)
134130

135131
if enablePrometheusExporterFlag {

pkg/bgpserver/connector.go

+59-36
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,28 @@ type Route struct {
3030
Community uint32
3131
}
3232

33+
type Peer struct {
34+
Neighbor string
35+
RemoteAS uint32
36+
RemotePort uint32
37+
KeepaliveIntervalSec uint64
38+
}
39+
3340
type Connector interface {
34-
Start(ctx context.Context) error
35-
AddPeer(neighbor string, remoteAS uint32, remotePort uint32, keepaliveIntervalSec uint64) error
41+
Start() error
3642
AddPath(prefix string, prefixLen uint32, nexthop string, community uint32) error
3743
ListPath() ([]Route, error)
44+
Stop()
3845
}
3946

4047
type bgpServerConnector struct {
4148
logger *slog.Logger
4249
server *gobgpserver.BgpServer
43-
ctx context.Context
4450
asn uint32
4551
routerId string
4652
listenPort int32
4753
grpcPort int
54+
peers []Peer
4855
}
4956

5057
func NewDefaultConnector(logger *slog.Logger, configs ...func(*bgpServerConnector)) Connector {
@@ -85,48 +92,62 @@ func WithGrpcPort(grpcPort int) func(*bgpServerConnector) {
8592
}
8693
}
8794

88-
func (bs *bgpServerConnector) Start(ctx context.Context) error {
89-
bs.ctx = ctx
95+
func WithPeers(peers []Peer) func(*bgpServerConnector) {
96+
return func(c *bgpServerConnector) {
97+
c.peers = peers
98+
}
99+
}
100+
101+
func (bs *bgpServerConnector) Start() error {
90102
go bs.server.Serve()
91-
return bs.server.StartBgp(ctx, &gobgpapi.StartBgpRequest{
103+
104+
err := bs.server.StartBgp(context.Background(), &gobgpapi.StartBgpRequest{
92105
Global: &gobgpapi.Global{
93106
Asn: bs.asn,
94107
RouterId: bs.routerId,
95108
ListenAddresses: []string{"0.0.0.0"},
96109
ListenPort: bs.listenPort,
97110
},
98111
})
99-
}
112+
if err != nil {
113+
return err
114+
}
100115

101-
func (bs *bgpServerConnector) AddPeer(neighbor string, remoteAS uint32, remotePort uint32, keepaliveIntervalSec uint64) error {
102-
p := &gobgpapi.Peer{
103-
Conf: &gobgpapi.PeerConf{
104-
NeighborAddress: neighbor,
105-
PeerAsn: remoteAS,
106-
},
107-
Transport: &gobgpapi.Transport{
108-
RemoteAddress: neighbor,
109-
RemotePort: remotePort,
110-
},
111-
Timers: &gobgpapi.Timers{
112-
Config: &gobgpapi.TimersConfig{
113-
KeepaliveInterval: keepaliveIntervalSec,
114-
HoldTime: keepaliveIntervalSec * 3,
116+
for _, peer := range bs.peers {
117+
p := &gobgpapi.Peer{
118+
Conf: &gobgpapi.PeerConf{
119+
NeighborAddress: peer.Neighbor,
120+
PeerAsn: peer.RemoteAS,
121+
},
122+
Transport: &gobgpapi.Transport{
123+
RemoteAddress: peer.Neighbor,
124+
RemotePort: peer.RemoteAS,
125+
},
126+
Timers: &gobgpapi.Timers{
127+
Config: &gobgpapi.TimersConfig{
128+
KeepaliveInterval: peer.KeepaliveIntervalSec,
129+
HoldTime: peer.KeepaliveIntervalSec * 3,
130+
},
115131
},
116-
},
117132

118-
// route reflector client is always on
119-
RouteReflector: &gobgpapi.RouteReflector{
120-
RouteReflectorClient: true,
121-
},
133+
// route reflector client is always on
134+
RouteReflector: &gobgpapi.RouteReflector{
135+
RouteReflectorClient: true,
136+
},
137+
}
138+
139+
err := bs.server.AddPeer(
140+
context.Background(),
141+
&gobgpapi.AddPeerRequest{
142+
Peer: p,
143+
},
144+
)
145+
if err != nil {
146+
return err
147+
}
122148
}
123149

124-
return bs.server.AddPeer(
125-
bs.ctx,
126-
&gobgpapi.AddPeerRequest{
127-
Peer: p,
128-
},
129-
)
150+
return nil
130151
}
131152

132153
func (bs *bgpServerConnector) AddPath(prefix string, prefixLen uint32, nexthop string, community uint32) error {
@@ -152,7 +173,7 @@ func (bs *bgpServerConnector) AddPath(prefix string, prefixLen uint32, nexthop s
152173
attrs = []*apb.Any{attrOrigin, attrNextHop, attrCommunities}
153174
}
154175

155-
_, err = bs.server.AddPath(bs.ctx, &gobgpapi.AddPathRequest{
176+
_, err = bs.server.AddPath(context.Background(), &gobgpapi.AddPathRequest{
156177
TableType: gobgpapi.TableType_GLOBAL,
157178
Path: &gobgpapi.Path{
158179
Family: &gobgpapi.Family{
@@ -170,15 +191,14 @@ func (bs *bgpServerConnector) AddPath(prefix string, prefixLen uint32, nexthop s
170191
func (bs *bgpServerConnector) ListPath() ([]Route, error) {
171192
var routes []Route
172193

173-
err := bs.server.ListPath(bs.ctx, &gobgpapi.ListPathRequest{
194+
err := bs.server.ListPath(context.Background(), &gobgpapi.ListPathRequest{
174195
TableType: gobgpapi.TableType_GLOBAL,
175196
Family: &gobgpapi.Family{
176197
Afi: gobgpapi.Family_AFI_IP,
177198
Safi: gobgpapi.Family_SAFI_UNICAST,
178199
},
179200
EnableFiltered: true,
180201
}, func(d *gobgpapi.Destination) {
181-
bs.logger.Debug("ListPath: found prefix", "prefix", d.Prefix)
182202
for _, path := range d.Paths {
183203
for _, attr := range path.GetPattrs() {
184204
m, err := attr.UnmarshalNew()
@@ -192,7 +212,6 @@ func (bs *bgpServerConnector) ListPath() ([]Route, error) {
192212
}
193213

194214
for _, comm := range ca.Communities {
195-
bs.logger.Debug("ListPath: found community", "community", EncodeCommunity(comm))
196215
routes = append(routes, Route{
197216
Prefix: d.Prefix,
198217
Community: comm,
@@ -205,6 +224,10 @@ func (bs *bgpServerConnector) ListPath() ([]Route, error) {
205224
return routes, err
206225
}
207226

227+
func (bs *bgpServerConnector) Stop() {
228+
bs.server.Stop()
229+
}
230+
208231
// EncodeCommunity converts plain community value to human readable notation(for example 65001:10)
209232
func EncodeCommunity(comm uint32) string {
210233
upper := comm >> 16

pkg/bgpserver/fake_connector.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package bgpserver
1616

1717
import (
18-
"context"
1918
"fmt"
2019
)
2120

@@ -29,11 +28,7 @@ func NewFakeBgpServerConnector() Connector {
2928
}
3029
}
3130

32-
func (bs *FakeBgpServerConnector) Start(_ context.Context) error {
33-
return nil
34-
}
35-
36-
func (bs *FakeBgpServerConnector) AddPeer(_ string, _ uint32, _ uint32, _ uint64) error {
31+
func (bs *FakeBgpServerConnector) Start() error {
3732
return nil
3833
}
3934

@@ -47,3 +42,6 @@ func (bs *FakeBgpServerConnector) AddPath(prefix string, prefixLen uint32, _ str
4742
func (bs *FakeBgpServerConnector) ListPath() ([]Route, error) {
4843
return nil, nil
4944
}
45+
46+
func (bs *FakeBgpServerConnector) Stop() {
47+
}

pkg/controller/controller.go

+8-19
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,21 @@ func NewController(
184184
func (c *Controller) Start(
185185
ctx context.Context,
186186
ctrlerLoopInterval time.Duration,
187-
) {
188-
c.logger.Info("Hello, Starting db-controller.")
187+
) error {
188+
c.logger.Debug("controller: start bgpserver")
189+
if err := c.bgpServerConnector.Start(); err != nil {
190+
return err
191+
}
192+
defer c.bgpServerConnector.Stop()
189193

190194
ticker := time.NewTicker(ctrlerLoopInterval)
191195
defer ticker.Stop()
192196

193-
controllerLoop:
194197
for {
195198
select {
196199
case <-ctx.Done():
197-
c.onExit()
198-
break controllerLoop
200+
c.forceTransitionToFault()
201+
return nil
199202
case <-ticker.C:
200203
// random sleep to avoid global synchronization
201204
time.Sleep(time.Second * time.Duration(rand.Intn(2)+1))
@@ -250,15 +253,6 @@ func (c *Controller) decideNextState() State {
250253
}
251254
}
252255

253-
func (c *Controller) onExit() error {
254-
c.setState(StateFault)
255-
if err := c.triggerRunOnStateChangesToFault(); err != nil {
256-
c.logger.Info("failed to TriggerRunOnStateChanges while going to fault. Ignore errors.", "error", err)
257-
}
258-
259-
return nil
260-
}
261-
262256
func (c *Controller) onStateHandler(nextState State) error {
263257
if cannotTransitionTo(c.GetState(), nextState) {
264258
panic("unreachable controller state was picked")
@@ -291,12 +285,9 @@ func (c *Controller) preDecideNextStateHandler() error {
291285
if err != nil {
292286
return err
293287
}
294-
c.logger.Debug("preDecideNextStateHandler", "routes", fmt.Sprintf("%#v", routes))
295288

296289
currentNeighbors := newNeighborSet()
297290
for _, route := range routes {
298-
c.logger.Debug(fmt.Sprintf("found route %s %s", route.Prefix, bgpserver.EncodeCommunity(route.Community)))
299-
300291
// parse community
301292
state, ok := bgpCommunityToState[route.Community]
302293
if !ok {
@@ -315,12 +306,10 @@ func (c *Controller) preDecideNextStateHandler() error {
315306

316307
// skip self originated route
317308
if addr == c.hostAddress {
318-
c.logger.Debug("it is my self host address, skipping.")
319309
continue
320310
}
321311

322312
if !slices.Contains(currentNeighbors[state], neighbor(addr)) {
323-
c.logger.Debug(fmt.Sprintf("appending address %s to state %s", addr, state))
324313
currentNeighbors[state] = append(currentNeighbors[state], neighbor(addr))
325314
}
326315
}

0 commit comments

Comments
 (0)