Skip to content

Commit

Permalink
feat: make autoscaler trigger the rest of the instances
Browse files Browse the repository at this point in the history
The client selects any instance and provide it with all the apocryph
node gateways and it will trigger the rest and for the cluster instead
of sending multiple connect requests to every provider.
  • Loading branch information
revoltez committed Jul 23, 2024
1 parent f31a4d0 commit ef6b429
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 83 deletions.
3 changes: 2 additions & 1 deletion cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ func main() {
fmt.Println("Failed creating p2p node")
return
}
fmt.Printf("PEER ID: %v\n", self.ID())
server := &AutoScalerServer{self: self}
server.mainLoop = setAppGatewayExample
path, handler := pbcon.NewAutoscalerServiceHandler(server)
mux.Handle(path, handler)
log.Println("Autoscaler RPC Server Started")
http.ListenAndServe(
"localhost:8080",
":8080",
// Use h2c so we can serve HTTP/2 without TLS.
h2c.NewHandler(mux, &http2.Server{}),
)
Expand Down
120 changes: 95 additions & 25 deletions cmd/autoscaler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"fmt"
"log"
"net/http"
"net/url"
"strings"
"time"

"connectrpc.com/connect"
Expand All @@ -12,67 +15,114 @@ import (
tpraft "github.com/comrade-coop/apocryph/pkg/raft"
"github.com/hashicorp/raft"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

const RAFT_P2P_PORT = 9999
const RAFT_P2P_PORT = 32500
const RAFT_PATH = ""

type AutoScalerServer struct {
pbcon.UnimplementedAutoscalerServiceHandler
node *tpraft.RaftNode
store *tpraft.KVStore
peers []string
started bool
self host.Host
nodeGateway string
mainLoop func(*AutoScalerServer)
}

func (s *AutoScalerServer) ConnectCluster(c context.Context, req *connect.Request[pb.ConnectClusterRequest]) (*connect.Response[pb.ConnectClusterResponse], error) {
log.Println("Forming a Raft Cluster with the following providers:", req.Msg.Servers)
func (s *AutoScalerServer) TriggerNode(c context.Context, req *connect.Request[pb.ConnectClusterRequest]) (*connect.Response[pb.TriggerNodeResponse], error) {
if s.started == false {
log.Println("Node Triggered")
go s.BoostrapCluster(req)
s.started = true
}
return connect.NewResponse(&pb.TriggerNodeResponse{PeerID: s.self.ID().String()}), nil
}

var peers []*host.Host
for _, addr := range req.Msg.Servers {
addr = fmt.Sprintf("/ip4/%v/tcp/%v", addr, RAFT_P2P_PORT)
log.Printf("Adding Peer:%v\n", addr)
peer, err := tpraft.NewPeer(addr)
func (s *AutoScalerServer) BoostrapCluster(req *connect.Request[pb.ConnectClusterRequest]) error {

peerIDs, err := s.FetchPeerIDsFromServers(req)
if err != nil {
return fmt.Errorf("Failed to fetch PeerIDs: %v", err)
}

var peers []*peer.AddrInfo
for serverAddr, addr := range peerIDs {
addr := GetMultiAddr(serverAddr, addr)
if addr == "" {
return fmt.Errorf("Failed to parse server address: %v", serverAddr)
}

maddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return connect.NewResponse(&pb.ConnectClusterResponse{
Success: false,
Error: fmt.Sprintf("Failed creating peer %v: %v", addr, err),
}), nil
return fmt.Errorf("Failed to parse multiaddr: %v: %v", addr, err)
}
peers = append(peers, &peer)
log.Printf("Peer %s ID:%s \n", addr, peer.ID().String())

peerInfo, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
return fmt.Errorf("Failed to extract peer info from %v, Error: %v", addr, err)
}

peers = append(peers, peerInfo)
log.Printf("Added Peer %v ID:%s \n", peerInfo.Addrs, peerInfo.ID.String())
}

node, err := tpraft.NewRaftNode(s.self, peers, RAFT_PATH)
if err != nil {
log.Println("Error:Could not Creat Raft Node")
return connect.NewResponse(&pb.ConnectClusterResponse{
Success: false,
Error: fmt.Sprintf("Failed Creating Raft Node %v\n", err),
}), nil

return fmt.Errorf("Failed Creating Raft Node %v\n", err)
}

// create the KVStore
store, err := tpraft.NewKVStore(node)

s.node = node
s.store = store
s.peers = req.Msg.Servers
s.nodeGateway = req.Msg.NodeGateway

err = s.waitLeaderElection(req.Msg.Timeout)
if err != nil {
response := &pb.ConnectClusterResponse{Success: false, Error: err.Error()}
return connect.NewResponse(response), nil
return err
}

return nil
}

func (s *AutoScalerServer) ConnectCluster(c context.Context, req *connect.Request[pb.ConnectClusterRequest]) (*connect.Response[pb.ConnectClusterResponse], error) {
log.Println("Forming a Raft Cluster with the following providers:", req.Msg.Servers)
s.started = true
err := s.BoostrapCluster(req)
if err != nil {
return connect.NewResponse(&pb.ConnectClusterResponse{
Success: false,
Error: fmt.Sprintf("Failed Bootstraping Cluster: %v\n", err),
}), nil
}
response := &pb.ConnectClusterResponse{Success: true}
return connect.NewResponse(response), nil
}

func (s *AutoScalerServer) FetchPeerIDsFromServers(req *connect.Request[pb.ConnectClusterRequest]) (map[string]string, error) {
peerIDs := make(map[string]string)

for _, addr := range req.Msg.Servers {
client := pbcon.NewAutoscalerServiceClient(
http.DefaultClient,
addr)

resp, err := client.TriggerNode(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("failed to get PeerID from server %v: %w", addr, err)
}
peerIDs[addr] = resp.Msg.PeerID
}

return peerIDs, nil
}

func (s *AutoScalerServer) waitLeaderElection(timeout uint32) error {

log.Printf("Waiting for leader election with %v seoncds timout ...", timeout)
Expand All @@ -98,12 +148,14 @@ func (s *AutoScalerServer) waitLeaderElection(timeout uint32) error {
switch obs.Data.(type) {
case raft.RaftState:
if leaderAddr, _ := s.node.Raft.LeaderWithID(); leaderAddr != "" {
fmt.Printf("Leader Elected: %v\n", leaderAddr)
go s.mainLoop(s)
return nil
}
}
case <-ticker.C:
if leaderAddr, _ := s.node.Raft.LeaderWithID(); leaderAddr != "" {
fmt.Printf("Leader Elected: %v\n", leaderAddr)
go s.mainLoop(s)
return nil
}
Expand Down Expand Up @@ -131,15 +183,33 @@ func setAppGatewayExample(s *AutoScalerServer) {
for {
if s.node.Raft.State() == raft.Leader {
log.Println("Setting the domain value to the current apocryph node gateway")
err := s.store.Set("www.test.com", s.nodeGateway)
err := s.store.Set("www.test.com", "http://localhost:8080")
// leadership could be changed right before setting the value
if _, ok := err.(*tpraft.NotLeaderError); ok {
newLeaderAddr, newLeaderID := s.node.Raft.LeaderWithID()
log.Printf("Leadership changed to %v:%v", newLeaderAddr, newLeaderID)
} else {
} else if err != nil {
log.Printf("Failed setting key:%v: %v\n", "www.test.com", err)
}
}
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
}
}

func GetMultiAddr(addr, peerID string) string {
// Parse the URL
parsedURL, err := url.Parse(addr)
if err != nil {
fmt.Println("Error parsing URL:", err)
return ""
}
hostPort := parsedURL.Host
parts := strings.Split(hostPort, ":")
if len(parts) != 2 {
log.Println("Invalid host:port format")
return ""
}
ip := parts[0]
// Change the port to the RAFT_P2P_PORT
return fmt.Sprintf("/ip4/%s/tcp/%v/p2p/%s", ip, RAFT_P2P_PORT, peerID)
}
7 changes: 5 additions & 2 deletions cmd/trustedpods/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ var autoscalerCmd = &cobra.Command{
client := pbcon.NewAutoscalerServiceClient(
http.DefaultClient,
baseUrl)
log.Printf("%v\n", nodesAddrs)
request := connect.NewRequest(&pb.ConnectClusterRequest{NodeGateway: baseUrl, Servers: nodesAddrs, Timeout: 10})
log.Printf("Connecting Clusters:%v\n", nodesAddrs)
request := connect.NewRequest(&pb.ConnectClusterRequest{NodeGateway: baseUrl, Servers: nodesAddrs, Timeout: 30})
// Add the Host that is needed for the ingress to route the autoscaler pod
request.Header().Set("Host", "autoscaler.local")
log.Printf("Sending Request to: %v\n", baseUrl)
response, err := client.ConnectCluster(context.Background(), request)
if err != nil {
return fmt.Errorf("Error connecting to cluster:%v", err)
Expand Down
8 changes: 2 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"directories": {
"test": "test"
},
"packageManager": "[email protected]",
"scripts": {
"lint": "eslint pkg/**/*.\\{ts,js\\} test/**/*.\\{ts,js\\} --ignore-pattern **/*.config.ts",
"fmt": "eslint --fix pkg/**/*.\\{ts,js\\} test/**/*.\\{ts,js\\} --ignore-pattern **/*.config.ts",
Expand Down Expand Up @@ -49,10 +50,5 @@
"helia": "^4.2.1",
"private-ip": "^3.0.2"
},
"workspaces": [
"test/e2e/webui",
"pkg/ipfs-ts",
"pkg/proto-ts",
"pkg/abi-ts"
]
"workspaces": ["test/e2e/webui", "pkg/ipfs-ts", "pkg/proto-ts", "pkg/abi-ts"]
}
Loading

0 comments on commit ef6b429

Please sign in to comment.