Skip to content

Commit ab26645

Browse files
authored
Merge pull request #2 from nann-cheng/robust-ecdh-error-handling
Enhanced MPC Cluster Resilience and Error Handling
2 parents ea1d761 + d330687 commit ab26645

File tree

8 files changed

+348
-186
lines changed

8 files changed

+348
-186
lines changed

cmd/mpcium/main.go

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
129129
if err != nil {
130130
logger.Fatal("Failed to connect to NATS", err)
131131
}
132-
defer natsConn.Close()
133132

134133
pubsub := messaging.NewNATSPubSub(natsConn)
135134
keygenBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.KeygenBrokerStream, []string{
@@ -162,7 +161,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
162161
logger.Info("Node is running", "ID", nodeID, "name", nodeName)
163162

164163
peerNodeIDs := GetPeerIDs(peers)
165-
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging)
164+
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging, pubsub, identityStore)
166165

167166
mpcNode := mpc.NewNode(
168167
nodeID,
@@ -176,9 +175,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
176175
)
177176
defer mpcNode.Close()
178177

179-
// ECDH session for DH key exchange
180-
ecdhSession := mpcNode.GetECDHSession()
181-
182178
eventConsumer := eventconsumer.NewEventConsumer(
183179
mpcNode,
184180
pubsub,
@@ -197,21 +193,16 @@ func runNode(ctx context.Context, c *cli.Command) error {
197193

198194
timeoutConsumer.Run()
199195
defer timeoutConsumer.Close()
200-
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry)
201-
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry)
196+
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry, genKeyResultQueue)
197+
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry, singingResultQueue)
202198

203199
// Make the node ready before starting the signing consumer
204200
if err := peerRegistry.Ready(); err != nil {
205201
logger.Error("Failed to mark peer registry as ready", err)
206202
}
207203
logger.Info("[READY] Node is ready", "nodeID", nodeID)
208204

209-
logger.Info("Waiting for ECDH key exchange to complete...", "nodeID", nodeID)
210-
if err := ecdhSession.WaitForExchangeComplete(); err != nil {
211-
logger.Fatal("ECDH exchange failed", err)
212-
}
213-
214-
logger.Info("ECDH key exchange completed successfully, starting consumers...", "nodeID", nodeID)
205+
logger.Info("Starting consumers", "nodeID", nodeID)
215206
appContext, cancel := context.WithCancel(context.Background())
216207
//Setup signal handling to cancel context on termination signals.
217208
go func() {
@@ -221,6 +212,11 @@ func runNode(ctx context.Context, c *cli.Command) error {
221212
logger.Warn("Shutdown signal received, canceling context...")
222213
cancel()
223214

215+
// Resign from peer registry first (before closing NATS)
216+
if err := peerRegistry.Resign(); err != nil {
217+
logger.Error("Failed to resign from peer registry", err)
218+
}
219+
224220
// Gracefully close consumers
225221
if err := keygenConsumer.Close(); err != nil {
226222
logger.Error("Failed to close keygen consumer", err)
@@ -229,10 +225,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
229225
logger.Error("Failed to close signing consumer", err)
230226
}
231227

232-
if err := ecdhSession.Close(); err != nil {
233-
logger.Error("Failed to close ECDH session", err)
234-
}
235-
236228
err := natsConn.Drain()
237229
if err != nil {
238230
logger.Error("Failed to drain NATS connection", err)
@@ -264,21 +256,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
264256
logger.Info("Signing consumer finished successfully")
265257
}()
266258

267-
go func() {
268-
for {
269-
select {
270-
case <-appContext.Done():
271-
return
272-
case err := <-ecdhSession.ErrChan():
273-
if err != nil {
274-
logger.Error("ECDH session error", err)
275-
errChan <- fmt.Errorf("ecdh session error: %w", err)
276-
return
277-
}
278-
}
279-
}
280-
}()
281-
282259
go func() {
283260
wg.Wait()
284261
logger.Info("All consumers have finished")

pkg/event/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ const (
9191
// Context and cancellation errors
9292
ErrorCodeContextCancelled ErrorCode = "ERROR_CONTEXT_CANCELLED"
9393
ErrorCodeOperationAborted ErrorCode = "ERROR_OPERATION_ABORTED"
94+
ErrorCodeNotMajority ErrorCode = "ERROR_NOT_MAJORITY"
95+
ErrorCodeClusterNotReady ErrorCode = "ERROR_CLUSTER_NOT_READY"
9496
)
9597

9698
// GetErrorCodeFromError attempts to categorize a generic error into a specific error code

pkg/eventconsumer/keygen_consumer.go

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package eventconsumer
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"fmt"
68
"time"
79

810
"github.com/fystack/mpcium/pkg/event"
911
"github.com/fystack/mpcium/pkg/logger"
1012
"github.com/fystack/mpcium/pkg/messaging"
1113
"github.com/fystack/mpcium/pkg/mpc"
14+
"github.com/fystack/mpcium/pkg/types"
1215
"github.com/google/uuid"
1316
"github.com/nats-io/nats.go"
1417
"github.com/nats-io/nats.go/jetstream"
@@ -31,22 +34,30 @@ type KeygenConsumer interface {
3134

3235
// keygenConsumer implements KeygenConsumer.
3336
type keygenConsumer struct {
34-
natsConn *nats.Conn
35-
pubsub messaging.PubSub
36-
jsBroker messaging.MessageBroker
37-
peerRegistry mpc.PeerRegistry
37+
natsConn *nats.Conn
38+
pubsub messaging.PubSub
39+
jsBroker messaging.MessageBroker
40+
peerRegistry mpc.PeerRegistry
41+
keygenResultQueue messaging.MessageQueue
3842

3943
// jsSub holds the JetStream subscription, so it can be cleaned up during Close().
4044
jsSub messaging.MessageSubscription
4145
}
4246

4347
// NewKeygenConsumer returns a new instance of KeygenConsumer.
44-
func NewKeygenConsumer(natsConn *nats.Conn, jsBroker messaging.MessageBroker, pubsub messaging.PubSub, peerRegistry mpc.PeerRegistry) KeygenConsumer {
48+
func NewKeygenConsumer(
49+
natsConn *nats.Conn,
50+
jsBroker messaging.MessageBroker,
51+
pubsub messaging.PubSub,
52+
peerRegistry mpc.PeerRegistry,
53+
keygenResultQueue messaging.MessageQueue,
54+
) KeygenConsumer {
4555
return &keygenConsumer{
46-
natsConn: natsConn,
47-
pubsub: pubsub,
48-
jsBroker: jsBroker,
49-
peerRegistry: peerRegistry,
56+
natsConn: natsConn,
57+
pubsub: pubsub,
58+
jsBroker: jsBroker,
59+
peerRegistry: peerRegistry,
60+
keygenResultQueue: keygenResultQueue,
5061
}
5162
}
5263

@@ -60,6 +71,9 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
6071
for {
6172
select {
6273
case <-ctx.Done():
74+
if ctx.Err() == context.Canceled {
75+
return nil
76+
}
6377
return ctx.Err()
6478
case <-ticker.C:
6579
allPeersReady := sc.peerRegistry.ArePeersReady()
@@ -80,6 +94,9 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
8094
func (sc *keygenConsumer) Run(ctx context.Context) error {
8195
// Wait for sufficient peers before starting to consume messages
8296
if err := sc.waitForAllPeersReadyToGenKey(ctx); err != nil {
97+
if err == context.Canceled {
98+
return nil
99+
}
83100
return fmt.Errorf("failed to wait for sufficient peers: %w", err)
84101
}
85102

@@ -104,9 +121,22 @@ func (sc *keygenConsumer) Run(ctx context.Context) error {
104121
}
105122

106123
func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
124+
raw := msg.Data()
125+
var keygenMsg types.GenerateKeyMessage
126+
sessionID := msg.Headers().Get("SessionID")
127+
128+
err := json.Unmarshal(raw, &keygenMsg)
129+
if err != nil {
130+
logger.Error("SigningConsumer: Failed to unmarshal keygen message", err)
131+
sc.handleKeygenError(keygenMsg, event.ErrorCodeUnmarshalFailure, err, sessionID)
132+
_ = msg.Ack()
133+
return
134+
}
107135

108136
if !sc.peerRegistry.ArePeersReady() {
109-
logger.Warn("KeygenConsumer: Not all peers are ready to sign, skipping message processing")
137+
logger.Warn("KeygenConsumer: Not all peers are ready to gen key, skipping message processing")
138+
sc.handleKeygenError(keygenMsg, event.ErrorCodeClusterNotReady, errors.New("not all peers are ready"), sessionID)
139+
_ = msg.Ack()
110140
return
111141
}
112142

@@ -161,6 +191,33 @@ func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
161191
_ = msg.Nak()
162192
}
163193

194+
func (sc *keygenConsumer) handleKeygenError(keygenMsg types.GenerateKeyMessage, errorCode event.ErrorCode, err error, sessionID string) {
195+
keygenResult := event.KeygenResultEvent{
196+
ResultType: event.ResultTypeError,
197+
ErrorCode: string(errorCode),
198+
WalletID: keygenMsg.WalletID,
199+
ErrorReason: err.Error(),
200+
}
201+
202+
keygenResultBytes, err := json.Marshal(keygenResult)
203+
if err != nil {
204+
logger.Error("Failed to marshal keygen result event", err,
205+
"walletID", keygenResult.WalletID,
206+
)
207+
return
208+
}
209+
210+
topic := fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, keygenResult.WalletID)
211+
err = sc.keygenResultQueue.Enqueue(topic, keygenResultBytes, &messaging.EnqueueOptions{
212+
IdempotententKey: buildIdempotentKey(keygenMsg.WalletID, sessionID, mpc.TypeGenerateWalletResultFmt),
213+
})
214+
if err != nil {
215+
logger.Error("Failed to enqueue keygen result event", err,
216+
"walletID", keygenMsg.WalletID,
217+
)
218+
}
219+
}
220+
164221
// Close unsubscribes from the JetStream subject and cleans up resources.
165222
func (sc *keygenConsumer) Close() error {
166223
if sc.jsSub != nil {

0 commit comments

Comments
 (0)