Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ eclair {
}
}
}
// The router parallelize route requests with multiple worker actors. The router creates the workers at statup and
// forwads route request to them. This parameter controls the number of the workers.
//
// The default value is 0, which means the number of CPU cores.
number-of-workers = 0
}

socks5 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ object NodeParams extends Logging {
pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")),
messageRouteParams = getMessageRouteParams(config.getConfig("router.message-path-finding")),
balanceEstimateHalfLife = FiniteDuration(config.getDuration("router.balance-estimate-half-life").getSeconds, TimeUnit.SECONDS),
numberOfWorkers = config.getInt("router.number-of-workers")
),
socksProxy_opt = socksProxy_opt,
maxPaymentAttempts = config.getInt("max-payment-attempts"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ object EclairInternalsSerializer {
("channelQueryChunkSize" | int32) ::
("pathFindingExperimentConf" | pathFindingExperimentConfCodec) ::
("messageRouteParams" | messageRouteParamsCodec) ::
("balanceEstimateHalfLife" | finiteDurationCodec)).as[RouterConf]
("balanceEstimateHalfLife" | finiteDurationCodec) ::
("numberOfRouterWorkers" | int32)).as[RouterConf]

val overrideFeaturesListCodec: Codec[List[(PublicKey, Features[Feature])]] = listOfN(uint16, publicKey ~ lengthPrefixedFeaturesCodec)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object RouteCalculation {
}
}

def finalizeRoute(d: Data, localNodeId: PublicKey, fr: FinalizeRoute)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
def finalizeRoute(d: Data, localNodeId: PublicKey, fr: FinalizeRoute, replyTo: ActorRef)(implicit log: DiagnosticLoggingAdapter): Data = {
def validateMaxRouteFee(route: Route, maxFee_opt: Option[MilliSatoshi]): Try[Route] = {
val routeFee = route.channelFee(includeLocalChannelCost = false)
maxFee_opt match {
Expand All @@ -64,7 +64,6 @@ object RouteCalculation {
parentPaymentId_opt = fr.paymentContext.map(_.parentId),
paymentId_opt = fr.paymentContext.map(_.id),
paymentHash_opt = fr.paymentContext.map(_.paymentHash))) {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors

val extraEdges = fr.extraEdges.map(ActiveEdge(_))
val g = extraEdges.foldLeft(d.graphWithBalances.graph) { case (g: DirectedGraph, e: ActiveEdge) => g.addEdge(e) }
Expand All @@ -78,12 +77,12 @@ object RouteCalculation {
val selectedEdges = edges.map(es => es.maxBy(e => e.balance_opt.getOrElse(e.capacity.toMilliSatoshi)))
val hops = selectedEdges.map(e => ChannelHop(getEdgeRelayScid(d, localNodeId, e), e.desc.a, e.desc.b, e.params))
validateMaxRouteFee(Route(amount, hops, None), maxFee_opt) match {
case Success(route) => ctx.sender() ! RouteResponse(route :: Nil)
case Failure(f) => ctx.sender() ! Status.Failure(f)
case Success(route) => replyTo ! RouteResponse(route :: Nil)
case Failure(f) => replyTo ! Status.Failure(f)
}
case _ =>
// some nodes in the supplied route aren't connected in our graph
ctx.sender() ! Status.Failure(new IllegalArgumentException("Not all the nodes in the supplied route are connected with public channels"))
replyTo ! Status.Failure(new IllegalArgumentException("Not all the nodes in the supplied route are connected with public channels"))
}
case PredefinedChannelRoute(amount, targetNodeId, shortChannelIds, maxFee_opt) =>
val (end, hops) = shortChannelIds.foldLeft((localNodeId, Seq.empty[ChannelHop])) {
Expand All @@ -107,11 +106,11 @@ object RouteCalculation {
}
}
if (end != targetNodeId || hops.length != shortChannelIds.length) {
ctx.sender() ! Status.Failure(new IllegalArgumentException("The sequence of channels provided cannot be used to build a route to the target node"))
replyTo ! Status.Failure(new IllegalArgumentException("The sequence of channels provided cannot be used to build a route to the target node"))
} else {
validateMaxRouteFee(Route(amount, hops, None), maxFee_opt) match {
case Success(route) => ctx.sender() ! RouteResponse(route :: Nil)
case Failure(f) => ctx.sender() ! Status.Failure(f)
case Success(route) => replyTo ! RouteResponse(route :: Nil)
case Failure(f) => replyTo ! Status.Failure(f)
}
}
}
Expand Down Expand Up @@ -191,13 +190,12 @@ object RouteCalculation {
})
}

def handleRouteRequest(d: Data, currentBlockHeight: BlockHeight, r: RouteRequest)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
def handleRouteRequest(d: Data, currentBlockHeight: BlockHeight, r: RouteRequest, replyTo: ActorRef)(implicit log: DiagnosticLoggingAdapter): Data = {
Logs.withMdc(log)(Logs.mdc(
category_opt = Some(LogCategory.PAYMENT),
parentPaymentId_opt = r.paymentContext.map(_.parentId),
paymentId_opt = r.paymentContext.map(_.id),
paymentHash_opt = r.paymentContext.map(_.paymentHash))) {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors

val ignoredEdges = r.ignore.channels ++ d.excludedChannels.keySet
val (targetNodeId, amountToSend, maxFee, extraEdges) = computeTarget(r, ignoredEdges)
Expand All @@ -220,26 +218,26 @@ object RouteCalculation {
// trampoline nodes).
Metrics.RouteResults.withTags(tags).record(routes.length)
routes.foreach(route => Metrics.RouteLength.withTags(tags).record(route.hops.length))
ctx.sender() ! RouteResponse(routes)
replyTo ! RouteResponse(routes)
case Failure(failure: InfiniteLoop) =>
log.error(s"found infinite loop ${failure.path.map(edge => edge.desc).mkString(" -> ")}")
Metrics.FindRouteErrors.withTags(tags.withTag(Tags.Error, "InfiniteLoop")).increment()
ctx.sender() ! Status.Failure(failure)
replyTo ! Status.Failure(failure)
case Failure(failure: NegativeProbability) =>
log.error(s"computed negative probability: edge=${failure.edge}, weight=${failure.weight}, heuristicsConstants=${failure.heuristicsConstants}")
Metrics.FindRouteErrors.withTags(tags.withTag(Tags.Error, "NegativeProbability")).increment()
ctx.sender() ! Status.Failure(failure)
replyTo ! Status.Failure(failure)
case Failure(t) =>
val failure = if (isNeighborBalanceTooLow(d.graphWithBalances.graph, r.source, targetNodeId, amountToSend)) BalanceTooLow else t
Metrics.FindRouteErrors.withTags(tags.withTag(Tags.Error, failure.getClass.getSimpleName)).increment()
ctx.sender() ! Status.Failure(failure)
replyTo ! Status.Failure(failure)
}
}
d
}
}

def handleMessageRouteRequest(d: Data, currentBlockHeight: BlockHeight, r: MessageRouteRequest, routeParams: MessageRouteParams)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
def handleMessageRouteRequest(d: Data, currentBlockHeight: BlockHeight, r: MessageRouteRequest, routeParams: MessageRouteParams)(implicit log: DiagnosticLoggingAdapter): Data = {
val boundaries: MessagePath.RichWeight => Boolean = { weight =>
weight.length <= routeParams.maxRouteLength && weight.length <= ROUTE_MAX_LENGTH
}
Expand Down
Loading