Skip to content

Commit

Permalink
feat(other): add zeromq server with configration (pactus-project#1660)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad authored Jan 8, 2025
1 parent 2ae3504 commit 5f05122
Show file tree
Hide file tree
Showing 19 changed files with 654 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/semantic-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
http
jsonrpc
nanomsg
zeromq
windows
linux
macos
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pactus-project/pactus/www/http"
"github.com/pactus-project/pactus/www/jsonrpc"
"github.com/pactus-project/pactus/www/nanomsg"
"github.com/pactus-project/pactus/www/zmq"
"github.com/pelletier/go-toml/v2"
)

Expand Down Expand Up @@ -47,6 +48,7 @@ type Config struct {
HTTP *http.Config `toml:"http"`
WalletManager *wallet.Config `toml:"-"`
Nanomsg *nanomsg.Config `toml:"nanomsg"`
ZeroMq *zmq.Config `toml:"zeromq"`
}

type BootstrapInfo struct {
Expand Down Expand Up @@ -99,6 +101,7 @@ func defaultConfig() *Config {
JSONRPC: jsonrpc.DefaultConfig(),
HTTP: http.DefaultConfig(),
Nanomsg: nanomsg.DefaultConfig(),
ZeroMq: zmq.DefaultConfig(),
WalletManager: wallet.DefaultConfig(),
}

Expand Down Expand Up @@ -219,6 +222,11 @@ func DefaultConfigLocalnet() *Config {
conf.HTTP.EnablePprof = true
conf.Nanomsg.Enable = true
conf.Nanomsg.Listen = "tcp://[::]:40799"
conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332"
conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333"
conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334"
conf.ZeroMq.ZmqPubRawTx = "tcp://127.0.0.1:28335"
conf.ZeroMq.ZmqPubHWM = 1000

return conf
}
Expand Down Expand Up @@ -296,6 +304,9 @@ func (conf *Config) BasicCheck() error {
if err := conf.GRPC.BasicCheck(); err != nil {
return err
}
if err := conf.ZeroMq.BasicCheck(); err != nil {
return err
}

return conf.HTTP.BasicCheck()
}
30 changes: 30 additions & 0 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
_pool = 'error'
_state = 'info'
_sync = 'error'
_zmq = 'info'
default = 'info'

# `grpc` contains configuration of the gRPC module.
Expand Down Expand Up @@ -246,3 +247,32 @@

# `listen` is the address for incoming connections to the nanomsg server.
listen = 'tcp://127.0.0.1:40899'

# ZeroMQ configuration.
[zeromq]

# `zmqpubblockinfo` specifies the address for publishing block info notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubblockinfo = ''

# `zmqpubtxinfo` specifies the address for publishing transaction info notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubtxinfo = ''

# `zmqpubrawblock` specifies the address for publishing raw block notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubrawblock = ''

# `zmqpubrawtx` specifies the address for publishing raw transaction notifications.
# Example: 'tcp://127.0.0.1:28332'
# Default is '', meaning the topic is disabled
zmqpubrawtx = ''

# `zmqpubhwm` defines the High Watermark (HWM) for ZeroMQ message pipes.
# This parameter determines the maximum number of messages ZeroMQ can buffer before blocking the publishing of further messages.
# The watermark is applied uniformly to all active topics.
# Default is 1000
zmqpubhwm = 1000
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/consensys/gnark-crypto v0.14.0
github.com/fxamacker/cbor/v2 v2.7.0
github.com/go-zeromq/zmq4 v0.17.0
github.com/gofrs/flock v0.12.1
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -68,6 +69,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc=
github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
Expand Down
1 change: 1 addition & 0 deletions util/logger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DefaultConfig() *Config {
conf.Levels["_grpc"] = "info"
conf.Levels["_nonomsg"] = "info"
conf.Levels["_jsonrpc"] = "info"
conf.Levels["_zmq"] = "info"
conf.Levels["_firewall"] = "warn"

return conf
Expand Down
1 change: 1 addition & 0 deletions util/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func getLoggersInst() *logger {
conf.Levels["_pool"] = "debug"
conf.Levels["_http"] = "debug"
conf.Levels["_grpc"] = "debug"
conf.Levels["_zmq"] = "debug"
conf.Levels["_firewall"] = "debug"
globalInst = &logger{
config: conf,
Expand Down
10 changes: 10 additions & 0 deletions util/testsuite/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testsuite
import (
"encoding/hex"
"math/rand"
"net"
"testing"
"time"

Expand Down Expand Up @@ -866,3 +867,12 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) {
trx.SetSignature(sig)
trx.SetPublicKey(prv.PublicKey())
}

func (*TestSuite) FindFreePort() int {
listener, _ := net.Listen("tcp", "localhost:0")
defer func() {
_ = listener.Close()
}()

return listener.Addr().(*net.TCPAddr).Port
}
26 changes: 26 additions & 0 deletions www/zmq/block_info_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package zmq

import (
"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)

type blockInfoPub struct {
basePub
}

func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
return &blockInfoPub{
basePub: basePub{
topic: BlockInfo,
zmqSocket: socket,
logger: logger,
},
}
}

func (*blockInfoPub) onNewBlock(_ *block.Block) {
// TODO implement me
panic("implement me")
}
92 changes: 92 additions & 0 deletions www/zmq/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package zmq

import (
"errors"
"fmt"
"net/url"
"strings"
"time"
)

type Config struct {
ZmqPubBlockInfo string `toml:"zmqpubblockinfo"`
ZmqPubTxInfo string `toml:"zmqpubtxinfo"`
ZmqPubRawBlock string `toml:"zmqpubrawblock"`
ZmqPubRawTx string `toml:"zmqpubrawtx"`
ZmqPubHWM int `toml:"zmqpubhwm"`

// Private config
ZmqAutomaticReconnect bool `toml:"-"`
ZmqDialerRetryTime time.Duration `toml:"-"`
ZmqDialerMaxRetries int `toml:"-"`
}

func DefaultConfig() *Config {
return &Config{
ZmqAutomaticReconnect: true,
ZmqDialerMaxRetries: 10,
ZmqDialerRetryTime: 250 * time.Millisecond,
ZmqPubHWM: 1000,
}
}

func (c *Config) BasicCheck() error {
if c.ZmqPubBlockInfo != "" {
if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil {
return err
}
}

if c.ZmqPubTxInfo != "" {
if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil {
return err
}
}

if c.ZmqPubRawBlock != "" {
if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil {
return err
}
}

if c.ZmqPubRawTx != "" {
if err := validateTopicSocket(c.ZmqPubRawTx); err != nil {
return err
}
}

if c.ZmqPubHWM < 0 {
return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM)
}

return nil
}

func validateTopicSocket(socket string) error {
addr, err := url.Parse(socket)
if err != nil {
return errors.New("failed to parse ZmqPub value: " + err.Error())
}

if addr.Scheme != "tcp" {
return errors.New("invalid scheme: zeromq socket schema")
}

if addr.Host == "" {
return errors.New("invalid host: host is empty")
}

parts := strings.Split(addr.Host, ":")
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return errors.New("invalid host: missing or malformed host/port")
}

port := parts[1]
for _, r := range port {
if r < '0' || r > '9' {
return errors.New("invalid port: non-numeric characters detected")
}
}

return nil
}
82 changes: 82 additions & 0 deletions www/zmq/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package zmq

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDefaultConfig(t *testing.T) {
cfg := DefaultConfig()

assert.NotNil(t, cfg, "DefaultConfig should not return nil")
assert.Equal(t, "", cfg.ZmqPubBlockInfo, "ZmqPubBlockInfo should be empty")
assert.Equal(t, "", cfg.ZmqPubTxInfo, "ZmqPubTxInfo should be empty")
assert.Equal(t, "", cfg.ZmqPubRawBlock, "ZmqPubRawBlock should be empty")
assert.Equal(t, "", cfg.ZmqPubRawTx, "ZmqPubRawTx should be empty")
assert.Equal(t, 1000, cfg.ZmqPubHWM, "ZmqPubHWM should default to 1000")
}

func TestBasicCheck(t *testing.T) {
testCases := []struct {
name string
config *Config
expectErr bool
}{
{
name: "Valid configuration",
config: &Config{
ZmqPubBlockInfo: "tcp://127.0.0.1:28332",
ZmqPubTxInfo: "tcp://127.0.0.1:28333",
ZmqPubRawBlock: "tcp://127.0.0.1:28334",
ZmqPubRawTx: "tcp://127.0.0.1:28335",
ZmqPubHWM: 1000,
},
expectErr: false,
},
{
name: "Invalid scheme",
config: &Config{
ZmqPubBlockInfo: "udp://127.0.0.1:28332",
},
expectErr: true,
},
{
name: "Missing port",
config: &Config{
ZmqPubBlockInfo: "tcp://127.0.0.1",
},
expectErr: true,
},
{
name: "Empty host",
config: &Config{
ZmqPubBlockInfo: "tcp://:28332",
},
expectErr: true,
},
{
name: "Negative ZmqPubHWM",
config: &Config{
ZmqPubHWM: -1,
},
expectErr: true,
},
{
name: "Empty configuration",
config: DefaultConfig(),
expectErr: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.config.BasicCheck()
if tc.expectErr {
assert.Error(t, err, "BasicCheck should return an error")
} else {
assert.NoError(t, err, "BasicCheck should not return an error")
}
})
}
}
28 changes: 28 additions & 0 deletions www/zmq/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package zmq

import (
"github.com/go-zeromq/zmq4"
"github.com/pactus-project/pactus/types/block"
"github.com/pactus-project/pactus/util/logger"
)

type Publisher interface {
Address() string
TopicName() string

onNewBlock(blk *block.Block)
}

type basePub struct {
topic Topic
zmqSocket zmq4.Socket
logger *logger.SubLogger
}

func (b *basePub) Address() string {
return b.zmqSocket.Addr().String()
}

func (b *basePub) TopicName() string {
return b.topic.String()
}
Loading

0 comments on commit 5f05122

Please sign in to comment.