1616
1717package fr .acinq .eclair .io
1818
19- import akka .actor .typed .Behavior
2019import akka .actor .typed .eventstream .EventStream
2120import akka .actor .typed .scaladsl .adapter .TypedActorRefOps
2221import akka .actor .typed .scaladsl .{ActorContext , Behaviors }
22+ import akka .actor .typed .{Behavior , SupervisorStrategy }
2323import akka .actor .{ActorRef , typed }
2424import fr .acinq .bitcoin .scalacompat .ByteVector32
2525import fr .acinq .bitcoin .scalacompat .Crypto .PublicKey
@@ -34,6 +34,8 @@ import fr.acinq.eclair.router.Router
3434import fr .acinq .eclair .wire .protocol .OnionMessage
3535import fr .acinq .eclair .{EncodedNodeId , Logs , NodeParams , ShortChannelId }
3636
37+ import scala .concurrent .duration .DurationInt
38+
3739object MessageRelay {
3840 // @formatter:off
3941 sealed trait Command
@@ -44,29 +46,18 @@ object MessageRelay {
4446 policy : RelayPolicy ,
4547 replyTo_opt : Option [typed.ActorRef [Status ]]) extends Command
4648 case class WrappedPeerInfo (peerInfo : PeerInfoResponse ) extends Command
47- case class WrappedConnectionResult (result : PeerConnection .ConnectionResult ) extends Command
48- case class WrappedOptionalNodeId (nodeId_opt : Option [PublicKey ]) extends Command
49+ private case class WrappedConnectionResult (result : PeerConnection .ConnectionResult ) extends Command
50+ private case class WrappedOptionalNodeId (nodeId_opt : Option [PublicKey ]) extends Command
51+ private case class WrappedPeerReadyResult (result : PeerReadyNotifier .Result ) extends Command
4952
50- sealed trait Status {
51- val messageId : ByteVector32
52- }
53+ sealed trait Status { val messageId : ByteVector32 }
5354 case class Sent (messageId : ByteVector32 ) extends Status
5455 sealed trait Failure extends Status
55- case class AgainstPolicy (messageId : ByteVector32 , policy : RelayPolicy ) extends Failure {
56- override def toString : String = s " Relay prevented by policy $policy"
57- }
58- case class ConnectionFailure (messageId : ByteVector32 , failure : PeerConnection .ConnectionResult .Failure ) extends Failure {
59- override def toString : String = s " Can't connect to peer: ${failure.toString}"
60- }
61- case class Disconnected (messageId : ByteVector32 ) extends Failure {
62- override def toString : String = " Peer is not connected"
63- }
64- case class UnknownChannel (messageId : ByteVector32 , channelId : ShortChannelId ) extends Failure {
65- override def toString : String = s " Unknown channel: $channelId"
66- }
67- case class DroppedMessage (messageId : ByteVector32 , reason : DropReason ) extends Failure {
68- override def toString : String = s " Message dropped: $reason"
69- }
56+ case class AgainstPolicy (messageId : ByteVector32 , policy : RelayPolicy ) extends Failure { override def toString : String = s " Relay prevented by policy $policy" }
57+ case class ConnectionFailure (messageId : ByteVector32 , failure : PeerConnection .ConnectionResult .Failure ) extends Failure { override def toString : String = s " Can't connect to peer: ${failure.toString}" }
58+ case class Disconnected (messageId : ByteVector32 ) extends Failure { override def toString : String = " Peer is not connected" }
59+ case class UnknownChannel (messageId : ByteVector32 , channelId : ShortChannelId ) extends Failure { override def toString : String = s " Unknown channel: $channelId" }
60+ case class DroppedMessage (messageId : ByteVector32 , reason : DropReason ) extends Failure { override def toString : String = s " Message dropped: $reason" }
7061
7162 sealed trait RelayPolicy
7263 case object RelayChannelsOnly extends RelayPolicy
@@ -106,15 +97,15 @@ private class MessageRelay(nodeParams: NodeParams,
10697 def queryNextNodeId (msg : OnionMessage , nextNode : Either [ShortChannelId , EncodedNodeId ]): Behavior [Command ] = {
10798 nextNode match {
10899 case Left (outgoingChannelId) if outgoingChannelId == ShortChannelId .toSelf =>
109- withNextNodeId(msg, nodeParams.nodeId)
100+ withNextNodeId(msg, EncodedNodeId . WithPublicKey . Plain ( nodeParams.nodeId) )
110101 case Left (outgoingChannelId) =>
111102 register ! Register .GetNextNodeId (context.messageAdapter(WrappedOptionalNodeId ), outgoingChannelId)
112103 waitForNextNodeId(msg, outgoingChannelId)
113104 case Right (EncodedNodeId .ShortChannelIdDir (isNode1, scid)) =>
114105 router ! Router .GetNodeId (context.messageAdapter(WrappedOptionalNodeId ), scid, isNode1)
115106 waitForNextNodeId(msg, scid)
116107 case Right (encodedNodeId : EncodedNodeId .WithPublicKey ) =>
117- withNextNodeId(msg, encodedNodeId.publicKey )
108+ withNextNodeId(msg, encodedNodeId)
118109 }
119110 }
120111
@@ -127,34 +118,39 @@ private class MessageRelay(nodeParams: NodeParams,
127118 Behaviors .stopped
128119 case WrappedOptionalNodeId (Some (nextNodeId)) =>
129120 log.info(" found outgoing node {} for channel {}" , nextNodeId, channelId)
130- withNextNodeId(msg, nextNodeId)
121+ withNextNodeId(msg, EncodedNodeId . WithPublicKey . Plain ( nextNodeId) )
131122 }
132123 }
133124
134- private def withNextNodeId (msg : OnionMessage , nextNodeId : PublicKey ): Behavior [Command ] = {
135- if (nextNodeId == nodeParams.nodeId) {
136- OnionMessages .process(nodeParams.privateKey, msg) match {
137- case OnionMessages .DropMessage (reason) =>
138- Metrics .OnionMessagesNotRelayed .withTag(Tags .Reason , reason.getClass.getSimpleName).increment()
139- replyTo_opt.foreach(_ ! DroppedMessage (messageId, reason))
140- Behaviors .stopped
141- case OnionMessages .SendMessage (nextNode, nextMessage) =>
142- // We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
143- queryNextNodeId(nextMessage, nextNode)
144- case received : OnionMessages .ReceiveMessage =>
145- context.system.eventStream ! EventStream .Publish (received)
146- replyTo_opt.foreach(_ ! Sent (messageId))
147- Behaviors .stopped
148- }
149- } else {
150- policy match {
151- case RelayChannelsOnly =>
152- switchboard ! GetPeerInfo (context.messageAdapter(WrappedPeerInfo ), prevNodeId)
153- waitForPreviousPeerForPolicyCheck(msg, nextNodeId)
154- case RelayAll =>
155- switchboard ! Peer .Connect (nextNodeId, None , context.messageAdapter(WrappedConnectionResult ).toClassic, isPersistent = false )
156- waitForConnection(msg, nextNodeId)
157- }
125+ private def withNextNodeId (msg : OnionMessage , nextNodeId : EncodedNodeId .WithPublicKey ): Behavior [Command ] = {
126+ nextNodeId match {
127+ case EncodedNodeId .WithPublicKey .Plain (nodeId) if nodeId == nodeParams.nodeId =>
128+ OnionMessages .process(nodeParams.privateKey, msg) match {
129+ case OnionMessages .DropMessage (reason) =>
130+ Metrics .OnionMessagesNotRelayed .withTag(Tags .Reason , reason.getClass.getSimpleName).increment()
131+ replyTo_opt.foreach(_ ! DroppedMessage (messageId, reason))
132+ Behaviors .stopped
133+ case OnionMessages .SendMessage (nextNode, nextMessage) =>
134+ // We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
135+ queryNextNodeId(nextMessage, nextNode)
136+ case received : OnionMessages .ReceiveMessage =>
137+ context.system.eventStream ! EventStream .Publish (received)
138+ replyTo_opt.foreach(_ ! Sent (messageId))
139+ Behaviors .stopped
140+ }
141+ case EncodedNodeId .WithPublicKey .Plain (nodeId) =>
142+ policy match {
143+ case RelayChannelsOnly =>
144+ switchboard ! GetPeerInfo (context.messageAdapter(WrappedPeerInfo ), prevNodeId)
145+ waitForPreviousPeerForPolicyCheck(msg, nodeId)
146+ case RelayAll =>
147+ switchboard ! Peer .Connect (nodeId, None , context.messageAdapter(WrappedConnectionResult ).toClassic, isPersistent = false )
148+ waitForConnection(msg, nodeId)
149+ }
150+ case EncodedNodeId .WithPublicKey .Wallet (nodeId) =>
151+ val notifier = context.spawnAnonymous(Behaviors .supervise(PeerReadyNotifier (nodeId, timeout_opt = Some (Left (nodeParams.wakeUpTimeout)))).onFailure(SupervisorStrategy .stop))
152+ notifier ! PeerReadyNotifier .NotifyWhenPeerReady (context.messageAdapter(WrappedPeerReadyResult ))
153+ waitForWalletNodeUp(msg, nodeId)
158154 }
159155 }
160156
@@ -197,4 +193,18 @@ private class MessageRelay(nodeParams: NodeParams,
197193 Behaviors .stopped
198194 }
199195 }
196+
197+ private def waitForWalletNodeUp (msg : OnionMessage , nextNodeId : PublicKey ): Behavior [Command ] = {
198+ Behaviors .receiveMessagePartial {
199+ case WrappedPeerReadyResult (r : PeerReadyNotifier .PeerReady ) =>
200+ log.info(" successfully woke up {}: relaying onion message" , nextNodeId)
201+ r.peer ! Peer .RelayOnionMessage (messageId, msg, replyTo_opt)
202+ Behaviors .stopped
203+ case WrappedPeerReadyResult (_ : PeerReadyNotifier .PeerUnavailable ) =>
204+ Metrics .OnionMessagesNotRelayed .withTag(Tags .Reason , Tags .Reasons .ConnectionFailure ).increment()
205+ log.info(" could not wake up {}: onion message cannot be relayed" , nextNodeId)
206+ replyTo_opt.foreach(_ ! Disconnected (messageId))
207+ Behaviors .stopped
208+ }
209+ }
200210}
0 commit comments