diff --git a/config/config.go b/config/config.go index 75129622..36161833 100644 --- a/config/config.go +++ b/config/config.go @@ -95,15 +95,12 @@ func DefaultConfig() *Config { RawRESTListener: fmt.Sprintf("localhost:%d", defaultRESTPort), GtwConnTimeout: defaultGatewayConnectionTimeout, Service: &service.Config{ - Genesis: defaultGenesisTime, - EpochDuration: defaultEpochDuration, - PhaseShift: defaultPhaseShift, - CycleGap: defaultCycleGap, - MemoryLayers: defaultMemoryLayers, - ConnAcksThreshold: defaultConnAcksThreshold, - BroadcastAcksThreshold: defaultBroadcastAcksThreshold, - BroadcastNumRetries: defaultBroadcastNumRetries, - BroadcastRetriesInterval: defaultBroadcastRetriesInterval, + Genesis: defaultGenesisTime, + EpochDuration: defaultEpochDuration, + PhaseShift: defaultPhaseShift, + CycleGap: defaultCycleGap, + MemoryLayers: defaultMemoryLayers, + ConnAcksThreshold: defaultConnAcksThreshold, }, CoreService: &coreServiceConfig{ MemoryLayers: defaultMemoryLayers, diff --git a/gateway/broadcaster/broadcaster.go b/gateway/broadcaster/broadcaster.go deleted file mode 100644 index 89a36f93..00000000 --- a/gateway/broadcaster/broadcaster.go +++ /dev/null @@ -1,120 +0,0 @@ -package broadcaster - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" - "github.com/spacemeshos/smutil/log" - "google.golang.org/genproto/googleapis/rpc/code" - "google.golang.org/grpc" - - "github.com/spacemeshos/poet/gateway" -) - -const DefaultBroadcastTimeout = 30 * time.Second - -// Broadcaster is responsible for broadcasting proofs to a list of Spacemesh-compatible gateway nodes. -type Broadcaster struct { - clients []pb.GatewayServiceClient - connections []*grpc.ClientConn - broadcastTimeout time.Duration - broadcastAckThreshold uint -} - -// New instantiate a new Broadcaster for a given list of gateway nodes addresses. -// disableBroadcast allows to create a disabled Broadcaster instance. -// connTimeout set the timeout per gRPC connection attempt to a node. -// broadcastTimeout set the timeout per proof broadcast. -// broadcastAcksThreshold set the lower-bound of required successful proof broadcasts. If not met, a warning will be logged. -func New(connections []*grpc.ClientConn, disableBroadcast bool, broadcastTimeout time.Duration, broadcastAcksThreshold uint) (*Broadcaster, error) { - if disableBroadcast { - log.Info("Broadcast is disabled") - return &Broadcaster{}, nil - } - - if broadcastAcksThreshold < 1 { - return nil, errors.New("successful broadcast threshold must be greater than 0") - } - if len(connections) < int(broadcastAcksThreshold) { - return nil, fmt.Errorf("number of gateway connections (%d) must be greater or equal than the successful broadcast threshold (%d)", len(connections), broadcastAcksThreshold) - } - - clients := make([]pb.GatewayServiceClient, len(connections)) - for i, conn := range connections { - clients[i] = pb.NewGatewayServiceClient(conn) - } - - return &Broadcaster{ - clients: clients, - connections: connections, - broadcastTimeout: broadcastTimeout, - broadcastAckThreshold: broadcastAcksThreshold, - }, nil -} - -// BroadcastProof broadcasts a serialized proof of a given round. -func (b *Broadcaster) BroadcastProof(msg []byte, roundID string, members [][]byte) error { - if b.clients == nil { - log.Info("Broadcast is disabled, not broadcasting round %v proof", roundID) - return nil - } - pbMsg := &pb.BroadcastPoetRequest{Data: msg} - ctx, cancel := context.WithTimeout(context.Background(), b.broadcastTimeout) - defer cancel() - - responses := make([]*pb.BroadcastPoetResponse, len(b.clients)) - errs := make([]error, len(b.clients)) - - start := time.Now() - var wg sync.WaitGroup - wg.Add(len(b.clients)) - for i, client := range b.clients { - i := i - client := client - go func() { - defer wg.Done() - responses[i], errs[i] = client.BroadcastPoet(ctx, pbMsg) - }() - } - wg.Wait() - elapsed := time.Since(start) - - // Count successful responses and concatenate errors of non-successful ones. - var numAcks int - var errors []error - for i := range b.clients { - res := responses[i] - err := errs[i] - - if err != nil { - err = fmt.Errorf("failed to broadcast via gateway node at \"%v\" after %v: %w", - b.connections[i].Target(), elapsed, err) - } else if code.Code(res.Status.Code) != code.Code_OK { - err = fmt.Errorf("failed to broadcast via gateway node at \"%v\" after %v: node response: %s (%s)", - b.connections[i].Target(), elapsed, code.Code(res.Status.Code).String(), res.Status.GetMessage()) - } else { - // Valid response. - numAcks++ - continue - } - errors = append(errors, err) - } - - retErr := gateway.NewMultiError(errors) - // If some requests failed, log it. - if len(errors) > 0 { - log.Warning("Round %v proof broadcast failed on %d/%d gateway nodes: %v", roundID, len(errors), len(b.clients), retErr) - } - - // If successful broadcasts threshold wasn't met, return error - if numAcks < int(b.broadcastAckThreshold) { - return retErr - } - - log.Info("Round %v proof broadcast completed successfully after %v, num of members: %d, proof size: %d", roundID, elapsed, len(members), len(msg)) - return nil -} diff --git a/gateway/broadcaster/broadcaster_test.go b/gateway/broadcaster/broadcaster_test.go deleted file mode 100644 index 5936f022..00000000 --- a/gateway/broadcaster/broadcaster_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package broadcaster - -import ( - "testing" - - "github.com/stretchr/testify/require" - "google.golang.org/grpc" -) - -func TestNew(t *testing.T) { - req := require.New(t) - - b, err := New([]*grpc.ClientConn(nil), true, DefaultBroadcastTimeout, 0) - req.NotNil(b) - req.NoError(err) - - b, err = New([]*grpc.ClientConn(nil), false, DefaultBroadcastTimeout, 0) - req.Nil(b) - req.EqualError(err, "successful broadcast threshold must be greater than 0") - - b, err = New(make([]*grpc.ClientConn, 1), false, DefaultBroadcastTimeout, 2) - req.Nil(b) - req.EqualError(err, "number of gateway connections (1) must be greater or equal than the successful broadcast threshold (2)") -} diff --git a/integration/server.go b/integration/server.go index f18fd161..470854a3 100644 --- a/integration/server.go +++ b/integration/server.go @@ -24,7 +24,6 @@ type ServerConfig struct { CycleGap time.Duration DebugLog bool Reset bool - DisableBroadcast bool RESTListen string GatewayAddresses []string GtwConnTimeout time.Duration @@ -79,9 +78,6 @@ func (cfg *ServerConfig) genArgs() []string { if cfg.Reset { args = append(args, "--reset") } - if cfg.DisableBroadcast { - args = append(args, "--disablebroadcast") - } return args } diff --git a/poet.go b/poet.go index 2a8e1d7a..d79ea1d9 100644 --- a/poet.go +++ b/poet.go @@ -92,9 +92,12 @@ func poetMain() error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() - if err := server.StartServer(ctx, cfg); err != nil { - log.Error("failed to start server: %v", err) - return err + server, err := server.New(*cfg) + if err != nil { + return fmt.Errorf("failed to create server: %w", err) + } + if err := server.Start(ctx); err != nil { + return fmt.Errorf("failure in server: %w", err) } return nil diff --git a/poet_test.go b/poet_test.go index b0d8d3cc..ca646d27 100644 --- a/poet_test.go +++ b/poet_test.go @@ -130,7 +130,6 @@ func TestHarness_CrashRecovery(t *testing.T) { req.NoError(err) cfg.Genesis = time.Now().Add(5 * time.Second) cfg.Reset = true - cfg.DisableBroadcast = true cfg.GatewayAddresses = []string{target} cfg.GtwConnTimeout = time.Second diff --git a/release/proto/go/rpc/api/api.pb.go b/release/proto/go/rpc/api/api.pb.go index dc9b681d..78b29cbf 100644 --- a/release/proto/go/rpc/api/api.pb.go +++ b/release/proto/go/rpc/api/api.pb.go @@ -10,6 +10,7 @@ import ( _ "google.golang.org/genproto/googleapis/api/annotations" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" reflect "reflect" sync "sync" ) @@ -299,8 +300,9 @@ type SubmitResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - RoundId string `protobuf:"bytes,1,opt,name=roundId,proto3" json:"roundId,omitempty"` - Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` + RoundId string `protobuf:"bytes,1,opt,name=roundId,proto3" json:"roundId,omitempty"` + Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` + RoundEnd *durationpb.Duration `protobuf:"bytes,3,opt,name=round_end,json=roundEnd,proto3" json:"round_end,omitempty"` } func (x *SubmitResponse) Reset() { @@ -349,6 +351,13 @@ func (x *SubmitResponse) GetHash() []byte { return nil } +func (x *SubmitResponse) GetRoundEnd() *durationpb.Duration { + if x != nil { + return x.RoundEnd + } + return nil +} + type GetInfoRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -513,20 +522,83 @@ func (x *MembershipProof) GetProof() [][]byte { return nil } +type MerkleProof struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Root []byte `protobuf:"bytes,1,opt,name=root,proto3" json:"root,omitempty"` + ProvenLeaves [][]byte `protobuf:"bytes,2,rep,name=proven_leaves,json=provenLeaves,proto3" json:"proven_leaves,omitempty"` + ProofNodes [][]byte `protobuf:"bytes,3,rep,name=proof_nodes,json=proofNodes,proto3" json:"proof_nodes,omitempty"` +} + +func (x *MerkleProof) Reset() { + *x = MerkleProof{} + if protoimpl.UnsafeEnabled { + mi := &file_rpc_api_api_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MerkleProof) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MerkleProof) ProtoMessage() {} + +func (x *MerkleProof) ProtoReflect() protoreflect.Message { + mi := &file_rpc_api_api_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MerkleProof.ProtoReflect.Descriptor instead. +func (*MerkleProof) Descriptor() ([]byte, []int) { + return file_rpc_api_api_proto_rawDescGZIP(), []int{9} +} + +func (x *MerkleProof) GetRoot() []byte { + if x != nil { + return x.Root + } + return nil +} + +func (x *MerkleProof) GetProvenLeaves() [][]byte { + if x != nil { + return x.ProvenLeaves + } + return nil +} + +func (x *MerkleProof) GetProofNodes() [][]byte { + if x != nil { + return x.ProofNodes + } + return nil +} + type PoetProof struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Phi []byte `protobuf:"bytes,1,opt,name=phi,proto3" json:"phi,omitempty"` - ProvenLeaves [][]byte `protobuf:"bytes,2,rep,name=provenLeaves,proto3" json:"provenLeaves,omitempty"` - ProofNodes [][]byte `protobuf:"bytes,3,rep,name=proofNodes,proto3" json:"proofNodes,omitempty"` + Proof *MerkleProof `protobuf:"bytes,1,opt,name=proof,proto3" json:"proof,omitempty"` + Members [][]byte `protobuf:"bytes,2,rep,name=members,proto3" json:"members,omitempty"` + Leaves uint64 `protobuf:"varint,3,opt,name=leaves,proto3" json:"leaves,omitempty"` } func (x *PoetProof) Reset() { *x = PoetProof{} if protoimpl.UnsafeEnabled { - mi := &file_rpc_api_api_proto_msgTypes[9] + mi := &file_rpc_api_api_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -539,7 +611,7 @@ func (x *PoetProof) String() string { func (*PoetProof) ProtoMessage() {} func (x *PoetProof) ProtoReflect() protoreflect.Message { - mi := &file_rpc_api_api_proto_msgTypes[9] + mi := &file_rpc_api_api_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -552,26 +624,128 @@ func (x *PoetProof) ProtoReflect() protoreflect.Message { // Deprecated: Use PoetProof.ProtoReflect.Descriptor instead. func (*PoetProof) Descriptor() ([]byte, []int) { - return file_rpc_api_api_proto_rawDescGZIP(), []int{9} + return file_rpc_api_api_proto_rawDescGZIP(), []int{10} } -func (x *PoetProof) GetPhi() []byte { +func (x *PoetProof) GetProof() *MerkleProof { if x != nil { - return x.Phi + return x.Proof } return nil } -func (x *PoetProof) GetProvenLeaves() [][]byte { +func (x *PoetProof) GetMembers() [][]byte { if x != nil { - return x.ProvenLeaves + return x.Members } return nil } -func (x *PoetProof) GetProofNodes() [][]byte { +func (x *PoetProof) GetLeaves() uint64 { if x != nil { - return x.ProofNodes + return x.Leaves + } + return 0 +} + +type GetProofRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RoundId string `protobuf:"bytes,1,opt,name=round_id,json=roundId,proto3" json:"round_id,omitempty"` +} + +func (x *GetProofRequest) Reset() { + *x = GetProofRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_rpc_api_api_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProofRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProofRequest) ProtoMessage() {} + +func (x *GetProofRequest) ProtoReflect() protoreflect.Message { + mi := &file_rpc_api_api_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProofRequest.ProtoReflect.Descriptor instead. +func (*GetProofRequest) Descriptor() ([]byte, []int) { + return file_rpc_api_api_proto_rawDescGZIP(), []int{11} +} + +func (x *GetProofRequest) GetRoundId() string { + if x != nil { + return x.RoundId + } + return "" +} + +type GetProofResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Proof *PoetProof `protobuf:"bytes,1,opt,name=proof,proto3" json:"proof,omitempty"` + Pubkey []byte `protobuf:"bytes,2,opt,name=pubkey,proto3" json:"pubkey,omitempty"` +} + +func (x *GetProofResponse) Reset() { + *x = GetProofResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_rpc_api_api_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetProofResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetProofResponse) ProtoMessage() {} + +func (x *GetProofResponse) ProtoReflect() protoreflect.Message { + mi := &file_rpc_api_api_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetProofResponse.ProtoReflect.Descriptor instead. +func (*GetProofResponse) Descriptor() ([]byte, []int) { + return file_rpc_api_api_proto_rawDescGZIP(), []int{12} +} + +func (x *GetProofResponse) GetProof() *PoetProof { + if x != nil { + return x.Proof + } + return nil +} + +func (x *GetProofResponse) GetPubkey() []byte { + if x != nil { + return x.Pubkey } return nil } @@ -582,6 +756,8 @@ var file_rpc_api_api_proto_rawDesc = []byte{ 0x0a, 0x11, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x70, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x61, 0x70, 0x69, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xcc, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x10, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, @@ -616,60 +792,84 @@ var file_rpc_api_api_proto_rawDesc = []byte{ 0x65, 0x6e, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x63, 0x68, 0x61, 0x6c, 0x6c, 0x65, 0x6e, 0x67, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, - 0x75, 0x72, 0x65, 0x22, 0x3e, 0x0a, 0x0e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, + 0x75, 0x72, 0x65, 0x22, 0x76, 0x0a, 0x0e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x68, - 0x61, 0x73, 0x68, 0x22, 0x10, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x89, 0x01, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x66, - 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x6f, 0x70, 0x65, - 0x6e, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, - 0x6f, 0x70, 0x65, 0x6e, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x12, 0x65, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x73, 0x49, 0x64, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x12, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, - 0x6e, 0x67, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x73, 0x49, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x75, 0x62, 0x4b, 0x65, - 0x79, 0x22, 0x51, 0x0a, 0x0f, 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x50, - 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, - 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x72, 0x6f, 0x6f, 0x74, 0x12, 0x14, - 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x05, 0x70, - 0x72, 0x6f, 0x6f, 0x66, 0x22, 0x61, 0x0a, 0x09, 0x50, 0x6f, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, - 0x66, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x68, 0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, - 0x70, 0x68, 0x69, 0x12, 0x22, 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x76, 0x65, 0x6e, 0x4c, 0x65, 0x61, - 0x76, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0c, 0x70, 0x72, 0x6f, 0x76, 0x65, - 0x6e, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x6f, 0x66, - 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x72, 0x6f, - 0x6f, 0x66, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x32, 0xc4, 0x02, 0x0a, 0x04, 0x50, 0x6f, 0x65, 0x74, - 0x12, 0x44, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, - 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x61, - 0x70, 0x69, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x14, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0e, 0x22, 0x09, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, - 0x61, 0x72, 0x74, 0x3a, 0x01, 0x2a, 0x12, 0x64, 0x0a, 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x12, 0x19, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x47, - 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1c, - 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x16, 0x22, 0x11, 0x2f, 0x76, 0x31, 0x2f, 0x75, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x3a, 0x01, 0x2a, 0x12, 0x48, 0x0a, 0x06, - 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x75, 0x62, - 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x15, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0f, 0x22, 0x0a, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x75, 0x62, - 0x6d, 0x69, 0x74, 0x3a, 0x01, 0x2a, 0x12, 0x46, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x66, - 0x6f, 0x12, 0x13, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, - 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x10, 0x82, 0xd3, - 0xe4, 0x93, 0x02, 0x0a, 0x12, 0x08, 0x2f, 0x76, 0x31, 0x2f, 0x69, 0x6e, 0x66, 0x6f, 0x42, 0x75, - 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x2e, 0x61, 0x70, 0x69, 0x42, 0x08, 0x41, 0x70, 0x69, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x73, 0x70, 0x61, 0x63, 0x65, 0x6d, 0x65, 0x73, 0x68, 0x6f, 0x73, 0x2f, 0x70, 0x6f, - 0x65, 0x74, 0x2f, 0x72, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0xa2, 0x02, 0x03, 0x41, 0x58, - 0x58, 0xaa, 0x02, 0x03, 0x41, 0x70, 0x69, 0xca, 0x02, 0x03, 0x41, 0x70, 0x69, 0xe2, 0x02, 0x0f, - 0x41, 0x70, 0x69, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, - 0x02, 0x03, 0x41, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x73, 0x68, 0x12, 0x36, 0x0a, 0x09, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x65, 0x6e, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x52, 0x08, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x45, 0x6e, 0x64, 0x22, 0x10, 0x0a, 0x0e, 0x47, + 0x65, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x89, 0x01, + 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x6f, 0x70, 0x65, 0x6e, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6f, 0x70, 0x65, 0x6e, 0x52, 0x6f, 0x75, 0x6e, + 0x64, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x12, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6e, 0x67, + 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x73, 0x49, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x12, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x52, 0x6f, 0x75, 0x6e, 0x64, 0x73, + 0x49, 0x64, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x50, 0x75, + 0x62, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x22, 0x51, 0x0a, 0x0f, 0x4d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x73, 0x68, 0x69, 0x70, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x14, 0x0a, 0x05, + 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x04, 0x72, 0x6f, 0x6f, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, + 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x22, 0x67, 0x0a, 0x0b, + 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x12, 0x0a, 0x04, 0x72, + 0x6f, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x72, 0x6f, 0x6f, 0x74, 0x12, + 0x23, 0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x76, 0x65, 0x6e, 0x5f, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0c, 0x70, 0x72, 0x6f, 0x76, 0x65, 0x6e, 0x4c, 0x65, + 0x61, 0x76, 0x65, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x5f, 0x6e, 0x6f, + 0x64, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x6f, 0x66, + 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x22, 0x65, 0x0a, 0x09, 0x50, 0x6f, 0x65, 0x74, 0x50, 0x72, 0x6f, + 0x6f, 0x66, 0x12, 0x26, 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x10, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x50, 0x72, + 0x6f, 0x6f, 0x66, 0x52, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, + 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x22, 0x2c, 0x0a, 0x0f, + 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x19, 0x0a, 0x08, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x49, 0x64, 0x22, 0x50, 0x0a, 0x10, 0x47, 0x65, + 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, + 0x0a, 0x05, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, + 0x61, 0x70, 0x69, 0x2e, 0x50, 0x6f, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x05, 0x70, + 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x75, 0x62, 0x6b, 0x65, 0x79, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, 0x75, 0x62, 0x6b, 0x65, 0x79, 0x32, 0x9c, 0x03, 0x0a, + 0x04, 0x50, 0x6f, 0x65, 0x74, 0x12, 0x44, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x11, + 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x12, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x14, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0e, 0x22, 0x09, 0x2f, + 0x76, 0x31, 0x2f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x3a, 0x01, 0x2a, 0x12, 0x64, 0x0a, 0x0d, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x12, 0x19, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x1c, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x16, 0x22, 0x11, 0x2f, 0x76, 0x31, + 0x2f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x3a, 0x01, + 0x2a, 0x12, 0x48, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x12, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x13, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x15, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0f, 0x22, 0x0a, 0x2f, 0x76, + 0x31, 0x2f, 0x73, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x3a, 0x01, 0x2a, 0x12, 0x46, 0x0a, 0x07, 0x47, + 0x65, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x13, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, + 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x10, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0a, 0x12, 0x08, 0x2f, 0x76, 0x31, 0x2f, 0x69, + 0x6e, 0x66, 0x6f, 0x12, 0x56, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, + 0x14, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x47, 0x65, 0x74, 0x50, + 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1d, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x17, 0x12, 0x15, 0x2f, 0x76, 0x31, 0x2f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x73, + 0x2f, 0x7b, 0x72, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x69, 0x64, 0x7d, 0x42, 0x75, 0x0a, 0x07, 0x63, + 0x6f, 0x6d, 0x2e, 0x61, 0x70, 0x69, 0x42, 0x08, 0x41, 0x70, 0x69, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, + 0x70, 0x61, 0x63, 0x65, 0x6d, 0x65, 0x73, 0x68, 0x6f, 0x73, 0x2f, 0x70, 0x6f, 0x65, 0x74, 0x2f, + 0x72, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, + 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0xa2, 0x02, 0x03, 0x41, 0x58, 0x58, 0xaa, 0x02, + 0x03, 0x41, 0x70, 0x69, 0xca, 0x02, 0x03, 0x41, 0x70, 0x69, 0xe2, 0x02, 0x0f, 0x41, 0x70, 0x69, + 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x03, 0x41, + 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -684,7 +884,7 @@ func file_rpc_api_api_proto_rawDescGZIP() []byte { return file_rpc_api_api_proto_rawDescData } -var file_rpc_api_api_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_rpc_api_api_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_rpc_api_api_proto_goTypes = []interface{}{ (*StartRequest)(nil), // 0: api.StartRequest (*StartResponse)(nil), // 1: api.StartResponse @@ -695,22 +895,31 @@ var file_rpc_api_api_proto_goTypes = []interface{}{ (*GetInfoRequest)(nil), // 6: api.GetInfoRequest (*GetInfoResponse)(nil), // 7: api.GetInfoResponse (*MembershipProof)(nil), // 8: api.MembershipProof - (*PoetProof)(nil), // 9: api.PoetProof + (*MerkleProof)(nil), // 9: api.MerkleProof + (*PoetProof)(nil), // 10: api.PoetProof + (*GetProofRequest)(nil), // 11: api.GetProofRequest + (*GetProofResponse)(nil), // 12: api.GetProofResponse + (*durationpb.Duration)(nil), // 13: google.protobuf.Duration } var file_rpc_api_api_proto_depIdxs = []int32{ - 0, // 0: api.Poet.Start:input_type -> api.StartRequest - 2, // 1: api.Poet.UpdateGateway:input_type -> api.UpdateGatewayRequest - 4, // 2: api.Poet.Submit:input_type -> api.SubmitRequest - 6, // 3: api.Poet.GetInfo:input_type -> api.GetInfoRequest - 1, // 4: api.Poet.Start:output_type -> api.StartResponse - 3, // 5: api.Poet.UpdateGateway:output_type -> api.UpdateGatewayResponse - 5, // 6: api.Poet.Submit:output_type -> api.SubmitResponse - 7, // 7: api.Poet.GetInfo:output_type -> api.GetInfoResponse - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 13, // 0: api.SubmitResponse.round_end:type_name -> google.protobuf.Duration + 9, // 1: api.PoetProof.proof:type_name -> api.MerkleProof + 10, // 2: api.GetProofResponse.proof:type_name -> api.PoetProof + 0, // 3: api.Poet.Start:input_type -> api.StartRequest + 2, // 4: api.Poet.UpdateGateway:input_type -> api.UpdateGatewayRequest + 4, // 5: api.Poet.Submit:input_type -> api.SubmitRequest + 6, // 6: api.Poet.GetInfo:input_type -> api.GetInfoRequest + 11, // 7: api.Poet.GetProof:input_type -> api.GetProofRequest + 1, // 8: api.Poet.Start:output_type -> api.StartResponse + 3, // 9: api.Poet.UpdateGateway:output_type -> api.UpdateGatewayResponse + 5, // 10: api.Poet.Submit:output_type -> api.SubmitResponse + 7, // 11: api.Poet.GetInfo:output_type -> api.GetInfoResponse + 12, // 12: api.Poet.GetProof:output_type -> api.GetProofResponse + 8, // [8:13] is the sub-list for method output_type + 3, // [3:8] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_rpc_api_api_proto_init() } @@ -828,6 +1037,18 @@ func file_rpc_api_api_proto_init() { } } file_rpc_api_api_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MerkleProof); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_rpc_api_api_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PoetProof); i { case 0: return &v.state @@ -839,6 +1060,30 @@ func file_rpc_api_api_proto_init() { return nil } } + file_rpc_api_api_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProofRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_rpc_api_api_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetProofResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -846,7 +1091,7 @@ func file_rpc_api_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_rpc_api_api_proto_rawDesc, NumEnums: 0, - NumMessages: 10, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/release/proto/go/rpc/api/api.pb.gw.go b/release/proto/go/rpc/api/api.pb.gw.go index af11fe25..7922c782 100644 --- a/release/proto/go/rpc/api/api.pb.gw.go +++ b/release/proto/go/rpc/api/api.pb.gw.go @@ -151,6 +151,58 @@ func local_request_Poet_GetInfo_0(ctx context.Context, marshaler runtime.Marshal } +func request_Poet_GetProof_0(ctx context.Context, marshaler runtime.Marshaler, client PoetClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq GetProofRequest + var metadata runtime.ServerMetadata + + var ( + val string + ok bool + err error + _ = err + ) + + val, ok = pathParams["round_id"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "round_id") + } + + protoReq.RoundId, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "round_id", err) + } + + msg, err := client.GetProof(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Poet_GetProof_0(ctx context.Context, marshaler runtime.Marshaler, server PoetServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq GetProofRequest + var metadata runtime.ServerMetadata + + var ( + val string + ok bool + err error + _ = err + ) + + val, ok = pathParams["round_id"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "round_id") + } + + protoReq.RoundId, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "round_id", err) + } + + msg, err := server.GetProof(ctx, &protoReq) + return msg, metadata, err + +} + // RegisterPoetHandlerServer registers the http handlers for service Poet to "mux". // UnaryRPC :call PoetServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. @@ -257,6 +309,31 @@ func RegisterPoetHandlerServer(ctx context.Context, mux *runtime.ServeMux, serve }) + mux.Handle("GET", pattern_Poet_GetProof_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/api.Poet/GetProof", runtime.WithHTTPPathPattern("/v1/proofs/{round_id}")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Poet_GetProof_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Poet_GetProof_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -386,6 +463,28 @@ func RegisterPoetHandlerClient(ctx context.Context, mux *runtime.ServeMux, clien }) + mux.Handle("GET", pattern_Poet_GetProof_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/api.Poet/GetProof", runtime.WithHTTPPathPattern("/v1/proofs/{round_id}")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Poet_GetProof_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_Poet_GetProof_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -397,6 +496,8 @@ var ( pattern_Poet_Submit_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "submit"}, "")) pattern_Poet_GetInfo_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "info"}, "")) + + pattern_Poet_GetProof_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"v1", "proofs", "round_id"}, "")) ) var ( @@ -407,4 +508,6 @@ var ( forward_Poet_Submit_0 = runtime.ForwardResponseMessage forward_Poet_GetInfo_0 = runtime.ForwardResponseMessage + + forward_Poet_GetProof_0 = runtime.ForwardResponseMessage ) diff --git a/release/proto/go/rpc/api/api_grpc.pb.go b/release/proto/go/rpc/api/api_grpc.pb.go index a1463a74..027453b8 100644 --- a/release/proto/go/rpc/api/api_grpc.pb.go +++ b/release/proto/go/rpc/api/api_grpc.pb.go @@ -33,6 +33,8 @@ type PoetClient interface { // GetInfo returns general information concerning the service, // including its identity pubkey. GetInfo(ctx context.Context, in *GetInfoRequest, opts ...grpc.CallOption) (*GetInfoResponse, error) + // GetProof returns the generated proof for given round id. + GetProof(ctx context.Context, in *GetProofRequest, opts ...grpc.CallOption) (*GetProofResponse, error) } type poetClient struct { @@ -79,6 +81,15 @@ func (c *poetClient) GetInfo(ctx context.Context, in *GetInfoRequest, opts ...gr return out, nil } +func (c *poetClient) GetProof(ctx context.Context, in *GetProofRequest, opts ...grpc.CallOption) (*GetProofResponse, error) { + out := new(GetProofResponse) + err := c.cc.Invoke(ctx, "/api.Poet/GetProof", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // PoetServer is the server API for Poet service. // All implementations should embed UnimplementedPoetServer // for forward compatibility @@ -94,6 +105,8 @@ type PoetServer interface { // GetInfo returns general information concerning the service, // including its identity pubkey. GetInfo(context.Context, *GetInfoRequest) (*GetInfoResponse, error) + // GetProof returns the generated proof for given round id. + GetProof(context.Context, *GetProofRequest) (*GetProofResponse, error) } // UnimplementedPoetServer should be embedded to have forward compatible implementations. @@ -112,6 +125,9 @@ func (UnimplementedPoetServer) Submit(context.Context, *SubmitRequest) (*SubmitR func (UnimplementedPoetServer) GetInfo(context.Context, *GetInfoRequest) (*GetInfoResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetInfo not implemented") } +func (UnimplementedPoetServer) GetProof(context.Context, *GetProofRequest) (*GetProofResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProof not implemented") +} // UnsafePoetServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to PoetServer will @@ -196,6 +212,24 @@ func _Poet_GetInfo_Handler(srv interface{}, ctx context.Context, dec func(interf return interceptor(ctx, in, info, handler) } +func _Poet_GetProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetProofRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PoetServer).GetProof(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/api.Poet/GetProof", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PoetServer).GetProof(ctx, req.(*GetProofRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Poet_ServiceDesc is the grpc.ServiceDesc for Poet service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -219,6 +253,10 @@ var Poet_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetInfo", Handler: _Poet_GetInfo_Handler, }, + { + MethodName: "GetProof", + Handler: _Poet_GetProof_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "rpc/api/api.proto", diff --git a/release/proto/openapiv2/rpc/api/api.swagger.json b/release/proto/openapiv2/rpc/api/api.swagger.json index ba2e97be..998a6725 100644 --- a/release/proto/openapiv2/rpc/api/api.swagger.json +++ b/release/proto/openapiv2/rpc/api/api.swagger.json @@ -39,6 +39,37 @@ ] } }, + "/v1/proofs/{roundId}": { + "get": { + "summary": "GetProof returns the generated proof for given round id.", + "operationId": "Poet_GetProof", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/apiGetProofResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "roundId", + "in": "path", + "required": true, + "type": "string" + } + ], + "tags": [ + "Poet" + ] + } + }, "/v1/start": { "post": { "summary": "Start is used to start the service.", @@ -158,6 +189,60 @@ } } }, + "apiGetProofResponse": { + "type": "object", + "properties": { + "proof": { + "$ref": "#/definitions/apiPoetProof" + }, + "pubkey": { + "type": "string", + "format": "byte" + } + } + }, + "apiMerkleProof": { + "type": "object", + "properties": { + "root": { + "type": "string", + "format": "byte" + }, + "provenLeaves": { + "type": "array", + "items": { + "type": "string", + "format": "byte" + } + }, + "proofNodes": { + "type": "array", + "items": { + "type": "string", + "format": "byte" + } + } + } + }, + "apiPoetProof": { + "type": "object", + "properties": { + "proof": { + "$ref": "#/definitions/apiMerkleProof" + }, + "members": { + "type": "array", + "items": { + "type": "string", + "format": "byte" + } + }, + "leaves": { + "type": "string", + "format": "uint64" + } + } + }, "apiStartRequest": { "type": "object", "properties": { @@ -205,6 +290,9 @@ "hash": { "type": "string", "format": "byte" + }, + "roundEnd": { + "type": "string" } } }, diff --git a/rpc/api/api.proto b/rpc/api/api.proto index 2a25726a..337c6814 100644 --- a/rpc/api/api.proto +++ b/rpc/api/api.proto @@ -1,6 +1,7 @@ syntax = "proto3"; import "google/api/annotations.proto"; +import "google/protobuf/duration.proto"; package api; @@ -46,6 +47,15 @@ service Poet { get: "/v1/info" }; } + + /** + GetProof returns the generated proof for given round id. + */ + rpc GetProof(GetProofRequest) returns (GetProofResponse) { + option (google.api.http) = { + get: "/v1/proofs/{round_id}" + }; + } } message StartRequest { @@ -76,6 +86,7 @@ message SubmitRequest { message SubmitResponse { string roundId = 1; bytes hash = 2; + google.protobuf.Duration round_end = 3; } message GetInfoRequest { @@ -93,8 +104,23 @@ message MembershipProof { repeated bytes proof = 3; } +message MerkleProof { + bytes root = 1; + repeated bytes proven_leaves = 2; + repeated bytes proof_nodes = 3; +} + message PoetProof { - bytes phi = 1; - repeated bytes provenLeaves = 2; - repeated bytes proofNodes = 3; + MerkleProof proof = 1; + repeated bytes members = 2; + uint64 leaves = 3; +} + +message GetProofRequest { + string round_id = 1; +} + +message GetProofResponse { + PoetProof proof = 1; + bytes pubkey = 2; } diff --git a/rpc/rpcserver.go b/rpc/rpcserver.go index cbedebe1..d23f4c52 100644 --- a/rpc/rpcserver.go +++ b/rpc/rpcserver.go @@ -7,12 +7,13 @@ import ( "sync" "github.com/spacemeshos/smutil/log" + "golang.org/x/exp/slices" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "github.com/spacemeshos/poet/config" "github.com/spacemeshos/poet/gateway" - "github.com/spacemeshos/poet/gateway/broadcaster" "github.com/spacemeshos/poet/gateway/challenge_verifier" "github.com/spacemeshos/poet/logging" "github.com/spacemeshos/poet/release/proto/go/rpc/api" @@ -21,6 +22,7 @@ import ( // rpcServer is a gRPC, RPC front end to poet. type rpcServer struct { + proofsDb *service.ProofsDatabase s *service.Service gtwManager *gateway.Manager cfg config.Config @@ -32,13 +34,13 @@ type rpcServer struct { var _ api.PoetServer = (*rpcServer)(nil) // NewServer creates and returns a new instance of the rpcServer. -func NewServer(service *service.Service, gtwManager *gateway.Manager, cfg config.Config) *rpcServer { - server := &rpcServer{ - s: service, +func NewServer(svc *service.Service, proofsDb *service.ProofsDatabase, gtwManager *gateway.Manager, cfg config.Config) *rpcServer { + return &rpcServer{ + proofsDb: proofsDb, + s: svc, cfg: cfg, gtwManager: gtwManager, } - return server } func (r *rpcServer) Start(ctx context.Context, in *api.StartRequest) (*api.StartResponse, error) { @@ -54,11 +56,6 @@ func (r *rpcServer) Start(ctx context.Context, in *api.StartRequest) (*api.Start connAcks = 1 } - broadcastAcks := in.BroadcastAcksThreshold - if broadcastAcks < 1 { - broadcastAcks = 1 - } - gtwConnCtx, cancel := context.WithTimeout(ctx, r.cfg.GtwConnTimeout) defer cancel() gtwManager, err := gateway.NewManager(gtwConnCtx, in.GatewayAddresses, connAcks) @@ -70,27 +67,18 @@ func (r *rpcServer) Start(ctx context.Context, in *api.StartRequest) (*api.Start logging.FromContext(ctx).With().Warning("failed to close GRPC connections", log.Err(err)) } }() - b, err := broadcaster.New( - gtwManager.Connections(), - in.DisableBroadcast, - broadcaster.DefaultBroadcastTimeout, - uint(broadcastAcks), - ) - if err != nil { - return nil, err - } verifier, err := service.CreateChallengeVerifier(gtwManager.Connections()) if err != nil { return nil, fmt.Errorf("failed to create challenge verifier: %w", err) } + if err = r.s.Start(ctx, verifier); err != nil { + return nil, err + } // Swap the new and old gateway managers. // The old one will be closed in defer. r.gtwManager, gtwManager = gtwManager, r.gtwManager - if err := r.s.Start(b, verifier); err != nil { - return nil, fmt.Errorf("failed to start service: %w", err) - } return &api.StartResponse{}, nil } @@ -108,11 +96,6 @@ func (r *rpcServer) UpdateGateway(ctx context.Context, in *api.UpdateGatewayRequ connAcks = 1 } - broadcastAcks := in.BroadcastAcksThreshold - if broadcastAcks < 1 { - broadcastAcks = 1 - } - gtwConnCtx, cancel := context.WithTimeout(ctx, r.cfg.GtwConnTimeout) defer cancel() gtwManager, err := gateway.NewManager(gtwConnCtx, in.GatewayAddresses, connAcks) @@ -124,15 +107,6 @@ func (r *rpcServer) UpdateGateway(ctx context.Context, in *api.UpdateGatewayRequ logging.FromContext(ctx).With().Warning("failed to close GRPC connections", log.Err(err)) } }() - b, err := broadcaster.New( - gtwManager.Connections(), - in.DisableBroadcast, - broadcaster.DefaultBroadcastTimeout, - uint(broadcastAcks), - ) - if err != nil { - return nil, err - } verifier, err := service.CreateChallengeVerifier(gtwManager.Connections()) if err != nil { @@ -142,14 +116,14 @@ func (r *rpcServer) UpdateGateway(ctx context.Context, in *api.UpdateGatewayRequ // Swap the new and old gateway managers. // The old one will be closed in defer. r.gtwManager, gtwManager = gtwManager, r.gtwManager - r.s.SetBroadcaster(b) r.s.SetChallengeVerifier(verifier) return &api.UpdateGatewayResponse{}, nil } +// Submit implements api.Submit. func (r *rpcServer) Submit(ctx context.Context, in *api.SubmitRequest) (*api.SubmitResponse, error) { - round, hash, err := r.s.Submit(ctx, in.Challenge, in.Signature) + result, err := r.s.Submit(ctx, in.Challenge, in.Signature) switch { case errors.Is(err, service.ErrNotStarted): return nil, status.Error(codes.FailedPrecondition, "cannot submit a challenge because poet service is not started") @@ -163,11 +137,13 @@ func (r *rpcServer) Submit(ctx context.Context, in *api.SubmitRequest) (*api.Sub } out := new(api.SubmitResponse) - out.RoundId = round - out.Hash = hash + out.RoundId = result.Round + out.Hash = result.Hash + out.RoundEnd = durationpb.New(result.RoundEnd) return out, nil } +// GetInfo implements api.GetInfo. func (r *rpcServer) GetInfo(ctx context.Context, in *api.GetInfoRequest) (*api.GetInfoResponse, error) { info, err := r.s.Info(ctx) if err != nil { @@ -184,3 +160,35 @@ func (r *rpcServer) GetInfo(ctx context.Context, in *api.GetInfoRequest) (*api.G return out, nil } + +// GetProof implements api.PoetServer. +func (r *rpcServer) GetProof(ctx context.Context, in *api.GetProofRequest) (*api.GetProofResponse, error) { + if info, err := r.s.Info(ctx); err == nil { + if info.OpenRoundID == in.RoundId || slices.Contains(info.ExecutingRoundsIds, in.RoundId) { + return nil, status.Error(codes.Unavailable, "round is not finished yet") + } + } + + proof, err := r.proofsDb.Get(ctx, in.RoundId) + switch { + case errors.Is(err, service.ErrNotFound): + return nil, status.Error(codes.NotFound, "proof not found") + case err == nil: + out := api.GetProofResponse{ + Proof: &api.PoetProof{ + Proof: &api.MerkleProof{ + Root: proof.Root, + ProvenLeaves: proof.ProvenLeaves, + ProofNodes: proof.ProofNodes, + }, + Members: proof.Members, + Leaves: proof.NumLeaves, + }, + Pubkey: proof.ServicePubKey, + } + + return &out, nil + default: + return nil, status.Error(codes.Internal, err.Error()) + } +} diff --git a/server/server.go b/server/server.go index d153dfd2..c523350e 100644 --- a/server/server.go +++ b/server/server.go @@ -2,10 +2,12 @@ package server import ( "context" + "errors" "fmt" "net" "net/http" "os" + "path/filepath" "time" "github.com/google/uuid" @@ -20,15 +22,60 @@ import ( "github.com/spacemeshos/poet/config" "github.com/spacemeshos/poet/gateway" - "github.com/spacemeshos/poet/gateway/broadcaster" "github.com/spacemeshos/poet/logging" "github.com/spacemeshos/poet/release/proto/go/rpc/api" "github.com/spacemeshos/poet/rpc" "github.com/spacemeshos/poet/service" ) -// startServer starts the RPC server. -func StartServer(ctx context.Context, cfg *config.Config) error { +type Server struct { + svc *service.Service + cfg config.Config + rpcListener net.Listener + restListener net.Listener +} + +func New(cfg config.Config) (*Server, error) { + rpcListener, err := net.Listen(cfg.RPCListener.Network(), cfg.RPCListener.String()) + if err != nil { + return nil, fmt.Errorf("failed to listen: %v", err) + } + + restListener, err := net.Listen(cfg.RESTListener.Network(), cfg.RESTListener.String()) + if err != nil { + return nil, fmt.Errorf("failed to listen: %v", err) + } + + if _, err := os.Stat(cfg.DataDir); os.IsNotExist(err) { + if err := os.Mkdir(cfg.DataDir, 0o700); err != nil { + return nil, err + } + } + + svc, err := service.NewService(cfg.Service, cfg.DataDir) + if err != nil { + return nil, fmt.Errorf("failed to create Service: %v", err) + } + + return &Server{ + svc: svc, + cfg: cfg, + rpcListener: rpcListener, + restListener: restListener, + }, nil +} + +func (s *Server) Close() { + s.rpcListener.Close() + s.restListener.Close() +} + +func (s *Server) RpcAddr() net.Addr { + return s.rpcListener.Addr() +} + +// Start starts the RPC server. +func (s *Server) Start(ctx context.Context) error { ctx, stop := context.WithCancel(ctx) defer stop() serverGroup, ctx := errgroup.WithContext(ctx) @@ -53,33 +100,23 @@ func StartServer(ctx context.Context, cfg *config.Config) error { }), } - if _, err := os.Stat(cfg.DataDir); os.IsNotExist(err) { - if err := os.Mkdir(cfg.DataDir, 0o700); err != nil { - return err - } - } - - svc, err := service.NewService(cfg.Service, cfg.DataDir) - defer svc.Shutdown() + proofsDbPath := filepath.Join(s.cfg.DataDir, "proofs") + proofsDb, err := service.NewProofsDatabase(proofsDbPath, s.svc.ProofsChan()) if err != nil { - return err + return fmt.Errorf("failed to create proofs DB: %w", err) } - gtwConnCtx, cancel := context.WithTimeout(ctx, cfg.GtwConnTimeout) + serverGroup.Go(func() error { + return proofsDb.Run(ctx) + }) + + serverGroup.Go(func() error { + return s.svc.Run(ctx) + }) + + gtwConnCtx, cancel := context.WithTimeout(ctx, s.cfg.GtwConnTimeout) defer cancel() - gtwManager, err := gateway.NewManager(gtwConnCtx, cfg.Service.GatewayAddresses, cfg.Service.ConnAcksThreshold) + gtwManager, err := gateway.NewManager(gtwConnCtx, s.cfg.Service.GatewayAddresses, s.cfg.Service.ConnAcksThreshold) if err == nil { - broadcaster, err := broadcaster.New( - gtwManager.Connections(), - cfg.Service.DisableBroadcast, - broadcaster.DefaultBroadcastTimeout, - cfg.Service.BroadcastAcksThreshold, - ) - if err != nil { - if err := gtwManager.Close(); err != nil { - logger.With().Warning("failed to close GRPC connections", log.Err(err)) - } - return err - } verifier, err := service.CreateChallengeVerifier(gtwManager.Connections()) if err != nil { if err := gtwManager.Close(); err != nil { @@ -87,8 +124,7 @@ func StartServer(ctx context.Context, cfg *config.Config) error { } return fmt.Errorf("failed to create challenge verifier: %w", err) } - - if err := svc.Start(broadcaster, verifier); err != nil { + if err := s.svc.Start(ctx, verifier); err != nil { return err } } else { @@ -96,37 +132,35 @@ func StartServer(ctx context.Context, cfg *config.Config) error { gtwManager = &gateway.Manager{} } - rpcServer := rpc.NewServer(svc, gtwManager, *cfg) + rpcServer := rpc.NewServer(s.svc, proofsDb, gtwManager, s.cfg) grpcServer = grpc.NewServer(options...) api.RegisterPoetServer(grpcServer, rpcServer) proxyRegstr = append(proxyRegstr, api.RegisterPoetHandlerFromEndpoint) // Start the gRPC server listening for HTTP/2 connections. - lis, err := net.Listen(cfg.RPCListener.Network(), cfg.RPCListener.String()) - if err != nil { - return fmt.Errorf("failed to listen: %v", err) - } - defer lis.Close() - serverGroup.Go(func() error { - logger.Info("RPC server listening on %s", lis.Addr()) - return grpcServer.Serve(lis) + logger.Info("RPC server listening on %s", s.rpcListener.Addr()) + return grpcServer.Serve(s.rpcListener) }) // Start the REST proxy for the gRPC server above. mux := proxy.NewServeMux() for _, r := range proxyRegstr { - err := r(ctx, mux, cfg.RPCListener.String(), []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) + err := r(ctx, mux, s.rpcListener.Addr().String(), []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) if err != nil { return err } } - server := &http.Server{Addr: cfg.RESTListener.String(), Handler: mux} + server := &http.Server{Handler: mux} serverGroup.Go(func() error { - logger.Info("REST proxy start listening on %s", cfg.RESTListener.String()) - return server.ListenAndServe() + logger.Info("REST proxy starts listening on %s", s.restListener.Addr()) + err := server.Serve(s.restListener) + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return err }) // Wait for the server to shut down gracefully diff --git a/server/server_test.go b/server/server_test.go new file mode 100644 index 00000000..c24e7f2a --- /dev/null +++ b/server/server_test.go @@ -0,0 +1,180 @@ +package server_test + +// End to end tests running a Poet server and interacting with it via +// its GRPC API. + +import ( + "context" + "fmt" + "testing" + "time" + + pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + "github.com/spacemeshos/merkle-tree" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/spacemeshos/poet/config" + "github.com/spacemeshos/poet/gateway" + "github.com/spacemeshos/poet/hash" + "github.com/spacemeshos/poet/release/proto/go/rpc/api" + "github.com/spacemeshos/poet/server" + "github.com/spacemeshos/poet/shared" + "github.com/spacemeshos/poet/verifier" +) + +const randomHost = "localhost:0" + +type gatewayService struct { + pb.UnimplementedGatewayServiceServer +} + +func (*gatewayService) VerifyChallenge(ctx context.Context, req *pb.VerifyChallengeRequest) (*pb.VerifyChallengeResponse, error) { + return &pb.VerifyChallengeResponse{ + Hash: []byte("hash"), + NodeId: []byte("nodeID"), + }, nil +} + +func spawnMockGateway(t *testing.T) (target string) { + t.Helper() + server := gateway.NewMockGrpcServer(t) + pb.RegisterGatewayServiceServer(server.Server, &gatewayService{}) + + var eg errgroup.Group + t.Cleanup(func() { require.NoError(t, eg.Wait()) }) + + eg.Go(server.Serve) + t.Cleanup(server.Stop) + + return server.Target() +} + +func spawnPoet(ctx context.Context, t *testing.T, cfg config.Config) (*server.Server, api.PoetClient) { + t.Helper() + req := require.New(t) + + _, err := config.SetupConfig(&cfg) + req.NoError(err) + + srv, err := server.New(cfg) + req.NoError(err) + + conn, err := grpc.DialContext( + context.Background(), + srv.RpcAddr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials())) + req.NoError(err) + t.Cleanup(func() { conn.Close() }) + + return srv, api.NewPoetClient(conn) +} + +// Test poet service startup. +func TestPoetStart(t *testing.T) { + t.Parallel() + req := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + + gtw := spawnMockGateway(t) + + cfg := config.DefaultConfig() + cfg.PoetDir = t.TempDir() + cfg.RawRPCListener = randomHost + cfg.RawRESTListener = randomHost + cfg.Service.GatewayAddresses = []string{gtw} + + srv, client := spawnPoet(ctx, t, *cfg) + + var eg errgroup.Group + eg.Go(func() error { + return srv.Start(ctx) + }) + + resp, err := client.GetInfo(context.Background(), &api.GetInfoRequest{}) + req.NoError(err) + req.Equal("0", resp.OpenRoundId) + + cancel() + req.NoError(eg.Wait()) +} + +// Test submitting a challenge followed by proof generation and getting the proof via GRPC. +func TestSubmitAndGetProof(t *testing.T) { + t.Parallel() + req := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + + gtw := spawnMockGateway(t) + + cfg := config.DefaultConfig() + cfg.PoetDir = t.TempDir() + cfg.Service.Genesis = time.Now().Add(time.Second).Format(time.RFC3339) + cfg.Service.EpochDuration = time.Second + cfg.Service.PhaseShift = 0 + cfg.Service.CycleGap = 0 + cfg.RawRPCListener = randomHost + cfg.RawRESTListener = randomHost + cfg.Service.GatewayAddresses = []string{gtw} + + srv, client := spawnPoet(ctx, t, *cfg) + + var eg errgroup.Group + eg.Go(func() error { + return srv.Start(ctx) + }) + + // Submit a challenge + resp, err := client.Submit(context.Background(), &api.SubmitRequest{}) + req.NoError(err) + req.Equal([]byte("hash"), resp.Hash) + + roundEnd := resp.RoundEnd.AsDuration() + req.NotZero(roundEnd) + + // Wait for round to end + <-time.After(roundEnd) + + // Query for the proof + var proof *api.GetProofResponse + req.Eventually(func() bool { + proof, err = client.GetProof(context.Background(), &api.GetProofRequest{RoundId: resp.RoundId}) + return err == nil + }, time.Second, time.Millisecond*100) + + req.NotZero(proof.Proof.Leaves) + req.Len(proof.Proof.Members, 1) + req.Contains(proof.Proof.Members, []byte("hash")) + cancel() + + merkleProof := shared.MerkleProof{ + Root: proof.Proof.Proof.Root, + ProvenLeaves: proof.Proof.Proof.ProvenLeaves, + ProofNodes: proof.Proof.Proof.ProofNodes, + } + + root, err := calcRoot(proof.Proof.Members) + req.NoError(err) + + labelHashFunc := hash.GenLabelHashFunc(root) + merkleHashFunc := hash.GenMerkleHashFunc(root) + req.NoError(verifier.Validate(merkleProof, labelHashFunc, merkleHashFunc, proof.Proof.Leaves, shared.T)) + + req.NoError(eg.Wait()) +} + +func calcRoot(leaves [][]byte) ([]byte, error) { + tree, err := merkle.NewTree() + if err != nil { + return nil, fmt.Errorf("failed to generate tree: %w", err) + } + for _, member := range leaves { + err := tree.AddLeaf(member) + if err != nil { + return nil, fmt.Errorf("failed to add leaf: %w", err) + } + } + return tree.Root(), nil +} diff --git a/service/db.go b/service/db.go index 034a7123..0d026521 100644 --- a/service/db.go +++ b/service/db.go @@ -1,48 +1,76 @@ package service import ( + "bytes" + "context" "fmt" + "github.com/spacemeshos/go-scale" + "github.com/spacemeshos/smutil/log" "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/spacemeshos/poet/shared" ) -type LevelDB struct { - *leveldb.DB - wo *opt.WriteOptions - ro *opt.ReadOptions +var ErrNotFound = leveldb.ErrNotFound + +type ProofsDatabase struct { + db *leveldb.DB + proofs <-chan shared.ProofMessage } -func NewLevelDbStore(path string, wo *opt.WriteOptions, ro *opt.ReadOptions) *LevelDB { - blocks, err := leveldb.OpenFile(path, nil) +func (db *ProofsDatabase) Get(ctx context.Context, roundID string) (*shared.ProofMessage, error) { + data, err := db.db.Get([]byte(roundID), nil) if err != nil { - panic(fmt.Errorf("failed to open db file (%v): %v", path, err)) + return nil, fmt.Errorf("get proof for %s from DB: %w", roundID, err) } - return &LevelDB{blocks, wo, ro} -} - -func (db *LevelDB) Close() error { - return db.DB.Close() + proof := &shared.ProofMessage{} + if _, err := proof.DecodeScale(scale.NewDecoder(bytes.NewReader(data))); err != nil { + return nil, fmt.Errorf("failed to get deserialize proof: %w", err) + } + return proof, nil } -func (db *LevelDB) Put(key, value []byte) error { - return db.DB.Put(key, value, db.wo) -} +func NewProofsDatabase(dbPath string, proofs <-chan shared.ProofMessage) (*ProofsDatabase, error) { + db, err := leveldb.OpenFile(dbPath, nil) + if err != nil { + return nil, fmt.Errorf("failed to open database @ %s: %w", dbPath, err) + } -func (db *LevelDB) Has(key []byte) (bool, error) { - return db.DB.Has(key, db.ro) + return &ProofsDatabase{db, proofs}, nil } -func (db *LevelDB) Get(key []byte) (value []byte, err error) { - return db.DB.Get(key, db.ro) +func (db *ProofsDatabase) Run(ctx context.Context) error { + logger := log.AppLog.WithName("proofs-db") + for { + select { + case proof := <-db.proofs: + serialized, err := serializeProofMsg(proof) + if err != nil { + return fmt.Errorf("failed serializing proof: %w", err) + } + if err := db.db.Put([]byte(proof.RoundID), serialized, &opt.WriteOptions{Sync: true}); err != nil { + logger.With().Error("failed storing proof in DB", log.Err(err)) + } else { + logger.With().Info("Proof saved in DB", + log.String("round", proof.RoundID), + log.Int("members", len(proof.Members)), + log.Uint64("leaves", proof.NumLeaves)) + } + case <-ctx.Done(): + logger.Info("shutting down proofs db") + return db.db.Close() + } + } } -func (db *LevelDB) Delete(key []byte) error { - return db.DB.Delete(key, db.wo) -} +func serializeProofMsg(proof shared.ProofMessage) ([]byte, error) { + var dataBuf bytes.Buffer + if _, err := proof.EncodeScale(scale.NewEncoder(&dataBuf)); err != nil { + return nil, fmt.Errorf("failed to marshal proof message for round %v: %v", proof.RoundID, err) + } -func (db *LevelDB) Iterator() iterator.Iterator { - return db.DB.NewIterator(nil, db.ro) + return dataBuf.Bytes(), nil } diff --git a/service/round.go b/service/round.go index d921a0ed..3313a740 100644 --- a/service/round.go +++ b/service/round.go @@ -12,6 +12,7 @@ import ( "github.com/spacemeshos/merkle-tree" "github.com/spacemeshos/merkle-tree/cache" "github.com/spacemeshos/smutil/log" + "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/spacemeshos/poet/hash" @@ -50,7 +51,7 @@ type round struct { datadir string ID string - challengesDb *LevelDB + challengesDb *leveldb.DB execution *executionState opened time.Time @@ -69,7 +70,7 @@ func (r *round) Epoch() uint32 { return r.execution.Epoch } -func newRound(ctx context.Context, datadir string, epoch uint32) *round { +func newRound(ctx context.Context, datadir string, epoch uint32) (*round, error) { r := new(round) r.ID = strconv.FormatUint(uint64(epoch), 10) r.datadir = filepath.Join(datadir, r.ID) @@ -79,8 +80,11 @@ func newRound(ctx context.Context, datadir string, epoch uint32) *round { r.broadcastedChan = make(chan struct{}) r.teardownChan = make(chan struct{}) - wo := &opt.WriteOptions{Sync: true} - r.challengesDb = NewLevelDbStore(filepath.Join(r.datadir, "challengesDb"), wo, nil) // This creates the datadir if it doesn't exist already. + db, err := leveldb.OpenFile(filepath.Join(r.datadir, "challengesDb"), nil) + if err != nil { + return nil, err + } + r.challengesDb = db r.execution = new(executionState) r.execution.Epoch = epoch @@ -103,7 +107,7 @@ func newRound(ctx context.Context, datadir string, epoch uint32) *round { log.Info("Round %v torn down (cleanup %v)", r.ID, cleanup) }() - return r + return r, nil } func (r *round) open() error { @@ -139,16 +143,16 @@ func (r *round) submit(key, challenge []byte) error { return errors.New("round is not open") } - if has, err := r.challengesDb.Has(key); err != nil { + if has, err := r.challengesDb.Has(key, nil); err != nil { return err } else if has { return fmt.Errorf("%w: key: %X", ErrChallengeAlreadySubmitted, key) } - return r.challengesDb.Put(key, challenge) + return r.challengesDb.Put(key, challenge, &opt.WriteOptions{Sync: true}) } func (r *round) numChallenges() int { - iter := r.challengesDb.Iterator() + iter := r.challengesDb.NewIterator(nil, nil) defer iter.Release() var num int @@ -160,7 +164,7 @@ func (r *round) numChallenges() int { } func (r *round) isEmpty() bool { - iter := r.challengesDb.Iterator() + iter := r.challengesDb.NewIterator(nil, nil) defer iter.Release() return !iter.Next() } @@ -296,7 +300,9 @@ func (r *round) proof(wait bool) (*PoetProof, error) { }, nil } -func (r *round) broadcasted() { +// Close closes the round. +// FIXME(brozansk) Close it synchronously (https://github.com/spacemeshos/poet/issues/181) +func (r *round) Close() { close(r.broadcastedChan) } @@ -332,7 +338,7 @@ func (r *round) calcMembersAndStatement() ([][]byte, []byte, error) { } members := make([][]byte, 0) - iter := r.challengesDb.Iterator() + iter := r.challengesDb.NewIterator(nil, nil) defer iter.Release() for iter.Next() { challenge := iter.Value() diff --git a/service/round_test.go b/service/round_test.go index f83d0172..27c71fd6 100644 --- a/service/round_test.go +++ b/service/round_test.go @@ -44,7 +44,8 @@ func TestRound_Recovery(t *testing.T) { req.NoError(err) // Execute r1 as a reference round. - r1 := newRound(ctx, tmpdir, 0) + r1, err := newRound(ctx, tmpdir, 0) + req.NoError(err) req.NoError(r1.open()) req.Equal(0, r1.numChallenges()) req.True(r1.isEmpty()) @@ -58,7 +59,8 @@ func TestRound_Recovery(t *testing.T) { req.NoError(r1.execute(ctx, time.Now().Add(duration), prover.LowestMerkleMinMemoryLayer)) // Execute r2, and request shutdown before completion. - r2 := newRound(ctx, tmpdir, 1) + r2, err := newRound(ctx, tmpdir, 1) + req.NoError(err) req.NoError(r2.open()) req.Equal(0, r2.numChallenges()) req.True(r2.isEmpty()) @@ -75,7 +77,8 @@ func TestRound_Recovery(t *testing.T) { // Recover r2 execution, and request shutdown before completion. ctx, stop = context.WithCancel(context.Background()) - r2recovery1 := newRound(ctx, tmpdir, 1) + r2recovery1, err := newRound(ctx, tmpdir, 1) + req.NoError(err) req.Equal(len(challenges), r2recovery1.numChallenges()) req.False(r2recovery1.isEmpty()) @@ -88,7 +91,8 @@ func TestRound_Recovery(t *testing.T) { // Recover r2 execution again, and let it complete. ctx, stop = context.WithCancel(context.Background()) - r2recovery2 := newRound(ctx, tmpdir, 1) + r2recovery2, err := newRound(ctx, tmpdir, 1) + req.NoError(err) req.Equal(len(challenges), r2recovery2.numChallenges()) req.False(r2recovery2.isEmpty()) state, err = r2recovery2.state() @@ -108,11 +112,12 @@ func TestRound_State(t *testing.T) { tempdir := t.TempDir() // Create a new round. - r := newRound(ctx, tempdir, 0) + r, err := newRound(ctx, tempdir, 0) + req.NoError(err) req.True(!r.isOpen()) req.True(r.opened.IsZero()) req.True(r.executionStarted.IsZero()) - _, err := r.proof(false) + _, err = r.proof(false) req.EqualError(err, "round wasn't open") req.Nil(r.stateCache) @@ -182,7 +187,8 @@ func TestRound_State(t *testing.T) { // Create a new round instance of the same round. ctx, stop = context.WithCancel(context.Background()) - r = newRound(ctx, tempdir, 0) + r, err = newRound(ctx, tempdir, 0) + req.NoError(err) req.False(r.isOpen()) req.True(r.opened.IsZero()) req.True(r.executionStarted.IsZero()) @@ -214,7 +220,7 @@ func TestRound_State(t *testing.T) { req.Equal(r.execution, state.Execution) // Trigger cleanup. - r.broadcasted() + r.Close() req.NoError(r.waitTeardown(context.Background())) // Verify cleanup. diff --git a/service/service.go b/service/service.go index c8239d4b..40dceeac 100644 --- a/service/service.go +++ b/service/service.go @@ -1,7 +1,6 @@ package service import ( - "bytes" "context" "crypto/ed25519" "encoding/hex" @@ -10,11 +9,9 @@ import ( "os" "path/filepath" "strconv" - "sync" "sync/atomic" "time" - "github.com/spacemeshos/go-scale" mshared "github.com/spacemeshos/merkle-tree/shared" "github.com/spacemeshos/smutil/log" "golang.org/x/sync/errgroup" @@ -27,19 +24,15 @@ import ( ) type Config struct { - Genesis string `long:"genesis-time" description:"Genesis timestamp"` - EpochDuration time.Duration `long:"epoch-duration" description:"Epoch duration"` - PhaseShift time.Duration `long:"phase-shift"` - CycleGap time.Duration `long:"cycle-gap"` - MemoryLayers uint `long:"memory" description:"Number of top Merkle tree layers to cache in-memory"` - NoRecovery bool `long:"norecovery" description:"whether to disable a potential recovery procedure"` - Reset bool `long:"reset" description:"whether to reset the service state by deleting the datadir"` - GatewayAddresses []string `long:"gateway" description:"list of Spacemesh gateway nodes RPC listeners (host:port) for broadcasting of proofs"` - DisableBroadcast bool `long:"disablebroadcast" description:"whether to disable broadcasting of proofs"` - ConnAcksThreshold uint `long:"conn-acks" description:"number of required successful connections to Spacemesh gateway nodes"` - BroadcastAcksThreshold uint `long:"broadcast-acks" description:"number of required successful broadcasts via Spacemesh gateway nodes"` - BroadcastNumRetries uint `long:"broadcast-num-retries" description:"number of broadcast retries"` - BroadcastRetriesInterval time.Duration `long:"broadcast-retries-interval" description:"duration interval between broadcast retries"` + Genesis string `long:"genesis-time" description:"Genesis timestamp"` + EpochDuration time.Duration `long:"epoch-duration" description:"Epoch duration"` + PhaseShift time.Duration `long:"phase-shift"` + CycleGap time.Duration `long:"cycle-gap"` + MemoryLayers uint `long:"memory" description:"Number of top Merkle tree layers to cache in-memory"` + NoRecovery bool `long:"norecovery" description:"whether to disable a potential recovery procedure"` + Reset bool `long:"reset" description:"whether to reset the service state by deleting the datadir"` + GatewayAddresses []string `long:"gateway" description:"list of Spacemesh gateway nodes RPC listeners (host:port) for broadcasting of proofs"` + ConnAcksThreshold uint `long:"conn-acks" description:"number of required successful connections to Spacemesh gateway nodes"` } // estimatedLeavesPerSecond is used to computed estimated height of the proving tree @@ -48,43 +41,31 @@ const estimatedLeavesPerSecond = 1 << 17 const ChallengeVerifierCacheSize = 1024 -// ServiceClient is an interface for interacting with the Service actor. -// It is created when the Service is started. -type ServiceClient struct { - serviceStarted *atomic.Bool - command chan<- Command - challengeVerifier atomic.Value // holds challenge_verifier.Verifier -} - -// Service orchestrates rounds functionality; each responsible for accepting challenges, -// generating a proof from their hash digest, and broadcasting the result to the Spacemesh network. +// Service orchestrates rounds functionality +// It is responsible for accepting challenges, generating a proof from their hash digest and persisting it. // -// Service is single-use, meaning it can be started with `Start()` and then stopped with `Shutdown()` -// but it cannot be restarted. A new instance of `Service` must be created. +// `Service` is single-use, meaning it can be started with `Service::Run`. +// It is stopped by canceling the context provided to `Service::Run`. +// It mustn't be restarted. A new instance of `Service` must be created. type Service struct { - commands <-chan Command - ServiceClient - - runningGroup errgroup.Group - stop context.CancelFunc + started atomic.Bool + proofs chan shared.ProofMessage + commands chan Command + timer <-chan time.Time cfg *Config datadir string genesis time.Time minMemoryLayer uint - started atomic.Bool // openRound is the round which is currently open for accepting challenges registration from miners. // At any given time there is one single open round. - openRound *round - executingRounds map[string]struct{} + openRound *round + executingRounds map[string]struct{} + challengeVerifier atomic.Value // holds challenge_verifier.Verifier PubKey ed25519.PublicKey privKey ed25519.PrivateKey - - broadcaster Broadcaster - - sync.Mutex } // Command is a function that will be run in the main Service loop. @@ -107,33 +88,11 @@ var ( ErrNotStarted = errors.New("service not started") ErrAlreadyStarted = errors.New("already started") ErrChallengeAlreadySubmitted = errors.New("challenge is already submitted") + ErrRoundNotFinished = errors.New("round is not finished yet") ) -type Broadcaster interface { - BroadcastProof(msg []byte, roundID string, members [][]byte) error -} - -type GossipPoetProof struct { - // The actual proof. - shared.MerkleProof - - // Members is the ordered list of miners challenges which are included - // in the proof (by using the list hash digest as the proof generation input (the statement)). - Members [][]byte - - // NumLeaves is the width of the proof-generation tree. - NumLeaves uint64 -} - -//go:generate scalegen -types PoetProofMessage,GossipPoetProof - -type PoetProofMessage struct { - GossipPoetProof - ServicePubKey []byte - RoundID string - Signature []byte -} - +// NewService creates a new instance of Poet Service. +// It should be started with `Service::Run`. func NewService(cfg *Config, datadir string) (*Service, error) { genesis, err := time.Parse(time.RFC3339, cfg.Genesis) if err != nil { @@ -169,15 +128,20 @@ func NewService(cfg *Config, datadir string) (*Service, error) { return nil, fmt.Errorf("failed to save state: %w", err) } } - cmds := make(chan Command, 1) privateKey := ed25519.NewKeyFromSeed(state.PrivKey[:32]) + + roundsDir := filepath.Join(datadir, "rounds") + if _, err := os.Stat(roundsDir); os.IsNotExist(err) { + if err := os.Mkdir(roundsDir, 0o700); err != nil { + return nil, err + } + } + s := &Service{ - commands: cmds, - ServiceClient: ServiceClient{ - command: cmds, - }, + proofs: make(chan shared.ProofMessage, 1), + commands: cmds, cfg: cfg, minMemoryLayer: uint(minMemoryLayer), genesis: genesis, @@ -186,7 +150,6 @@ func NewService(cfg *Config, datadir string) (*Service, error) { privKey: privateKey, PubKey: privateKey.Public().(ed25519.PublicKey), } - s.ServiceClient.serviceStarted = &s.started log.Info("Service public key: %x", s.PubKey) @@ -198,13 +161,30 @@ type roundResult struct { err error } -func (s *Service) loop(ctx context.Context, roundsToResume []*round) { - var eg errgroup.Group - defer eg.Wait() +func (s *Service) ProofsChan() <-chan shared.ProofMessage { + return s.proofs +} - logger := log.AppLog.WithName("worker") +func (s *Service) loop(ctx context.Context, roundsToResume []*round) error { + logger := logging.FromContext(ctx).WithName("worker") ctx = logging.NewContext(ctx, logger) + // Make sure there is an open round + if s.openRound == nil { + epoch := uint32(0) + if d := time.Since(s.genesis); d > 0 { + epoch = uint32(d / s.cfg.EpochDuration) + } + newRound, err := s.newRound(ctx, uint32(epoch)) + if err != nil { + return fmt.Errorf("failed to open round the first round: %w", err) + } + s.openRound = newRound + } + + var eg errgroup.Group + defer eg.Wait() + roundResults := make(chan roundResult, 1) // Resume recovered rounds @@ -219,8 +199,6 @@ func (s *Service) loop(ctx context.Context, roundsToResume []*round) { }) } - timer := s.scheduleRound(ctx, s.openRound) - for { select { case cmd := <-s.commands: @@ -228,16 +206,20 @@ func (s *Service) loop(ctx context.Context, roundsToResume []*round) { case result := <-roundResults: if result.err == nil { - broadcaster := s.broadcaster - go broadcastProof(s, result.round, result.round.execution, broadcaster) + s.reportNewProof(result.round.ID, result.round.execution) } else { - logger.With().Warning("round execution failed", log.Err(result.err), log.String("round", result.round.ID)) + logger.With().Error("round execution failed", log.Err(result.err), log.String("round", result.round.ID)) } delete(s.executingRounds, result.round.ID) + result.round.Close() - case <-timer: + case <-s.timer: round := s.openRound - s.openRound = s.newRound(ctx, round.Epoch()+1) + newRound, err := s.newRound(ctx, round.Epoch()+1) + if err != nil { + return fmt.Errorf("failed to open new round: %w", err) + } + s.openRound = newRound s.executingRounds[round.ID] = struct{}{} end := s.roundEndTime(round) @@ -249,11 +231,11 @@ func (s *Service) loop(ctx context.Context, roundsToResume []*round) { }) // schedule the next round - timer = s.scheduleRound(ctx, s.openRound) + s.timer = s.scheduleRound(ctx, s.openRound) case <-ctx.Done(): logger.Info("service shutting down") - return + return nil } } } @@ -275,20 +257,9 @@ func (s *Service) scheduleRound(ctx context.Context, round *round) <-chan time.T return timer } -func (s *Service) Start(b Broadcaster, verifier challenge_verifier.Verifier) error { - s.Lock() - defer s.Unlock() - if s.Started() { - return ErrAlreadyStarted - } - - // Create a context for the running Service. - // This context will be canceled when Service is stopped. - ctx, stop := context.WithCancel(context.Background()) - s.stop = stop - - s.broadcaster = b - +// Run starts the Service's actor event loop. +// It stops when the `ctx` is canceled. +func (s *Service) Run(ctx context.Context) error { var toResume []*round if s.cfg.NoRecovery { log.Info("Recovery is disabled") @@ -300,47 +271,39 @@ func (s *Service) Start(b Broadcaster, verifier challenge_verifier.Verifier) err } } - now := time.Now() - epoch := time.Duration(0) - if d := now.Sub(s.genesis); d > 0 { - epoch = d / s.cfg.EpochDuration - } - if s.openRound == nil { - s.openRound = s.newRound(ctx, uint32(epoch)) - } - - s.ServiceClient.SetChallengeVerifier(verifier) - - s.runningGroup.Go(func() error { - s.loop(ctx, toResume) - return nil - }) - s.started.Store(true) - return nil + return s.loop(ctx, toResume) } -// Shutdown gracefully stops running Service and waits -// for all processing to stop. -func (s *Service) Shutdown() error { - if !s.Started() { - return ErrNotStarted +// Start starts proofs generation. +func (s *Service) Start(ctx context.Context, verifier challenge_verifier.Verifier) error { + resp := make(chan error) + s.commands <- func(s *Service) { + defer close(resp) + if s.Started() { + resp <- ErrAlreadyStarted + } + s.SetChallengeVerifier(verifier) + s.timer = s.scheduleRound(ctx, s.openRound) + s.started.Store(true) + } + select { + case err := <-resp: + return err + case <-ctx.Done(): + return ctx.Err() } - log.Info("service shutting down") - s.stop() - err := s.runningGroup.Wait() - s.started.Store(false) - log.Info("service shutdown complete") - return err } +// Started returns whether the `Service` is generating proofs. func (s *Service) Started() bool { return s.started.Load() } func (s *Service) recover(ctx context.Context) (open *round, executing []*round, err error) { + roundsDir := filepath.Join(s.datadir, "rounds") logger := log.AppLog.WithName("recovery") logger.With().Info("Recovering service state", log.String("datadir", s.datadir)) - entries, err := os.ReadDir(s.datadir) + entries, err := os.ReadDir(roundsDir) if err != nil { return nil, nil, err } @@ -355,22 +318,25 @@ func (s *Service) recover(ctx context.Context) (open *round, executing []*round, if err != nil { return nil, nil, fmt.Errorf("entry is not a uint32 %s", entry.Name()) } - r := newRound(ctx, s.datadir, uint32(epoch)) + r, err := newRound(ctx, roundsDir, uint32(epoch)) + if err != nil { + return nil, nil, fmt.Errorf("failed to create round: %w", err) + } + state, err := r.state() if err != nil { - return nil, nil, fmt.Errorf("invalid round state: %v", err) + return nil, nil, fmt.Errorf("invalid round state: %w", err) } if state.isExecuted() { - logger.Info("found round %v in executed state. broadcasting...", r.ID) - go broadcastProof(s, r, state.Execution, s.broadcaster) + s.reportNewProof(r.ID, state.Execution) continue } if state.isOpen() { logger.Info("found round %v in open state.", r.ID) if err := r.open(); err != nil { - return nil, nil, fmt.Errorf("failed to open round: %v", err) + return nil, nil, fmt.Errorf("failed to open round: %w", err) } // Keep the last open round as openRound (multiple open rounds state is possible @@ -386,24 +352,19 @@ func (s *Service) recover(ctx context.Context) (open *round, executing []*round, return open, executing, nil } -func (s *ServiceClient) SetBroadcaster(b Broadcaster) { - // No need to wait for the Command to execute. - s.command <- func(s *Service) { - old := s.broadcaster - s.broadcaster = b - if old != nil { - log.Info("Service broadcaster updated") - } - } +func (s *Service) SetChallengeVerifier(provider challenge_verifier.Verifier) { + s.challengeVerifier.Store(provider) } -func (s *ServiceClient) SetChallengeVerifier(provider challenge_verifier.Verifier) { - s.challengeVerifier.Store(provider) +type SubmitResult struct { + Round string + Hash []byte + RoundEnd time.Duration } -func (s *ServiceClient) Submit(ctx context.Context, challenge, signature []byte) (string, []byte, error) { - if !s.serviceStarted.Load() { - return "", nil, ErrNotStarted +func (s *Service) Submit(ctx context.Context, challenge, signature []byte) (*SubmitResult, error) { + if !s.Started() { + return nil, ErrNotStarted } logger := logging.FromContext(ctx) @@ -413,19 +374,23 @@ func (s *ServiceClient) Submit(ctx context.Context, challenge, signature []byte) result, err := verifier.Verify(ctx, challenge, signature) if err != nil { logger.With().Debug("challenge verification failed", log.Err(err)) - return "", nil, err + return nil, err } - logger.With().Debug("verified challenge", log.String("hash", hex.EncodeToString(result.Hash)), log.String("node_id", hex.EncodeToString(result.NodeId))) + logger.With().Debug("verified challenge", + log.String("hash", hex.EncodeToString(result.Hash)), + log.String("node_id", hex.EncodeToString(result.NodeId))) type response struct { round string err error + end time.Time } done := make(chan response, 1) - s.command <- func(s *Service) { + s.commands <- func(s *Service) { done <- response{ round: s.openRound.ID, err: s.openRound.submit(result.NodeId, result.Hash), + end: s.roundEndTime(s.openRound), } close(done) } @@ -433,25 +398,25 @@ func (s *ServiceClient) Submit(ctx context.Context, challenge, signature []byte) select { case resp := <-done: switch { + case resp.err == nil: + logger.With().Debug("submitted challenge for round", log.String("round", resp.round)) case errors.Is(resp.err, ErrChallengeAlreadySubmitted): - return resp.round, result.Hash, nil - case err != nil: - return "", nil, resp.err + case resp.err != nil: + return nil, err } - logger.With().Debug("submitted challenge for round", log.String("round", resp.round)) - return resp.round, result.Hash, nil + return &SubmitResult{ + Round: resp.round, + Hash: result.Hash, + RoundEnd: time.Until(resp.end), + }, nil case <-ctx.Done(): - return "", nil, ctx.Err() + return nil, ctx.Err() } } -func (s *ServiceClient) Info(ctx context.Context) (*InfoResponse, error) { - if !s.serviceStarted.Load() { - return nil, ErrNotStarted - } - +func (s *Service) Info(ctx context.Context) (*InfoResponse, error) { resp := make(chan *InfoResponse, 1) - s.command <- func(s *Service) { + s.commands <- func(s *Service) { defer close(resp) ids := make([]string, 0, len(s.executingRounds)) for id := range s.executingRounds { @@ -471,52 +436,30 @@ func (s *ServiceClient) Info(ctx context.Context) (*InfoResponse, error) { } // newRound creates a new round with the given epoch. -func (s *Service) newRound(ctx context.Context, epoch uint32) *round { - r := newRound(ctx, s.datadir, epoch) - if err := r.open(); err != nil { - panic(fmt.Errorf("failed to open round: %v", err)) - } - - log.With().Info("Round opened", log.String("ID", r.ID)) - return r -} - -func broadcastProof(s *Service, r *round, execution *executionState, broadcaster Broadcaster) { - msg, err := serializeProofMsg(s.PubKey, r.ID, execution) +func (s *Service) newRound(ctx context.Context, epoch uint32) (*round, error) { + roundsDir := filepath.Join(s.datadir, "rounds") + r, err := newRound(ctx, roundsDir, epoch) if err != nil { - log.Error(err.Error()) - return + return nil, fmt.Errorf("failed to create a new round: %w", err) } - - bindFunc := func() error { return broadcaster.BroadcastProof(msg, r.ID, r.execution.Members) } - logger := func(msg string) { log.Error("Round %v: %v", r.ID, msg) } - - if err := shared.Retry(bindFunc, int(s.cfg.BroadcastNumRetries), s.cfg.BroadcastRetriesInterval, logger); err != nil { - log.Error("Round %v proof broadcast failure: %v", r.ID, err) - return + if err := r.open(); err != nil { + return nil, fmt.Errorf("failed to open round: %w", err) } - r.broadcasted() + log.With().Info("Round opened", log.String("ID", r.ID)) + return r, nil } -func serializeProofMsg(servicePubKey []byte, roundID string, execution *executionState) ([]byte, error) { - proofMessage := PoetProofMessage{ - GossipPoetProof: GossipPoetProof{ +func (s *Service) reportNewProof(round string, execution *executionState) { + s.proofs <- shared.ProofMessage{ + Proof: shared.Proof{ MerkleProof: *execution.NIP, Members: execution.Members, NumLeaves: execution.NumLeaves, }, - ServicePubKey: servicePubKey, - RoundID: roundID, - Signature: nil, + ServicePubKey: s.PubKey, + RoundID: round, } - - var dataBuf bytes.Buffer - if _, err := proofMessage.EncodeScale(scale.NewEncoder(&dataBuf)); err != nil { - return nil, fmt.Errorf("failed to marshal proof message for round %v: %v", roundID, err) - } - - return dataBuf.Bytes(), nil } // CreateChallengeVerifier creates a verifier connected to provided gateways. diff --git a/service/service_scale.go b/service/service_scale.go deleted file mode 100644 index bb8a261a..00000000 --- a/service/service_scale.go +++ /dev/null @@ -1,127 +0,0 @@ -// Code generated by github.com/spacemeshos/go-scale/scalegen. DO NOT EDIT. - -// nolint -package service - -import ( - "github.com/spacemeshos/go-scale" -) - -func (t *PoetProofMessage) EncodeScale(enc *scale.Encoder) (total int, err error) { - { - n, err := t.GossipPoetProof.EncodeScale(enc) - if err != nil { - return total, err - } - total += n - } - { - n, err := scale.EncodeByteSlice(enc, t.ServicePubKey) - if err != nil { - return total, err - } - total += n - } - { - n, err := scale.EncodeString(enc, string(t.RoundID)) - if err != nil { - return total, err - } - total += n - } - { - n, err := scale.EncodeByteSlice(enc, t.Signature) - if err != nil { - return total, err - } - total += n - } - return total, nil -} - -func (t *PoetProofMessage) DecodeScale(dec *scale.Decoder) (total int, err error) { - { - n, err := t.GossipPoetProof.DecodeScale(dec) - if err != nil { - return total, err - } - total += n - } - { - field, n, err := scale.DecodeByteSlice(dec) - if err != nil { - return total, err - } - total += n - t.ServicePubKey = field - } - { - field, n, err := scale.DecodeString(dec) - if err != nil { - return total, err - } - total += n - t.RoundID = string(field) - } - { - field, n, err := scale.DecodeByteSlice(dec) - if err != nil { - return total, err - } - total += n - t.Signature = field - } - return total, nil -} - -func (t *GossipPoetProof) EncodeScale(enc *scale.Encoder) (total int, err error) { - { - n, err := t.MerkleProof.EncodeScale(enc) - if err != nil { - return total, err - } - total += n - } - { - n, err := scale.EncodeSliceOfByteSlice(enc, t.Members) - if err != nil { - return total, err - } - total += n - } - { - n, err := scale.EncodeCompact64(enc, uint64(t.NumLeaves)) - if err != nil { - return total, err - } - total += n - } - return total, nil -} - -func (t *GossipPoetProof) DecodeScale(dec *scale.Decoder) (total int, err error) { - { - n, err := t.MerkleProof.DecodeScale(dec) - if err != nil { - return total, err - } - total += n - } - { - field, n, err := scale.DecodeSliceOfByteSlice(dec) - if err != nil { - return total, err - } - total += n - t.Members = field - } - { - field, n, err := scale.DecodeCompact64(dec) - if err != nil { - return total, err - } - total += n - t.NumLeaves = uint64(field) - } - return total, nil -} diff --git a/service/service_test.go b/service/service_test.go index 28ebcf57..ecfd4eae 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -1,16 +1,13 @@ package service_test import ( - "bytes" "context" "crypto/rand" - "fmt" "strconv" "testing" "time" "github.com/golang/mock/gomock" - "github.com/spacemeshos/go-scale" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" @@ -20,15 +17,6 @@ import ( "github.com/spacemeshos/poet/service" ) -type MockBroadcaster struct { - receivedMessages chan []byte -} - -func (b *MockBroadcaster) BroadcastProof(msg []byte, roundID string, members [][]byte) error { - b.receivedMessages <- msg - return nil -} - type challenge struct { data []byte nodeID []byte @@ -36,12 +24,10 @@ type challenge struct { func TestService_Recovery(t *testing.T) { req := require.New(t) - broadcaster := &MockBroadcaster{receivedMessages: make(chan []byte)} cfg := &service.Config{ Genesis: time.Now().Add(time.Second).Format(time.RFC3339), - EpochDuration: time.Second, - PhaseShift: time.Second / 2, - CycleGap: time.Second / 4, + EpochDuration: time.Second * 2, + PhaseShift: time.Second, } ctrl := gomock.NewController(t) @@ -52,8 +38,12 @@ func TestService_Recovery(t *testing.T) { // Create a new service instance. s, err := service.NewService(cfg, tempdir) req.NoError(err) - err = s.Start(broadcaster, verifier) - req.NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var eg errgroup.Group + eg.Go(func() error { return s.Run(ctx) }) + req.NoError(s.Start(context.Background(), verifier)) // Generate 4 groups of random challenges. challengeGroupSize := 10 @@ -73,10 +63,10 @@ func TestService_Recovery(t *testing.T) { submitChallenges := func(roundID string, challenges []challenge) { for _, challenge := range challenges { verifier.EXPECT().Verify(gomock.Any(), challenge.data, nil).Return(&challenge_verifier.Result{Hash: challenge.data, NodeId: challenge.nodeID}, nil) - round, hash, err := s.Submit(context.Background(), challenge.data, nil) + result, err := s.Submit(context.Background(), challenge.data, nil) req.NoError(err) - req.Equal(challenge.data, hash) - req.Equal(roundID, round) + req.Equal(challenge.data, result.Hash) + req.Equal(roundID, result.Round) } } @@ -93,14 +83,19 @@ func TestService_Recovery(t *testing.T) { // Submit challenges to open round (1). submitChallenges("1", challengeGroups[1]) - req.NoError(s.Shutdown()) + cancel() + req.NoError(eg.Wait()) // Create a new service instance. s, err = service.NewService(cfg, tempdir) req.NoError(err) - err = s.Start(broadcaster, verifier) + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + eg = errgroup.Group{} + eg.Go(func() error { return s.Run(ctx) }) req.NoError(err) + req.NoError(s.Start(context.Background(), verifier)) // Service instance should recover 2 rounds: round 0 in executing state, and round 1 in open state. info, err := s.Info(context.Background()) @@ -120,52 +115,17 @@ func TestService_Recovery(t *testing.T) { submitChallenges("2", challengeGroups[2]) for i := 0; i < len(challengeGroups); i++ { - msg := <-broadcaster.receivedMessages - proofMsg := service.PoetProofMessage{} - dec := scale.NewDecoder(bytes.NewReader(msg)) - _, err := proofMsg.DecodeScale(dec) - req.NoError(err) - - req.Equal(strconv.Itoa(i), proofMsg.RoundID) + proof := <-s.ProofsChan() + req.Equal(strconv.Itoa(i), proof.RoundID) // Verify the submitted challenges. - req.Len(proofMsg.Members, len(challengeGroups[i]), "round: %v i: %d", proofMsg.RoundID, i) + req.Len(proof.Members, len(challengeGroups[i]), "round: %v i: %d", proof.RoundID, i) for _, ch := range challengeGroups[i] { - req.Contains(proofMsg.Members, ch.data, "round: %v, i: %d", proofMsg.RoundID, i) + req.Contains(proof.Members, ch.data, "round: %v, i: %d", proof.RoundID, i) } } - req.NoError(s.Shutdown()) -} - -func TestConcurrentServiceStartAndShutdown(t *testing.T) { - t.Parallel() - req := require.New(t) - - cfg := service.Config{ - Genesis: time.Now().Add(2 * time.Second).Format(time.RFC3339), - EpochDuration: time.Second, - PhaseShift: time.Second / 2, - CycleGap: time.Second / 4, - } - ctrl := gomock.NewController(t) - verifier := mocks.NewMockVerifier(ctrl) - - for i := 0; i < 100; i += 1 { - t.Run(fmt.Sprintf("iteration %d", i), func(t *testing.T) { - t.Parallel() - s, err := service.NewService(&cfg, t.TempDir()) - req.NoError(err) - - var eg errgroup.Group - eg.Go(func() error { - proofBroadcaster := &MockBroadcaster{receivedMessages: make(chan []byte)} - req.NoError(s.Start(proofBroadcaster, verifier)) - return nil - }) - req.Eventually(func() bool { return s.Shutdown() == nil }, time.Second, time.Millisecond*10) - eg.Wait() - }) - } + cancel() + req.NoError(eg.Wait()) } func TestNewService(t *testing.T) { @@ -174,17 +134,19 @@ func TestNewService(t *testing.T) { cfg := new(service.Config) cfg.Genesis = time.Now().Add(time.Second).Format(time.RFC3339) - cfg.EpochDuration = time.Second - cfg.PhaseShift = time.Second / 2 - cfg.CycleGap = time.Second / 4 + cfg.EpochDuration = time.Second * 2 + cfg.PhaseShift = time.Second s, err := service.NewService(cfg, tempdir) req.NoError(err) - proofBroadcaster := &MockBroadcaster{receivedMessages: make(chan []byte)} ctrl := gomock.NewController(t) verifier := mocks.NewMockVerifier(ctrl) - err = s.Start(proofBroadcaster, verifier) - req.NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var eg errgroup.Group + eg.Go(func() error { return s.Run(ctx) }) + req.NoError(s.Start(context.Background(), verifier)) challengesCount := 8 challenges := make([]challenge, challengesCount) @@ -205,10 +167,10 @@ func TestNewService(t *testing.T) { // Submit challenges. for i := 0; i < len(challenges); i++ { verifier.EXPECT().Verify(gomock.Any(), challenges[i].data, nil).Return(&challenge_verifier.Result{Hash: challenges[i].data, NodeId: challenges[i].nodeID}, nil) - round, hash, err := s.Submit(context.Background(), challenges[i].data, nil) + result, err := s.Submit(context.Background(), challenges[i].data, nil) req.NoError(err) - req.Equal(challenges[i].data, hash) - req.Equal(currentRound, round) + req.Equal(challenges[i].data, result.Hash) + req.Equal(currentRound, result.Round) } // Verify that round is still open. @@ -240,11 +202,7 @@ func TestNewService(t *testing.T) { }, time.Second, time.Millisecond*20) // Wait for proof message broadcast. - msg := <-proofBroadcaster.receivedMessages - proof := service.PoetProofMessage{} - dec := scale.NewDecoder(bytes.NewReader(msg)) - _, err = proof.DecodeScale(dec) - req.NoError(err) + proof := <-s.ProofsChan() req.Equal(currentRound, proof.RoundID) // Verify the submitted challenges. @@ -253,7 +211,8 @@ func TestNewService(t *testing.T) { req.Contains(proof.Members, ch.data) } - req.NoError(s.Shutdown()) + cancel() + req.NoError(eg.Wait()) } func TestSubmitIdempotency(t *testing.T) { @@ -270,21 +229,25 @@ func TestSubmitIdempotency(t *testing.T) { s, err := service.NewService(&cfg, t.TempDir()) req.NoError(err) - proofBroadcaster := &MockBroadcaster{receivedMessages: make(chan []byte)} verifier := mocks.NewMockVerifier(gomock.NewController(t)) verifier.EXPECT().Verify(gomock.Any(), challenge, signature).Times(2).Return(&challenge_verifier.Result{Hash: []byte("hash")}, nil) - err = s.Start(proofBroadcaster, verifier) - req.NoError(err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var eg errgroup.Group + eg.Go(func() error { return s.Run(ctx) }) + req.NoError(s.Start(context.Background(), verifier)) // Submit challenge - _, hash, err := s.Submit(context.Background(), challenge, signature) + result, err := s.Submit(context.Background(), challenge, signature) req.NoError(err) - req.Equal(hash, []byte("hash")) + req.Equal(result.Hash, []byte("hash")) // Try again - it should return the same result - _, hash, err = s.Submit(context.Background(), challenge, signature) + result, err = s.Submit(context.Background(), challenge, signature) req.NoError(err) - req.Equal(hash, []byte("hash")) + req.Equal(result.Hash, []byte("hash")) - req.NoError(s.Shutdown()) + cancel() + req.NoError(eg.Wait()) } diff --git a/shared/shared.go b/shared/shared.go index 668b8048..3571bf47 100644 --- a/shared/shared.go +++ b/shared/shared.go @@ -2,16 +2,13 @@ package shared import ( "encoding/binary" - "errors" - "fmt" "os" - "time" "github.com/minio/sha256-simd" "github.com/spacemeshos/merkle-tree" ) -//go:generate scalegen -types MerkleProof +//go:generate scalegen -types MerkleProof,ProofMessage,Proof const ( // T is the security param which determines the number of leaves @@ -71,26 +68,26 @@ func MakeLabelFunc() func(hash func(data []byte) []byte, labelID uint64, leftSib } } -type MerkleProof struct { - Root []byte - ProvenLeaves [][]byte - ProofNodes [][]byte -} +type Proof struct { + // The actual proof. + MerkleProof -// Retry provides generic capability for retryable function execution. -func Retry(retryable func() error, numRetries int, interval time.Duration, logger func(msg string)) error { - err := retryable() - if err == nil { - return nil - } + // Members is the ordered list of miners challenges which are included + // in the proof (by using the list hash digest as the proof generation input (the statement)). + Members [][]byte - if numRetries < 1 { - logger(err.Error()) - return errors.New("number of retries exceeded") - } + // NumLeaves is the width of the proof-generation tree. + NumLeaves uint64 +} - logger(fmt.Sprintf("%v | retrying in %v...", err, interval)) - time.Sleep(interval) +type ProofMessage struct { + Proof + ServicePubKey []byte + RoundID string +} - return Retry(retryable, numRetries-1, interval, logger) +type MerkleProof struct { + Root []byte + ProvenLeaves [][]byte + ProofNodes [][]byte } diff --git a/shared/shared_scale.go b/shared/shared_scale.go index 7d302736..0b6fb1c7 100644 --- a/shared/shared_scale.go +++ b/shared/shared_scale.go @@ -59,3 +59,107 @@ func (t *MerkleProof) DecodeScale(dec *scale.Decoder) (total int, err error) { } return total, nil } + +func (t *ProofMessage) EncodeScale(enc *scale.Encoder) (total int, err error) { + { + n, err := t.Proof.EncodeScale(enc) + if err != nil { + return total, err + } + total += n + } + { + n, err := scale.EncodeByteSlice(enc, t.ServicePubKey) + if err != nil { + return total, err + } + total += n + } + { + n, err := scale.EncodeString(enc, string(t.RoundID)) + if err != nil { + return total, err + } + total += n + } + return total, nil +} + +func (t *ProofMessage) DecodeScale(dec *scale.Decoder) (total int, err error) { + { + n, err := t.Proof.DecodeScale(dec) + if err != nil { + return total, err + } + total += n + } + { + field, n, err := scale.DecodeByteSlice(dec) + if err != nil { + return total, err + } + total += n + t.ServicePubKey = field + } + { + field, n, err := scale.DecodeString(dec) + if err != nil { + return total, err + } + total += n + t.RoundID = string(field) + } + return total, nil +} + +func (t *Proof) EncodeScale(enc *scale.Encoder) (total int, err error) { + { + n, err := t.MerkleProof.EncodeScale(enc) + if err != nil { + return total, err + } + total += n + } + { + n, err := scale.EncodeSliceOfByteSlice(enc, t.Members) + if err != nil { + return total, err + } + total += n + } + { + n, err := scale.EncodeCompact64(enc, uint64(t.NumLeaves)) + if err != nil { + return total, err + } + total += n + } + return total, nil +} + +func (t *Proof) DecodeScale(dec *scale.Decoder) (total int, err error) { + { + n, err := t.MerkleProof.DecodeScale(dec) + if err != nil { + return total, err + } + total += n + } + { + field, n, err := scale.DecodeSliceOfByteSlice(dec) + if err != nil { + return total, err + } + total += n + t.Members = field + } + { + field, n, err := scale.DecodeCompact64(dec) + if err != nil { + return total, err + } + total += n + t.NumLeaves = uint64(field) + } + return total, nil +}