Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions boxd/eventbus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,40 @@ const (
TopicGetNetworkID = "rpc:getnetworkid"
// TopicGetAddressBook is topic for listing p2p peer status
TopicGetAddressBook = "rpc:getaddressbook"

////////////////////////////// p2p /////////////////////////////

//TopicP2PPeerAddr is a event topic for new peer addr found or peer addr updated
TopicP2PPeerAddr = "p2p:peeraddr"
//TopicP2PAddPeer is a event topic for adding peer addr to peer store
TopicP2PAddPeer = "p2p:addpeer"
// TopicConnEvent is a event topic of events for score updated
TopicConnEvent = "p2p:connevent"

////////////////////////////// dpos /////////////////////////////

// TopicMiner is topic for miner
TopicMiner = "dpos:miner"
// TopicMiners is topic for replying current miners
TopicMiners = "dpos:miners"
// TopicAddrs is topic for replying current addrs
TopicAddrs = "dpos:addrs"
// TopicValidateMiner is topic for replying wheather it is a miner now
TopicValidateMiner = "dpos:validateminer"
// TopicMinerPubkey is topic for miner pubkey
TopicMinerPubkey = "dpos:minerpubkey"
// TopicSignature is topic for sign a []byte
TopicSignature = "dpos:signature"

////////////////////////////// chain /////////////////////////////

// TopicChainUpdate is topic for notifying that the chain is updated,
// either chain reorg, or chain extended.
TopicChainUpdate = "chain:update"
// TopicBlacklistBlockConfirmResult is topic for confirmed result
TopicBlacklistBlockConfirmResult = "chain:blacklistblockresult"
// TopicBlacklistTxConfirmResult is topic for confirmed result
TopicBlacklistTxConfirmResult = "chain:blacklisttxresult"

// TopicUtxoUpdate is topic for notifying that chain utxo is changed
TopicUtxoUpdate = "chain:utxoupdate"
Expand All @@ -37,4 +59,9 @@ const (
TopicGetDatabaseValue = "rpc:database:get"
// TopicRPCSendNewBlock is topic for sending new block to explorer
TopicRPCSendNewBlock = "rpc:newblock:send"

////////////////////////////// tx /////////////////////////////

// TopicGenerateTx is topic to generate tx
TopicGenerateTx = "tx:generate"
)
13 changes: 9 additions & 4 deletions boxd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"sync"
"time"

"github.com/BOXFoundation/boxd/wallet/walletserver"

"github.com/BOXFoundation/boxd/blocksync"
"github.com/BOXFoundation/boxd/boxd/eventbus"
"github.com/BOXFoundation/boxd/boxd/service"
config "github.com/BOXFoundation/boxd/config"
"github.com/BOXFoundation/boxd/consensus/dpos"
"github.com/BOXFoundation/boxd/core/chain"
ctl "github.com/BOXFoundation/boxd/core/controller"
"github.com/BOXFoundation/boxd/core/txgenerator"
"github.com/BOXFoundation/boxd/core/txpool"
"github.com/BOXFoundation/boxd/log"
"github.com/BOXFoundation/boxd/metrics"
Expand All @@ -31,6 +31,7 @@ import (
storage "github.com/BOXFoundation/boxd/storage"
_ "github.com/BOXFoundation/boxd/storage/memdb" // init memdb
_ "github.com/BOXFoundation/boxd/storage/rocksdb" // init rocksdb
"github.com/BOXFoundation/boxd/wallet/walletserver"
"github.com/jbenet/goprocess"
)

Expand Down Expand Up @@ -162,14 +163,17 @@ func (server *Server) Prepare() {

// prepare grpc server.
if cfg.RPC.Enabled {
server.grpcsvr = grpcserver.NewServer(txPool.Proc(), &cfg.RPC, blockChain, txPool, server.wallet, server.bus)
server.grpcsvr = grpcserver.NewServer(txPool.Proc(), &cfg.RPC, blockChain, txPool, server.wallet, server.bus, ctl.Default())
}

// prepare sync manager.
syncManager := blocksync.NewSyncManager(blockChain, peer, consensus, blockChain.Proc())
server.syncManager = syncManager
server.blockChain.Setup(consensus, syncManager)

if _, err := txgenerator.New(database, blockChain, txPool, peer); err != nil {
logger.Fatalf("Failed to new txgenerator. Err: %v", err)
}
}

var _ service.Server = (*Server)(nil)
Expand Down Expand Up @@ -220,9 +224,10 @@ func (server *Server) Run() error {

if cfg.RPC.Enabled {
server.grpcsvr = grpcserver.NewServer(server.txPool.Proc(), &cfg.RPC, server.blockChain,
server.txPool, server.wallet, server.bus)
server.txPool, server.wallet, server.bus, ctl.Default())
server.grpcsvr.Run()
}
txgenerator.Default().Run()

// goprocesses dependencies
// root
Expand Down
16 changes: 16 additions & 0 deletions commands/box/ctl/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ to quickly create a Cobra application.`,
fmt.Println("getnetworkinfo called")
},
},
&cobra.Command{
Use: "getblacklist",
Short: "Get the blacklist of the main chain",
Run: getBlacklistCmdFunc,
},
&cobra.Command{
Use: "getrawtx [txhash]",
Short: "Get the raw transaction for a txid",
Expand Down Expand Up @@ -288,6 +293,17 @@ func getBlockHeaderCmdFunc(cmd *cobra.Command, args []string) {
}
}

func getBlacklistCmdFunc(cmd *cobra.Command, args []string) {
fmt.Println("getblacklist called")
conn := client.NewConnectionWithViper(viper.GetViper())
defer conn.Close()
blacklist, err := client.GetBlacklist(conn)
if err != nil {
fmt.Println(err)
}
fmt.Println("Current Blacklist: ", blacklist)
}

func getRawTxCmdFunc(cmd *cobra.Command, args []string) {
fmt.Println("getrawtx called")
if len(args) < 1 {
Expand Down
51 changes: 47 additions & 4 deletions consensus/dpos/dpos.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"sync/atomic"
"time"

"github.com/hashicorp/golang-lru"

"github.com/BOXFoundation/boxd/boxd/eventbus"
"github.com/BOXFoundation/boxd/boxd/service"
"github.com/BOXFoundation/boxd/core"
"github.com/BOXFoundation/boxd/core/chain"
ctl "github.com/BOXFoundation/boxd/core/controller"
"github.com/BOXFoundation/boxd/core/txpool"
"github.com/BOXFoundation/boxd/core/types"
"github.com/BOXFoundation/boxd/crypto"
Expand All @@ -24,6 +24,7 @@ import (
"github.com/BOXFoundation/boxd/storage"
"github.com/BOXFoundation/boxd/util"
"github.com/BOXFoundation/boxd/wallet"
"github.com/hashicorp/golang-lru"
"github.com/jbenet/goprocess"
)

Expand Down Expand Up @@ -118,6 +119,7 @@ func (dpos *Dpos) Run() error {
return err
}
dpos.bftservice = bftService
dpos.subscribeBlacklist()
bftService.Start()
dpos.proc.Go(dpos.loop)

Expand Down Expand Up @@ -441,9 +443,8 @@ func (dpos *Dpos) BroadcastEternalMsgToMiners(block *types.Block) error {
eternalBlockMsg.hash = *hash
eternalBlockMsg.signature = signature
eternalBlockMsg.timestamp = block.Header.TimeStamp
miners := dpos.context.periodContext.periodPeers

return dpos.net.BroadcastToMiners(p2p.EternalBlockMsg, eternalBlockMsg, miners)
return dpos.net.BroadcastToMiners(p2p.EternalBlockMsg, eternalBlockMsg)
}

// StorePeriodContext store period context
Expand Down Expand Up @@ -674,3 +675,45 @@ func (dpos *Dpos) TryToUpdateEternalBlock(src *types.Block) {
}
dpos.bftservice.updateEternal(block)
}

func (dpos *Dpos) subscribeBlacklist() {

ctl.SetPeriodSize(PeriodSize)

dpos.chain.Bus().Reply(eventbus.TopicMiners, func(out chan<- []string) {
out <- dpos.context.periodContext.periodPeers
}, false)

dpos.chain.Bus().Reply(eventbus.TopicMiner, func(out chan<- *wallet.Account) {
out <- dpos.miner
}, false)

dpos.chain.Bus().Reply(eventbus.TopicAddrs, func(out chan<- []types.AddressHash) {
out <- dpos.context.periodContext.periodAddrs
}, false)

dpos.chain.Bus().Reply(eventbus.TopicMinerPubkey, func(out chan<- []byte) {
out <- dpos.miner.PubKeyHash()
}, false)

dpos.chain.Bus().Reply(eventbus.TopicValidateMiner, func(pid string, addr types.AddressHash, out chan<- bool) {
if util.InArray(pid, dpos.bftservice.consensus.context.periodContext.periodPeers) {
for _, v := range dpos.bftservice.consensus.context.periodContext.period {
if addr == v.addr && pid == v.peerID {
out <- true
}
}
}
out <- false
}, false)

dpos.chain.Bus().Reply(eventbus.TopicSignature, func(digest []byte, out chan<- []byte) {
signature, err := crypto.SignCompact(dpos.miner.PrivateKey(), digest[:])
if err == nil {
out <- signature
} else {
logger.Warnf("SignCompact err: %v", err)
out <- nil
}
}, false)
}
39 changes: 36 additions & 3 deletions core/chain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/BOXFoundation/boxd/boxd/eventbus"
"github.com/BOXFoundation/boxd/boxd/service"
"github.com/BOXFoundation/boxd/core"
ctl "github.com/BOXFoundation/boxd/core/controller"
"github.com/BOXFoundation/boxd/core/metrics"
"github.com/BOXFoundation/boxd/core/pb"
"github.com/BOXFoundation/boxd/core/types"
Expand Down Expand Up @@ -146,8 +147,9 @@ var _ service.Server = (*BlockChain)(nil)
// Run launch blockchain.
func (chain *BlockChain) Run() error {
chain.subscribeMessageNotifiee()
chain.subscribeBlacklistMsg()
chain.proc.Go(chain.loop)

ctl.NewBlacklistWrap(chain.notifiee, chain.bus, chain.db, chain.proc)
return nil
}

Expand Down Expand Up @@ -180,6 +182,13 @@ func (chain *BlockChain) subscribeMessageNotifiee() {
chain.notifiee.Subscribe(p2p.NewNotifiee(p2p.NewBlockMsg, chain.newblockMsgCh))
}

func (chain *BlockChain) subscribeBlacklistMsg() {
chain.bus.Reply(eventbus.TopicBlacklistBlockConfirmResult, func(block *types.Block, messageFrom peer.ID, resultCh chan error) {
err := chain.ProcessBlock(block, core.DefaultMode, false, messageFrom)
resultCh <- err
}, false)
}

func (chain *BlockChain) loop(p goprocess.Process) {
logger.Info("Waitting for new block message...")
metricsTicker := time.NewTicker(metricsLoopInterval)
Expand Down Expand Up @@ -223,14 +232,33 @@ func (chain *BlockChain) processBlockMsg(msg p2p.Message) error {
}

// process block
if err := chain.ProcessBlock(block, core.RelayMode, true, msg.From()); err != nil && util.InArray(err, core.EvilBehavior) {
chain.Bus().Publish(eventbus.TopicConnEvent, msg.From(), eventbus.BadBlockEvent)
if err := chain.ProcessBlock(block, core.RelayMode, true, msg.From()); err != nil {
chain.checkEvilBehavior(msg.From(), block, err)
return err
}

// TODO: test
go func() {
if pubkey, ok := crypto.RecoverCompact(block.BlockHash()[:], block.Signature); ok {
ctl.Default().SceneCh <- &ctl.Evidence{PubKey: pubkey.Serialize(), Block: block, Type: ctl.BlockEvidence, Err: core.ErrBlockExists.Error(), Ts: time.Now().Unix()}
}
}()

chain.Bus().Publish(eventbus.TopicConnEvent, msg.From(), eventbus.NewBlockEvent)
return nil
}

func (chain *BlockChain) checkEvilBehavior(pid peer.ID, block *types.Block, err error) {
if util.InArray(err, core.EvilBehavior) {
chain.Bus().Publish(eventbus.TopicConnEvent, pid, eventbus.BadBlockEvent)
go func() {
if pubkey, ok := crypto.RecoverCompact(block.BlockHash()[:], block.Signature); ok {
ctl.Default().SceneCh <- &ctl.Evidence{PubKey: pubkey.Serialize(), Block: block, Type: ctl.BlockEvidence, Err: err.Error(), Ts: time.Now().Unix()}
}
}()
}
}

// ProcessBlock is used to handle new blocks.
func (chain *BlockChain) ProcessBlock(block *types.Block, transferMode core.TransferMode, fastConfirm bool, messageFrom peer.ID) error {

Expand Down Expand Up @@ -657,6 +685,11 @@ func (chain *BlockChain) applyBlock(block *types.Block, utxoSet *UtxoSet, batch
return err
}

// save candidate context
if err := ctl.Default().StoreContext(block, batch); err != nil {
return err
}

// save tx index
if err := chain.WriteTxIndex(block, batch); err != nil {
return err
Expand Down
11 changes: 11 additions & 0 deletions core/chain/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func validateBlockScripts(utxoSet *UtxoSet, block *types.Block) error {
// Coinbase tx will not reach here
func ValidateTxScripts(utxoSet *UtxoSet, tx *types.Transaction) error {
txHash, _ := tx.TxHash()
var checkBlackList bool
for txInIdx, txIn := range tx.Vin {
// Ensure the referenced input transaction exists and is not spent.
utxo := utxoSet.FindUtxo(txIn.PrevOutPoint)
Expand All @@ -177,6 +178,16 @@ func ValidateTxScripts(utxoSet *UtxoSet, tx *types.Transaction) error {
if err := script.Validate(scriptSig, prevScriptPubKey, tx, txInIdx); err != nil {
return err
}

// Ensure public key not in black list
if !checkBlackList {
// if checksum, ok := prevScriptPubKey.GetPubKeyChecksum(); ok {
// if _, ok := bl.Default().Details.Load(checksum); ok {
// return core.ErrPubKeyInBlackList
// }
// }
// checkBlackList = true
}
}

return nil
Expand Down
Loading