Skip to content

Commit 16039cd

Browse files
committedFeb 16, 2024··
Accelerate the efficiency of block synchronization
1 parent 63f442f commit 16039cd

File tree

14 files changed

+1319
-957
lines changed

14 files changed

+1319
-957
lines changed
 

‎consensus/model/notify.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// and rpc server.
1414
type Notify interface {
1515
AnnounceNewTransactions(newTxs []*types.TxDesc, filters []peer.ID)
16-
RelayInventory(data interface{}, filters []peer.ID)
16+
RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID)
1717
BroadcastMessage(data interface{})
1818
TransactionConfirmed(tx *types.Tx)
1919
AddRebroadcastInventory(newTxs []*types.TxDesc)

‎core/blockchain/behaviorflags.go

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ const (
2121
// Add block from RPC interface
2222
BFRPCAdd
2323

24+
// Add block from broadcast interface
25+
BFBroadcast
26+
2427
// BFNone is a convenience value to specifically indicate no flags.
2528
BFNone BehaviorFlags = 0
2629
)

‎core/blockchain/notifications.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/Qitmeer/qng/common/hash"
1212
"github.com/Qitmeer/qng/core/event"
1313
"github.com/Qitmeer/qng/core/types"
14+
"github.com/libp2p/go-libp2p/core/peer"
1415
"time"
1516
)
1617

@@ -69,7 +70,8 @@ type BlockAcceptedNotifyData struct {
6970
Block *types.SerializedBlock
7071
Height uint64
7172

72-
Flags BehaviorFlags
73+
Flags BehaviorFlags
74+
Source *peer.ID
7375
}
7476

7577
// ReorganizationNotifyData is the structure for data indicating information

‎core/blockchain/orphan.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (b *BlockChain) processOrphans(flags BehaviorFlags) error {
258258
continue
259259
}
260260
b.RemoveOrphanBlock(cur)
261-
b.maybeAcceptBlock(cur.block, flags)
261+
b.maybeAcceptBlock(cur.block, flags, nil)
262262
}
263263
return nil
264264
}

‎core/blockchain/process.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package blockchain
88
import (
99
"container/list"
1010
"fmt"
11+
"github.com/libp2p/go-libp2p/core/peer"
1112
"time"
1213

1314
"github.com/Qitmeer/qng/common/hash"
@@ -36,12 +37,12 @@ import (
3637
//
3738
// This function is safe for concurrent access.
3839
// return IsOrphan,error
39-
func (b *BlockChain) ProcessBlock(block *types.SerializedBlock, flags BehaviorFlags) (meerdag.IBlock, bool, error) {
40+
func (b *BlockChain) ProcessBlock(block *types.SerializedBlock, flags BehaviorFlags, source *peer.ID) (meerdag.IBlock, bool, error) {
4041
if b.IsShutdown() {
4142
return nil, false, fmt.Errorf("block chain is shutdown")
4243
}
4344
block.Reset()
44-
msg := processMsg{block: block, flags: flags, result: make(chan *processResult)}
45+
msg := processMsg{block: block, flags: flags, result: make(chan *processResult), source: source}
4546
b.msgChan <- &msg
4647
result := <-msg.result
4748
return result.block, result.isOrphan, result.err
@@ -54,7 +55,7 @@ out:
5455
select {
5556
case msg := <-b.msgChan:
5657
start := time.Now()
57-
ib, isOrphan, err := b.processBlock(msg.block, msg.flags)
58+
ib, isOrphan, err := b.processBlock(msg.block, msg.flags, msg.source)
5859
blockProcessTimer.Update(time.Since(start))
5960
msg.result <- &processResult{isOrphan: isOrphan, err: err, block: ib}
6061
case <-b.quit:
@@ -75,14 +76,14 @@ cleanup:
7576
log.Trace("BlockChain handler done")
7677
}
7778

78-
func (b *BlockChain) processBlock(block *types.SerializedBlock, flags BehaviorFlags) (meerdag.IBlock, bool, error) {
79+
func (b *BlockChain) processBlock(block *types.SerializedBlock, flags BehaviorFlags, source *peer.ID) (meerdag.IBlock, bool, error) {
7980
isorphan, err := b.preProcessBlock(block, flags)
8081
if err != nil || isorphan {
8182
return nil, isorphan, err
8283
}
8384
// The block has passed all context independent checks and appears sane
8485
// enough to potentially accept it into the block chain.
85-
ib, err := b.maybeAcceptBlock(block, flags)
86+
ib, err := b.maybeAcceptBlock(block, flags, source)
8687
if err != nil {
8788
return nil, false, err
8889
}
@@ -193,7 +194,7 @@ func (b *BlockChain) preProcessBlock(block *types.SerializedBlock, flags Behavio
193194
// their documentation for how the flags modify their behavior.
194195
//
195196
// This function MUST be called with the chain state lock held (for writes).
196-
func (b *BlockChain) maybeAcceptBlock(block *types.SerializedBlock, flags BehaviorFlags) (meerdag.IBlock, error) {
197+
func (b *BlockChain) maybeAcceptBlock(block *types.SerializedBlock, flags BehaviorFlags, source *peer.ID) (meerdag.IBlock, error) {
197198
if onEnd := l.LogAndMeasureExecutionTime(log, "BlockChain.maybeAcceptBlock"); onEnd != nil {
198199
defer onEnd()
199200
}
@@ -293,6 +294,7 @@ func (b *BlockChain) maybeAcceptBlock(block *types.SerializedBlock, flags Behavi
293294
Block: block,
294295
Flags: flags,
295296
Height: uint64(ib.GetHeight()),
297+
Source: source,
296298
})
297299
if b.Acct != nil {
298300
err = b.Acct.Commit()
@@ -304,7 +306,7 @@ func (b *BlockChain) maybeAcceptBlock(block *types.SerializedBlock, flags Behavi
304306
}
305307

306308
func (b *BlockChain) FastAcceptBlock(block *types.SerializedBlock, flags BehaviorFlags) (meerdag.IBlock, error) {
307-
return b.maybeAcceptBlock(block, flags)
309+
return b.maybeAcceptBlock(block, flags, nil)
308310
}
309311

310312
// connectBestChain handles connecting the passed block to the chain while
@@ -788,6 +790,7 @@ type processMsg struct {
788790
block *types.SerializedBlock
789791
flags BehaviorFlags
790792
result chan *processResult
793+
source *peer.ID
791794
}
792795

793796
type processResult struct {

‎core/protocol/protocol.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ const (
2121
// Support continue block sync for DAG search
2222
ConSyncDAGProtocolVersion uint32 = 44
2323

24+
// Support continue block sync for DAG search
25+
BroadcastblockProtocolVersion uint32 = 45
26+
2427
// ProtocolVersion is the latest protocol version this package supports.
25-
ProtocolVersion uint32 = ConSyncDAGProtocolVersion
28+
ProtocolVersion uint32 = BroadcastblockProtocolVersion
2629
)
2730

2831
// Network represents which qitmeer network a message belongs to.

‎p2p/proto/v1/getblockdatas.pb.go

+199-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎p2p/proto/v1/getblockdatas.proto

+4
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,8 @@ message BlockDatas {
1515

1616
message BlockData {
1717
bytes blockBytes = 100 [(gogoproto.moretags) = "ssz-max:\"1048576\""];
18+
}
19+
20+
message BroadcastBlock {
21+
BlockData block =1;
1822
}

‎p2p/proto/v1/messages.ssz.go

+1,002-934
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎p2p/service.go

+25
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/Qitmeer/qng/core/event"
1212
"github.com/Qitmeer/qng/core/json"
1313
pv "github.com/Qitmeer/qng/core/protocol"
14+
"github.com/Qitmeer/qng/core/types"
1415
"github.com/Qitmeer/qng/node/service"
1516
"github.com/Qitmeer/qng/p2p/common"
1617
"github.com/Qitmeer/qng/p2p/discover"
@@ -479,6 +480,30 @@ func (s *Service) BroadcastMessage(data interface{}) {
479480

480481
}
481482

483+
func (s *Service) BroadcastBlock(block *types.SerializedBlock, source *peer.ID) error {
484+
for _, pe := range s.Peers().CanSyncPeers() {
485+
if source != nil {
486+
if pe.GetID() == *source {
487+
continue
488+
}
489+
}
490+
if pe.ChainState().ProtocolVersion < uint32(pv.BroadcastblockProtocolVersion) {
491+
continue
492+
}
493+
go func(pe *peers.Peer) {
494+
blockBytes, err := block.Bytes()
495+
if err != nil {
496+
log.Error(err.Error())
497+
return
498+
}
499+
if _, err := s.sy.Send(pe, synch.RPCBroadcastBlock, &pb.BroadcastBlock{Block: &pb.BlockData{BlockBytes: blockBytes}}); err != nil {
500+
log.Error(err.Error())
501+
}
502+
}(pe)
503+
}
504+
return nil
505+
}
506+
482507
func (s *Service) GetBanlist() map[peer.ID][]*json.BadResponse {
483508
result := map[peer.ID][]*json.BadResponse{}
484509
bads := s.Peers().Bad()

‎p2p/synch/getblockdatas.go

+48-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (ps *PeerSync) processGetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) *P
231231
block = b.Block
232232
}
233233
//
234-
_, IsOrphan, err := ps.sy.p2p.BlockChain().ProcessBlock(block, behaviorFlags)
234+
_, IsOrphan, err := ps.sy.p2p.BlockChain().ProcessBlock(block, behaviorFlags, nil)
235235
if err != nil {
236236
log.Error(fmt.Sprintf("Failed to process block:hash=%s err=%s", block.Hash(), err), "processID", ps.processID)
237237
continue
@@ -333,3 +333,50 @@ func (ps *PeerSync) OnGetData(sp *peers.Peer, invList []*pb.InvVect) error {
333333
}
334334
return nil
335335
}
336+
337+
func (s *Sync) sendBroadcastBlockRequest(stream network.Stream, pe *peers.Peer) *common.Error {
338+
e := ReadRspCode(stream, s.p2p)
339+
if !e.Code.IsSuccess() {
340+
e.Add("get block date request rsp")
341+
return e
342+
}
343+
msg := new(uint64)
344+
if err := DecodeMessage(stream, s.p2p, msg); err != nil {
345+
return common.NewError(common.ErrStreamRead, err)
346+
}
347+
if *msg != 0 {
348+
log.Trace("broadcast block is added")
349+
}
350+
return nil
351+
}
352+
353+
func (s *Sync) broadcastBlockHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream, pe *peers.Peer) *common.Error {
354+
m, ok := msg.(*pb.BroadcastBlock)
355+
if !ok {
356+
err := fmt.Errorf("message is not type *pb.Hash")
357+
return ErrMessage(err)
358+
}
359+
block, err := types.NewBlockFromBytes(m.Block.BlockBytes)
360+
if err != nil {
361+
return ErrMessage(err)
362+
}
363+
ret := uint64(0)
364+
if s.p2p.BlockChain().BlockDAG().HasBlock(block.Hash()) ||
365+
s.p2p.BlockChain().IsOrphan(block.Hash()) ||
366+
s.p2p.BlockChain().HasBlockInDB(block.Hash()) {
367+
return s.EncodeResponseMsg(stream, ret)
368+
}
369+
for _, ph := range block.Block().Parents {
370+
if !s.p2p.BlockChain().BlockDAG().HasBlock(ph) {
371+
return s.EncodeResponseMsg(stream, ret)
372+
}
373+
}
374+
peid := pe.GetID()
375+
_, _, err = s.p2p.BlockChain().ProcessBlock(block, blockchain.BFBroadcast, &peid)
376+
if err != nil {
377+
log.Error("Failed to process block", "hash", block.Hash(), "error", err)
378+
return s.EncodeResponseMsg(stream, ret)
379+
}
380+
ret = 1
381+
return s.EncodeResponseMsg(stream, ret)
382+
}

‎p2p/synch/sync.go

+10
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ const (
6262
RPCGetData = "/qitmeer/req/getdata/1"
6363
// RPCStateRoot defines the topic for the stateroot rpc method.
6464
RPCStateRoot = "/qitmeer/req/stateroot/1"
65+
// RPCBroadcastBlock defines the topic for the broadcast block rpc method.
66+
RPCBroadcastBlock = "/qitmeer/req/broadcastblock/1"
6567
)
6668

6769
// Time to first byte timeout. The maximum time to wait for first byte of
@@ -236,6 +238,12 @@ func (s *Sync) registerRPCHandlers() {
236238
&pb.StateRootReq{},
237239
s.stateRootHandler,
238240
)
241+
242+
s.registerRPC(
243+
RPCBroadcastBlock,
244+
&pb.BroadcastBlock{},
245+
s.broadcastBlockHandler,
246+
)
239247
}
240248

241249
// registerRPC for a given topic with an expected protobuf message type.
@@ -290,6 +298,8 @@ func (s *Sync) Send(pe *peers.Peer, protocol string, message interface{}) (inter
290298
ret, e = s.sendTxRequest(stream, pe)
291299
case RPCStateRoot:
292300
ret, e = s.sendStateRootRequest(stream, pe)
301+
case RPCBroadcastBlock:
302+
e = s.sendBroadcastBlockRequest(stream, pe)
293303
default:
294304
return nil, fmt.Errorf("Can't support:%s", protocol)
295305
}

‎services/miner/miner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ func (m *Miner) submitBlock(block *types.SerializedBlock) (interface{}, error) {
606606
}
607607
// Process this block using the same rules as blocks coming from other
608608
// nodes. This will in turn relay it to the network like normal.
609-
ib, IsOrphan, err := m.consensus.BlockChain().(*blockchain.BlockChain).ProcessBlock(block, blockchain.BFRPCAdd)
609+
ib, IsOrphan, err := m.consensus.BlockChain().(*blockchain.BlockChain).ProcessBlock(block, blockchain.BFRPCAdd, nil)
610610
if err != nil {
611611
// Anything other than a rule violation is an unexpected error,
612612
// so log that error as an internal error.

‎services/notifymgr/notifymgr.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,16 @@ func (ntmgr *NotifyMgr) AnnounceNewTransactions(newTxs []*types.TxDesc, filters
6565

6666
// RelayInventory relays the passed inventory vector to all connected peers
6767
// that are not already known to have it.
68-
func (ntmgr *NotifyMgr) RelayInventory(data interface{}, filters []peer.ID) {
68+
func (ntmgr *NotifyMgr) RelayInventory(block *types.SerializedBlock, flags uint32, source *peer.ID) {
6969
if ntmgr.IsShutdown() {
7070
return
7171
}
72-
_, ok := data.(types.BlockHeader)
73-
if !ok {
74-
log.Warn(fmt.Sprintf("No support relay data:%v", data))
75-
return
72+
fs := blockchain.BehaviorFlags(flags)
73+
if fs.Has(blockchain.BFRPCAdd) || fs.Has(blockchain.BFBroadcast) {
74+
err := ntmgr.Server.BroadcastBlock(block, source)
75+
if err != nil {
76+
log.Error(err.Error())
77+
}
7678
}
7779
ntmgr.Server.PeerSync().RelayGraphState()
7880
}
@@ -207,7 +209,7 @@ func (ntmgr *NotifyMgr) handleNotifyMsg(notification *blockchain.Notification) {
207209
return
208210
}
209211
log.Trace("we are current, can do relay")
210-
ntmgr.RelayInventory(block.Block().Header, nil)
212+
ntmgr.RelayInventory(block, uint32(band.Flags), band.Source)
211213

212214
// A block has been connected to the main block chain.
213215
case blockchain.BlockConnected:

0 commit comments

Comments
 (0)
Please sign in to comment.