@@ -2,13 +2,16 @@ package eventconsumer
22
33import (
44 "context"
5+ "encoding/json"
6+ "errors"
57 "fmt"
68 "time"
79
810 "github.com/fystack/mpcium/pkg/event"
911 "github.com/fystack/mpcium/pkg/logger"
1012 "github.com/fystack/mpcium/pkg/messaging"
1113 "github.com/fystack/mpcium/pkg/mpc"
14+ "github.com/fystack/mpcium/pkg/types"
1215 "github.com/google/uuid"
1316 "github.com/nats-io/nats.go"
1417 "github.com/nats-io/nats.go/jetstream"
@@ -31,22 +34,30 @@ type KeygenConsumer interface {
3134
3235// keygenConsumer implements KeygenConsumer.
3336type keygenConsumer struct {
34- natsConn * nats.Conn
35- pubsub messaging.PubSub
36- jsBroker messaging.MessageBroker
37- peerRegistry mpc.PeerRegistry
37+ natsConn * nats.Conn
38+ pubsub messaging.PubSub
39+ jsBroker messaging.MessageBroker
40+ peerRegistry mpc.PeerRegistry
41+ keygenResultQueue messaging.MessageQueue
3842
3943 // jsSub holds the JetStream subscription, so it can be cleaned up during Close().
4044 jsSub messaging.MessageSubscription
4145}
4246
4347// NewKeygenConsumer returns a new instance of KeygenConsumer.
44- func NewKeygenConsumer (natsConn * nats.Conn , jsBroker messaging.MessageBroker , pubsub messaging.PubSub , peerRegistry mpc.PeerRegistry ) KeygenConsumer {
48+ func NewKeygenConsumer (
49+ natsConn * nats.Conn ,
50+ jsBroker messaging.MessageBroker ,
51+ pubsub messaging.PubSub ,
52+ peerRegistry mpc.PeerRegistry ,
53+ keygenResultQueue messaging.MessageQueue ,
54+ ) KeygenConsumer {
4555 return & keygenConsumer {
46- natsConn : natsConn ,
47- pubsub : pubsub ,
48- jsBroker : jsBroker ,
49- peerRegistry : peerRegistry ,
56+ natsConn : natsConn ,
57+ pubsub : pubsub ,
58+ jsBroker : jsBroker ,
59+ peerRegistry : peerRegistry ,
60+ keygenResultQueue : keygenResultQueue ,
5061 }
5162}
5263
@@ -60,6 +71,9 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
6071 for {
6172 select {
6273 case <- ctx .Done ():
74+ if ctx .Err () == context .Canceled {
75+ return nil
76+ }
6377 return ctx .Err ()
6478 case <- ticker .C :
6579 allPeersReady := sc .peerRegistry .ArePeersReady ()
@@ -80,6 +94,9 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
8094func (sc * keygenConsumer ) Run (ctx context.Context ) error {
8195 // Wait for sufficient peers before starting to consume messages
8296 if err := sc .waitForAllPeersReadyToGenKey (ctx ); err != nil {
97+ if err == context .Canceled {
98+ return nil
99+ }
83100 return fmt .Errorf ("failed to wait for sufficient peers: %w" , err )
84101 }
85102
@@ -104,9 +121,22 @@ func (sc *keygenConsumer) Run(ctx context.Context) error {
104121}
105122
106123func (sc * keygenConsumer ) handleKeygenEvent (msg jetstream.Msg ) {
124+ raw := msg .Data ()
125+ var keygenMsg types.GenerateKeyMessage
126+ sessionID := msg .Headers ().Get ("SessionID" )
127+
128+ err := json .Unmarshal (raw , & keygenMsg )
129+ if err != nil {
130+ logger .Error ("SigningConsumer: Failed to unmarshal keygen message" , err )
131+ sc .handleKeygenError (keygenMsg , event .ErrorCodeUnmarshalFailure , err , sessionID )
132+ _ = msg .Ack ()
133+ return
134+ }
107135
108136 if ! sc .peerRegistry .ArePeersReady () {
109- logger .Warn ("KeygenConsumer: Not all peers are ready to sign, skipping message processing" )
137+ logger .Warn ("KeygenConsumer: Not all peers are ready to gen key, skipping message processing" )
138+ sc .handleKeygenError (keygenMsg , event .ErrorCodeClusterNotReady , errors .New ("not all peers are ready" ), sessionID )
139+ _ = msg .Ack ()
110140 return
111141 }
112142
@@ -161,6 +191,33 @@ func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
161191 _ = msg .Nak ()
162192}
163193
194+ func (sc * keygenConsumer ) handleKeygenError (keygenMsg types.GenerateKeyMessage , errorCode event.ErrorCode , err error , sessionID string ) {
195+ keygenResult := event.KeygenResultEvent {
196+ ResultType : event .ResultTypeError ,
197+ ErrorCode : string (errorCode ),
198+ WalletID : keygenMsg .WalletID ,
199+ ErrorReason : err .Error (),
200+ }
201+
202+ keygenResultBytes , err := json .Marshal (keygenResult )
203+ if err != nil {
204+ logger .Error ("Failed to marshal keygen result event" , err ,
205+ "walletID" , keygenResult .WalletID ,
206+ )
207+ return
208+ }
209+
210+ topic := fmt .Sprintf (mpc .TypeGenerateWalletResultFmt , keygenResult .WalletID )
211+ err = sc .keygenResultQueue .Enqueue (topic , keygenResultBytes , & messaging.EnqueueOptions {
212+ IdempotententKey : buildIdempotentKey (keygenMsg .WalletID , sessionID , mpc .TypeGenerateWalletResultFmt ),
213+ })
214+ if err != nil {
215+ logger .Error ("Failed to enqueue keygen result event" , err ,
216+ "walletID" , keygenMsg .WalletID ,
217+ )
218+ }
219+ }
220+
164221// Close unsubscribes from the JetStream subject and cleans up resources.
165222func (sc * keygenConsumer ) Close () error {
166223 if sc .jsSub != nil {
0 commit comments