Skip to content

Commit

Permalink
Handle msg before sending to neighbors to prevent duplicate tx
Browse files Browse the repository at this point in the history
Signed-off-by: Yilun <[email protected]>
  • Loading branch information
yilunzhang committed Nov 6, 2018
1 parent 2c51dc0 commit 59049e2
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 106 deletions.
6 changes: 0 additions & 6 deletions net/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,11 @@ func AllocMsg(t string, length int) Messenger {
case "tx":
var msg trn
copy(msg.msgHdr.CMD[0:len(t)], t)
//if (message.Payload.Length <= 1024 * 1024)
//OnInventoryReceived(Transaction.DeserializeFrom(message.Payload));
return &msg
case "ising":
var msg IsingMessage
copy(msg.msgHdr.CMD[0:len(t)], t)
return &msg
case "txnpool":
var msg txnPool
copy(msg.msgHdr.CMD[0:len(t)], t)
return &msg
case "notfound":
var msg notFound
copy(msg.msgHdr.CMD[0:len(t)], t)
Expand Down
17 changes: 0 additions & 17 deletions net/message/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,3 @@ func (msg *trn) Deserialize(r io.Reader) error {

return nil
}

type txnPool struct {
msgHdr
//TBD
}

func ReqTxnPool(node Noder) error {
msg := AllocMsg("txnpool", 0)
buff := bytes.NewBuffer(nil)
err := msg.Serialize(buff)
if err != nil {
return err
}
node.Tx(buff.Bytes())

return nil
}
4 changes: 2 additions & 2 deletions net/node/hashcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

const (
HashCacheExpiration = 60 * time.Second
HashCacheCleanupInterval = 1 * time.Second
HashCacheExpiration = 300 * time.Second
HashCacheCleanupInterval = 10 * time.Second
)

type hashCache struct {
Expand Down
167 changes: 86 additions & 81 deletions net/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,109 +175,114 @@ func InitNode(pubKey *crypto.PubKey, nn *nnet.NNet) (Noder, error) {
return true
}))

nn.MustApplyMiddleware(nnetnode.BytesReceived(func(msg, msgID, srcID []byte, remoteNode *nnetnode.RemoteNode) ([]byte, bool) {
nbr := n
if remoteNode != nil {
nbr = n.getNbrByNNetNode(remoteNode)
if nbr == nil {
err := n.AddRemoteNode(remoteNode)
if err != nil {
log.Error("Cannot add remote node:", err)
return msg, true
}
nn.MustApplyMiddleware(routing.RemoteMessageRouted(func(remoteMessage *nnetnode.RemoteMessage, localNode *nnetnode.LocalNode, remoteNodes []*nnetnode.RemoteNode) (*nnetnode.RemoteMessage, *nnetnode.LocalNode, []*nnetnode.RemoteNode, bool) {
if remoteMessage.Msg.MessageType == nnetprotobuf.BYTES {
msgBody := &nnetprotobuf.Bytes{}
err := proto.Unmarshal(remoteMessage.Msg.Message, msgBody)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

nbr = n.getNbrByNNetNode(remoteNode)
if nbr == nil {
log.Error("Cannot get neighbor node")
return msg, true
if localNode != nil {
nbr := n
if remoteMessage.RemoteNode != nil {
nbr = n.getNbrByNNetNode(remoteMessage.RemoteNode)
if nbr == nil {
err = n.AddRemoteNode(remoteMessage.RemoteNode)
if err != nil {
log.Error("Cannot add remote node:", err)
return nil, nil, nil, false
}

nbr = n.getNbrByNNetNode(remoteMessage.RemoteNode)
if nbr == nil {
log.Error("Cannot get neighbor node")
return nil, nil, nil, false
}
}
}
}
}

err := message.HandleNodeMsg(nbr, msg)
if err != nil {
log.Error(err)
return msg, true
}
err = message.HandleNodeMsg(nbr, msgBody.Data)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

return msg, true
}))
if len(remoteNodes) == 0 {
return nil, nil, nil, false
}

nn.MustApplyMiddleware(routing.RemoteMessageRouted(func(remoteMessage *nnetnode.RemoteMessage, localNode *nnetnode.LocalNode, remoteNodes []*nnetnode.RemoteNode) (*nnetnode.RemoteMessage, *nnetnode.LocalNode, []*nnetnode.RemoteNode, bool) {
if remoteMessage.Msg.MessageType == nnetprotobuf.BYTES && remoteMessage.Msg.RoutingType == nnetprotobuf.RELAY {
if localNode != nil {
return remoteMessage, localNode, remoteNodes, false
localNode = nil
}

if len(remoteNodes) == 0 {
log.Error("No next hop found")
return nil, nil, nil, false
}
if remoteMessage.Msg.RoutingType == nnetprotobuf.RELAY {
if len(remoteNodes) > 1 {
log.Error("Multiple next hop is not supported yet")
return nil, nil, nil, false
}

if len(remoteNodes) > 1 {
log.Error("Multiple next hop is not supported yet")
return nil, nil, nil, false
}
nextHop := n.getNbrByNNetNode(remoteNodes[0])
if nextHop == nil {
err := n.AddRemoteNode(remoteNodes[0])
if err != nil {
log.Error("Cannot add next hop remote node:", err)
return nil, nil, nil, false
}

nextHop = n.getNbrByNNetNode(remoteNodes[0])
if nextHop == nil {
log.Error("Cannot get next hop neighbor node")
return nil, nil, nil, false
}
}

nextHop := n.getNbrByNNetNode(remoteNodes[0])
if nextHop == nil {
err := n.AddRemoteNode(remoteNodes[0])
msgBody := &nnetprotobuf.Bytes{}
err := proto.Unmarshal(remoteMessage.Msg.Message, msgBody)
if err != nil {
log.Error("Cannot add next hop remote node:", err)
log.Error(err)
return nil, nil, nil, false
}

nextHop = n.getNbrByNNetNode(remoteNodes[0])
if nextHop == nil {
log.Error("Cannot get next hop neighbor node")
msg, err := message.ParseMsg(msgBody.Data)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}
}

msgBody := &nnetprotobuf.Bytes{}
err := proto.Unmarshal(remoteMessage.Msg.Message, msgBody)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

msg, err := message.ParseMsg(msgBody.Data)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

relayMsg, ok := msg.(*message.RelayMessage)
if !ok {
log.Error("Msg is not relay message")
return nil, nil, nil, false
}
relayMsg, ok := msg.(*message.RelayMessage)
if !ok {
log.Error("Msg is not relay message")
return nil, nil, nil, false
}

relayPacket := &relayMsg.Packet
err = n.relayer.SignRelayPacket(nextHop, relayPacket)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}
relayPacket := &relayMsg.Packet
err = n.relayer.SignRelayPacket(nextHop, relayPacket)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

relayMsg, err = message.NewRelayMessage(relayPacket)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}
relayMsg, err = message.NewRelayMessage(relayPacket)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

msgBody.Data, err = relayMsg.ToBytes()
if err != nil {
log.Error(err)
return nil, nil, nil, false
}
msgBody.Data, err = relayMsg.ToBytes()
if err != nil {
log.Error(err)
return nil, nil, nil, false
}

remoteMessage.Msg.Message, err = proto.Marshal(msgBody)
if err != nil {
log.Error(err)
return nil, nil, nil, false
remoteMessage.Msg.Message, err = proto.Marshal(msgBody)
if err != nil {
log.Error(err)
return nil, nil, nil, false
}
}
}

return remoteMessage, localNode, remoteNodes, true
}))

Expand Down

0 comments on commit 59049e2

Please sign in to comment.