Skip to content

Commit

Permalink
*: support extra dBFT stage
Browse files Browse the repository at this point in the history
Ref. #112.

Signed-off-by: Anna Shaleva <[email protected]>
  • Loading branch information
AnnaShaleva committed Jul 12, 2024
1 parent de50c7b commit 7ea4a6f
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 2 deletions.
64 changes: 64 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,70 @@ func (d *DBFT[H]) checkCommit() {
return
}

if d.isAntiMEVExtensionEnabled() {
d.preBlock = d.CreatePreBlock()
hash := d.preBlock.Hash()

d.Logger.Info("processing PreBlock",
zap.Uint32("height", d.BlockIndex),
zap.Stringer("preBlock hash", hash),
zap.Int("tx_count", len(d.preBlock.Transactions())))

d.preBlockProcessed = true
d.ProcessPreBlock(d.preBlock)

if d.CommitSent() { // TODO: Do we really need to require Commit sent by *self* or M other's Commits is enough to sent CommitAck?
d.sendCommitAck()
d.changeTimer(d.SecondsPerBlock)
d.checkCommitAck()
} else {
d.Logger.Debug("can't send commitAck since self commit not yet sent")
}
return
}

d.lastBlockIndex = d.BlockIndex
d.lastBlockTime = d.Timer.Now()
d.block = d.CreateBlock()
hash := d.block.Hash()

d.Logger.Info("approving block",
zap.Uint32("height", d.BlockIndex),
zap.Stringer("hash", hash),
zap.Int("tx_count", len(d.block.Transactions())),
zap.Stringer("merkle", d.block.MerkleRoot()),
zap.Stringer("prev", d.block.PrevHash()))

d.blockProcessed = true
d.ProcessBlock(d.block)

// Do not initialize consensus process immediately. It's the caller's duty to
// start the new block acceptance process and call Reset at the
// new height.
}

// TODO: check logic
func (d *DBFT[H]) checkCommitAck() {
if !d.hasAllTransactions() {
d.Logger.Debug("check commit: some transactions are missing", zap.Any("hashes", d.MissingTransactions))
return
}

// return if we received commits from other nodes
// before receiving PrepareRequest from Speaker
count := 0

for _, msg := range d.CommitPayloads {
if msg != nil && msg.ViewNumber() == d.ViewNumber {
count++
}
}

if count < d.M() {
d.Logger.Debug("not enough to commit", zap.Int("count", count))
return
}

d.lastBlockIndex = d.BlockIndex
d.lastBlockTime = d.Timer.Now()
d.block = d.CreateBlock()
Expand Down
23 changes: 23 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ type Config[H Hash] struct {
// if current time is less than that of previous context.
// By default use millisecond precision.
TimestampIncrement uint64
// AntiMEVExtensionEnabled denotes the height starting from which dBFT
// Anti-MEV extensions should be enabled. -1 means no extensions are
// enabled.
AntiMEVExtensionEnabled int64
// GetKeyPair returns an index of the node in the list of validators
// together with it's key pair.
GetKeyPair func([]PublicKey) (int, PrivateKey, PublicKey)
// NewPreBlockFromContext should allocate, fill from Context and return new block.PreBlock.
NewPreBlockFromContext func(ctx *Context[H]) PreBlock[H]

Check failure on line 31 in config.go

View workflow job for this annotation

GitHub Actions / Coverage

undefined: PreBlock

Check failure on line 31 in config.go

View workflow job for this annotation

GitHub Actions / Test (1.21, ubuntu-latest)

undefined: PreBlock

Check failure on line 31 in config.go

View workflow job for this annotation

GitHub Actions / Test (1.22, macos-14)

undefined: PreBlock

Check failure on line 31 in config.go

View workflow job for this annotation

GitHub Actions / Test (1.20, ubuntu-latest)

undefined: PreBlock
// NewBlockFromContext should allocate, fill from Context and return new block.Block.
NewBlockFromContext func(ctx *Context[H]) Block[H]
// RequestTx is a callback which is called when transaction contained
Expand All @@ -40,6 +46,8 @@ type Config[H Hash] struct {
VerifyBlock func(b Block[H]) bool
// Broadcast should broadcast payload m to the consensus nodes.
Broadcast func(m ConsensusPayload[H])
// ProcessBlock is called every time new preBlock is accepted.
ProcessPreBlock func(b Block[H])
// ProcessBlock is called every time new block is accepted.
ProcessBlock func(b Block[H])
// GetBlock should return block with hash.
Expand Down Expand Up @@ -171,6 +179,14 @@ func WithTimestampIncrement[H Hash](u uint64) func(config *Config[H]) {
}
}

// WithNewPreBlockFromContext sets NewPreBlockFromContext.
// TODO: config validation depending on extension.
func WithNewPreBlockFromContext[H Hash](f func(ctx *Context[H]) PreBlock[H]) func(config *Config[H]) {

Check failure on line 184 in config.go

View workflow job for this annotation

GitHub Actions / Coverage

undefined: PreBlock

Check failure on line 184 in config.go

View workflow job for this annotation

GitHub Actions / Test (1.21, ubuntu-latest)

undefined: PreBlock

Check failure on line 184 in config.go

View workflow job for this annotation

GitHub Actions / Test (1.22, macos-14)

undefined: PreBlock

Check failure on line 184 in config.go

View workflow job for this annotation

GitHub Actions / Test (1.20, ubuntu-latest)

undefined: PreBlock
return func(cfg *Config[H]) {
cfg.NewPreBlockFromContext = f
}
}

// WithNewBlockFromContext sets NewBlockFromContext.
func WithNewBlockFromContext[H Hash](f func(ctx *Context[H]) Block[H]) func(config *Config[H]) {
return func(cfg *Config[H]) {
Expand Down Expand Up @@ -227,6 +243,13 @@ func WithProcessBlock[H Hash](f func(b Block[H])) func(config *Config[H]) {
}
}

// WithProcessPreBlock sets ProcessPreBlock.
func WithProcessPreBlock[H Hash](f func(b Block[H])) func(config *Config[H]) {
return func(cfg *Config[H]) {
cfg.ProcessPreBlock = f
}
}

// WithGetBlock sets GetBlock.
func WithGetBlock[H Hash](f func(h H) Block[H]) func(config *Config[H]) {
return func(cfg *Config[H]) {
Expand Down
2 changes: 2 additions & 0 deletions consensus_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type ConsensusMessage[H Hash] interface {
GetPrepareResponse() PrepareResponse[H]
// GetCommit returns payload as if it was Commit.
GetCommit() Commit
// GetCommitAck returns payload as if it was CommitAck.
GetCommitAck() CommitAck

Check failure on line 21 in consensus_message.go

View workflow job for this annotation

GitHub Actions / Coverage

undefined: CommitAck

Check failure on line 21 in consensus_message.go

View workflow job for this annotation

GitHub Actions / Test (1.21, ubuntu-latest)

undefined: CommitAck

Check failure on line 21 in consensus_message.go

View workflow job for this annotation

GitHub Actions / Test (1.22, macos-14)

undefined: CommitAck

Check failure on line 21 in consensus_message.go

View workflow job for this annotation

GitHub Actions / Test (1.20, ubuntu-latest)

undefined: CommitAck
// GetRecoveryRequest returns payload as if it was RecoveryRequest.
GetRecoveryRequest() RecoveryRequest
// GetRecoveryMessage returns payload as if it was RecoveryMessage.
Expand Down
3 changes: 3 additions & 0 deletions consensus_message_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
PrepareRequestType MessageType = 0x20
PrepareResponseType MessageType = 0x21
CommitType MessageType = 0x30
CommitAckType MessageType = 0x31
RecoveryRequestType MessageType = 0x40
RecoveryMessageType MessageType = 0x41
)
Expand All @@ -26,6 +27,8 @@ func (m MessageType) String() string {
return "PrepareResponse"
case CommitType:
return "Commit"
case CommitAckType:
return "CommitAck"
case RecoveryRequestType:
return "RecoveryRequest"
case RecoveryMessageType:
Expand Down
70 changes: 68 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ type Context[H Hash] struct {
// Pub is node's public key.
Pub PublicKey

block Block[H]
header Block[H]
preBlock PreBlock[H]

Check failure on line 26 in context.go

View workflow job for this annotation

GitHub Actions / Coverage

undefined: PreBlock

Check failure on line 26 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.21, ubuntu-latest)

undefined: PreBlock

Check failure on line 26 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.22, macos-14)

undefined: PreBlock

Check failure on line 26 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.20, ubuntu-latest)

undefined: PreBlock
preHeader PreBlock[H]

Check failure on line 27 in context.go

View workflow job for this annotation

GitHub Actions / Coverage

undefined: PreBlock

Check failure on line 27 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.21, ubuntu-latest)

undefined: PreBlock

Check failure on line 27 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.22, macos-14)

undefined: PreBlock

Check failure on line 27 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.20, ubuntu-latest)

undefined: PreBlock
block Block[H]
header Block[H]
// blockProcessed denotes whether Config.ProcessBlock callback was called for the current
// height. If so, then no second call must happen. After new block is received by the user,
// dBFT stops any new transaction or messages processing as far as timeouts handling till
// the next call to Reset.
blockProcessed bool
// TODO: add a comment, t has another meaning than blockProcessed.
preBlockProcessed bool

// BlockIndex is current block index.
BlockIndex uint32
Expand Down Expand Up @@ -65,6 +69,15 @@ type Context[H Hash] struct {
// current round, so it's possible to verify Commit against it) or stored till
// the corresponding PrepareRequest receiving.
CommitPayloads []ConsensusPayload[H]
// CommitAckPayloads stores consensus CommitAck payloads sent through all epochs.
// It is assumed that valid CommitAck payloads can only be sent once by a single
// node per the whole set of consensus epochs for particular block. Invalid
// CommitAck payloads are kicked off this list immediately (if Commit
// [TODO: and PrepareRequest? How do we verify CommitAck payloads: based on
// Commit only or based on PrepareRequest?] was received for the current round,
// so it's possible to verify CommitAck against it) or stored till the
// corresponding Commit receiving.
CommitAckPayloads []ConsensusPayload[H]
// ChangeViewPayloads stores consensus ChangeView payloads for the current epoch.
ChangeViewPayloads []ConsensusPayload[H]
// LastChangeViewPayloads stores consensus ChangeView payloads for the last epoch.
Expand Down Expand Up @@ -149,6 +162,12 @@ func (c *Context[H]) CommitSent() bool {
return !c.WatchOnly() && c.CommitPayloads[c.MyIndex] != nil
}

// CommitAckSent returns true iff CommitAck message was sent for the current epoch
// assuming that the node can't go further than current epoch after commit was sent.
func (c *Context[H]) CommitAckSent() bool {
return !c.WatchOnly() && c.CommitAckPayloads[c.MyIndex] != nil
}

// BlockSent returns true iff block was formed AND sent for the current height.
// Once block is sent, the consensus stops new transactions and messages processing
// as far as timeouts handling.
Expand Down Expand Up @@ -285,24 +304,71 @@ func (c *Context[H]) CreateBlock() Block[H] {
}

c.block.SetTransactions(txx)

// TODO: do we really need this? CreateBlock will be called when all decryption data are available, thus we may
// add all necessary information in MakeHeader or in SetTransactions. For now, I'd skip it.
//if c.isAntiMEVExtensionEnabled() {
// c.block.Finalize()
//}
}

return c.block
}

// CreatePreBlock returns PreBlock for the current epoch.
func (c *Context[H]) CreatePreBlock() PreBlock[H] {

Check failure on line 319 in context.go

View workflow job for this annotation

GitHub Actions / Coverage

undefined: PreBlock

Check failure on line 319 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.21, ubuntu-latest)

undefined: PreBlock

Check failure on line 319 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.22, macos-14)

undefined: PreBlock

Check failure on line 319 in context.go

View workflow job for this annotation

GitHub Actions / Test (1.20, ubuntu-latest)

undefined: PreBlock
if c.preBlock == nil {
if c.preBlock = c.MakePreHeader(); c.preBlock == nil {
return nil
}

txx := make([]Transaction[H], len(c.TransactionHashes))

for i, h := range c.TransactionHashes {
txx[i] = c.Transactions[h]
}

c.preBlock.SetTransactions(txx)
}

return c.preBlock
}

// isAntiMEVExtensionEnabled returns whether Anti-MEV dBFT extension is enabled
// at the currently processing block height.
func (c *Context[H]) isAntiMEVExtensionEnabled() bool {
return c.Config.AntiMEVExtensionEnabled >= 0 && uint32(c.Config.AntiMEVExtensionEnabled) < c.BlockIndex
}

// MakeHeader returns half-filled block for the current epoch.
// All hashable fields will be filled.
func (c *Context[H]) MakeHeader() Block[H] {
if c.header == nil {
if !c.RequestSentOrReceived() {
return nil
}
if c.isAntiMEVExtensionEnabled() && !c.CommitAckSent() {
return nil
}
c.header = c.Config.NewBlockFromContext(c)
}

return c.header
}

// MakePreHeader returns half-filled block for the current epoch.
// All hashable fields will be filled.
func (c *Context[H]) MakePreHeader() Block[H] {
if c.preHeader == nil {
if !c.RequestSentOrReceived() {
return nil
}
c.preHeader = c.Config.NewPreBlockFromContext(c)
}

return c.preHeader
}

// hasAllTransactions returns true iff all transactions were received
// for the proposed block.
func (c *Context[H]) hasAllTransactions() bool {
Expand Down
68 changes: 68 additions & 0 deletions dbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ func (d *DBFT[H]) OnReceive(msg ConsensusPayload[H]) {
d.onPrepareResponse(msg)
case CommitType:
d.onCommit(msg)
case CommitAckType:
if !d.isAntiMEVExtensionEnabled() {
d.Logger.Error(fmt.Sprintf("%s message received but AntiMEVExtension is disabled", CommitAckType),
zap.Uint16("from", msg.ValidatorIndex()),
)
return
}
d.onCommitAck(msg)
case RecoveryRequestType:
d.onRecoveryRequest(msg)
case RecoveryMessageType:
Expand Down Expand Up @@ -497,6 +505,23 @@ func (d *DBFT[H]) onCommit(msg ConsensusPayload[H]) {
if d.ViewNumber == msg.ViewNumber() {
d.Logger.Info("received Commit", zap.Uint("validator", uint(msg.ValidatorIndex())))
d.extendTimer(4)
if d.isAntiMEVExtensionEnabled() {
preHeader := d.MakePreHeader()
if preHeader == nil {
d.CommitPayloads[msg.ValidatorIndex()] = msg
} else {
pub := d.Validators[msg.ValidatorIndex()]
if preHeader.Verify(pub, msg.GetCommit().Signature()) == nil {
d.CommitPayloads[msg.ValidatorIndex()] = msg
d.checkCommit()
} else {
d.Logger.Warn("invalid commit signature",
zap.Uint("validator", uint(msg.ValidatorIndex())),
)
}
}
return
}
header := d.MakeHeader()
if header == nil {
d.CommitPayloads[msg.ValidatorIndex()] = msg
Expand All @@ -522,6 +547,49 @@ func (d *DBFT[H]) onCommit(msg ConsensusPayload[H]) {
d.CommitPayloads[msg.ValidatorIndex()] = msg
}

func (d *DBFT[H]) onCommitAck(msg ConsensusPayload[H]) {
existing := d.CommitAckPayloads[msg.ValidatorIndex()]
if existing != nil {
if existing.Hash() != msg.Hash() {
d.Logger.Warn("rejecting commitAck due to existing",
zap.Uint("validator", uint(msg.ValidatorIndex())),
zap.Uint("existing view", uint(existing.ViewNumber())),
zap.Uint("view", uint(msg.ViewNumber())),
zap.Stringer("existing hash", existing.Hash()),
zap.Stringer("hash", msg.Hash()),
)
}
return
}
if d.ViewNumber == msg.ViewNumber() {
d.Logger.Info("received CommitAck", zap.Uint("validator", uint(msg.ValidatorIndex())))
d.extendTimer(4)
header := d.MakeHeader()
if header == nil {
d.CommitAckPayloads[msg.ValidatorIndex()] = msg
} else {
pub := d.Validators[msg.ValidatorIndex()]
if err := header.Verify(pub, msg.GetCommitAck().Data()); err == nil {
d.CommitAckPayloads[msg.ValidatorIndex()] = msg
d.checkCommitAck()
} else {
d.Logger.Warn("invalid commitAck",
zap.Uint("validator", uint(msg.ValidatorIndex())),
zap.Error(err),
)
}
}

return
}

d.Logger.Info("received commitAck for different view",
zap.Uint("validator", uint(msg.ValidatorIndex())),
zap.Uint("view", uint(msg.ViewNumber())),
)
d.CommitAckPayloads[msg.ValidatorIndex()] = msg
}

func (d *DBFT[H]) onRecoveryRequest(msg ConsensusPayload[H]) {
if !d.CommitSent() {
// Limit recoveries to be sent from no more than F nodes
Expand Down
Loading

0 comments on commit 7ea4a6f

Please sign in to comment.