Skip to content

Commit 0f066fb

Browse files
committed
consensus/istanbul: implement gossip network
1 parent 38f82a8 commit 0f066fb

File tree

6 files changed

+69
-5
lines changed

6 files changed

+69
-5
lines changed

consensus/istanbul/backend.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ type Backend interface {
3434
// EventMux returns the event mux in backend
3535
EventMux() *event.TypeMux
3636

37-
// Broadcast sends a message to all validators
37+
// Broadcast sends a message to all validators (include self)
3838
Broadcast(valSet ValidatorSet, payload []byte) error
3939

40+
// Gossip sends a message to all validators (exclude self)
41+
Gossip(valSet ValidatorSet, payload []byte) error
42+
4043
// Commit delivers an approved proposal to backend.
4144
// The delivered proposal will be put into blockchain.
4245
Commit(proposal Proposal, seals [][]byte) error

consensus/istanbul/backend/backend.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
func New(config *istanbul.Config, eventMux *event.TypeMux, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
4040
// Allocate the snapshot caches and create the engine
4141
recents, _ := lru.NewARC(inmemorySnapshots)
42+
recentMessages, _ := lru.NewARC(inmemoryPeers)
43+
knownMessages, _ := lru.NewARC(inmemoryMessages)
4244
backend := &backend{
4345
config: config,
4446
eventMux: eventMux,
@@ -51,6 +53,8 @@ func New(config *istanbul.Config, eventMux *event.TypeMux, privateKey *ecdsa.Pri
5153
recents: recents,
5254
candidates: make(map[common.Address]bool),
5355
coreStarted: false,
56+
recentMessages: recentMessages,
57+
knownMessages: knownMessages,
5458
}
5559
backend.core = istanbulCore.New(backend, backend.config)
5660
return backend
@@ -87,6 +91,9 @@ type backend struct {
8791
// event subscription for ChainHeadEvent event
8892
eventSub *event.TypeMuxSubscription
8993
broadcaster consensus.Broadcaster
94+
95+
recentMessages *lru.ARCCache // the cache of peer's messages
96+
knownMessages *lru.ARCCache // the cache of self messages
9097
}
9198

9299
// Address implements istanbul.Backend.Address
@@ -111,7 +118,11 @@ func (sb *backend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error
111118
return nil
112119
}
113120

121+
// Broadcast implements istanbul.Backend.Gossip
114122
func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error {
123+
hash := istanbul.RLPHash(payload)
124+
sb.knownMessages.Add(hash, true)
125+
115126
targets := make(map[common.Address]bool)
116127
for _, val := range valSet.List() {
117128
if val.Address() != sb.Address() {
@@ -121,7 +132,22 @@ func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error {
121132

122133
if sb.broadcaster != nil && len(targets) > 0 {
123134
ps := sb.broadcaster.FindPeers(targets)
124-
for _, p := range ps {
135+
for addr, p := range ps {
136+
ms, ok := sb.recentMessages.Get(addr)
137+
var m *lru.ARCCache
138+
if ok {
139+
m, _ = ms.(*lru.ARCCache)
140+
if _, k := m.Get(hash); k {
141+
// This peer had this event, skip it
142+
continue
143+
}
144+
} else {
145+
m, _ = lru.NewARC(inmemoryMessages)
146+
}
147+
148+
m.Add(hash, true)
149+
sb.recentMessages.Add(addr, m)
150+
125151
go p.Send(istanbulMsg, payload)
126152
}
127153
}

consensus/istanbul/backend/engine.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import (
4242
const (
4343
checkpointInterval = 1024 // Number of blocks after which to save the vote snapshot to the database
4444
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
45+
inmemoryPeers = 40
46+
inmemoryMessages = 100
4547
)
4648

4749
var (

consensus/istanbul/backend/handler.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/ethereum/go-ethereum/core"
2626
"github.com/ethereum/go-ethereum/core/types"
2727
"github.com/ethereum/go-ethereum/p2p"
28+
lru "github.com/hashicorp/golang-lru"
2829
)
2930

3031
const (
@@ -60,6 +61,25 @@ func (sb *backend) HandleMsg(addr common.Address, msg p2p.Msg) (bool, error) {
6061
return true, errDecodeFailed
6162
}
6263

64+
hash := istanbul.RLPHash(data)
65+
66+
// Mark peer's message
67+
ms, ok := sb.recentMessages.Get(addr)
68+
var m *lru.ARCCache
69+
if ok {
70+
m, _ = ms.(*lru.ARCCache)
71+
} else {
72+
m, _ = lru.NewARC(inmemoryMessages)
73+
sb.recentMessages.Add(addr, m)
74+
}
75+
m.Add(hash, true)
76+
77+
// Mark self known message
78+
if _, ok := sb.knownMessages.Get(hash); ok {
79+
return true, nil
80+
}
81+
sb.knownMessages.Add(hash, true)
82+
6383
go sb.istanbulEventMux.Post(istanbul.MessageEvent{
6484
Payload: data,
6585
})

consensus/istanbul/core/handler.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,19 @@ func (c *core) handleEvents() {
9595
c.storeRequestMsg(r)
9696
}
9797
case istanbul.MessageEvent:
98-
c.handleMsg(ev.Payload)
98+
if err := c.handleMsg(ev.Payload); err == nil {
99+
c.backend.Gossip(c.valSet, ev.Payload)
100+
}
99101
case backlogEvent:
100102
// No need to check signature for internal messages
101-
c.handleCheckedMsg(ev.msg, ev.src)
102-
103+
if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
104+
p, err := ev.msg.Payload()
105+
if err != nil {
106+
c.logger.Warn("Get message payload failed", "err", err)
107+
continue
108+
}
109+
c.backend.Gossip(c.valSet, p)
110+
}
103111
}
104112
case _, ok := <-c.timeoutSub.Chan():
105113
if !ok {

consensus/istanbul/core/testbackend_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ func (self *testSystemBackend) Broadcast(valSet istanbul.ValidatorSet, message [
8787
return nil
8888
}
8989

90+
func (self *testSystemBackend) Gossip(valSet istanbul.ValidatorSet, message []byte) error {
91+
testLogger.Warn("not sign any data")
92+
return nil
93+
}
94+
9095
func (self *testSystemBackend) NextRound() error {
9196
testLogger.Warn("nothing to happen")
9297
return nil

0 commit comments

Comments
 (0)