Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Periodic client discovery refresh #1152 #1886

Merged
merged 10 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin-tester-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repositories {

def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13")
def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}"
def akkaVersion = "2.9.0"
def akkaVersion = "2.9.1"

dependencies {
implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3'
Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version>
<maven-exec-plugin.version>3.0.0</maven-exec-plugin.version>
<akka.http.cors.version>1.1.0</akka.http.cors.version>
<akka.version>2.9.0</akka.version>
<akka.version>2.9.1</akka.version>
<grpc.version>1.60.2</grpc.version> <!-- checked synced by VersionSyncCheckPlugin -->
<project.encoding>UTF-8</project.encoding>
<build-helper-maven-plugin>3.3.0</build-helper-maven-plugin>
Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ repositories {

def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13")
def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}"
def akkaVersion = "2.9.0"
def akkaVersion = "2.9.1"

dependencies {
implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3'
Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<properties>
<maven.compiler.release>11</maven.compiler.release>
<akka.version>2.9.0</akka.version>
<akka.version>2.9.1</akka.version>
<akka.http.cors.version>0.4.2</akka.http.cors.version>
<grpc.version>1.60.2</grpc.version> <!-- checked synced by VersionSyncCheckPlugin -->
<project.encoding>UTF-8</project.encoding>
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
// We don't force Akka updates because downstream projects can upgrade
// themselves. For more information see
// https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html
val akka = "2.9.0"
val akka = "2.9.1"
val akkaBinary = "2.9"
val akkaHttp = "10.6.0"
val akkaHttpBinary = "10.6"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# private
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.GrpcClientSettings.this")

# internal
ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.listener")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.lookup")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply")
ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel")
8 changes: 8 additions & 0 deletions runtime/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ akka.grpc.client."*" {

# timeout for service discovery resolving
resolve-timeout = 1s

# Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
# if the discovery mechanism supports that. Expected use is for client side load-balancing, to detect new services
# to load balance across. The default value "off" disables periodic refresh and instead only does refresh when
# the client implementation decides to.
#
# Currently only supported by the Netty client backend.
refresh-interval = off
}

# port to use if service-discovery-mechism is static or service discovery does not return a port
Expand Down
44 changes: 38 additions & 6 deletions runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import com.typesafe.config.{ Config, ConfigValueFactory }
import io.grpc.CallCredentials
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider
import javax.net.ssl.{ SSLContext, TrustManager }

import javax.net.ssl.{ SSLContext, TrustManager }
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, _ }

Expand Down Expand Up @@ -148,7 +148,9 @@ object GrpcClientSettings {
getOptionalString(clientConfiguration, "user-agent"),
clientConfiguration.getBoolean("use-tls"),
getOptionalString(clientConfiguration, "load-balancing-policy"),
clientConfiguration.getString("backend"))
clientConfiguration.getString("backend"),
identity,
getOptionalDuration(clientConfiguration, "service-discovery.refresh-interval"))

private def getOptionalString(config: Config, path: String): Option[String] =
config.getString(path) match {
Expand All @@ -162,6 +164,12 @@ object GrpcClientSettings {
case other => Some(other)
}

private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] =
Helpers.toRootLowerCase(config.getString(path)) match {
case "off" => None
case _ => Some(config.getDuration(path).asScala)
}

private def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration =
Helpers.toRootLowerCase(underlying.getString(path)) match {
case "infinite" => Duration.Inf
Expand Down Expand Up @@ -196,7 +204,8 @@ final class GrpcClientSettings private (
val useTls: Boolean,
val loadBalancingPolicy: Option[String],
val backend: String,
val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity) {
val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity,
val discoveryRefreshInterval: Option[FiniteDuration]) {
require(
sslContext.isEmpty || trustManager.isEmpty,
"Configuring the sslContext or the trustManager is mutually exclusive")
Expand Down Expand Up @@ -284,6 +293,28 @@ final class GrpcClientSettings private (
def withBackend(value: String): GrpcClientSettings =
copy(backend = value)

/**
* Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
* if the discovery mechanism supports that. The default is no periodic refresh and instead
* * only does refresh when the client implementation decides to.
*
* Currently only supported by the Netty client backend.
*/
@ApiMayChange
def withDiscoveryRefreshInterval(refreshInterval: FiniteDuration): GrpcClientSettings =
copy(discoveryRefreshInterval = Some(refreshInterval))

/**
* Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
* if the discovery mechanism supports that. The default is no periodic refresh and instead
* only does refresh when the client implementation decides to.
*
* Currently only supported by the Netty client backend.
*/
@ApiMayChange
def withDiscoveryRefreshInterval(refreshInterval: java.time.Duration): GrpcClientSettings =
copy(discoveryRefreshInterval = Some(refreshInterval.asScala))

private def copy(
serviceName: String = serviceName,
servicePortName: Option[String] = servicePortName,
Expand All @@ -301,8 +332,8 @@ final class GrpcClientSettings private (
connectionAttempts: Option[Int] = connectionAttempts,
loadBalancingPolicy: Option[String] = loadBalancingPolicy,
backend: String = backend,
channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides)
: GrpcClientSettings =
channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides,
discoveryRefreshInterval: Option[FiniteDuration] = discoveryRefreshInterval): GrpcClientSettings =
new GrpcClientSettings(
callCredentials = callCredentials,
serviceDiscovery = serviceDiscovery,
Expand All @@ -321,5 +352,6 @@ final class GrpcClientSettings private (
connectionAttempts = connectionAttempts,
loadBalancingPolicy = loadBalancingPolicy,
backend = backend,
channelBuilderOverrides = channelBuilderOverrides)
channelBuilderOverrides = channelBuilderOverrides,
discoveryRefreshInterval = discoveryRefreshInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,101 @@

package akka.grpc.internal

import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.annotation.InternalApi

import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.event.Logging
import akka.grpc.GrpcClientSettings
import io.grpc.{ Attributes, EquivalentAddressGroup, NameResolver, Status }
import io.grpc.NameResolver.Listener

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
import scala.util.{ Failure, Success }

class AkkaDiscoveryNameResolver(
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaDiscoveryNameResolver(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
resolveTimeout: FiniteDuration)(implicit val ec: ExecutionContext)
resolveTimeout: FiniteDuration,
refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem)
extends NameResolver {

private final val log = Logging(system, "akka.grpc.internal.AkkaDiscoveryNameResolver")

override def getServiceAuthority: String = serviceName

val listener: Promise[Listener] = Promise()
private val listener: Promise[Listener] = Promise()

// initialized after first resolve if needed
private val refreshTask = new AtomicReference[Cancellable]

override def start(l: Listener): Unit = {
log.debug("Name resolver for {} started", serviceName)
listener.trySuccess(l)
lookup(l)
lookup(l, evict = false)
}

override def refresh(): Unit =
override def refresh(): Unit = refresh(false)

private def refresh(evict: Boolean): Unit =
listener.future.onComplete {
case Success(l) => lookup(l)
case Success(l) =>
log.debug("Name resolver for {} refreshing", serviceName)
lookup(l, evict)
case Failure(_) => // We never fail this promise
}

def lookup(listener: Listener): Unit = {
discovery.lookup(Lookup(serviceName, portName, protocol), resolveTimeout).onComplete {
def lookup(listener: Listener, evict: Boolean): Unit = {
val request = {
val l = Lookup(serviceName, portName, protocol)
if (evict) l.withDiscardCache
else l
}
val result = discovery.lookup(request, resolveTimeout)

result.onComplete {
case Success(result) =>
try {
if (log.isDebugEnabled)
log.debug(
"Successful service discovery for service {}, found addresses: {}",
serviceName,
result.addresses.mkString(", "))
listener.onAddresses(addresses(result.addresses), Attributes.EMPTY)
} catch {
case e: UnknownHostException =>
// TODO at least log
log.warning(e, "Unknown host for service {}", serviceName)
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}
case Failure(e) =>
// TODO at least log
log.warning(e, "Service discovery failed for service {}", serviceName)
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}

// initialize refresh timer after first lookup, if configured
if (refreshInterval.isDefined && refreshTask.get() == null) {
result.onComplete { _ =>
refreshInterval.foreach { interval =>
val cancellable = system.scheduler.scheduleWithFixedDelay(interval, interval)(() => refresh(evict = true))
if (!refreshTask.compareAndSet(null, cancellable)) {
// concurrent update beat us to it, there already is a scheduled task
cancellable.cancel()
}
}
}
}
}

@throws[UnknownHostException]
Expand All @@ -67,16 +113,25 @@ class AkkaDiscoveryNameResolver(
.asJava
}

override def shutdown(): Unit = ()
override def shutdown(): Unit = {
val refreshCancellable = refreshTask.get()
if (refreshCancellable ne null) refreshCancellable.cancel()
}
}

object AkkaDiscoveryNameResolver {
def apply(settings: GrpcClientSettings)(implicit ec: ExecutionContext): AkkaDiscoveryNameResolver =
/**
* INTERNAL API
*/
@InternalApi
private[akka] object AkkaDiscoveryNameResolver {
def apply(
settings: GrpcClientSettings)(implicit ec: ExecutionContext, system: ActorSystem): AkkaDiscoveryNameResolver =
new AkkaDiscoveryNameResolver(
settings.serviceDiscovery,
settings.defaultPort,
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
settings.resolveTimeout)
settings.resolveTimeout,
settings.discoveryRefreshInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@

package akka.grpc.internal

import java.net.URI
import akka.actor.ActorSystem
import akka.annotation.InternalApi

import java.net.URI
import akka.discovery.ServiceDiscovery
import io.grpc.{ NameResolver, NameResolverProvider }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

class AkkaDiscoveryNameResolverProvider(
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaDiscoveryNameResolverProvider(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
resolveTimeout: FiniteDuration)(implicit ec: ExecutionContext)
resolveTimeout: FiniteDuration,
refreshInterval: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem)
extends NameResolverProvider {
override def isAvailable: Boolean = true

Expand All @@ -27,6 +34,13 @@ class AkkaDiscoveryNameResolverProvider(
override def getDefaultScheme: String = "http"

override def newNameResolver(targetUri: URI, args: NameResolver.Args): AkkaDiscoveryNameResolver = {
new AkkaDiscoveryNameResolver(discovery, defaultPort, serviceName, portName, protocol, resolveTimeout)
new AkkaDiscoveryNameResolver(
discovery,
defaultPort,
serviceName,
portName,
protocol,
resolveTimeout,
refreshInterval)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ChannelUtils {
implicit sys: ClassicActorSystemProvider): InternalChannel = {
settings.backend match {
case "netty" =>
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher)
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher, sys.classicSystem)
case "akka-http" =>
AkkaHttpClientUtils.createChannel(settings, log)
case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]")
Expand Down
Loading
Loading