diff --git a/cmd/autoscaler/main.go b/cmd/autoscaler/main.go index 910a0a6..4244caa 100644 --- a/cmd/autoscaler/main.go +++ b/cmd/autoscaler/main.go @@ -18,13 +18,14 @@ func main() { fmt.Println("Failed creating p2p node") return } + fmt.Printf("PEER ID: %v\n", self.ID()) server := &AutoScalerServer{self: self} server.mainLoop = setAppGatewayExample path, handler := pbcon.NewAutoscalerServiceHandler(server) mux.Handle(path, handler) log.Println("Autoscaler RPC Server Started") http.ListenAndServe( - "localhost:8080", + ":8080", // Use h2c so we can serve HTTP/2 without TLS. h2c.NewHandler(mux, &http2.Server{}), ) diff --git a/cmd/autoscaler/server.go b/cmd/autoscaler/server.go index bda784f..e41195c 100644 --- a/cmd/autoscaler/server.go +++ b/cmd/autoscaler/server.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "log" + "net/http" + "net/url" + "strings" "time" "connectrpc.com/connect" @@ -12,9 +15,11 @@ import ( tpraft "github.com/comrade-coop/apocryph/pkg/raft" "github.com/hashicorp/raft" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" ) -const RAFT_P2P_PORT = 9999 +const RAFT_P2P_PORT = 32500 const RAFT_PATH = "" type AutoScalerServer struct { @@ -22,42 +27,57 @@ type AutoScalerServer struct { node *tpraft.RaftNode store *tpraft.KVStore peers []string + started bool self host.Host nodeGateway string mainLoop func(*AutoScalerServer) } -func (s *AutoScalerServer) ConnectCluster(c context.Context, req *connect.Request[pb.ConnectClusterRequest]) (*connect.Response[pb.ConnectClusterResponse], error) { - log.Println("Forming a Raft Cluster with the following providers:", req.Msg.Servers) +func (s *AutoScalerServer) TriggerNode(c context.Context, req *connect.Request[pb.ConnectClusterRequest]) (*connect.Response[pb.TriggerNodeResponse], error) { + if s.started == false { + log.Println("Node Triggered") + go s.BoostrapCluster(req) + s.started = true + } + return connect.NewResponse(&pb.TriggerNodeResponse{PeerID: s.self.ID().String()}), nil +} - var peers []*host.Host - for _, addr := range req.Msg.Servers { - addr = fmt.Sprintf("/ip4/%v/tcp/%v", addr, RAFT_P2P_PORT) - log.Printf("Adding Peer:%v\n", addr) - peer, err := tpraft.NewPeer(addr) +func (s *AutoScalerServer) BoostrapCluster(req *connect.Request[pb.ConnectClusterRequest]) error { + + peerIDs, err := s.FetchPeerIDsFromServers(req) + if err != nil { + return fmt.Errorf("Failed to fetch PeerIDs: %v", err) + } + + var peers []*peer.AddrInfo + for serverAddr, addr := range peerIDs { + addr := GetMultiAddr(serverAddr, addr) + if addr == "" { + return fmt.Errorf("Failed to parse server address: %v", serverAddr) + } + + maddr, err := multiaddr.NewMultiaddr(addr) if err != nil { - return connect.NewResponse(&pb.ConnectClusterResponse{ - Success: false, - Error: fmt.Sprintf("Failed creating peer %v: %v", addr, err), - }), nil + return fmt.Errorf("Failed to parse multiaddr: %v: %v", addr, err) } - peers = append(peers, &peer) - log.Printf("Peer %s ID:%s \n", addr, peer.ID().String()) + + peerInfo, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + return fmt.Errorf("Failed to extract peer info from %v, Error: %v", addr, err) + } + + peers = append(peers, peerInfo) + log.Printf("Added Peer %v ID:%s \n", peerInfo.Addrs, peerInfo.ID.String()) } node, err := tpraft.NewRaftNode(s.self, peers, RAFT_PATH) if err != nil { log.Println("Error:Could not Creat Raft Node") - return connect.NewResponse(&pb.ConnectClusterResponse{ - Success: false, - Error: fmt.Sprintf("Failed Creating Raft Node %v\n", err), - }), nil - + return fmt.Errorf("Failed Creating Raft Node %v\n", err) } // create the KVStore store, err := tpraft.NewKVStore(node) - s.node = node s.store = store s.peers = req.Msg.Servers @@ -65,14 +85,44 @@ func (s *AutoScalerServer) ConnectCluster(c context.Context, req *connect.Reques err = s.waitLeaderElection(req.Msg.Timeout) if err != nil { - response := &pb.ConnectClusterResponse{Success: false, Error: err.Error()} - return connect.NewResponse(response), nil + return err } + return nil +} + +func (s *AutoScalerServer) ConnectCluster(c context.Context, req *connect.Request[pb.ConnectClusterRequest]) (*connect.Response[pb.ConnectClusterResponse], error) { + log.Println("Forming a Raft Cluster with the following providers:", req.Msg.Servers) + s.started = true + err := s.BoostrapCluster(req) + if err != nil { + return connect.NewResponse(&pb.ConnectClusterResponse{ + Success: false, + Error: fmt.Sprintf("Failed Bootstraping Cluster: %v\n", err), + }), nil + } response := &pb.ConnectClusterResponse{Success: true} return connect.NewResponse(response), nil } +func (s *AutoScalerServer) FetchPeerIDsFromServers(req *connect.Request[pb.ConnectClusterRequest]) (map[string]string, error) { + peerIDs := make(map[string]string) + + for _, addr := range req.Msg.Servers { + client := pbcon.NewAutoscalerServiceClient( + http.DefaultClient, + addr) + + resp, err := client.TriggerNode(context.Background(), req) + if err != nil { + return nil, fmt.Errorf("failed to get PeerID from server %v: %w", addr, err) + } + peerIDs[addr] = resp.Msg.PeerID + } + + return peerIDs, nil +} + func (s *AutoScalerServer) waitLeaderElection(timeout uint32) error { log.Printf("Waiting for leader election with %v seoncds timout ...", timeout) @@ -98,12 +148,14 @@ func (s *AutoScalerServer) waitLeaderElection(timeout uint32) error { switch obs.Data.(type) { case raft.RaftState: if leaderAddr, _ := s.node.Raft.LeaderWithID(); leaderAddr != "" { + fmt.Printf("Leader Elected: %v\n", leaderAddr) go s.mainLoop(s) return nil } } case <-ticker.C: if leaderAddr, _ := s.node.Raft.LeaderWithID(); leaderAddr != "" { + fmt.Printf("Leader Elected: %v\n", leaderAddr) go s.mainLoop(s) return nil } @@ -131,15 +183,33 @@ func setAppGatewayExample(s *AutoScalerServer) { for { if s.node.Raft.State() == raft.Leader { log.Println("Setting the domain value to the current apocryph node gateway") - err := s.store.Set("www.test.com", s.nodeGateway) + err := s.store.Set("www.test.com", "http://localhost:8080") // leadership could be changed right before setting the value if _, ok := err.(*tpraft.NotLeaderError); ok { newLeaderAddr, newLeaderID := s.node.Raft.LeaderWithID() log.Printf("Leadership changed to %v:%v", newLeaderAddr, newLeaderID) - } else { + } else if err != nil { log.Printf("Failed setting key:%v: %v\n", "www.test.com", err) } } - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) + } +} + +func GetMultiAddr(addr, peerID string) string { + // Parse the URL + parsedURL, err := url.Parse(addr) + if err != nil { + fmt.Println("Error parsing URL:", err) + return "" + } + hostPort := parsedURL.Host + parts := strings.Split(hostPort, ":") + if len(parts) != 2 { + log.Println("Invalid host:port format") + return "" } + ip := parts[0] + // Change the port to the RAFT_P2P_PORT + return fmt.Sprintf("/ip4/%s/tcp/%v/p2p/%s", ip, RAFT_P2P_PORT, peerID) } diff --git a/cmd/trustedpods/autoscaler.go b/cmd/trustedpods/autoscaler.go index 1692798..5cc1d3a 100644 --- a/cmd/trustedpods/autoscaler.go +++ b/cmd/trustedpods/autoscaler.go @@ -25,8 +25,11 @@ var autoscalerCmd = &cobra.Command{ client := pbcon.NewAutoscalerServiceClient( http.DefaultClient, baseUrl) - log.Printf("%v\n", nodesAddrs) - request := connect.NewRequest(&pb.ConnectClusterRequest{NodeGateway: baseUrl, Servers: nodesAddrs, Timeout: 10}) + log.Printf("Connecting Clusters:%v\n", nodesAddrs) + request := connect.NewRequest(&pb.ConnectClusterRequest{NodeGateway: baseUrl, Servers: nodesAddrs, Timeout: 30}) + // Add the Host that is needed for the ingress to route the autoscaler pod + request.Header().Set("Host", "autoscaler.local") + log.Printf("Sending Request to: %v\n", baseUrl) response, err := client.ConnectCluster(context.Background(), request) if err != nil { return fmt.Errorf("Error connecting to cluster:%v", err) diff --git a/package.json b/package.json index 7860d8b..a00c8cc 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "directories": { "test": "test" }, + "packageManager": "npm@10.7.0", "scripts": { "lint": "eslint pkg/**/*.\\{ts,js\\} test/**/*.\\{ts,js\\} --ignore-pattern **/*.config.ts", "fmt": "eslint --fix pkg/**/*.\\{ts,js\\} test/**/*.\\{ts,js\\} --ignore-pattern **/*.config.ts", @@ -49,10 +50,5 @@ "helia": "^4.2.1", "private-ip": "^3.0.2" }, - "workspaces": [ - "test/e2e/webui", - "pkg/ipfs-ts", - "pkg/proto-ts", - "pkg/abi-ts" - ] + "workspaces": ["test/e2e/webui", "pkg/ipfs-ts", "pkg/proto-ts", "pkg/abi-ts"] } diff --git a/pkg/proto/autoscaler.pb.go b/pkg/proto/autoscaler.pb.go index 8905450..a48379b 100644 --- a/pkg/proto/autoscaler.pb.go +++ b/pkg/proto/autoscaler.pb.go @@ -9,6 +9,7 @@ package proto import ( + _ "github.com/golang/protobuf/ptypes/empty" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -141,37 +142,96 @@ func (x *ConnectClusterResponse) GetError() string { return "" } +type TriggerNodeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PeerID string `protobuf:"bytes,1,opt,name=peerID,proto3" json:"peerID,omitempty"` +} + +func (x *TriggerNodeResponse) Reset() { + *x = TriggerNodeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_autoscaler_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TriggerNodeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TriggerNodeResponse) ProtoMessage() {} + +func (x *TriggerNodeResponse) ProtoReflect() protoreflect.Message { + mi := &file_autoscaler_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TriggerNodeResponse.ProtoReflect.Descriptor instead. +func (*TriggerNodeResponse) Descriptor() ([]byte, []int) { + return file_autoscaler_proto_rawDescGZIP(), []int{2} +} + +func (x *TriggerNodeResponse) GetPeerID() string { + if x != nil { + return x.PeerID + } + return "" +} + var File_autoscaler_proto protoreflect.FileDescriptor var file_autoscaler_proto_rawDesc = []byte{ 0x0a, 0x10, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x30, 0x2e, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, - 0x22, 0x6d, 0x0a, 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, - 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, - 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, - 0x48, 0x0a, 0x16, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, - 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, - 0x65, 0x73, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0x90, 0x01, 0x0a, 0x11, 0x41, 0x75, - 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, - 0x7b, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x12, 0x33, 0x2e, 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, 0x68, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2e, 0x76, 0x30, 0x2e, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, - 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x34, 0x2e, 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, - 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x30, 0x2e, 0x61, 0x75, 0x74, 0x6f, 0x73, - 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2c, 0x5a, 0x2a, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6d, 0x72, 0x61, - 0x64, 0x65, 0x2d, 0x63, 0x6f, 0x6f, 0x70, 0x2f, 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, 0x68, - 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6d, 0x0a, + 0x15, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x47, 0x61, + 0x74, 0x65, 0x77, 0x61, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6e, 0x6f, 0x64, + 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x48, 0x0a, 0x16, + 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, + 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x13, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, + 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x70, 0x65, 0x65, 0x72, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, + 0x65, 0x65, 0x72, 0x49, 0x44, 0x32, 0x87, 0x02, 0x0a, 0x11, 0x41, 0x75, 0x74, 0x6f, 0x73, 0x63, + 0x61, 0x6c, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7b, 0x0a, 0x0e, 0x43, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x33, 0x2e, + 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, + 0x30, 0x2e, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x34, 0x2e, 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, 0x68, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x30, 0x2e, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, + 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x75, 0x0a, 0x0b, 0x54, 0x72, 0x69, 0x67, + 0x67, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x33, 0x2e, 0x61, 0x70, 0x6f, 0x63, 0x72, 0x79, + 0x70, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x30, 0x2e, 0x61, 0x75, 0x74, 0x6f, + 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x31, 0x2e, 0x61, + 0x70, 0x6f, 0x63, 0x72, 0x79, 0x70, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x30, + 0x2e, 0x61, 0x75, 0x74, 0x6f, 0x73, 0x63, 0x61, 0x6c, 0x65, 0x72, 0x2e, 0x54, 0x72, 0x69, 0x67, + 0x67, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, + 0x6d, 0x72, 0x61, 0x64, 0x65, 0x2d, 0x63, 0x6f, 0x6f, 0x70, 0x2f, 0x61, 0x70, 0x6f, 0x63, 0x72, + 0x79, 0x70, 0x68, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -186,16 +246,19 @@ func file_autoscaler_proto_rawDescGZIP() []byte { return file_autoscaler_proto_rawDescData } -var file_autoscaler_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_autoscaler_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_autoscaler_proto_goTypes = []interface{}{ (*ConnectClusterRequest)(nil), // 0: apocryph.proto.v0.autoscaler.ConnectClusterRequest (*ConnectClusterResponse)(nil), // 1: apocryph.proto.v0.autoscaler.ConnectClusterResponse + (*TriggerNodeResponse)(nil), // 2: apocryph.proto.v0.autoscaler.TriggerNodeResponse } var file_autoscaler_proto_depIdxs = []int32{ 0, // 0: apocryph.proto.v0.autoscaler.AutoscalerService.ConnectCluster:input_type -> apocryph.proto.v0.autoscaler.ConnectClusterRequest - 1, // 1: apocryph.proto.v0.autoscaler.AutoscalerService.ConnectCluster:output_type -> apocryph.proto.v0.autoscaler.ConnectClusterResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type + 0, // 1: apocryph.proto.v0.autoscaler.AutoscalerService.TriggerNode:input_type -> apocryph.proto.v0.autoscaler.ConnectClusterRequest + 1, // 2: apocryph.proto.v0.autoscaler.AutoscalerService.ConnectCluster:output_type -> apocryph.proto.v0.autoscaler.ConnectClusterResponse + 2, // 3: apocryph.proto.v0.autoscaler.AutoscalerService.TriggerNode:output_type -> apocryph.proto.v0.autoscaler.TriggerNodeResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -231,6 +294,18 @@ func file_autoscaler_proto_init() { return nil } } + file_autoscaler_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TriggerNodeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -238,7 +313,7 @@ func file_autoscaler_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_autoscaler_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/proto/protoconnect/autoscaler.connect.go b/pkg/proto/protoconnect/autoscaler.connect.go index 50cc945..14e0f39 100644 --- a/pkg/proto/protoconnect/autoscaler.connect.go +++ b/pkg/proto/protoconnect/autoscaler.connect.go @@ -38,18 +38,23 @@ const ( // AutoscalerServiceConnectClusterProcedure is the fully-qualified name of the AutoscalerService's // ConnectCluster RPC. AutoscalerServiceConnectClusterProcedure = "/apocryph.proto.v0.autoscaler.AutoscalerService/ConnectCluster" + // AutoscalerServiceTriggerNodeProcedure is the fully-qualified name of the AutoscalerService's + // TriggerNode RPC. + AutoscalerServiceTriggerNodeProcedure = "/apocryph.proto.v0.autoscaler.AutoscalerService/TriggerNode" ) // These variables are the protoreflect.Descriptor objects for the RPCs defined in this package. var ( autoscalerServiceServiceDescriptor = proto.File_autoscaler_proto.Services().ByName("AutoscalerService") autoscalerServiceConnectClusterMethodDescriptor = autoscalerServiceServiceDescriptor.Methods().ByName("ConnectCluster") + autoscalerServiceTriggerNodeMethodDescriptor = autoscalerServiceServiceDescriptor.Methods().ByName("TriggerNode") ) // AutoscalerServiceClient is a client for the apocryph.proto.v0.autoscaler.AutoscalerService // service. type AutoscalerServiceClient interface { ConnectCluster(context.Context, *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.ConnectClusterResponse], error) + TriggerNode(context.Context, *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.TriggerNodeResponse], error) } // NewAutoscalerServiceClient constructs a client for the @@ -69,12 +74,19 @@ func NewAutoscalerServiceClient(httpClient connect.HTTPClient, baseURL string, o connect.WithSchema(autoscalerServiceConnectClusterMethodDescriptor), connect.WithClientOptions(opts...), ), + triggerNode: connect.NewClient[proto.ConnectClusterRequest, proto.TriggerNodeResponse]( + httpClient, + baseURL+AutoscalerServiceTriggerNodeProcedure, + connect.WithSchema(autoscalerServiceTriggerNodeMethodDescriptor), + connect.WithClientOptions(opts...), + ), } } // autoscalerServiceClient implements AutoscalerServiceClient. type autoscalerServiceClient struct { connectCluster *connect.Client[proto.ConnectClusterRequest, proto.ConnectClusterResponse] + triggerNode *connect.Client[proto.ConnectClusterRequest, proto.TriggerNodeResponse] } // ConnectCluster calls apocryph.proto.v0.autoscaler.AutoscalerService.ConnectCluster. @@ -82,10 +94,16 @@ func (c *autoscalerServiceClient) ConnectCluster(ctx context.Context, req *conne return c.connectCluster.CallUnary(ctx, req) } +// TriggerNode calls apocryph.proto.v0.autoscaler.AutoscalerService.TriggerNode. +func (c *autoscalerServiceClient) TriggerNode(ctx context.Context, req *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.TriggerNodeResponse], error) { + return c.triggerNode.CallUnary(ctx, req) +} + // AutoscalerServiceHandler is an implementation of the // apocryph.proto.v0.autoscaler.AutoscalerService service. type AutoscalerServiceHandler interface { ConnectCluster(context.Context, *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.ConnectClusterResponse], error) + TriggerNode(context.Context, *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.TriggerNodeResponse], error) } // NewAutoscalerServiceHandler builds an HTTP handler from the service implementation. It returns @@ -100,10 +118,18 @@ func NewAutoscalerServiceHandler(svc AutoscalerServiceHandler, opts ...connect.H connect.WithSchema(autoscalerServiceConnectClusterMethodDescriptor), connect.WithHandlerOptions(opts...), ) + autoscalerServiceTriggerNodeHandler := connect.NewUnaryHandler( + AutoscalerServiceTriggerNodeProcedure, + svc.TriggerNode, + connect.WithSchema(autoscalerServiceTriggerNodeMethodDescriptor), + connect.WithHandlerOptions(opts...), + ) return "/apocryph.proto.v0.autoscaler.AutoscalerService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case AutoscalerServiceConnectClusterProcedure: autoscalerServiceConnectClusterHandler.ServeHTTP(w, r) + case AutoscalerServiceTriggerNodeProcedure: + autoscalerServiceTriggerNodeHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -116,3 +142,7 @@ type UnimplementedAutoscalerServiceHandler struct{} func (UnimplementedAutoscalerServiceHandler) ConnectCluster(context.Context, *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.ConnectClusterResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("apocryph.proto.v0.autoscaler.AutoscalerService.ConnectCluster is not implemented")) } + +func (UnimplementedAutoscalerServiceHandler) TriggerNode(context.Context, *connect.Request[proto.ConnectClusterRequest]) (*connect.Response[proto.TriggerNodeResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("apocryph.proto.v0.autoscaler.AutoscalerService.TriggerNode is not implemented")) +} diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 7ffa6a6..eff6e0d 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p" raftp2p "github.com/libp2p/go-libp2p-raft" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" ) @@ -34,11 +35,12 @@ type RaftNode struct { LogStore raft.LogStore } -func NewRaftNode(host host.Host, peers []*host.Host, raftDir string) (*RaftNode, error) { +func NewRaftNode(host host.Host, peers []*peer.AddrInfo, raftDir string) (*RaftNode, error) { for _, peerPtr := range peers { peer := *peerPtr - if peer.Addrs()[0].String() != host.Addrs()[0].String() { - host.Peerstore().AddAddrs(peer.ID(), peer.Addrs(), peerstore.PermanentAddrTTL) + // skip adding self + if peer.Addrs[0].String() != host.Addrs()[0].String() { + host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) } } @@ -79,8 +81,8 @@ func NewRaftNode(host host.Host, peers []*host.Host, raftDir string) (*RaftNode, peer := *peerPtr servers = append(servers, raft.Server{ Suffrage: raft.Voter, - ID: raft.ServerID(peer.ID().String()), - Address: raft.ServerAddress(peer.ID().String()), + ID: raft.ServerID(peer.ID.String()), + Address: raft.ServerAddress(peer.ID.String()), }) } serversCfg := raft.Configuration{Servers: servers} diff --git a/proto/autoscaler.proto b/proto/autoscaler.proto index 7b6e31c..362a21e 100644 --- a/proto/autoscaler.proto +++ b/proto/autoscaler.proto @@ -5,19 +5,25 @@ package apocryph.proto.v0.autoscaler; option go_package = "github.com/comrade-coop/apocryph/pkg/proto"; +import "google/protobuf/empty.proto"; service AutoscalerService { rpc ConnectCluster(ConnectClusterRequest) returns (ConnectClusterResponse); + rpc TriggerNode(ConnectClusterRequest) returns (TriggerNodeResponse); } -message ConnectClusterRequest{ +message ConnectClusterRequest { // could get it from within the cluster maybe? string nodeGateway = 1; repeated string servers = 2; uint32 timeout = 3; } -message ConnectClusterResponse{ +message ConnectClusterResponse { bool success = 1; string error = 2; } + +message TriggerNodeResponse { + string peerID =1; +} diff --git a/test/e2e/autoscaler/run-test.sh b/test/e2e/autoscaler/run-test.sh index 78e95a0..5067acb 100755 --- a/test/e2e/autoscaler/run-test.sh +++ b/test/e2e/autoscaler/run-test.sh @@ -1,6 +1,9 @@ #!/bin/bash cd "$(dirname "$0")" +set -v + +sudo chmod o+rw /run/containerd/containerd.sock trap 'kill $(jobs -p) &>/dev/null' EXIT @@ -13,7 +16,6 @@ which minikube >/dev/null; which helmfile >/dev/null; which helm >/dev/null; whi which docker >/dev/null which virtualbox >/dev/null -set -v # based on https://stackoverflow.com/a/31269848 / https://bobcopeland.com/blog/2012/10/goto-in-bash/ if [ -n "$1" ]; then @@ -22,7 +24,6 @@ if [ -n "$1" ]; then exit fi -sudo chmod o+rw /run/containerd/containerd.sock ## 0: Set up the external environment @@ -62,6 +63,8 @@ helmfile sync -f ../minikube || { while ! kubectl get -n keda endpoints ingress- minikube profile list +sleep 5 + ## 4.0: Register the providers in the marketplace minikube profile c1 @@ -112,19 +115,46 @@ go run ../../../cmd/trustedpods registry get --config ../../integration/registry --ipfs /ip4/127.0.0.1/tcp/5004 \ ## 5.0: Deploy the autoscaler to the providers using their p2p multiaddr -minikube profile c1 +set -v +rm -f ~/.apocryph/deployment/* +PAYMENT_CONTRACT=$(cat ../../../contracts/broadcast/Deploy.s.sol/31337/run-latest.json | jq -r '.returns.payment.value') +REGISTRY_CONTRACT=$(cat ../../../contracts/broadcast/Deploy.s.sol/31337/run-latest.json | jq -r '.returns.registry.value') +PUBLISHER_KEY=$(docker logs anvil | awk '/Private Keys/ {flag=1; next} flag && /^\(2\)/ {print $2; exit}') +FUNDS=10000000000000000000000 + +minikube profile c1 source swarm-connect.sh PROVIDER_ETH=0x70997970C51812dc3A010C7d01b50e0d17dc79C8 #TODO= anvil.accounts[1] +echo $PROVIDER_IPFS + +go run ../../../cmd/trustedpods/ pod deploy ../common/manifest-autoscaler.yaml \ + --ethereum-key "$PUBLISHER_KEY" \ + --payment-contract "$PAYMENT_CONTRACT" \ + --registry-contract "$REGISTRY_CONTRACT" \ + --funds "$FUNDS" \ + --upload-images=true \ + --mint-funds \ + --provider /p2p/"$PROVIDER_IPFS" \ + --provider-eth "$PROVIDER_ETH" + +sleep 5 + +## 5.2: deploy to the second cluster +minikube profile c2 +source swarm-connect.sh +# for now just remove the deployment file to avoid uploading instead of deploying +rm -f ~/.apocryph/deployment/* + PUBLISHER_KEY=$(docker logs anvil | awk '/Private Keys/ {flag=1; next} flag && /^\(2\)/ {print $2; exit}') PAYMENT_CONTRACT=$(cat ../../../contracts/broadcast/Deploy.s.sol/31337/run-latest.json | jq -r '.returns.payment.value') REGISTRY_CONTRACT=$(cat ../../../contracts/broadcast/Deploy.s.sol/31337/run-latest.json | jq -r '.returns.registry.value') FUNDS=10000000000000000000000 -set +v -set -x +PROVIDER_ETH=0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f #TODO= anvil.accounts[7] +echo $PROVIDER_IPFS go run ../../../cmd/trustedpods/ pod deploy ../common/manifest-autoscaler.yaml \ --ethereum-key "$PUBLISHER_KEY" \ @@ -136,4 +166,40 @@ go run ../../../cmd/trustedpods/ pod deploy ../common/manifest-autoscaler.yaml \ --provider /p2p/"$PROVIDER_IPFS" \ --provider-eth "$PROVIDER_ETH" +sleep 5 + +## 5.3: deploy to the third cluster +minikube profile c3 +source swarm-connect.sh +rm -f ~/.apocryph/deployment/* + +PUBLISHER_KEY=$(docker logs anvil | awk '/Private Keys/ {flag=1; next} flag && /^\(2\)/ {print $2; exit}') +PAYMENT_CONTRACT=$(cat ../../../contracts/broadcast/Deploy.s.sol/31337/run-latest.json | jq -r '.returns.payment.value') +REGISTRY_CONTRACT=$(cat ../../../contracts/broadcast/Deploy.s.sol/31337/run-latest.json | jq -r '.returns.registry.value') +FUNDS=10000000000000000000000 + +PROVIDER_ETH=0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 #TODO= anvil.accounts[8] +FUNDS=10000000000000000000000 + +echo $PROVIDER_IPFS + +go run ../../../cmd/trustedpods/ pod deploy ../common/manifest-autoscaler.yaml \ + --ethereum-key "$PUBLISHER_KEY" \ + --payment-contract "$PAYMENT_CONTRACT" \ + --registry-contract "$REGISTRY_CONTRACT" \ + --funds "$FUNDS" \ + --upload-images=true \ + --mint-funds \ + --provider /p2p/"$PROVIDER_IPFS" \ + --provider-eth "$PROVIDER_ETH" + +## 6.0: Connect the cluster + +minikube profile c1 +C1_INGRESS_URL=$(minikube service -n keda ingress-nginx-controller --url=true | head -n 1); echo $C1_INGRESS_URL +minikube profile c2 +C2_INGRESS_URL=$(minikube service -n keda ingress-nginx-controller --url=true | head -n 1); echo $C2_INGRESS_URL +minikube profile c3 +C3_INGRESS_URL=$(minikube service -n keda ingress-nginx-controller --url=true | head -n 1); echo $C3_INGRESS_URL +go run ../../../cmd/trustedpods/ autoscale --url "$C1_INGRESS_URL" --providers "$C1_INGRESS_URL","$C2_INGRESS_URL","$C3_INGRESS_URL" diff --git a/test/e2e/autoscaler/swarm-connect.sh b/test/e2e/autoscaler/swarm-connect.sh index 75e0165..1abc8c5 100755 --- a/test/e2e/autoscaler/swarm-connect.sh +++ b/test/e2e/autoscaler/swarm-connect.sh @@ -1,15 +1,20 @@ #!/bin/bash ## Configure provider/in-cluster IPFS and publisher IPFS ## -pkill -f "kubectl port-forward" pkill ipfs +pkill -f "kubectl port-forward" + +PORT_5004="" +PROVIDER_IPFS="" +IPFS_DAEMON="" { while ! kubectl get -n ipfs endpoints ipfs-rpc -o json | jq '.subsets[].addresses[].ip' &>/dev/null; do sleep 1; done; } O_IPFS_PATH=$IPFS_PATH export IPFS_PATH=$(mktemp ipfs.XXXX --tmpdir -d) -[ "$PORT_5004" == "" ] && { PORT_5004="yes" ; kubectl port-forward --namespace ipfs svc/ipfs-rpc 5004:5001 & sleep 0.5; } +kubectl port-forward --namespace ipfs svc/ipfs-rpc 5004:5001 & + echo /ip4/127.0.0.1/tcp/5004 > $IPFS_PATH/api SWARM_ADDRESSES=$(minikube service -n ipfs ipfs-swarm --url | head -n 1 | sed -E 's|http://(.+):(.+)|["/ip4/\1/tcp/\2", "/ip4/\1/udp/\2/quic", "/ip4/\1/udp/\2/quic-v1", "/ip4/\1/udp/\2/quic-v1/webtransport"]|') @@ -30,9 +35,8 @@ ipfs id &>/dev/null || ipfs init ipfs config --json Experimental.Libp2pStreamMounting true -[ -n "$IPFS_DAEMON" ] || { IPFS_DAEMON=yes; ipfs daemon & { while ! [ -f ${IPFS_PATH:-~/.ipfs}/api ]; do sleep 0.1; done; } 2>/dev/null; } +ipfs daemon & { while ! [ -f ${IPFS_PATH:-~/.ipfs}/api ]; do sleep 0.1; done; } echo "$SWARM_ADDRESSES" | jq -r '.[] + "/p2p/'"$PROVIDER_IPFS"'"' | xargs -n 1 ipfs swarm connect || true sleep 5 - diff --git a/test/integration/raft/main.go b/test/integration/raft/main.go index eb2c177..d6bbe42 100644 --- a/test/integration/raft/main.go +++ b/test/integration/raft/main.go @@ -5,7 +5,7 @@ import ( "time" "github.com/comrade-coop/apocryph/pkg/raft" - "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" ) func main() { @@ -21,7 +21,21 @@ func main() { defer peer2.Close() defer peer3.Close() - peers := []*host.Host{&peer1, &peer2, &peer3} + peerInfo1, err := peer.AddrInfoFromString(fmt.Sprintf("/ip4/127.0.0.1/tcp/9997/p2p/%v", peer1.ID())) + if err != nil { + panic("failed to create AddrInfo from address") + } + peerInfo2, err := peer.AddrInfoFromString(fmt.Sprintf("/ip4/127.0.0.1/tcp/9998/p2p/%v", peer2.ID())) + if err != nil { + panic("failed to create AddrInfo from address") + } + peerInfo3, err := peer.AddrInfoFromString(fmt.Sprintf("/ip4/127.0.0.1/tcp/9999/p2p/%v", peer3.ID())) + if err != nil { + panic("failed to create AddrInfo from address") + } + + peers := []*peer.AddrInfo{peerInfo1, peerInfo2, peerInfo3} + node1, err := raft.NewRaftNode(peer1, peers, "") if err != nil { fmt.Printf("Error:%s", err) diff --git a/turbo.json b/turbo.json index 3c2aa64..5b80432 100644 --- a/turbo.json +++ b/turbo.json @@ -1,6 +1,6 @@ { "$schema": "https://turbo.build/schema.json", - "pipeline": { + "tasks": { "build": { "dependsOn": ["//#build-images"] },