Skip to content

Commit 54185b0

Browse files
committed
Rework node relay FSM flow
We refactor `NodeRelay.scala` to re-order some steps, without making meaningful functional changes. The steps are: 1. Fully receive the incoming payment 2. Resolve the next node (unwrap blinded paths if needed) 3. Wake-up the next node if necessary (mobile wallet) 4. Relay outgoing payment Note that we introduce a wake-up step, that will be enriched in future commits and can be extended to include mobile notifications. The file is now also easier to follow, as steps are done linearly by simply scrolling down.
1 parent cfb6c9c commit 54185b0

File tree

10 files changed

+116
-228
lines changed

10 files changed

+116
-228
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ class Setup(val datadir: File,
360360
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
361361
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
362362
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
363-
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
363+
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
364364
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
365365
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
366366
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.

eclair-core/src/main/scala/fr/acinq/eclair/io/PeerReadyNotifier.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ object PeerReadyNotifier {
8888
context.log.error("no switchboard found")
8989
replyTo ! PeerUnavailable(remoteNodeId)
9090
Behaviors.stopped
91-
}
91+
}
92+
case Timeout =>
93+
context.log.info("timed out finding switchboard actor")
94+
replyTo ! PeerUnavailable(remoteNodeId)
95+
Behaviors.stopped
9296
}
9397
}
9498

eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala

Lines changed: 87 additions & 88 deletions
Large diffs are not rendered by default.

eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package fr.acinq.eclair.payment.relay
1818

19-
import akka.actor.typed
2019
import akka.actor.typed.scaladsl.Behaviors
2120
import akka.actor.typed.{ActorRef, Behavior}
2221
import fr.acinq.bitcoin.scalacompat.ByteVector32
@@ -58,7 +57,7 @@ object NodeRelayer {
5857
* NB: the payment secret used here is different from the invoice's payment secret and ensures we can
5958
* group together HTLCs that the previous trampoline node sent in the same MPP.
6059
*/
61-
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
60+
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
6261
Behaviors.setup { context =>
6362
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
6463
Behaviors.receiveMessage {
@@ -73,15 +72,15 @@ object NodeRelayer {
7372
case None =>
7473
val relayId = UUID.randomUUID()
7574
context.log.debug(s"spawning a new handler with relayId=$relayId")
76-
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString)
75+
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, router), relayId.toString)
7776
context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId)
7877
handler ! NodeRelay.Relay(nodeRelayPacket, originNode)
79-
apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler))
78+
apply(nodeParams, register, outgoingPaymentFactory, router, children + (childKey -> handler))
8079
}
8180
case RelayComplete(childHandler, paymentHash, paymentSecret) =>
8281
// we do a back-and-forth between parent and child before stopping the child to prevent a race condition
8382
childHandler ! NodeRelay.Stop
84-
apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret))
83+
apply(nodeParams, register, outgoingPaymentFactory, router, children - PaymentKey(paymentHash, paymentSecret))
8584
case GetPendingPayments(replyTo) =>
8685
replyTo ! children
8786
Behaviors.same

eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import scala.util.Random
4949
* It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers.
5050
* It also maintains an up-to-date view of local channel balances.
5151
*/
52-
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
52+
class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {
5353

5454
import Relayer._
5555

@@ -58,7 +58,7 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym
5858

5959
private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner")
6060
private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register)).onFailure(SupervisorStrategy.resume), "channel-relayer")
61-
private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer")
61+
private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), router)).onFailure(SupervisorStrategy.resume), name = "node-relayer")
6262

6363
def receive: Receive = {
6464
case init: PostRestartHtlcCleaner.Init => postRestartCleaner forward init
@@ -120,8 +120,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym
120120

121121
object Relayer extends Logging {
122122

123-
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None): Props =
124-
Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, initialized))
123+
def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None): Props =
124+
Props(new Relayer(nodeParams, router, register, paymentHandler, initialized))
125125

126126
// @formatter:off
127127
case class RelayFees(feeBase: MilliSatoshi, feeProportionalMillionths: Long) {

eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Channe
6666
val bobRegister = system.actorOf(Props(new TestRegister()))
6767
val alicePaymentHandler = system.actorOf(Props(new PaymentHandler(aliceParams, aliceRegister, TestProbe().ref)))
6868
val bobPaymentHandler = system.actorOf(Props(new PaymentHandler(bobParams, bobRegister, TestProbe().ref)))
69-
val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref))
70-
val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref))
69+
val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler))
70+
val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler))
7171
val wallet = new DummyOnChainWallet()
7272
val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(aliceParams, wallet, bobParams.nodeId, alice2blockchain.ref, aliceRelayer, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref)
7373
val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(bobParams, wallet, aliceParams.nodeId, bob2blockchain.ref, bobRelayer, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref)

eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,12 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat
9090
val bitcoinClient = new TestBitcoinCoreClient()
9191
val wallet = new SingleKeyOnChainWallet()
9292
val watcher = TestProbe("watcher")
93-
val triggerer = TestProbe("payment-triggerer")
9493
val watcherTyped = watcher.ref.toTyped[ZmqWatcher.Command]
9594
val register = system.actorOf(Register.props(), "register")
9695
val router = system.actorOf(Router.props(nodeParams, watcherTyped), "router")
9796
val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager")
9897
val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler")
99-
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped), "relayer")
98+
val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler), "relayer")
10099
val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient)
101100
val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory)
102101
val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume))

eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit
5757

5858
case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) {
5959
def createRelayer(nodeParams1: NodeParams): (ActorRef, ActorRef) = {
60-
val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped))
60+
val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref))
6161
// we need ensure the post-htlc-restart child actor is initialized
6262
sender.send(relayer, Relayer.GetChildActors(sender.ref))
6363
(relayer, sender.expectMsgType[Relayer.ChildActors].postRestartCleaner)

0 commit comments

Comments
 (0)