Skip to content

Commit 46abc98

Browse files
committed
feat: implement transport address
We need a way to specify the list of peer IDs to contact in the application configuration, along with transport type + address to use. In the past, we have the static peer lists in each transport, which map a peer ID to an address. But we need to be able to dynamically dial peers not listed. libp2p has the multiaddr which is a compact way to specify this: - /ip4/45.32.195.135/udp/4001/quic-v1/p2p/12D3KooWKX2W1HwKSxrRRWnHLtpATmevMWRoms3GxmgWYxEZMWav - Contains the IP address, the port, protocol (quic-v1), and peer ID. - https://github.com/multiformats/go-multiaddr/blob/master/protocols.go - Has a hardcoded list of multiaddr types (protocols). Reasons to not use the multiaddr: - Our transports do not map 1-1 with the multiaddr transports - We don't have a central place with all protocols registered Use something called a tptaddr instead: - {transport-type-id}|{addr} - ignore everything after the first | - strip the {transport-type-id}, lookup the transport w/ that type id, and pass addr to DialPeer. - add TransportTypeId to the transport interface - simplified transport type id: - udp-quic: dial using the udp quic controller (bifrost/transport/udp) - ws: websocket controller (supports both ws and wss) - ws|wss://my.url.com/ Add controller which maps peer IDs to transports: - Handle EstablishLinkWithPeer directives - Create a LookupTptAddr directive to lookup the list of tptaddr for a peer. - For each tptaddr with the peer create a DialTptAddr - Requests transports establish a link specifically using that tptaddr. - The transports can parse the tptaddr and check if it matches. - Expect that the results of DialTptAddr will also be pushed to EstablishLinkWithPeer automatically by the transport. See transport/controller/establish-link.go which will add a link waiter for any link that matches that peer id. - Once all directives become idle mark the resolver as idle. End to end process: 1. Someone creates EstablishLinkWithPeer 2. The tptaddr controller creates LookupTptAddr with the peer id 3. The tpt address storage controller returns the list of transport addresses 4. The tptaddr controller creates DialTptAddr directives for each address 5. The transports that resolve those directives also resolve EstablishLinkWithPeer. Signed-off-by: Christian Stewart <[email protected]>
1 parent 9d5648b commit 46abc98

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2291
-11
lines changed

agent/controller/config.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"github.com/aperturerobotics/bifrost/util/confparse"
66
"github.com/aperturerobotics/controllerbus/config"
77
"github.com/libp2p/go-libp2p/core/crypto"
8-
"google.golang.org/protobuf/proto"
98
)
109

1110
// ConfigID is the identifier for the config type.
@@ -23,7 +22,7 @@ func (c *Config) EqualsConfig(c2 config.Config) bool {
2322
return false
2423
}
2524

26-
return proto.Equal(c, oc)
25+
return c.EqualVT(oc)
2726
}
2827

2928
// Validate validates the configuration.

core/core_bus.go

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
stream_echo "github.com/aperturerobotics/bifrost/stream/echo"
1313
stream_forwarding "github.com/aperturerobotics/bifrost/stream/forwarding"
1414
stream_listening "github.com/aperturerobotics/bifrost/stream/listening"
15+
tptaddr_controller "github.com/aperturerobotics/bifrost/tptaddr/controller"
16+
tptaddr_static "github.com/aperturerobotics/bifrost/tptaddr/static"
1517
iproctpt "github.com/aperturerobotics/bifrost/transport/inproc"
1618
udptpt "github.com/aperturerobotics/bifrost/transport/udp"
1719
wtpt "github.com/aperturerobotics/bifrost/transport/websocket"
@@ -65,4 +67,8 @@ func AddFactories(b bus.Bus, sr *static.Resolver) {
6567
// entity graph
6668
sr.AddFactory(egc.NewFactory(b))
6769
sr.AddFactory(bifrosteg.NewFactory(b))
70+
71+
// tptaddr
72+
sr.AddFactory(tptaddr_controller.NewFactory(b))
73+
sr.AddFactory(tptaddr_static.NewFactory(b))
6874
}

link/establish-link-ex.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package link
2+
3+
import (
4+
"context"
5+
6+
"github.com/aperturerobotics/bifrost/peer"
7+
"github.com/aperturerobotics/controllerbus/bus"
8+
)
9+
10+
// EstablishLinkWithPeerEx executes a EstablishLinkWithPeer directive.
11+
// Returns a release function.
12+
func EstablishLinkWithPeerEx(
13+
ctx context.Context,
14+
b bus.Bus,
15+
localPeerID, remotePeerID peer.ID,
16+
returnIfIdle bool,
17+
) (Link, func(), error) {
18+
estl, _, ref, err := bus.ExecWaitValue[EstablishLinkWithPeerValue](
19+
ctx,
20+
b,
21+
NewEstablishLinkWithPeer(
22+
localPeerID, remotePeerID,
23+
),
24+
returnIfIdle,
25+
nil,
26+
nil,
27+
)
28+
if err != nil {
29+
return nil, func() {}, err
30+
}
31+
32+
return estl, ref.Release, nil
33+
}

link/establish-link.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
package link
22

33
import (
4-
"errors"
54
"time"
65

76
"github.com/aperturerobotics/bifrost/peer"
87
"github.com/aperturerobotics/controllerbus/directive"
8+
"github.com/pkg/errors"
99
)
1010

1111
// holdOpenDur is the default hold open duration
1212
var holdOpenDur = time.Second * 10
1313

1414
// EstablishLinkWithPeer is a directive to establish a link with a peer.
15+
//
16+
// Value: Link
1517
type EstablishLinkWithPeer interface {
1618
// Directive indicates EstablishLinkWithPeer is a directive.
1719
directive.Directive
@@ -24,6 +26,9 @@ type EstablishLinkWithPeer interface {
2426
EstablishLinkTargetPeerId() peer.ID
2527
}
2628

29+
// EstablishLinkWithPeerValue is the type emitted when resolving EstablishLinkWithPeer.
30+
type EstablishLinkWithPeerValue = Link
31+
2732
// establishLinkWithPeer implements EstablishLinkWithPeer with a peer ID constraint.
2833
type establishLinkWithPeer struct {
2934
src, dest peer.ID
@@ -48,7 +53,7 @@ func (d *establishLinkWithPeer) EstablishLinkSourcePeerId() peer.ID {
4853
// This is a cursory validation to see if the values "look correct."
4954
func (d *establishLinkWithPeer) Validate() error {
5055
if len(d.dest) == 0 {
51-
return errors.New("peer id of destination required")
56+
return errors.Wrap(peer.ErrEmptyPeerID, "destination")
5257
}
5358

5459
return nil
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

stream/api/accept/config.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"github.com/aperturerobotics/bifrost/protocol"
66
"github.com/aperturerobotics/bifrost/util/confparse"
77
"github.com/aperturerobotics/controllerbus/config"
8-
"google.golang.org/protobuf/proto"
98
)
109

1110
// ConfigID is the string used to identify this config object.
@@ -55,7 +54,7 @@ func (c *Config) EqualsConfig(other config.Config) bool {
5554
return false
5655
}
5756

58-
return proto.Equal(ot, c)
57+
return ot.EqualVT(c)
5958
}
6059

6160
var _ config.Config = ((*Config)(nil))

stream/echo/echo.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (c *Controller) GetControllerInfo() *controller.Info {
6060
)
6161
}
6262

63-
// Execute executes the forwarding controller.
63+
// Execute executes the echo controller.
6464
// Returning nil ends execution.
6565
// Returning an error triggers a retry with backoff.
6666
func (c *Controller) Execute(ctx context.Context) error {

stream/echo/factory.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ const ControllerID = "bifrost/stream/echo"
1313
// Version is the controller version.
1414
var Version = semver.MustParse("0.0.1")
1515

16-
// Factory constructs a forwarding controller
16+
// Factory constructs a controller
1717
type Factory struct {
1818
// bus is the controller bus
1919
bus bus.Bus
2020
}
2121

22-
// NewFactory builds a forwarding factory.
22+
// NewFactory builds a factory.
2323
func NewFactory(bus bus.Bus) *Factory {
2424
return &Factory{bus: bus}
2525
}

stream/listening/config.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/aperturerobotics/controllerbus/config"
88
ma "github.com/multiformats/go-multiaddr"
99
"github.com/pkg/errors"
10-
"google.golang.org/protobuf/proto"
1110
)
1211

1312
// ConfigID is the string used to identify this config object.
@@ -73,7 +72,7 @@ func (c *Config) EqualsConfig(other config.Config) bool {
7372
return false
7473
}
7574

76-
return proto.Equal(ot, c)
75+
return ot.EqualVT(c)
7776
}
7877

7978
var _ config.Config = ((*Config)(nil))

testbed/testbed.go

+9
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type Testbed struct {
2626
Bus bus.Bus
2727
// PrivKey is the private key.
2828
PrivKey crypto.PrivKey
29+
// PeerID is the peer id for private key.
30+
PeerID peer.ID
2931
// Release releases the testbed.
3032
Release func()
3133
}
@@ -73,6 +75,13 @@ func NewTestbed(ctx context.Context, le *logrus.Entry, opts TestbedOpts) (*Testb
7375
return nil, err
7476
}
7577
}
78+
if t.PrivKey != nil {
79+
pid, err := peer.IDFromPrivateKey(t.PrivKey)
80+
if err != nil {
81+
return nil, err
82+
}
83+
t.PeerID = pid
84+
}
7685

7786
if !opts.NoPeer {
7887
// start peer controller

tptaddr/controller/config.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package tptaddr_controller
2+
3+
import (
4+
"github.com/aperturerobotics/controllerbus/config"
5+
)
6+
7+
// ConfigID is the string used to identify this config object.
8+
const ConfigID = ControllerID
9+
10+
// Validate validates the configuration.
11+
func (c *Config) Validate() error {
12+
return nil
13+
}
14+
15+
// GetConfigID returns the unique string for this configuration type.
16+
func (c *Config) GetConfigID() string {
17+
return ConfigID
18+
}
19+
20+
// EqualsConfig checks if the config is equal to another.
21+
func (c *Config) EqualsConfig(other config.Config) bool {
22+
ot, ok := other.(*Config)
23+
if !ok {
24+
return false
25+
}
26+
27+
return ot.EqualVT(c)
28+
}
29+
30+
var _ config.Config = ((*Config)(nil))

tptaddr/controller/config.pb.go

+139
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)