diff --git a/plugin-tester-java/build.gradle b/plugin-tester-java/build.gradle
index 683de2a6d..a6d647236 100644
--- a/plugin-tester-java/build.gradle
+++ b/plugin-tester-java/build.gradle
@@ -37,7 +37,7 @@ repositories {
def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13")
def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}"
-def akkaVersion = "2.9.0"
+def akkaVersion = "2.9.1"
dependencies {
implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3'
diff --git a/plugin-tester-java/pom.xml b/plugin-tester-java/pom.xml
index 0df034bcf..37e9b61d8 100644
--- a/plugin-tester-java/pom.xml
+++ b/plugin-tester-java/pom.xml
@@ -15,7 +15,7 @@
3.1.2
3.0.0
1.1.0
- 2.9.0
+ 2.9.1
1.60.2
UTF-8
3.3.0
diff --git a/plugin-tester-scala/build.gradle b/plugin-tester-scala/build.gradle
index 1856234d0..e7bac98eb 100644
--- a/plugin-tester-scala/build.gradle
+++ b/plugin-tester-scala/build.gradle
@@ -33,7 +33,7 @@ repositories {
def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13")
def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}"
-def akkaVersion = "2.9.0"
+def akkaVersion = "2.9.1"
dependencies {
implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3'
diff --git a/plugin-tester-scala/pom.xml b/plugin-tester-scala/pom.xml
index 8de9abba2..69b794613 100644
--- a/plugin-tester-scala/pom.xml
+++ b/plugin-tester-scala/pom.xml
@@ -12,7 +12,7 @@
11
- 2.9.0
+ 2.9.1
0.4.2
1.60.2
UTF-8
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 7a24c253a..66c37101c 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -17,7 +17,7 @@ object Dependencies {
// We don't force Akka updates because downstream projects can upgrade
// themselves. For more information see
// https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html
- val akka = "2.9.0"
+ val akka = "2.9.1"
val akkaBinary = "2.9"
val akkaHttp = "10.6.0"
val akkaHttpBinary = "10.6"
diff --git a/runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes b/runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes
new file mode 100644
index 000000000..fce56a646
--- /dev/null
+++ b/runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes
@@ -0,0 +1,14 @@
+# private
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.GrpcClientSettings.this")
+
+# internal
+ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.listener")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.lookup")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply")
+ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel")
\ No newline at end of file
diff --git a/runtime/src/main/resources/reference.conf b/runtime/src/main/resources/reference.conf
index 25d8b47ac..c3729a3ba 100644
--- a/runtime/src/main/resources/reference.conf
+++ b/runtime/src/main/resources/reference.conf
@@ -17,6 +17,14 @@ akka.grpc.client."*" {
# timeout for service discovery resolving
resolve-timeout = 1s
+
+ # Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
+ # if the discovery mechanism supports that. Expected use is for client side load-balancing, to detect new services
+ # to load balance across. The default value "off" disables periodic refresh and instead only does refresh when
+ # the client implementation decides to.
+ #
+ # Currently only supported by the Netty client backend.
+ refresh-interval = off
}
# port to use if service-discovery-mechism is static or service discovery does not return a port
diff --git a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala
index bd9742bc2..896dd16fe 100644
--- a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala
+++ b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala
@@ -15,8 +15,8 @@ import com.typesafe.config.{ Config, ConfigValueFactory }
import io.grpc.CallCredentials
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider
-import javax.net.ssl.{ SSLContext, TrustManager }
+import javax.net.ssl.{ SSLContext, TrustManager }
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, _ }
@@ -148,7 +148,9 @@ object GrpcClientSettings {
getOptionalString(clientConfiguration, "user-agent"),
clientConfiguration.getBoolean("use-tls"),
getOptionalString(clientConfiguration, "load-balancing-policy"),
- clientConfiguration.getString("backend"))
+ clientConfiguration.getString("backend"),
+ identity,
+ getOptionalDuration(clientConfiguration, "service-discovery.refresh-interval"))
private def getOptionalString(config: Config, path: String): Option[String] =
config.getString(path) match {
@@ -162,6 +164,12 @@ object GrpcClientSettings {
case other => Some(other)
}
+ private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] =
+ Helpers.toRootLowerCase(config.getString(path)) match {
+ case "off" => None
+ case _ => Some(config.getDuration(path).asScala)
+ }
+
private def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration =
Helpers.toRootLowerCase(underlying.getString(path)) match {
case "infinite" => Duration.Inf
@@ -196,7 +204,8 @@ final class GrpcClientSettings private (
val useTls: Boolean,
val loadBalancingPolicy: Option[String],
val backend: String,
- val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity) {
+ val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity,
+ val discoveryRefreshInterval: Option[FiniteDuration]) {
require(
sslContext.isEmpty || trustManager.isEmpty,
"Configuring the sslContext or the trustManager is mutually exclusive")
@@ -284,6 +293,28 @@ final class GrpcClientSettings private (
def withBackend(value: String): GrpcClientSettings =
copy(backend = value)
+ /**
+ * Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
+ * if the discovery mechanism supports that. The default is no periodic refresh and instead
+ * * only does refresh when the client implementation decides to.
+ *
+ * Currently only supported by the Netty client backend.
+ */
+ @ApiMayChange
+ def withDiscoveryRefreshInterval(refreshInterval: FiniteDuration): GrpcClientSettings =
+ copy(discoveryRefreshInterval = Some(refreshInterval))
+
+ /**
+ * Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
+ * if the discovery mechanism supports that. The default is no periodic refresh and instead
+ * only does refresh when the client implementation decides to.
+ *
+ * Currently only supported by the Netty client backend.
+ */
+ @ApiMayChange
+ def withDiscoveryRefreshInterval(refreshInterval: java.time.Duration): GrpcClientSettings =
+ copy(discoveryRefreshInterval = Some(refreshInterval.asScala))
+
private def copy(
serviceName: String = serviceName,
servicePortName: Option[String] = servicePortName,
@@ -301,8 +332,8 @@ final class GrpcClientSettings private (
connectionAttempts: Option[Int] = connectionAttempts,
loadBalancingPolicy: Option[String] = loadBalancingPolicy,
backend: String = backend,
- channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides)
- : GrpcClientSettings =
+ channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides,
+ discoveryRefreshInterval: Option[FiniteDuration] = discoveryRefreshInterval): GrpcClientSettings =
new GrpcClientSettings(
callCredentials = callCredentials,
serviceDiscovery = serviceDiscovery,
@@ -321,5 +352,6 @@ final class GrpcClientSettings private (
connectionAttempts = connectionAttempts,
loadBalancingPolicy = loadBalancingPolicy,
backend = backend,
- channelBuilderOverrides = channelBuilderOverrides)
+ channelBuilderOverrides = channelBuilderOverrides,
+ discoveryRefreshInterval = discoveryRefreshInterval)
}
diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala
index c299ddf3e..dac3f9f74 100644
--- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala
+++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala
@@ -4,55 +4,101 @@
package akka.grpc.internal
-import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
+import akka.actor.ActorSystem
+import akka.actor.Cancellable
+import akka.annotation.InternalApi
+import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.discovery.{ Lookup, ServiceDiscovery }
+import akka.event.Logging
import akka.grpc.GrpcClientSettings
import io.grpc.{ Attributes, EquivalentAddressGroup, NameResolver, Status }
import io.grpc.NameResolver.Listener
+import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
import scala.util.{ Failure, Success }
-class AkkaDiscoveryNameResolver(
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final class AkkaDiscoveryNameResolver(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
- resolveTimeout: FiniteDuration)(implicit val ec: ExecutionContext)
+ resolveTimeout: FiniteDuration,
+ refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem)
extends NameResolver {
+
+ private final val log = Logging(system, "akka.grpc.internal.AkkaDiscoveryNameResolver")
+
override def getServiceAuthority: String = serviceName
- val listener: Promise[Listener] = Promise()
+ private val listener: Promise[Listener] = Promise()
+
+ // initialized after first resolve if needed
+ private val refreshTask = new AtomicReference[Cancellable]
override def start(l: Listener): Unit = {
+ log.debug("Name resolver for {} started", serviceName)
listener.trySuccess(l)
- lookup(l)
+ lookup(l, evict = false)
}
- override def refresh(): Unit =
+ override def refresh(): Unit = refresh(false)
+
+ private def refresh(evict: Boolean): Unit =
listener.future.onComplete {
- case Success(l) => lookup(l)
+ case Success(l) =>
+ log.debug("Name resolver for {} refreshing", serviceName)
+ lookup(l, evict)
case Failure(_) => // We never fail this promise
}
- def lookup(listener: Listener): Unit = {
- discovery.lookup(Lookup(serviceName, portName, protocol), resolveTimeout).onComplete {
+ def lookup(listener: Listener, evict: Boolean): Unit = {
+ val request = {
+ val l = Lookup(serviceName, portName, protocol)
+ if (evict) l.withDiscardCache
+ else l
+ }
+ val result = discovery.lookup(request, resolveTimeout)
+
+ result.onComplete {
case Success(result) =>
try {
+ if (log.isDebugEnabled)
+ log.debug(
+ "Successful service discovery for service {}, found addresses: {}",
+ serviceName,
+ result.addresses.mkString(", "))
listener.onAddresses(addresses(result.addresses), Attributes.EMPTY)
} catch {
case e: UnknownHostException =>
- // TODO at least log
+ log.warning(e, "Unknown host for service {}", serviceName)
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}
case Failure(e) =>
- // TODO at least log
+ log.warning(e, "Service discovery failed for service {}", serviceName)
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}
+
+ // initialize refresh timer after first lookup, if configured
+ if (refreshInterval.isDefined && refreshTask.get() == null) {
+ result.onComplete { _ =>
+ refreshInterval.foreach { interval =>
+ val cancellable = system.scheduler.scheduleWithFixedDelay(interval, interval)(() => refresh(evict = true))
+ if (!refreshTask.compareAndSet(null, cancellable)) {
+ // concurrent update beat us to it, there already is a scheduled task
+ cancellable.cancel()
+ }
+ }
+ }
+ }
}
@throws[UnknownHostException]
@@ -67,16 +113,25 @@ class AkkaDiscoveryNameResolver(
.asJava
}
- override def shutdown(): Unit = ()
+ override def shutdown(): Unit = {
+ val refreshCancellable = refreshTask.get()
+ if (refreshCancellable ne null) refreshCancellable.cancel()
+ }
}
-object AkkaDiscoveryNameResolver {
- def apply(settings: GrpcClientSettings)(implicit ec: ExecutionContext): AkkaDiscoveryNameResolver =
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] object AkkaDiscoveryNameResolver {
+ def apply(
+ settings: GrpcClientSettings)(implicit ec: ExecutionContext, system: ActorSystem): AkkaDiscoveryNameResolver =
new AkkaDiscoveryNameResolver(
settings.serviceDiscovery,
settings.defaultPort,
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
- settings.resolveTimeout)
+ settings.resolveTimeout,
+ settings.discoveryRefreshInterval)
}
diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala
index 995e98cfc..02f5609cb 100644
--- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala
+++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala
@@ -4,21 +4,28 @@
package akka.grpc.internal
-import java.net.URI
+import akka.actor.ActorSystem
+import akka.annotation.InternalApi
+import java.net.URI
import akka.discovery.ServiceDiscovery
import io.grpc.{ NameResolver, NameResolverProvider }
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
-class AkkaDiscoveryNameResolverProvider(
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] final class AkkaDiscoveryNameResolverProvider(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
- resolveTimeout: FiniteDuration)(implicit ec: ExecutionContext)
+ resolveTimeout: FiniteDuration,
+ refreshInterval: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem)
extends NameResolverProvider {
override def isAvailable: Boolean = true
@@ -27,6 +34,13 @@ class AkkaDiscoveryNameResolverProvider(
override def getDefaultScheme: String = "http"
override def newNameResolver(targetUri: URI, args: NameResolver.Args): AkkaDiscoveryNameResolver = {
- new AkkaDiscoveryNameResolver(discovery, defaultPort, serviceName, portName, protocol, resolveTimeout)
+ new AkkaDiscoveryNameResolver(
+ discovery,
+ defaultPort,
+ serviceName,
+ portName,
+ protocol,
+ resolveTimeout,
+ refreshInterval)
}
}
diff --git a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala
index 4922e4a5a..bd7da7c0f 100644
--- a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala
+++ b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala
@@ -38,7 +38,7 @@ object ChannelUtils {
implicit sys: ClassicActorSystemProvider): InternalChannel = {
settings.backend match {
case "netty" =>
- NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher)
+ NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher, sys.classicSystem)
case "akka-http" =>
AkkaHttpClientUtils.createChannel(settings, log)
case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]")
diff --git a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala
index c89997b05..c880ea7be 100644
--- a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala
+++ b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala
@@ -4,6 +4,9 @@
package akka.grpc.internal
+import akka.Done
+import akka.NotUsed
+import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.grpc.GrpcClientSettings
@@ -12,28 +15,26 @@ import akka.grpc.GrpcSingleResponse
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
-import akka.Done
-import akka.NotUsed
+import io.grpc.CallOptions
+import io.grpc.MethodDescriptor
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
+import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig
import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig.Protocol
import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig.SelectedListenerFailureBehavior
import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig.SelectorFailureBehavior
-import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig
import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolNames
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder
-import io.grpc.CallOptions
-import io.grpc.MethodDescriptor
import java.util.concurrent.TimeUnit
import javax.net.ssl.SSLContext
import scala.annotation.nowarn
-import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
+import scala.concurrent.duration.FiniteDuration
import scala.util.Failure
import scala.util.Success
@@ -48,7 +49,8 @@ object NettyClientUtils {
*/
@InternalApi
def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)(
- implicit ec: ExecutionContext): InternalChannel = {
+ implicit ec: ExecutionContext,
+ system: ActorSystem): InternalChannel = {
@nowarn("msg=deprecated")
var builder =
@@ -66,7 +68,8 @@ object NettyClientUtils {
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
- settings.resolveTimeout))
+ settings.resolveTimeout,
+ settings.discoveryRefreshInterval))
if (!settings.useTls)
builder = builder.usePlaintext()
diff --git a/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala b/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala
index dfae3e42d..c78a2afa9 100644
--- a/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala
+++ b/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala
@@ -54,7 +54,8 @@ class AkkaDiscoveryNameResolverProviderSpec
serviceName = serviceName,
portName = None,
protocol = None,
- resolveTimeout = 3.seconds)
+ resolveTimeout = 3.seconds,
+ None)
val resolver = provider.newNameResolver(new URI("//" + serviceName), null)
diff --git a/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt b/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt
index 5ec738ad3..c4c0da9f4 100644
--- a/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt
+++ b/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt
@@ -4,7 +4,7 @@ resolvers += "Akka library repository".at("https://repo.akka.io/maven")
enablePlugins(AkkaGrpcPlugin)
-dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % "2.9.0"
+dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % "2.9.1"
assembly / assemblyMergeStrategy := {
// https://github.com/akka/akka/issues/29456