diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 7109e304dc..593206dad1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -309,7 +309,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val periodicRefreshInitialDelay = Helpers.nextChannelUpdateRefresh(channelUpdate1.timestamp) context.system.scheduler.scheduleWithFixedDelay(initialDelay = periodicRefreshInitialDelay, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh)) - goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1) + goto(OFFLINE) using normal.copy(channelUpdate = channelUpdate1, channelUpdateBeforeRestore_opt = Some(normal.channelUpdate)) case funding: DATA_WAIT_FOR_FUNDING_CONFIRMED => watchFundingTx(funding.commitments) @@ -666,7 +666,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val initialChannelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, shortChannelId, nodeParams.expiryDelta, d.commitments.remoteParams.htlcMinimum, fees.feeBase, fees.feeProportionalMillionths, commitments.capacity.toMilliSatoshi, enable = Helpers.aboveReserve(d.commitments)) // we need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network context.system.scheduler.scheduleWithFixedDelay(initialDelay = REFRESH_CHANNEL_UPDATE_INTERVAL, delay = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh)) - goto(NORMAL) using DATA_NORMAL(commitments.copy(remoteNextCommitInfo = Right(nextPerCommitmentPoint)), shortChannelId, buried = false, None, initialChannelUpdate, None, None) storing() + goto(NORMAL) using DATA_NORMAL(commitments.copy(remoteNextCommitInfo = Right(nextPerCommitmentPoint)), shortChannelId, buried = false, None, initialChannelUpdate, None, None, None) storing() case Event(remoteAnnSigs: AnnouncementSignatures, d: DATA_WAIT_FOR_FUNDING_LOCKED) if d.commitments.announceChannel => log.debug("received remote announcement signatures, delaying") @@ -1879,24 +1879,23 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case _ => () } - val previousChannelUpdate_opt = stateData match { - case data: DATA_NORMAL => Some(data.channelUpdate) - case _ => None - } - (state, nextState, stateData, nextStateData) match { // ORDER MATTERS! case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) => Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) { log.debug("re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags)) } - context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments)) + context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.channelUpdateBeforeRestore_opt, normal.commitments)) case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement => // don't do anything if neither the channel_update nor the channel_announcement didn't change () case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) => // when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel) log.info("emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags)) + val previousChannelUpdate_opt = stateData match { + case data: DATA_NORMAL => Some(data.channelUpdate) + case _ => None + } context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, previousChannelUpdate_opt, normal.commitments)) case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) => // in any other case (e.g. OFFLINE->SYNCING) we do nothing diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index 05c9716792..5236f74613 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -421,6 +421,7 @@ final case class DATA_NORMAL(commitments: Commitments, buried: Boolean, channelAnnouncement: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, + channelUpdateBeforeRestore_opt: Option[ChannelUpdate], localShutdown: Option[Shutdown], remoteShutdown: Option[Shutdown]) extends ChannelData with HasCommitments final case class DATA_SHUTDOWN(commitments: Commitments, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 7b09d84050..87f4a7168d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags} import fr.acinq.eclair.payment._ +import fr.acinq.eclair.router.Announcements /** * This actor sits at the interface between our event stream and the database. @@ -119,15 +120,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { case u: LocalChannelUpdate => u.previousChannelUpdate_opt match { - case Some(previous) if - u.channelUpdate.feeBaseMsat == previous.feeBaseMsat && - u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths && - u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta && - u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat && - u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => () + case Some(previous) if Announcements.areSameWithoutFlags(previous, u.channelUpdate) => () // channel update hasn't changed case _ => auditDb.addChannelUpdate(u) } - } override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala index d3ee33472c..7f5726e310 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Announcements.scala @@ -126,6 +126,9 @@ object Announcements { def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean = u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0) + def areSameWithoutFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean = + u1.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0, messageFlags = 1, channelFlags = 0) + def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte() def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelCodecs0.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelCodecs0.scala index e623df31ac..fd373837d8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelCodecs0.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelCodecs0.scala @@ -26,7 +26,7 @@ import fr.acinq.eclair.transactions._ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ -import fr.acinq.eclair.wire.protocol.UpdateMessage +import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage} import scodec.bits.{BitVector, ByteVector} import scodec.codecs._ import scodec.{Attempt, Codec} @@ -309,6 +309,7 @@ private[channel] object ChannelCodecs0 { ("buried" | bool) :: ("channelAnnouncement" | optional(bool, variableSizeBytes(noUnknownFieldsChannelAnnouncementSizeCodec, channelAnnouncementCodec))) :: ("channelUpdate" | variableSizeBytes(noUnknownFieldsChannelUpdateSizeCodec, channelUpdateCodec)) :: + ("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) :: ("localShutdown" | optional(bool, shutdownCodec)) :: ("remoteShutdown" | optional(bool, shutdownCodec))).as[DATA_NORMAL].decodeOnly @@ -318,6 +319,7 @@ private[channel] object ChannelCodecs0 { ("buried" | bool) :: ("channelAnnouncement" | optional(bool, variableSizeBytes(uint16, channelAnnouncementCodec))) :: ("channelUpdate" | variableSizeBytes(uint16, channelUpdateCodec)) :: + ("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) :: ("localShutdown" | optional(bool, shutdownCodec)) :: ("remoteShutdown" | optional(bool, shutdownCodec))).as[DATA_NORMAL].decodeOnly diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version1/ChannelCodecs1.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version1/ChannelCodecs1.scala index d8e4293c92..2091b7dfb2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version1/ChannelCodecs1.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version1/ChannelCodecs1.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0 import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ -import fr.acinq.eclair.wire.protocol.UpdateMessage +import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage} import scodec.bits.ByteVector import scodec.codecs._ import scodec.{Attempt, Codec} @@ -247,6 +247,7 @@ private[channel] object ChannelCodecs1 { ("buried" | bool8) :: ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: + ("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) :: ("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: ("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec)))).as[DATA_NORMAL] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version2/ChannelCodecs2.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version2/ChannelCodecs2.scala index 0fabb04cca..d63b4c44e8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version2/ChannelCodecs2.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version2/ChannelCodecs2.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0 import fr.acinq.eclair.wire.internal.channel.version0.ChannelTypes0.{HtlcTxAndSigs, PublishableTxs} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ -import fr.acinq.eclair.wire.protocol.UpdateMessage +import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage} import scodec.bits.ByteVector import scodec.codecs._ import scodec.{Attempt, Codec} @@ -282,6 +282,7 @@ private[channel] object ChannelCodecs2 { ("buried" | bool8) :: ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: + ("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) :: ("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: ("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec)))).as[DATA_NORMAL] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelCodecs3.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelCodecs3.scala index dc213c2cc2..5132ff13e2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelCodecs3.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelCodecs3.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions.{CommitmentSpec, DirectedHtlc, IncomingHtlc, OutgoingHtlc} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ -import fr.acinq.eclair.wire.protocol.UpdateMessage +import fr.acinq.eclair.wire.protocol.{ChannelUpdate, UpdateMessage} import fr.acinq.eclair.{FeatureSupport, Features, MilliSatoshi} import scodec.bits.{BitVector, ByteVector} import scodec.codecs._ @@ -302,6 +302,7 @@ private[channel] object ChannelCodecs3 { ("buried" | bool8) :: ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: + ("channelUpdateBeforeRestore_opt" | provide[Option[ChannelUpdate]](None)) :: ("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: ("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec)))).as[DATA_NORMAL] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala index abe96276a7..2ed421d1e5 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala @@ -1,13 +1,17 @@ package fr.acinq.eclair.channel -import akka.testkit.TestProbe +import akka.actor.typed.scaladsl.adapter.actorRefAdapter +import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.bitcoin._ -import fr.acinq.eclair.TestConstants.Alice +import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.WatchFundingSpentTriggered import fr.acinq.eclair.channel.states.ChannelStateTestsBase +import fr.acinq.eclair.channel.states.ChannelStateTestsHelperMethods.FakeTxPublisherFactory import fr.acinq.eclair.crypto.Generators import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager +import fr.acinq.eclair.payment.relay.Relayer.{RelayFees, RelayParams} +import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.transactions.Transactions.{ClaimP2WPKHOutputTx, DefaultCommitmentFormat, InputInfo, TxOwner} import fr.acinq.eclair.wire.protocol.{ChannelReestablish, CommitSig, Error, Init, RevokeAndAck} @@ -122,4 +126,66 @@ class RecoverySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Cha val tx1 = tx.updateWitness(0, ScriptWitness(Scripts.der(sig) :: ourToRemotePubKey.value :: Nil)) Transaction.correctlySpends(tx1, bobCommitTx :: Nil, ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) } + + test("restore channel without configuration change") { f => + import f._ + val sender = TestProbe() + + // we start by storing the current state + assert(alice.stateData.isInstanceOf[DATA_NORMAL]) + val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL] + + // we simulate a disconnection + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // we restart Alice + val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(TestConstants.Alice.nodeParams, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) + newAlice ! INPUT_RESTORED(oldStateData) + newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit) + bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit) + alice2bob.expectMsgType[ChannelReestablish] + bob2alice.expectMsgType[ChannelReestablish] + alice2bob.forward(bob) + bob2alice.forward(newAlice) + awaitCond(newAlice.stateName == NORMAL) + val u = channelUpdateListener.expectMsgType[LocalChannelUpdate] + assert(u.previousChannelUpdate_opt.nonEmpty) + assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, u.channelUpdate)) + assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, oldStateData.channelUpdate)) + } + + test("restore channel with configuration change") { f => + import f._ + val sender = TestProbe() + + // we start by storing the current state + assert(alice.stateData.isInstanceOf[DATA_NORMAL]) + val oldStateData = alice.stateData.asInstanceOf[DATA_NORMAL] + + // we simulate a disconnection + sender.send(alice, INPUT_DISCONNECTED) + sender.send(bob, INPUT_DISCONNECTED) + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + + // we restart Alice with a different configuration + val newFees = RelayFees(765 msat, 2345) + val newConfig = TestConstants.Alice.nodeParams.copy(relayParams = RelayParams(newFees, newFees, newFees)) + val newAlice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(newConfig, wallet, Bob.nodeParams.nodeId, alice2blockchain.ref, relayerA.ref, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) + newAlice ! INPUT_RESTORED(oldStateData) + newAlice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit) + bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit) + alice2bob.expectMsgType[ChannelReestablish] + bob2alice.expectMsgType[ChannelReestablish] + alice2bob.forward(bob) + bob2alice.forward(newAlice) + awaitCond(newAlice.stateName == NORMAL) + val u = channelUpdateListener.expectMsgType[LocalChannelUpdate] + assert(u.previousChannelUpdate_opt.nonEmpty) + assert(!Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, u.channelUpdate)) + assert(Announcements.areSameWithoutFlags(u.previousChannelUpdate_opt.get, oldStateData.channelUpdate)) + } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala index eb76615543..b9ca920cb7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala @@ -325,7 +325,7 @@ object ChannelCodecsSpec { commitInput = commitmentInput, remotePerCommitmentSecrets = ShaChain.init) - DATA_NORMAL(commitments, ShortChannelId(42), buried = true, None, channelUpdate, None, None) + DATA_NORMAL(commitments, ShortChannelId(42), buried = true, None, channelUpdate, None, None, None) } }