Skip to content
Open

Dev #98

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/mpcium/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func GetNATSConnection(environment string) (*nats.Conn, error) {
opts := []nats.Option{
nats.MaxReconnects(-1), // retry forever
nats.ReconnectWait(2 * time.Second),
nats.NoEcho(), // Optimization: avoid echoing messages back to the publisher
nats.DisconnectHandler(func(nc *nats.Conn) {
logger.Warn("Disconnected from NATS")
}),
Expand Down
2 changes: 1 addition & 1 deletion examples/generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
logger.Init(environment, false)

natsURL := viper.GetString("nats.url")
natsConn, err := nats.Connect(natsURL)
natsConn, err := nats.Connect(natsURL, nats.NoEcho())
if err != nil {
logger.Fatal("Failed to connect to NATS", err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/eventconsumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
baseCtx, baseCancel := context.WithTimeout(context.Background(), KeyGenTimeOut)
defer baseCancel()

logger.Info("[KEY GEN] Key generation result")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unecessary logging


raw := natMsg.Data
var msg types.GenerateKeyMessage
if err := json.Unmarshal(raw, &msg); err != nil {
Expand All @@ -167,6 +169,9 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
}

walletID := msg.WalletID

logger.Info("[KEY GEN] Key generation result", "walletID", walletID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unecessary logging


ecdsaSession, err := ec.node.CreateKeyGenSession(mpc.SessionTypeECDSA, walletID, ec.mpcThreshold, ec.genKeyResultQueue)
if err != nil {
ec.handleKeygenSessionError(walletID, err, "Failed to create ECDSA key generation session", natMsg)
Expand Down
12 changes: 7 additions & 5 deletions pkg/eventconsumer/keygen_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
}
}

// Run subscribes to signing events and processes them until the context is canceled.
// Run subscribes to keygen events and processes them until the context is canceled.
func (sc *keygenConsumer) Run(ctx context.Context) error {
// Wait for sufficient peers before starting to consume messages
if err := sc.waitForAllPeersReadyToGenKey(ctx); err != nil {
Expand All @@ -110,7 +110,7 @@ func (sc *keygenConsumer) Run(ctx context.Context) error {
return fmt.Errorf("failed to subscribe to keygen events: %w", err)
}
sc.jsSub = sub
logger.Info("SigningConsumer: Subscribed to keygen events")
logger.Info("KeygenConsumer: Subscribed to keygen events")

// Block until context cancellation.
<-ctx.Done()
Expand Down Expand Up @@ -140,9 +140,11 @@ func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
return
}

// Create a reply inbox to receive the signing event response.
// Create a reply inbox to receive the keygen event response.
replyInbox := nats.NewInbox()

logger.Info("Newreplybox id", "topic", replyInbox)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this debug logging


// Use a synchronous subscription for the reply inbox.
replySub, err := sc.natsConn.SubscribeSync(replyInbox)
if err != nil {
Expand All @@ -156,12 +158,12 @@ func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
}
}()

// Publish the signing event with the reply inbox.
// Publish the keygen event with the reply inbox.
headers := map[string]string{
"SessionID": uuid.New().String(),
}
if err := sc.pubsub.PublishWithReply(MPCGenerateEvent, replyInbox, msg.Data(), headers); err != nil {
logger.Error("KeygenConsumer: Failed to publish signing event with reply", err)
logger.Error("KeygenConsumer: Failed to publish keygen event with reply", err)
_ = msg.Nak()
return
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/messaging/nats_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package messaging

import (
"fmt"

"github.com/nats-io/nats.go"
)

type Subscription interface {
Unsubscribe() error
}

// a subscription can be made by pubsub or dicrectmessaging
type natsSubscription struct {
subscription *nats.Subscription
topic string
pubSub *natsPubSub
direct *natsDirectMessaging
}

func (ns *natsSubscription) Unsubscribe() error {
if ns.topic == "" {
return fmt.Errorf("cannot cleanup handlers: topic is empty")
}

if ns.pubSub != nil {
ns.pubSub.cleanupHandlers(ns.topic)
}

if ns.direct != nil {
ns.direct.cleanupHandlers(ns.topic)
}
return ns.subscription.Unsubscribe()
}
8 changes: 7 additions & 1 deletion pkg/messaging/point2point.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,11 @@ func (d *natsDirectMessaging) Listen(topic string, handler func(data []byte)) (S
d.handlers[topic] = append(d.handlers[topic], handler)
d.mu.Unlock()

return &natsSubscription{subscription: sub}, nil
return &natsSubscription{subscription: sub, topic: topic, direct: d}, nil
}

func (d *natsDirectMessaging) cleanupHandlers(topic string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.handlers, topic)
}
67 changes: 50 additions & 17 deletions pkg/messaging/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,49 @@
package messaging

import (
"sync"

"github.com/fystack/mpcium/pkg/logger"
"github.com/nats-io/nats.go"
)

type Subscription interface {
Unsubscribe() error
}

type PubSub interface {
Publish(topic string, message []byte) error
PublishWithReply(topic, reply string, data []byte, headers map[string]string) error
Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error)
Subscribe(topic string, handler func(*nats.Msg)) (Subscription, error)
}

type natsPubSub struct {
natsConn *nats.Conn
}

type natsSubscription struct {
subscription *nats.Subscription
}

func (ns *natsSubscription) Unsubscribe() error {
return ns.subscription.Unsubscribe()
handlers map[string][]func(*nats.Msg)
mu sync.Mutex
}

func NewNATSPubSub(natsConn *nats.Conn) PubSub {
return &natsPubSub{natsConn}
return &natsPubSub{
natsConn: natsConn,
handlers: make(map[string][]func(*nats.Msg)),
}
}

func (n *natsPubSub) Publish(topic string, message []byte) error {
logger.Debug("[NATS] Publishing message", "topic", topic)
logger.Info("[NATS] Publishing message", "topic", topic)

// access local handlers for subscribed topics
n.mu.Lock()
defer n.mu.Unlock()

handlers, ok := n.handlers[topic]
if ok && len(handlers) != 0 {
msgNats := &nats.Msg{
Subject: topic, // Required: the topic to publish to
Data: message, // The []byte payload
}
for _, handler := range handlers {
handler(msgNats)
}
}

return n.natsConn.Publish(topic, message)
}

Expand All @@ -46,11 +57,23 @@ func (n *natsPubSub) PublishWithReply(topic, reply string, data []byte, headers
for k, v := range headers {
msg.Header.Set(k, v)
}

// access local handlers for subscribed topics
n.mu.Lock()
defer n.mu.Unlock()

handlers, ok := n.handlers[topic]
if ok && len(handlers) != 0 {
for _, handler := range handlers {
handler(msg)
}
}

err := n.natsConn.PublishMsg(msg)
return err
}

func (n *natsPubSub) Subscribe(topic string, handler func(msg *nats.Msg)) (Subscription, error) {
func (n *natsPubSub) Subscribe(topic string, handler func(*nats.Msg)) (Subscription, error) {
//Handle subscription: handle more fields in msg
sub, err := n.natsConn.Subscribe(topic, func(msg *nats.Msg) {
handler(msg)
Expand All @@ -59,5 +82,15 @@ func (n *natsPubSub) Subscribe(topic string, handler func(msg *nats.Msg)) (Subsc
return nil, err
}

return &natsSubscription{subscription: sub}, nil
n.mu.Lock()
n.handlers[topic] = append(n.handlers[topic], handler)
n.mu.Unlock()

return &natsSubscription{subscription: sub, topic: topic, pubSub: n}, nil
}

func (n *natsPubSub) cleanupHandlers(topic string) {
n.mu.Lock()
defer n.mu.Unlock()
delete(n.handlers, topic)
}
1 change: 1 addition & 0 deletions pkg/mpc/key_exchange_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (e *ecdhSession) ListenKeyExchange() error {
}

if ecdhMsg.From == e.nodeID {
logger.Info("To self message successfully received", "nodeID", e.nodeID)
return
}

Expand Down
22 changes: 10 additions & 12 deletions pkg/mpc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *session) receiveTssMessage(msg *types.TssMessage) {
s.ErrCh <- errors.Wrap(err, "Broken TSS Share")
return
}
logger.Debug(
logger.Info(
"Received message",
"round",
round.RoundMsg,
Expand Down Expand Up @@ -285,17 +285,15 @@ func (s *session) subscribeFromPeersAsync(fromIDs []string) {
}

func (s *session) subscribeBroadcastAsync() {
go func() {
topic := s.topicComposer.ComposeBroadcastTopic()
sub, err := s.pubSub.Subscribe(topic, func(natMsg *nats.Msg) {
s.receiveBroadcastTssMessage(natMsg.Data)
})
if err != nil {
s.ErrCh <- fmt.Errorf("Failed to subscribe to broadcast topic %s: %w", topic, err)
return
}
s.broadcastSub = sub
}()
topic := s.topicComposer.ComposeBroadcastTopic()
sub, err := s.pubSub.Subscribe(topic, func(natMsg *nats.Msg) {
go s.receiveBroadcastTssMessage(natMsg.Data)
})
if err != nil {
s.ErrCh <- fmt.Errorf("Failed to subscribe to broadcast topic %s: %w", topic, err)
return
}
s.broadcastSub = sub
}

func (s *session) ListenToIncomingMessageAsync() {
Expand Down
Loading