Skip to content

Commit

Permalink
Query poet proofs instead of relying on broadcasting (#3865)
Browse files Browse the repository at this point in the history
## Motivation
Part of spacemeshos/pm#173

Closes #3746 
Closes #3814 

## Changes
- removed broadcasting method from `GatewayService`
- removed p2p listeners for broadcasted poet proofs
- changed `NIPostBuilder` to query poets for proofs after the rounds end

## Test Plan
- added a system test in which nodes use different poets to verify if poet proofs are properly propagated between nodes

## TODO
- [ ] Bump poet to a released version in go.mod after spacemeshos/poet#187 is merged

## DevOps Notes
- [x] This PR does not require configuration changes (e.g., environment variables, GitHub secrets, VM resources)
- [ ] ~This PR does not affect public APIs~ Proof broadcasting was removed
- [ ] ~This PR does not rely on a new version of external services (PoET, elasticsearch, etc.)~ - It relies on a new Poet version
- [ ] ~This PR does not make changes to log messages (which monitoring infrastructure may rely on)~


Co-authored-by: moshababo <[email protected]>
  • Loading branch information
poszu and moshababo committed Dec 28, 2022
1 parent 40cf75d commit 5c7f486
Show file tree
Hide file tree
Showing 36 changed files with 681 additions and 588 deletions.
4 changes: 3 additions & 1 deletion activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ func (b *Builder) createAtx(ctx context.Context) (*types.ActivationTx, error) {
challenge.InitialPost = b.initialPost
challenge.InitialPostMetadata = b.initialPostMeta
}
nipost, postDuration, err := b.nipostBuilder.BuildNIPost(ctx, &challenge, poetProofDeadline)
buildingNipostCtx, cancel := context.WithDeadline(ctx, nextPoetRoundStart)
defer cancel()
nipost, postDuration, err := b.nipostBuilder.BuildNIPost(buildingNipostCtx, &challenge, poetProofDeadline)
if err != nil {
return nil, fmt.Errorf("failed to build NIPost: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,7 +1408,7 @@ func TestBuilder_UpdatePoets(t *testing.T) {
atxHdlr := newAtxHandler(t, cdb)
b := newBuilder(t, cdb, atxHdlr, WithPoETClientInitializer(func(string) PoetProvingServiceClient {
poet := NewMockPoetProvingServiceClient(gomock.NewController(t))
poet.EXPECT().PoetServiceID(gomock.Any()).Times(1).Return([]byte("poetid"), nil)
poet.EXPECT().PoetServiceID(gomock.Any()).AnyTimes().Return([]byte("poetid"), nil)
return poet
}))

Expand All @@ -1430,7 +1430,7 @@ func TestBuilder_UpdatePoetsUnstable(t *testing.T) {
atxHdlr := newAtxHandler(t, cdb)
b := newBuilder(t, cdb, atxHdlr, WithPoETClientInitializer(func(string) PoetProvingServiceClient {
poet := NewMockPoetProvingServiceClient(gomock.NewController(t))
poet.EXPECT().PoetServiceID(gomock.Any()).Times(1).Return([]byte("poetid"), errors.New("ERROR"))
poet.EXPECT().PoetServiceID(gomock.Any()).AnyTimes().Return([]byte("poetid"), errors.New("ERROR"))
return poet
}))

Expand Down
2 changes: 1 addition & 1 deletion activation/challenge_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (v *challengeVerifier) Verify(ctx context.Context, challengeBytes, signatur
}

func (v *challengeVerifier) verifyChallenge(ctx context.Context, challenge *types.PoetChallenge, nodeID types.NodeID) error {
log.With().Info("verifying challenge", log.Object("challenge", challenge))
log.GetLogger().WithContext(ctx).With().Info("Verifying challenge", log.Object("challenge", challenge))

if err := validateNumUnits(&v.cfg, challenge.NumUnits); err != nil {
return fmt.Errorf("%w: %v", ErrChallengeInvalid, err)
Expand Down
6 changes: 0 additions & 6 deletions activation/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ type atxReceiver interface {
OnAtx(*types.ActivationTxHeader)
}

type poetValidatorPersister interface {
HasProof(types.PoetProofRef) bool
Validate(types.PoetProof, []byte, string, []byte) error
StoreProof(context.Context, types.PoetProofRef, *types.PoetProofMessage) error
}

type nipostValidator interface {
Validate(nodeId types.NodeID, atxId types.ATXID, NIPost *types.NIPost, expectedChallenge types.Hash32, numUnits uint32) (uint64, error)
ValidatePost(nodeId types.NodeID, atxId types.ATXID, Post *types.Post, PostMetadata *types.PostMetadata, numUnits uint32) error
Expand Down
65 changes: 0 additions & 65 deletions activation/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

163 changes: 110 additions & 53 deletions activation/nipost.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ type PoetProvingServiceClient interface {
Submit(ctx context.Context, challenge []byte, signature []byte) (*types.PoetRound, error)

// PoetServiceID returns the public key of the PoET proving service.
PoetServiceID(context.Context) ([]byte, error)
PoetServiceID(context.Context) (types.PoetServiceID, error)

GetProof(ctx context.Context, roundID string) (*types.PoetProofMessage, error)
}

func (nb *NIPostBuilder) load(challenge types.Hash32) {
Expand Down Expand Up @@ -62,9 +64,8 @@ type NIPostBuilder struct {
}

type poetDbAPI interface {
GetMembershipMap(proofRef types.PoetProofRef) (map[types.Hash32]bool, error)
GetProof(types.PoetProofRef) (*types.PoetProof, error)
GetProofRef(poetID []byte, roundID string) (types.PoetProofRef, error)
ValidateAndStore(ctx context.Context, proofMessage *types.PoetProofMessage) error
}

// NewNIPostBuilder returns a NIPostBuilder.
Expand Down Expand Up @@ -129,10 +130,10 @@ func (nb *NIPostBuilder) BuildNIPost(ctx context.Context, challenge *types.PoetC

validPoetRequests := make([]types.PoetRequest, 0, len(poetRequests))
for _, req := range poetRequests {
if !bytes.Equal(req.PoetRound.ChallengeHash, challengeHash[:]) {
if !bytes.Equal(req.PoetRound.ChallengeHash[:], challengeHash[:]) {
nb.log.With().Info(
"poet returned invalid challenge hash",
log.Binary("hash", req.PoetRound.ChallengeHash),
req.PoetRound.ChallengeHash,
log.String("poet_id", hex.EncodeToString(req.PoetServiceID)),
)
} else {
Expand All @@ -148,17 +149,16 @@ func (nb *NIPostBuilder) BuildNIPost(ctx context.Context, challenge *types.PoetC
nb.persist()
}

// Phase 1: receive proofs from PoET services
// Phase 1: query PoET services for proofs
if nb.state.PoetProofRef == nil {
select {
case <-time.After(time.Until(poetProofDeadline)):
case <-ctx.Done():
return nil, 0, ctx.Err()
getProofsCtx, cancel := context.WithDeadline(ctx, poetProofDeadline)
defer cancel()
poetProofRef, err := nb.getBestProof(getProofsCtx, challengeHash)
if err != nil {
return nil, 0, &PoetSvcUnstableError{msg: "getBestProof failed", source: err}
}
poetProofRef := nb.getBestProof(ctx, challengeHash)
if poetProofRef == nil {
// Time is up - ATX challenge is expired.
return nil, 0, ErrPoetProofNotReceived
return nil, 0, &PoetSvcUnstableError{source: ErrPoetProofNotReceived}
}
nb.state.PoetProofRef = poetProofRef
nb.persist()
Expand Down Expand Up @@ -194,26 +194,20 @@ func (nb *NIPostBuilder) BuildNIPost(ctx context.Context, challenge *types.PoetC
}

// Submit the challenge to a single PoET.
func submitPoetChallenge(ctx context.Context, logger log.Log, poet PoetProvingServiceClient, challenge []byte, signature []byte) (*types.PoetRequest, error) {
func (nb *NIPostBuilder) submitPoetChallenge(ctx context.Context, poet PoetProvingServiceClient, challenge []byte, signature []byte) (*types.PoetRequest, error) {
poetServiceID, err := poet.PoetServiceID(ctx)
if err != nil {
return nil, &PoetSvcUnstableError{msg: "failed to get PoET service ID", source: err}
}

logger.With().Debug("submitting challenge to poet proving service",
log.String("poet_id", hex.EncodeToString(poetServiceID)))
logger := nb.log.WithFields(log.String("poet_id", hex.EncodeToString(poetServiceID)))
logger.Debug("submitting challenge to poet proving service")

round, err := poet.Submit(ctx, challenge, signature)
if err != nil {
logger.With().Error("failed to submit challenge to poet proving service",
log.String("poet_id", hex.EncodeToString(poetServiceID)),
log.Err(err))
return nil, &PoetSvcUnstableError{msg: "failed to submit challenge to poet service", source: err}
}

logger.With().Info("challenge submitted to poet proving service",
log.String("poet_id", hex.EncodeToString(poetServiceID)),
log.String("round_id", round.ID))
logger.With().Info("challenge submitted to poet proving service", log.String("round", round.ID))

return &types.PoetRequest{
PoetRound: round,
Expand All @@ -228,7 +222,7 @@ func (nb *NIPostBuilder) submitPoetChallenges(ctx context.Context, challenge []b
for _, poetProver := range nb.poetProvers {
poet := poetProver
g.Go(func() error {
if poetRequest, err := submitPoetChallenge(ctx, nb.log, poet, challenge, signature); err == nil {
if poetRequest, err := nb.submitPoetChallenge(ctx, poet, challenge, signature); err == nil {
poetRequestsChannel <- *poetRequest
} else {
nb.log.With().Warning("failed to submit challenge to PoET", log.Err(err))
Expand All @@ -246,47 +240,110 @@ func (nb *NIPostBuilder) submitPoetChallenges(ctx context.Context, challenge []b
return poetRequests
}

func (nb *NIPostBuilder) getBestProof(ctx context.Context, challenge *types.Hash32) types.PoetProofRef {
type poetProof struct {
ref types.PoetProofRef
leafCount uint64
func (nb *NIPostBuilder) getPoetClient(ctx context.Context, id types.PoetServiceID) PoetProvingServiceClient {
for _, client := range nb.poetProvers {
if clientId, err := client.PoetServiceID(ctx); err == nil && bytes.Equal(id, clientId) {
return client
}
}
var bestProof *poetProof
return nil
}

for _, poetSubmission := range nb.state.PoetRequests {
ref, err := nb.poetDB.GetProofRef(poetSubmission.PoetServiceID, poetSubmission.PoetRound.ID)
if err != nil {
continue
func membersContain(members [][]byte, challenge *types.Hash32) bool {
for _, member := range members {
if bytes.Equal(member, challenge.Bytes()) {
return true
}
// We are interested only in proofs that we are members of
membership, err := nb.poetDB.GetMembershipMap(ref)
if err != nil {
nb.log.With().Panic("failed to fetch membership for poet proof", log.Binary("challenge", challenge[:]))
}
return false
}

func (nb *NIPostBuilder) getProofWithRetry(ctx context.Context, client PoetProvingServiceClient, roundID string, retryInterval time.Duration) (*types.PoetProofMessage, error) {
for {
proof, err := client.GetProof(ctx, roundID)
switch {
case err == nil:
return proof, nil
case errors.Is(err, ErrUnavailable) || errors.Is(err, ErrNotFound):
nb.log.With().Debug("Proof not found, retrying", log.Duration("interval", retryInterval))
select {
case <-ctx.Done():
return nil, fmt.Errorf("retry was canceled: %w", ctx.Err())
case <-time.After(retryInterval):
}
default:
return nil, err
}
if !membership[*challenge] {
nb.log.With().Debug("poet proof membership doesn't contain the challenge", log.Binary("challenge", challenge[:]))
}
}

func (nb *NIPostBuilder) getBestProof(ctx context.Context, challenge *types.Hash32) (types.PoetProofRef, error) {
proofs := make(chan *types.PoetProofMessage, len(nb.state.PoetRequests))

var eg errgroup.Group
for _, r := range nb.state.PoetRequests {
logger := nb.log.WithFields(log.String("poet_id", hex.EncodeToString(r.PoetServiceID)), log.String("round", r.PoetRound.ID))
client := nb.getPoetClient(ctx, r.PoetServiceID)
if client == nil {
logger.Warning("Poet client not found")
continue
}
proof, err := nb.poetDB.GetProof(ref)
if err != nil {
nb.log.Panic("Inconsistent state of poetDB. Received poetProofRef which doesn't exist in poetDB.")
}
nb.log.With().Info("Got a new PoET proof", log.Uint64("leafCount", proof.LeafCount), log.Binary("ref", ref))
round := r.PoetRound.ID
// Time to wait before quering for the proof
// The additional second is an optimization to be nicer to poet
// and don't accidentially ask it to soon and have to retry.
waitTime := time.Until(r.PoetRound.End.IntoTime()) + time.Second
eg.Go(func() error {
logger.With().Info("Waiting till poet round end", log.Duration("wait time", waitTime))
select {
case <-ctx.Done():
logger.With().Info("Waiting interrupted", log.Err(ctx.Err()))
return ctx.Err()
case <-time.After(waitTime):
}
proof, err := nb.getProofWithRetry(ctx, client, round, time.Second)
if err != nil {
logger.With().Warning("Failed to get proof from Poet", log.Err(err))
return nil
}

if bestProof == nil || bestProof.leafCount < proof.LeafCount {
bestProof = &poetProof{
ref: ref,
leafCount: proof.LeafCount,
if err := nb.poetDB.ValidateAndStore(ctx, proof); err != nil && !errors.Is(err, ErrObjectExists) {
logger.With().Warning("Failed to validate and store proof", log.Err(err), log.Object("proof", proof))
return nil
}

// We are interested only in proofs that we are members of
if !membersContain(proof.Members, challenge) {
logger.With().Warning("poet proof membership doesn't contain the challenge", challenge)
return nil
}

proofs <- proof
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("querying for proofs failed: %w", err)
}
close(proofs)

var bestProof *types.PoetProofMessage

for proof := range proofs {
nb.log.With().Info("Got a new PoET proof", log.Uint64("leafCount", proof.LeafCount))
if bestProof == nil || bestProof.LeafCount < proof.LeafCount {
bestProof = proof
}
}

if bestProof != nil {
nb.log.With().Debug("Selected the best PoET proof",
log.Uint64("leafCount", bestProof.leafCount),
log.Binary("ref", bestProof.ref))
return bestProof.ref
ref, err := bestProof.Ref()
if err != nil {
return nil, fmt.Errorf("failed to get proof ref: %w", err)
}
nb.log.With().Info("Selected the best proof", log.Uint64("leafCount", bestProof.LeafCount), log.Binary("ref", ref))
return ref, nil
}

return nil
return nil, ErrPoetProofNotReceived
}
Loading

0 comments on commit 5c7f486

Please sign in to comment.