Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsub browser peer discovery #133

Merged
merged 13 commits into from
May 7, 2024
Merged
75 changes: 43 additions & 32 deletions go-peer/chatroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
// ChatRoomBufSize is the number of incoming messages to buffer for each topic.
const ChatRoomBufSize = 128

// Topic used to broadcast browser WebRTC addresses
const PubSubDiscoveryTopic string = "universal-connectivity-browser-peer-discovery"

const ChatTopic string = "universal-connectivity"
const ChatFileTopic string = "universal-connectivity-file"

// ChatRoom represents a subscription to a single PubSub topic. Messages
// can be published to the topic with ChatRoom.Publish, and received
// messages are pushed to the Messages channel.
Expand All @@ -23,13 +29,15 @@ type ChatRoom struct {
Messages chan *ChatMessage
SysMessages chan *ChatMessage

ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription
peerDiscoveryTopic *pubsub.Topic
peerDiscoverySub *pubsub.Subscription

roomName string
nick string
Expand All @@ -44,9 +52,9 @@ type ChatMessage struct {

// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) {
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string) (*ChatRoom, error) {
// join the pubsub chatTopic
chatTopic, err := ps.Join(chatTopicName(roomName))
chatTopic, err := ps.Join(ChatTopic)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +66,7 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname
}

// join the pubsub fileTopic
fileTopic, err := ps.Join(fileTopicName(roomName))
fileTopic, err := ps.Join(ChatFileTopic)
if err != nil {
return nil, err
}
Expand All @@ -69,18 +77,31 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname
return nil, err
}

// join the pubsub peer disovery topic
peerDiscoveryTopic, err := ps.Join(PubSubDiscoveryTopic)
if err != nil {
return nil, err
}

// and subscribe to it
peerDiscoverySub, err := peerDiscoveryTopic.Subscribe()
if err != nil {
return nil, err
}

cr := &ChatRoom{
ctx: ctx,
h: h,
ps: ps,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
nick: nickname,
roomName: roomName,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
SysMessages: make(chan *ChatMessage, ChatRoomBufSize),
ctx: ctx,
h: h,
ps: ps,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
peerDiscoveryTopic: peerDiscoveryTopic,
peerDiscoverySub: peerDiscoverySub,
nick: nickname,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
SysMessages: make(chan *ChatMessage, ChatRoomBufSize),
}

// start reading messages from the subscription in a loop
Expand All @@ -94,7 +115,7 @@ func (cr *ChatRoom) Publish(message string) error {
}

func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(chatTopicName(cr.roomName))
return cr.ps.ListPeers(ChatTopic)
}

// readLoop pulls messages from the pubsub chat/file topic and handles them.
Expand Down Expand Up @@ -187,13 +208,3 @@ func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) {

return fileBody, nil
}

// chatTopicName returns the name of the pubsub topic for the chat room.
func chatTopicName(roomName string) string {
return roomName
}

// fileTopicName returns the name of the pubsub topic used for sending/recieving files in the chat room.
func fileTopicName(roomName string) string {
return fmt.Sprintf("%s-file", roomName)
}
16 changes: 7 additions & 9 deletions go-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult
}

// Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
// Only used by Go peer to find each other.
// TODO: since this isn't implemented on the Rust or the JS side, can probably be removed
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT) {
routingDiscovery := routing.NewRoutingDiscovery(dht)

discovery.Advertise(ctx, routingDiscovery, rendezvous)
discovery.Advertise(ctx, routingDiscovery, DiscoveryServiceTag)

ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
Expand All @@ -86,7 +88,7 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous str
return
case <-ticker.C:

peers, err := discovery.FindPeers(ctx, routingDiscovery, rendezvous)
peers, err := discovery.FindPeers(ctx, routingDiscovery, DiscoveryServiceTag)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -115,7 +117,6 @@ func LogMsgf(f string, msg ...any) {
func main() {
// parse some flags to set our nickname and the room to join
nickFlag := flag.String("nick", "", "nickname to use in chat. will be generated if empty")
roomFlag := flag.String("room", "universal-connectivity", "name of chat room to join")
idPath := flag.String("identity", "identity.key", "path to the private key (PeerID) file")
certPath := flag.String("tls-cert-path", "", "path to the tls cert file (for websockets)")
keyPath := flag.String("tls-key-path", "", "path to the tls key file (for websockets")
Expand Down Expand Up @@ -194,11 +195,8 @@ func main() {
nick = defaultNick(h.ID())
}

// join the room from the cli flag, or the flag default
room := *roomFlag

// join the chat room
cr, err := JoinChatRoom(ctx, h, ps, nick, room)
cr, err := JoinChatRoom(ctx, h, ps, nick)
if err != nil {
panic(err)
}
Expand All @@ -213,7 +211,7 @@ func main() {
}

// setup peer discovery
go Discover(ctx, h, dht, "universal-connectivity")
go Discover(ctx, h, dht)

// setup local mDNS discovery
if err := setupDiscovery(h); err != nil {
Expand Down
50 changes: 32 additions & 18 deletions js-peer/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion js-peer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
"@libp2p/identify": "^1.0.19",
"@libp2p/interface-pubsub": "^4.0.1",
"@libp2p/kad-dht": "^12.0.13",
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/webrtc": "^4.0.28",
"@libp2p/websockets": "^8.0.20",
"@libp2p/webtransport": "^4.0.28",
"@multiformats/multiaddr": "^12.2.1",
"datastore-idb": "^2.1.9",
"debug": "^4.3.4",
"it-length-prefixed": "^9.0.4",
"it-map": "^3.1.0",
Expand Down
7 changes: 5 additions & 2 deletions js-peer/src/context/chat-ctx.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import React, { createContext, useContext, useEffect, useState } from 'react';
import { useLibp2pContext } from './ctx';
import type { Message } from '@libp2p/interface'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL } from '@/lib/constants'
import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL, PUBSUB_PEER_DISCOVERY } from '@/lib/constants'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { pipe } from 'it-pipe'
Expand Down Expand Up @@ -58,8 +58,11 @@ export const ChatProvider = ({ children }: any) => {
chatFileMessageCB(evt, topic, data)
break
}
case PUBSUB_PEER_DISCOVERY: {
break
}
default: {
throw new Error(`Unexpected gossipsub topic: ${topic}`)
console.error(`Unexpected event %o on gossipsub topic: ${topic}`, evt)
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions js-peer/src/context/peer-ctx.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { PeerId } from '@libp2p/interface'

export interface PeerStats {
peerIds: PeerId[]
connected: boolean
connections: Connection[]
latency: number
}
Expand All @@ -17,7 +16,6 @@ export interface PeerContextInterface {
export const peerContext = createContext<PeerContextInterface>({
peerStats: {
peerIds: [],
connected: true,
connections: [],
latency: 0
},
Expand All @@ -31,7 +29,6 @@ export const usePeerContext = () => {
export const PeerProvider = ({ children }: { children: ReactNode }) => {
const [peerStats, setPeerStats] = useState<PeerStats>({
peerIds: [],
connected: false,
connections: [],
latency: 0
});
Expand Down
1 change: 1 addition & 0 deletions js-peer/src/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export const CHAT_TOPIC = "universal-connectivity"
export const CHAT_FILE_TOPIC = "universal-connectivity-file"
export const PUBSUB_PEER_DISCOVERY = "universal-connectivity-browser-peer-discovery"
export const FILE_EXCHANGE_PROTOCOL = "/universal-connectivity-file/1"

export const CIRCUIT_RELAY_CODE = 290
Expand Down
Loading