From 95e58fd5bbacc40fe42556344e58bdedd30f75be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Fri, 15 Dec 2023 13:16:56 +0100 Subject: [PATCH 01/10] feat: Periodic client discovery refresh #1152 --- project/Dependencies.scala | 2 +- runtime/src/main/resources/reference.conf | 8 ++ .../scala/akka/grpc/GrpcClientSettings.scala | 46 ++++++++-- .../internal/AkkaDiscoveryNameResolver.scala | 85 +++++++++++++++---- .../AkkaDiscoveryNameResolverProvider.scala | 22 ++++- .../akka/grpc/internal/ChannelUtils.scala | 2 +- .../akka/grpc/internal/NettyClientUtils.scala | 7 +- 7 files changed, 143 insertions(+), 29 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7a24c253a..09b3dae00 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { // We don't force Akka updates because downstream projects can upgrade // themselves. For more information see // https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html - val akka = "2.9.0" + val akka = "2.9.1-M1+11-809c4920-SNAPSHOT" val akkaBinary = "2.9" val akkaHttp = "10.6.0" val akkaHttpBinary = "10.6" diff --git a/runtime/src/main/resources/reference.conf b/runtime/src/main/resources/reference.conf index 25d8b47ac..c3729a3ba 100644 --- a/runtime/src/main/resources/reference.conf +++ b/runtime/src/main/resources/reference.conf @@ -17,6 +17,14 @@ akka.grpc.client."*" { # timeout for service discovery resolving resolve-timeout = 1s + + # Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries + # if the discovery mechanism supports that. Expected use is for client side load-balancing, to detect new services + # to load balance across. The default value "off" disables periodic refresh and instead only does refresh when + # the client implementation decides to. + # + # Currently only supported by the Netty client backend. + refresh-interval = off } # port to use if service-discovery-mechism is static or service discovery does not return a port diff --git a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala index bd9742bc2..fd6fd2928 100644 --- a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala +++ b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala @@ -15,10 +15,12 @@ import com.typesafe.config.{ Config, ConfigValueFactory } import io.grpc.CallCredentials import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider -import javax.net.ssl.{ SSLContext, TrustManager } +import java.util.Optional +import javax.net.ssl.{ SSLContext, TrustManager } import scala.collection.immutable import scala.concurrent.duration.{ Duration, _ } +import scala.jdk.OptionConverters.RichOptional object GrpcClientSettings { @@ -148,7 +150,9 @@ object GrpcClientSettings { getOptionalString(clientConfiguration, "user-agent"), clientConfiguration.getBoolean("use-tls"), getOptionalString(clientConfiguration, "load-balancing-policy"), - clientConfiguration.getString("backend")) + clientConfiguration.getString("backend"), + identity, + getOptionalDuration(clientConfiguration, "discovery.refresh-interval")) private def getOptionalString(config: Config, path: String): Option[String] = config.getString(path) match { @@ -162,6 +166,12 @@ object GrpcClientSettings { case other => Some(other) } + private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] = + config.getString(path) match { + case "off" => None + case _ => Some(config.getDuration(path).asScala) + } + private def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration = Helpers.toRootLowerCase(underlying.getString(path)) match { case "infinite" => Duration.Inf @@ -196,7 +206,8 @@ final class GrpcClientSettings private ( val useTls: Boolean, val loadBalancingPolicy: Option[String], val backend: String, - val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity) { + val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity, + val discoveryRefreshInterval: Option[FiniteDuration]) { require( sslContext.isEmpty || trustManager.isEmpty, "Configuring the sslContext or the trustManager is mutually exclusive") @@ -284,6 +295,28 @@ final class GrpcClientSettings private ( def withBackend(value: String): GrpcClientSettings = copy(backend = value) + /** + * Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries + * if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead + * only does refresh when the client implementation decides to. + * + * Currently only supported by the Netty client backend. + */ + @ApiMayChange + def withDiscoveryRefreshInterval(refreshInterval: Option[FiniteDuration]): GrpcClientSettings = + copy(discoveryRefreshInterval = refreshInterval) + + /** + * Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries + * if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead + * only does refresh when the client implementation decides to. + * + * Currently only supported by the Netty client backend. + */ + @ApiMayChange + def withDiscoveryRefreshInterval(refreshInterval: Optional[java.time.Duration]): GrpcClientSettings = + copy(discoveryRefreshInterval = refreshInterval.map(_.asScala).toScala) + private def copy( serviceName: String = serviceName, servicePortName: Option[String] = servicePortName, @@ -301,8 +334,8 @@ final class GrpcClientSettings private ( connectionAttempts: Option[Int] = connectionAttempts, loadBalancingPolicy: Option[String] = loadBalancingPolicy, backend: String = backend, - channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides) - : GrpcClientSettings = + channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides, + discoveryRefreshInterval: Option[FiniteDuration] = discoveryRefreshInterval): GrpcClientSettings = new GrpcClientSettings( callCredentials = callCredentials, serviceDiscovery = serviceDiscovery, @@ -321,5 +354,6 @@ final class GrpcClientSettings private ( connectionAttempts = connectionAttempts, loadBalancingPolicy = loadBalancingPolicy, backend = backend, - channelBuilderOverrides = channelBuilderOverrides) + channelBuilderOverrides = channelBuilderOverrides, + discoveryRefreshInterval = discoveryRefreshInterval) } diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala index c299ddf3e..949dd6033 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala @@ -4,55 +4,101 @@ package akka.grpc.internal -import java.net.{ InetAddress, InetSocketAddress, UnknownHostException } +import akka.actor.ActorSystem +import akka.actor.Cancellable +import akka.annotation.InternalApi +import java.net.{ InetAddress, InetSocketAddress, UnknownHostException } import akka.discovery.ServiceDiscovery.ResolvedTarget import akka.discovery.{ Lookup, ServiceDiscovery } +import akka.event.Logging import akka.grpc.GrpcClientSettings import io.grpc.{ Attributes, EquivalentAddressGroup, NameResolver, Status } import io.grpc.NameResolver.Listener +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Promise } import scala.util.{ Failure, Success } -class AkkaDiscoveryNameResolver( +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class AkkaDiscoveryNameResolver( discovery: ServiceDiscovery, defaultPort: Int, serviceName: String, portName: Option[String], protocol: Option[String], - resolveTimeout: FiniteDuration)(implicit val ec: ExecutionContext) + resolveTimeout: FiniteDuration, + refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem) extends NameResolver { + + private final val log = Logging(system, getClass) + override def getServiceAuthority: String = serviceName - val listener: Promise[Listener] = Promise() + private val listener: Promise[Listener] = Promise() + + // initialized after first resolve if needed + private val refreshTask = new AtomicReference[Cancellable] override def start(l: Listener): Unit = { + log.debug("Name resolver for {} started", serviceName) listener.trySuccess(l) - lookup(l) + lookup(l, evict = false) } - override def refresh(): Unit = + override def refresh(): Unit = refresh(false) + + private def refresh(evict: Boolean): Unit = listener.future.onComplete { - case Success(l) => lookup(l) + case Success(l) => + log.debug("Name resolver for {} refreshing", serviceName) + lookup(l, evict) case Failure(_) => // We never fail this promise } - def lookup(listener: Listener): Unit = { - discovery.lookup(Lookup(serviceName, portName, protocol), resolveTimeout).onComplete { + def lookup(listener: Listener, evict: Boolean): Unit = { + val request = { + val l = Lookup(serviceName, portName, protocol) + if (evict) l.withDiscardCache + else l + } + val result = discovery.lookup(request, resolveTimeout) + + result.onComplete { case Success(result) => try { + if (log.isDebugEnabled) + log.debug( + "Successful service discovery for service {}, found addresses: {}", + serviceName, + result.addresses.mkString(", ")) listener.onAddresses(addresses(result.addresses), Attributes.EMPTY) } catch { case e: UnknownHostException => - // TODO at least log + log.warning(e, s"Unknown host for service $serviceName") listener.onError(Status.UNKNOWN.withDescription(e.getMessage)) } case Failure(e) => - // TODO at least log + log.warning(e, s"Service discovery failed for service $serviceName") listener.onError(Status.UNKNOWN.withDescription(e.getMessage)) } + + // initialize refresh timer after first lookup, if configured + if (refreshInterval.isDefined && refreshTask.get() == null) { + result.onComplete { _ => + refreshInterval.foreach { interval => + val cancellable = system.scheduler.scheduleWithFixedDelay(interval, interval)(() => refresh(evict = true)) + if (!refreshTask.compareAndSet(null, cancellable)) { + // concurrent update beat us to it, there already is a scheduled task + cancellable.cancel() + } + } + } + } } @throws[UnknownHostException] @@ -67,16 +113,25 @@ class AkkaDiscoveryNameResolver( .asJava } - override def shutdown(): Unit = () + override def shutdown(): Unit = { + val refreshCancellable = refreshTask.get() + if (refreshCancellable ne null) refreshCancellable.cancel() + } } -object AkkaDiscoveryNameResolver { - def apply(settings: GrpcClientSettings)(implicit ec: ExecutionContext): AkkaDiscoveryNameResolver = +/** + * INTERNAL API + */ +@InternalApi +private[akka] object AkkaDiscoveryNameResolver { + def apply( + settings: GrpcClientSettings)(implicit ec: ExecutionContext, system: ActorSystem): AkkaDiscoveryNameResolver = new AkkaDiscoveryNameResolver( settings.serviceDiscovery, settings.defaultPort, settings.serviceName, settings.servicePortName, settings.serviceProtocol, - settings.resolveTimeout) + settings.resolveTimeout, + settings.discoveryRefreshInterval) } diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala index 995e98cfc..02f5609cb 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala @@ -4,21 +4,28 @@ package akka.grpc.internal -import java.net.URI +import akka.actor.ActorSystem +import akka.annotation.InternalApi +import java.net.URI import akka.discovery.ServiceDiscovery import io.grpc.{ NameResolver, NameResolverProvider } import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -class AkkaDiscoveryNameResolverProvider( +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class AkkaDiscoveryNameResolverProvider( discovery: ServiceDiscovery, defaultPort: Int, serviceName: String, portName: Option[String], protocol: Option[String], - resolveTimeout: FiniteDuration)(implicit ec: ExecutionContext) + resolveTimeout: FiniteDuration, + refreshInterval: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem) extends NameResolverProvider { override def isAvailable: Boolean = true @@ -27,6 +34,13 @@ class AkkaDiscoveryNameResolverProvider( override def getDefaultScheme: String = "http" override def newNameResolver(targetUri: URI, args: NameResolver.Args): AkkaDiscoveryNameResolver = { - new AkkaDiscoveryNameResolver(discovery, defaultPort, serviceName, portName, protocol, resolveTimeout) + new AkkaDiscoveryNameResolver( + discovery, + defaultPort, + serviceName, + portName, + protocol, + resolveTimeout, + refreshInterval) } } diff --git a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala index 4922e4a5a..bd7da7c0f 100644 --- a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala @@ -38,7 +38,7 @@ object ChannelUtils { implicit sys: ClassicActorSystemProvider): InternalChannel = { settings.backend match { case "netty" => - NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher) + NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher, sys.classicSystem) case "akka-http" => AkkaHttpClientUtils.createChannel(settings, log) case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]") diff --git a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala index c89997b05..55f594485 100644 --- a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala @@ -14,6 +14,7 @@ import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.Done import akka.NotUsed +import akka.actor.ActorSystem import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts import io.grpc.netty.shaded.io.grpc.netty.NegotiationType import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder @@ -48,7 +49,8 @@ object NettyClientUtils { */ @InternalApi def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)( - implicit ec: ExecutionContext): InternalChannel = { + implicit ec: ExecutionContext, + system: ActorSystem): InternalChannel = { @nowarn("msg=deprecated") var builder = @@ -66,7 +68,8 @@ object NettyClientUtils { settings.serviceName, settings.servicePortName, settings.serviceProtocol, - settings.resolveTimeout)) + settings.resolveTimeout, + settings.discoveryRefreshInterval)) if (!settings.useTls) builder = builder.usePlaintext() From cda9739cca15bf30202cbe84ac7a95088413424c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Tue, 19 Dec 2023 15:09:18 +0100 Subject: [PATCH 02/10] bump: Akka 2.9.1 for the piercing lookup API --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 09b3dae00..66c37101c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { // We don't force Akka updates because downstream projects can upgrade // themselves. For more information see // https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html - val akka = "2.9.1-M1+11-809c4920-SNAPSHOT" + val akka = "2.9.1" val akkaBinary = "2.9" val akkaHttp = "10.6.0" val akkaHttpBinary = "10.6" From ae0dedc341d3fa64340bc3c836c8ab3caef9f9d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Tue, 19 Dec 2023 15:24:00 +0100 Subject: [PATCH 03/10] Mima, wrong setting name --- .../periodic-client-discovery.excludes | 14 ++++++++++++++ .../main/scala/akka/grpc/GrpcClientSettings.scala | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes diff --git a/runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes b/runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes new file mode 100644 index 000000000..fce56a646 --- /dev/null +++ b/runtime/src/main/mima-filters/2.4.0.backwards.excludes/periodic-client-discovery.excludes @@ -0,0 +1,14 @@ +# private +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.GrpcClientSettings.this") + +# internal +ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.listener") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.lookup") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply") +ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel") \ No newline at end of file diff --git a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala index fd6fd2928..47c683bf0 100644 --- a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala +++ b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala @@ -152,7 +152,7 @@ object GrpcClientSettings { getOptionalString(clientConfiguration, "load-balancing-policy"), clientConfiguration.getString("backend"), identity, - getOptionalDuration(clientConfiguration, "discovery.refresh-interval")) + getOptionalDuration(clientConfiguration, "service-discovery.refresh-interval")) private def getOptionalString(config: Config, path: String): Option[String] = config.getString(path) match { From 31b4f413741899ca86530f0182f0062d31588866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Tue, 19 Dec 2023 15:43:29 +0100 Subject: [PATCH 04/10] Good if all tests compile also --- .../grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala b/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala index dfae3e42d..c78a2afa9 100644 --- a/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala +++ b/runtime/src/test/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProviderSpec.scala @@ -54,7 +54,8 @@ class AkkaDiscoveryNameResolverProviderSpec serviceName = serviceName, portName = None, protocol = None, - resolveTimeout = 3.seconds) + resolveTimeout = 3.seconds, + None) val resolver = provider.newNameResolver(new URI("//" + serviceName), null) From c528b90a4b9eeda3b933a0f31c0c30ac8892533d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Tue, 19 Dec 2023 15:50:19 +0100 Subject: [PATCH 05/10] String logsource to make Scala 3 happy --- .../scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala index 949dd6033..c7def5a8a 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala @@ -35,7 +35,7 @@ private[akka] final class AkkaDiscoveryNameResolver( refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem) extends NameResolver { - private final val log = Logging(system, getClass) + private final val log = Logging(system, "akka.grpc.internal.AkkaDiscoveryNameResolver") override def getServiceAuthority: String = serviceName From 3e559a093ae098a6a3f5c75a539c946531cc1c13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Tue, 19 Dec 2023 16:38:50 +0100 Subject: [PATCH 06/10] Akka 2.9.1 in all the places --- plugin-tester-java/build.gradle | 2 +- plugin-tester-java/pom.xml | 2 +- plugin-tester-scala/build.gradle | 2 +- plugin-tester-scala/pom.xml | 2 +- .../gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin-tester-java/build.gradle b/plugin-tester-java/build.gradle index 683de2a6d..a6d647236 100644 --- a/plugin-tester-java/build.gradle +++ b/plugin-tester-java/build.gradle @@ -37,7 +37,7 @@ repositories { def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13") def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}" -def akkaVersion = "2.9.0" +def akkaVersion = "2.9.1" dependencies { implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3' diff --git a/plugin-tester-java/pom.xml b/plugin-tester-java/pom.xml index 0df034bcf..37e9b61d8 100644 --- a/plugin-tester-java/pom.xml +++ b/plugin-tester-java/pom.xml @@ -15,7 +15,7 @@ <maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version> <maven-exec-plugin.version>3.0.0</maven-exec-plugin.version> <akka.http.cors.version>1.1.0</akka.http.cors.version> - <akka.version>2.9.0</akka.version> + <akka.version>2.9.1</akka.version> <grpc.version>1.60.2</grpc.version> <!-- checked synced by VersionSyncCheckPlugin --> <project.encoding>UTF-8</project.encoding> <build-helper-maven-plugin>3.3.0</build-helper-maven-plugin> diff --git a/plugin-tester-scala/build.gradle b/plugin-tester-scala/build.gradle index 1856234d0..e7bac98eb 100644 --- a/plugin-tester-scala/build.gradle +++ b/plugin-tester-scala/build.gradle @@ -33,7 +33,7 @@ repositories { def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13") def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}" -def akkaVersion = "2.9.0" +def akkaVersion = "2.9.1" dependencies { implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3' diff --git a/plugin-tester-scala/pom.xml b/plugin-tester-scala/pom.xml index 8de9abba2..69b794613 100644 --- a/plugin-tester-scala/pom.xml +++ b/plugin-tester-scala/pom.xml @@ -12,7 +12,7 @@ <properties> <maven.compiler.release>11</maven.compiler.release> - <akka.version>2.9.0</akka.version> + <akka.version>2.9.1</akka.version> <akka.http.cors.version>0.4.2</akka.http.cors.version> <grpc.version>1.60.2</grpc.version> <!-- checked synced by VersionSyncCheckPlugin --> <project.encoding>UTF-8</project.encoding> diff --git a/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt b/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt index 5ec738ad3..c4c0da9f4 100644 --- a/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt +++ b/sbt-plugin/src/sbt-test/gen-scala-server/07-gen-basic-server-with-akka-27/build.sbt @@ -4,7 +4,7 @@ resolvers += "Akka library repository".at("https://repo.akka.io/maven") enablePlugins(AkkaGrpcPlugin) -dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % "2.9.0" +dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % "2.9.1" assembly / assemblyMergeStrategy := { // https://github.com/akka/akka/issues/29456 From f9b88bcf19d94c6317097e8743375479623a668c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Mon, 19 Feb 2024 08:36:12 +0100 Subject: [PATCH 07/10] unused import --- native-image-tests/grpc-scala/build.sbt | 34 ++++++ .../grpc-scala/project/build.properties | 1 + .../grpc-scala/project/plugins.sbt | 4 + .../grpc-scala/src/main/java/Empty.java | 4 + .../src/main/protobuf/helloworld.proto | 31 ++++++ .../src/main/resources/application.conf | 8 ++ .../src/main/resources/certs/ca.pem | 15 +++ .../src/main/resources/certs/server1.key | 16 +++ .../src/main/resources/certs/server1.pem | 16 +++ .../grpc-scala/src/main/resources/logback.xml | 21 ++++ .../example/helloworld/GreeterClient.scala | 78 +++++++++++++ .../example/helloworld/GreeterServer.scala | 103 ++++++++++++++++++ .../helloworld/GreeterServiceImpl.scala | 41 +++++++ .../akka/grpc/internal/NettyClientUtils.scala | 14 +-- 14 files changed, 379 insertions(+), 7 deletions(-) create mode 100644 native-image-tests/grpc-scala/build.sbt create mode 100644 native-image-tests/grpc-scala/project/build.properties create mode 100644 native-image-tests/grpc-scala/project/plugins.sbt create mode 100644 native-image-tests/grpc-scala/src/main/java/Empty.java create mode 100644 native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto create mode 100644 native-image-tests/grpc-scala/src/main/resources/application.conf create mode 100644 native-image-tests/grpc-scala/src/main/resources/certs/ca.pem create mode 100644 native-image-tests/grpc-scala/src/main/resources/certs/server1.key create mode 100644 native-image-tests/grpc-scala/src/main/resources/certs/server1.pem create mode 100644 native-image-tests/grpc-scala/src/main/resources/logback.xml create mode 100644 native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala create mode 100644 native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala create mode 100644 native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala diff --git a/native-image-tests/grpc-scala/build.sbt b/native-image-tests/grpc-scala/build.sbt new file mode 100644 index 000000000..338f5af3d --- /dev/null +++ b/native-image-tests/grpc-scala/build.sbt @@ -0,0 +1,34 @@ +name := "akka-grpc-quickstart-scala" + +version := "1.0" + +scalaVersion := "2.13.12" + +lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.1") +lazy val akkaGrpcVersion = sys.props.getOrElse("akka.grpc.version", "2.4.0") + +enablePlugins(AkkaGrpcPlugin) + +// Run in a separate JVM, to make sure sbt waits until all threads have +// finished before returning. +// If you want to keep the application running while executing other +// sbt tasks, consider https://github.com/spray/sbt-revolver/ +fork := true + +resolvers += "Akka library repository".at("https://repo.akka.io/maven") + +// GraalVM native image build +enablePlugins(NativeImagePlugin) +nativeImageJvm := "graalvm-community" +nativeImageVersion := "21.0.2" +nativeImageOptions := Seq("--no-fallback", "--verbose") + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-stream" % akkaVersion, + "com.typesafe.akka" %% "akka-discovery" % akkaVersion, + "com.typesafe.akka" %% "akka-pki" % akkaVersion, + "ch.qos.logback" % "logback-classic" % "1.2.3", + "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test, + "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, + "org.scalatest" %% "scalatest" % "3.2.12" % Test) diff --git a/native-image-tests/grpc-scala/project/build.properties b/native-image-tests/grpc-scala/project/build.properties new file mode 100644 index 000000000..8cf07b7c2 --- /dev/null +++ b/native-image-tests/grpc-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.8 \ No newline at end of file diff --git a/native-image-tests/grpc-scala/project/plugins.sbt b/native-image-tests/grpc-scala/project/plugins.sbt new file mode 100644 index 000000000..15aa671e5 --- /dev/null +++ b/native-image-tests/grpc-scala/project/plugins.sbt @@ -0,0 +1,4 @@ +resolvers += "Akka library repository".at("https://repo.akka.io/maven") + +addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.4.0") +addSbtPlugin("org.scalameta" % "sbt-native-image" % "0.3.4") diff --git a/native-image-tests/grpc-scala/src/main/java/Empty.java b/native-image-tests/grpc-scala/src/main/java/Empty.java new file mode 100644 index 000000000..220a89df9 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/java/Empty.java @@ -0,0 +1,4 @@ +public class Empty { + // Just an empty class + // because gradle complained about not having any Java files to compile? +} diff --git a/native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto b/native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto new file mode 100644 index 000000000..c444c4ea3 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto @@ -0,0 +1,31 @@ +//#service-request-reply +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.example.helloworld"; +option java_outer_classname = "HelloWorldProto"; + +// The greeting service definition. +service GreeterService { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} + //#service-request-reply + //#service-stream + // The stream of incoming HelloRequest messages are + // sent out as corresponding HelloReply. From + // all clients to all clients, like a chat room. + rpc SayHelloToAll (stream HelloRequest) returns (stream HelloReply) {} + //#service-stream + //#service-request-reply +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} +//#service-request-reply diff --git a/native-image-tests/grpc-scala/src/main/resources/application.conf b/native-image-tests/grpc-scala/src/main/resources/application.conf new file mode 100644 index 000000000..0cf9442ee --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/resources/application.conf @@ -0,0 +1,8 @@ +akka.grpc.client { + "helloworld.GreeterService" { + host = 127.0.0.1 + port = 8080 + override-authority = foo.test.google.fr + trusted = /certs/ca.pem + } +} diff --git a/native-image-tests/grpc-scala/src/main/resources/certs/ca.pem b/native-image-tests/grpc-scala/src/main/resources/certs/ca.pem new file mode 100644 index 000000000..6c8511a73 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/resources/certs/ca.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla +Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 +YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT +BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 ++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu +g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd +Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV +HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau +sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m +oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG +Dfcog5wrJytaQ6UA0wE= +-----END CERTIFICATE----- diff --git a/native-image-tests/grpc-scala/src/main/resources/certs/server1.key b/native-image-tests/grpc-scala/src/main/resources/certs/server1.key new file mode 100644 index 000000000..143a5b876 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/resources/certs/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/native-image-tests/grpc-scala/src/main/resources/certs/server1.pem b/native-image-tests/grpc-scala/src/main/resources/certs/server1.pem new file mode 100644 index 000000000..f3d43fcc5 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/resources/certs/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx +MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50 +ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco +LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg +zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd +9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw +CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy +em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G +CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6 +hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh +y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8 +-----END CERTIFICATE----- diff --git a/native-image-tests/grpc-scala/src/main/resources/logback.xml b/native-image-tests/grpc-scala/src/main/resources/logback.xml new file mode 100644 index 000000000..203596da1 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/resources/logback.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <!-- This is a development logging configuration that logs to standard out, for an example of a production + logging config, see the Akka docs: https://doc.akka.io/docs/akka/2.6/typed/logging.html#logback --> + <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern> + </encoder> + </appender> + + <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> + <queueSize>1024</queueSize> + <neverBlock>true</neverBlock> + <appender-ref ref="STDOUT" /> + </appender> + + <root level="INFO"> + <appender-ref ref="ASYNC"/> + </root> + +</configuration> diff --git a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala new file mode 100644 index 000000000..fc1f83431 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala @@ -0,0 +1,78 @@ +package com.example.helloworld + +//#import +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Failure +import scala.util.Success +import akka.Done +import akka.NotUsed +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.grpc.GrpcClientSettings +import akka.stream.scaladsl.Source + +//#import + +//#client-request-reply +object GreeterClient { + + def main(args: Array[String]): Unit = { + implicit val sys: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty[Nothing], "GreeterClient") + implicit val ec: ExecutionContext = sys.executionContext + + val client = GreeterServiceClient(GrpcClientSettings.fromConfig("helloworld.GreeterService")) + + val names = + if (args.isEmpty) List("Alice", "Bob") + else args.toList + + names.foreach(singleRequestReply) + + //#client-request-reply + if (args.nonEmpty) + names.foreach(streamingBroadcast) + //#client-request-reply + + def singleRequestReply(name: String): Unit = { + println(s"Performing request: $name") + val reply = client.sayHello(HelloRequest(name)) + reply.onComplete { + case Success(msg) => + println(msg) + case Failure(e) => + println(s"Error: $e") + } + } + + //#client-request-reply + //#client-stream + def streamingBroadcast(name: String): Unit = { + println(s"Performing streaming requests: $name") + + val requestStream: Source[HelloRequest, NotUsed] = + Source + .tick(1.second, 1.second, "tick") + .zipWithIndex + .map { case (_, i) => i } + .map(i => HelloRequest(s"$name-$i")) + .mapMaterializedValue(_ => NotUsed) + + val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream) + val done: Future[Done] = + responseStream.runForeach(reply => println(s"$name got streaming reply: ${reply.message}")) + + done.onComplete { + case Success(_) => + println("streamingBroadcast done") + case Failure(e) => + println(s"Error streamingBroadcast: $e") + } + } + //#client-stream + //#client-request-reply + + } + +} +//#client-request-reply diff --git a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala new file mode 100644 index 000000000..10d367cc3 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala @@ -0,0 +1,103 @@ +package com.example.helloworld + +//#import + + +import java.security.KeyStore +import java.security.SecureRandom +import java.security.cert.Certificate +import java.security.cert.CertificateFactory + +import scala.io.Source + +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors +import akka.http.scaladsl.ConnectionContext +import akka.http.scaladsl.Http +import akka.http.scaladsl.HttpsConnectionContext +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.pki.pem.DERPrivateKeyLoader +import akka.pki.pem.PEMDecoder +import com.typesafe.config.ConfigFactory +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SSLContext + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.Failure +import scala.util.Success +import scala.concurrent.duration._ +//#import + + +//#server +object GreeterServer { + + def main(args: Array[String]): Unit = { + // important to enable HTTP/2 in ActorSystem's config + val conf = ConfigFactory.parseString("akka.http.server.enable-http2 = on") + .withFallback(ConfigFactory.defaultApplication()) + val system = ActorSystem[Nothing](Behaviors.empty[Nothing], "GreeterServer", conf) + new GreeterServer(system).run() + } +} + +class GreeterServer(system: ActorSystem[_]) { + + def run(): Future[Http.ServerBinding] = { + implicit val sys = system + implicit val ec: ExecutionContext = system.executionContext + + val service: HttpRequest => Future[HttpResponse] = + GreeterServiceHandler(new GreeterServiceImpl(system)) + + val bound: Future[Http.ServerBinding] = Http()(system) + .newServerAt(interface = "127.0.0.1", port = 8080) + .enableHttps(serverHttpContext) + .bind(service) + .map(_.addToCoordinatedShutdown(hardTerminationDeadline = 10.seconds)) + + bound.onComplete { + case Success(binding) => + val address = binding.localAddress + println(s"gRPC server bound to ${address.getHostString}:${address.getPort}") + case Failure(ex) => + println("Failed to bind gRPC endpoint, terminating system") + ex.printStackTrace() + system.terminate() + } + + bound + } + //#server + + + private def serverHttpContext: HttpsConnectionContext = { + val privateKey = + DERPrivateKeyLoader.load(PEMDecoder.decode(readPrivateKeyPem())) + val fact = CertificateFactory.getInstance("X.509") + val cer = fact.generateCertificate( + classOf[GreeterServer].getResourceAsStream("/certs/server1.pem") + ) + val ks = KeyStore.getInstance("PKCS12") + ks.load(null) + ks.setKeyEntry( + "private", + privateKey, + new Array[Char](0), + Array[Certificate](cer) + ) + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(ks, null) + val context = SSLContext.getInstance("TLS") + context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom) + ConnectionContext.httpsServer(context) + } + + private def readPrivateKeyPem(): String = + Source.fromResource("certs/server1.key").mkString + //#server + +} +//#server diff --git a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala new file mode 100644 index 000000000..b9ed19a84 --- /dev/null +++ b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala @@ -0,0 +1,41 @@ +package com.example.helloworld + +//#import +import scala.concurrent.Future + +import akka.NotUsed +import akka.actor.typed.ActorSystem +import akka.stream.scaladsl.BroadcastHub +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.MergeHub +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +//#import + +//#service-request-reply +//#service-stream +class GreeterServiceImpl(system: ActorSystem[_]) extends GreeterService { + private implicit val sys: ActorSystem[_] = system + + //#service-request-reply + val (inboundHub: Sink[HelloRequest, NotUsed], outboundHub: Source[HelloReply, NotUsed]) = + MergeHub.source[HelloRequest] + .map(request => HelloReply(s"Hello, ${request.name}")) + .toMat(BroadcastHub.sink[HelloReply])(Keep.both) + .run() + //#service-request-reply + + override def sayHello(request: HelloRequest): Future[HelloReply] = { + Future.successful(HelloReply(s"Hello, ${request.name}")) + } + + //#service-request-reply + override def sayHelloToAll(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { + in.runWith(inboundHub) + outboundHub + } + //#service-request-reply +} +//#service-stream +//#service-request-reply diff --git a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala index 55f594485..c880ea7be 100644 --- a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala @@ -4,6 +4,9 @@ package akka.grpc.internal +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.event.LoggingAdapter import akka.grpc.GrpcClientSettings @@ -12,29 +15,26 @@ import akka.grpc.GrpcSingleResponse import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source -import akka.Done -import akka.NotUsed -import akka.actor.ActorSystem +import io.grpc.CallOptions +import io.grpc.MethodDescriptor import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts import io.grpc.netty.shaded.io.grpc.netty.NegotiationType import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder +import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig.Protocol import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig.SelectedListenerFailureBehavior import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig.SelectorFailureBehavior -import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolConfig import io.grpc.netty.shaded.io.netty.handler.ssl.ApplicationProtocolNames import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder -import io.grpc.CallOptions -import io.grpc.MethodDescriptor import java.util.concurrent.TimeUnit import javax.net.ssl.SSLContext import scala.annotation.nowarn -import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise +import scala.concurrent.duration.FiniteDuration import scala.util.Failure import scala.util.Success From f0784af4282f368169bb7d7e8ef53decd981def3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Mon, 19 Feb 2024 15:07:55 +0100 Subject: [PATCH 08/10] Accidental commit of native image stuff --- native-image-tests/grpc-scala/build.sbt | 34 ------ .../grpc-scala/project/build.properties | 1 - .../grpc-scala/project/plugins.sbt | 4 - .../grpc-scala/src/main/java/Empty.java | 4 - .../src/main/protobuf/helloworld.proto | 31 ------ .../src/main/resources/application.conf | 8 -- .../src/main/resources/certs/ca.pem | 15 --- .../src/main/resources/certs/server1.key | 16 --- .../src/main/resources/certs/server1.pem | 16 --- .../grpc-scala/src/main/resources/logback.xml | 21 ---- .../example/helloworld/GreeterClient.scala | 78 ------------- .../example/helloworld/GreeterServer.scala | 103 ------------------ .../helloworld/GreeterServiceImpl.scala | 41 ------- 13 files changed, 372 deletions(-) delete mode 100644 native-image-tests/grpc-scala/build.sbt delete mode 100644 native-image-tests/grpc-scala/project/build.properties delete mode 100644 native-image-tests/grpc-scala/project/plugins.sbt delete mode 100644 native-image-tests/grpc-scala/src/main/java/Empty.java delete mode 100644 native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto delete mode 100644 native-image-tests/grpc-scala/src/main/resources/application.conf delete mode 100644 native-image-tests/grpc-scala/src/main/resources/certs/ca.pem delete mode 100644 native-image-tests/grpc-scala/src/main/resources/certs/server1.key delete mode 100644 native-image-tests/grpc-scala/src/main/resources/certs/server1.pem delete mode 100644 native-image-tests/grpc-scala/src/main/resources/logback.xml delete mode 100644 native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala delete mode 100644 native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala delete mode 100644 native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala diff --git a/native-image-tests/grpc-scala/build.sbt b/native-image-tests/grpc-scala/build.sbt deleted file mode 100644 index 338f5af3d..000000000 --- a/native-image-tests/grpc-scala/build.sbt +++ /dev/null @@ -1,34 +0,0 @@ -name := "akka-grpc-quickstart-scala" - -version := "1.0" - -scalaVersion := "2.13.12" - -lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.1") -lazy val akkaGrpcVersion = sys.props.getOrElse("akka.grpc.version", "2.4.0") - -enablePlugins(AkkaGrpcPlugin) - -// Run in a separate JVM, to make sure sbt waits until all threads have -// finished before returning. -// If you want to keep the application running while executing other -// sbt tasks, consider https://github.com/spray/sbt-revolver/ -fork := true - -resolvers += "Akka library repository".at("https://repo.akka.io/maven") - -// GraalVM native image build -enablePlugins(NativeImagePlugin) -nativeImageJvm := "graalvm-community" -nativeImageVersion := "21.0.2" -nativeImageOptions := Seq("--no-fallback", "--verbose") - -libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, - "com.typesafe.akka" %% "akka-stream" % akkaVersion, - "com.typesafe.akka" %% "akka-discovery" % akkaVersion, - "com.typesafe.akka" %% "akka-pki" % akkaVersion, - "ch.qos.logback" % "logback-classic" % "1.2.3", - "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test, - "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, - "org.scalatest" %% "scalatest" % "3.2.12" % Test) diff --git a/native-image-tests/grpc-scala/project/build.properties b/native-image-tests/grpc-scala/project/build.properties deleted file mode 100644 index 8cf07b7c2..000000000 --- a/native-image-tests/grpc-scala/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=1.9.8 \ No newline at end of file diff --git a/native-image-tests/grpc-scala/project/plugins.sbt b/native-image-tests/grpc-scala/project/plugins.sbt deleted file mode 100644 index 15aa671e5..000000000 --- a/native-image-tests/grpc-scala/project/plugins.sbt +++ /dev/null @@ -1,4 +0,0 @@ -resolvers += "Akka library repository".at("https://repo.akka.io/maven") - -addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.4.0") -addSbtPlugin("org.scalameta" % "sbt-native-image" % "0.3.4") diff --git a/native-image-tests/grpc-scala/src/main/java/Empty.java b/native-image-tests/grpc-scala/src/main/java/Empty.java deleted file mode 100644 index 220a89df9..000000000 --- a/native-image-tests/grpc-scala/src/main/java/Empty.java +++ /dev/null @@ -1,4 +0,0 @@ -public class Empty { - // Just an empty class - // because gradle complained about not having any Java files to compile? -} diff --git a/native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto b/native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto deleted file mode 100644 index c444c4ea3..000000000 --- a/native-image-tests/grpc-scala/src/main/protobuf/helloworld.proto +++ /dev/null @@ -1,31 +0,0 @@ -//#service-request-reply -syntax = "proto3"; - -option java_multiple_files = true; -option java_package = "com.example.helloworld"; -option java_outer_classname = "HelloWorldProto"; - -// The greeting service definition. -service GreeterService { - // Sends a greeting - rpc SayHello (HelloRequest) returns (HelloReply) {} - //#service-request-reply - //#service-stream - // The stream of incoming HelloRequest messages are - // sent out as corresponding HelloReply. From - // all clients to all clients, like a chat room. - rpc SayHelloToAll (stream HelloRequest) returns (stream HelloReply) {} - //#service-stream - //#service-request-reply -} - -// The request message containing the user's name. -message HelloRequest { - string name = 1; -} - -// The response message containing the greetings -message HelloReply { - string message = 1; -} -//#service-request-reply diff --git a/native-image-tests/grpc-scala/src/main/resources/application.conf b/native-image-tests/grpc-scala/src/main/resources/application.conf deleted file mode 100644 index 0cf9442ee..000000000 --- a/native-image-tests/grpc-scala/src/main/resources/application.conf +++ /dev/null @@ -1,8 +0,0 @@ -akka.grpc.client { - "helloworld.GreeterService" { - host = 127.0.0.1 - port = 8080 - override-authority = foo.test.google.fr - trusted = /certs/ca.pem - } -} diff --git a/native-image-tests/grpc-scala/src/main/resources/certs/ca.pem b/native-image-tests/grpc-scala/src/main/resources/certs/ca.pem deleted file mode 100644 index 6c8511a73..000000000 --- a/native-image-tests/grpc-scala/src/main/resources/certs/ca.pem +++ /dev/null @@ -1,15 +0,0 @@ ------BEGIN CERTIFICATE----- -MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV -BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX -aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla -Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 -YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT -BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 -+L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu -g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd -Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV -HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau -sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m -oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG -Dfcog5wrJytaQ6UA0wE= ------END CERTIFICATE----- diff --git a/native-image-tests/grpc-scala/src/main/resources/certs/server1.key b/native-image-tests/grpc-scala/src/main/resources/certs/server1.key deleted file mode 100644 index 143a5b876..000000000 --- a/native-image-tests/grpc-scala/src/main/resources/certs/server1.key +++ /dev/null @@ -1,16 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD -M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf -3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY -AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm -V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY -tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p -dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q -K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR -81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff -DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd -aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 -ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 -XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe -F98XJ7tIFfJq ------END PRIVATE KEY----- diff --git a/native-image-tests/grpc-scala/src/main/resources/certs/server1.pem b/native-image-tests/grpc-scala/src/main/resources/certs/server1.pem deleted file mode 100644 index f3d43fcc5..000000000 --- a/native-image-tests/grpc-scala/src/main/resources/certs/server1.pem +++ /dev/null @@ -1,16 +0,0 @@ ------BEGIN CERTIFICATE----- -MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET -MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ -dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx -MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV -BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50 -ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco -LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg -zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd -9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw -CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy -em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G -CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6 -hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh -y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8 ------END CERTIFICATE----- diff --git a/native-image-tests/grpc-scala/src/main/resources/logback.xml b/native-image-tests/grpc-scala/src/main/resources/logback.xml deleted file mode 100644 index 203596da1..000000000 --- a/native-image-tests/grpc-scala/src/main/resources/logback.xml +++ /dev/null @@ -1,21 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - <!-- This is a development logging configuration that logs to standard out, for an example of a production - logging config, see the Akka docs: https://doc.akka.io/docs/akka/2.6/typed/logging.html#logback --> - <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern> - </encoder> - </appender> - - <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> - <queueSize>1024</queueSize> - <neverBlock>true</neverBlock> - <appender-ref ref="STDOUT" /> - </appender> - - <root level="INFO"> - <appender-ref ref="ASYNC"/> - </root> - -</configuration> diff --git a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala deleted file mode 100644 index fc1f83431..000000000 --- a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterClient.scala +++ /dev/null @@ -1,78 +0,0 @@ -package com.example.helloworld - -//#import -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Failure -import scala.util.Success -import akka.Done -import akka.NotUsed -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.grpc.GrpcClientSettings -import akka.stream.scaladsl.Source - -//#import - -//#client-request-reply -object GreeterClient { - - def main(args: Array[String]): Unit = { - implicit val sys: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty[Nothing], "GreeterClient") - implicit val ec: ExecutionContext = sys.executionContext - - val client = GreeterServiceClient(GrpcClientSettings.fromConfig("helloworld.GreeterService")) - - val names = - if (args.isEmpty) List("Alice", "Bob") - else args.toList - - names.foreach(singleRequestReply) - - //#client-request-reply - if (args.nonEmpty) - names.foreach(streamingBroadcast) - //#client-request-reply - - def singleRequestReply(name: String): Unit = { - println(s"Performing request: $name") - val reply = client.sayHello(HelloRequest(name)) - reply.onComplete { - case Success(msg) => - println(msg) - case Failure(e) => - println(s"Error: $e") - } - } - - //#client-request-reply - //#client-stream - def streamingBroadcast(name: String): Unit = { - println(s"Performing streaming requests: $name") - - val requestStream: Source[HelloRequest, NotUsed] = - Source - .tick(1.second, 1.second, "tick") - .zipWithIndex - .map { case (_, i) => i } - .map(i => HelloRequest(s"$name-$i")) - .mapMaterializedValue(_ => NotUsed) - - val responseStream: Source[HelloReply, NotUsed] = client.sayHelloToAll(requestStream) - val done: Future[Done] = - responseStream.runForeach(reply => println(s"$name got streaming reply: ${reply.message}")) - - done.onComplete { - case Success(_) => - println("streamingBroadcast done") - case Failure(e) => - println(s"Error streamingBroadcast: $e") - } - } - //#client-stream - //#client-request-reply - - } - -} -//#client-request-reply diff --git a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala deleted file mode 100644 index 10d367cc3..000000000 --- a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServer.scala +++ /dev/null @@ -1,103 +0,0 @@ -package com.example.helloworld - -//#import - - -import java.security.KeyStore -import java.security.SecureRandom -import java.security.cert.Certificate -import java.security.cert.CertificateFactory - -import scala.io.Source - -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.http.scaladsl.ConnectionContext -import akka.http.scaladsl.Http -import akka.http.scaladsl.HttpsConnectionContext -import akka.http.scaladsl.model.HttpRequest -import akka.http.scaladsl.model.HttpResponse -import akka.pki.pem.DERPrivateKeyLoader -import akka.pki.pem.PEMDecoder -import com.typesafe.config.ConfigFactory -import javax.net.ssl.KeyManagerFactory -import javax.net.ssl.SSLContext - -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success -import scala.concurrent.duration._ -//#import - - -//#server -object GreeterServer { - - def main(args: Array[String]): Unit = { - // important to enable HTTP/2 in ActorSystem's config - val conf = ConfigFactory.parseString("akka.http.server.enable-http2 = on") - .withFallback(ConfigFactory.defaultApplication()) - val system = ActorSystem[Nothing](Behaviors.empty[Nothing], "GreeterServer", conf) - new GreeterServer(system).run() - } -} - -class GreeterServer(system: ActorSystem[_]) { - - def run(): Future[Http.ServerBinding] = { - implicit val sys = system - implicit val ec: ExecutionContext = system.executionContext - - val service: HttpRequest => Future[HttpResponse] = - GreeterServiceHandler(new GreeterServiceImpl(system)) - - val bound: Future[Http.ServerBinding] = Http()(system) - .newServerAt(interface = "127.0.0.1", port = 8080) - .enableHttps(serverHttpContext) - .bind(service) - .map(_.addToCoordinatedShutdown(hardTerminationDeadline = 10.seconds)) - - bound.onComplete { - case Success(binding) => - val address = binding.localAddress - println(s"gRPC server bound to ${address.getHostString}:${address.getPort}") - case Failure(ex) => - println("Failed to bind gRPC endpoint, terminating system") - ex.printStackTrace() - system.terminate() - } - - bound - } - //#server - - - private def serverHttpContext: HttpsConnectionContext = { - val privateKey = - DERPrivateKeyLoader.load(PEMDecoder.decode(readPrivateKeyPem())) - val fact = CertificateFactory.getInstance("X.509") - val cer = fact.generateCertificate( - classOf[GreeterServer].getResourceAsStream("/certs/server1.pem") - ) - val ks = KeyStore.getInstance("PKCS12") - ks.load(null) - ks.setKeyEntry( - "private", - privateKey, - new Array[Char](0), - Array[Certificate](cer) - ) - val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") - keyManagerFactory.init(ks, null) - val context = SSLContext.getInstance("TLS") - context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom) - ConnectionContext.httpsServer(context) - } - - private def readPrivateKeyPem(): String = - Source.fromResource("certs/server1.key").mkString - //#server - -} -//#server diff --git a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala b/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala deleted file mode 100644 index b9ed19a84..000000000 --- a/native-image-tests/grpc-scala/src/main/scala/com/example/helloworld/GreeterServiceImpl.scala +++ /dev/null @@ -1,41 +0,0 @@ -package com.example.helloworld - -//#import -import scala.concurrent.Future - -import akka.NotUsed -import akka.actor.typed.ActorSystem -import akka.stream.scaladsl.BroadcastHub -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.MergeHub -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source - -//#import - -//#service-request-reply -//#service-stream -class GreeterServiceImpl(system: ActorSystem[_]) extends GreeterService { - private implicit val sys: ActorSystem[_] = system - - //#service-request-reply - val (inboundHub: Sink[HelloRequest, NotUsed], outboundHub: Source[HelloReply, NotUsed]) = - MergeHub.source[HelloRequest] - .map(request => HelloReply(s"Hello, ${request.name}")) - .toMat(BroadcastHub.sink[HelloReply])(Keep.both) - .run() - //#service-request-reply - - override def sayHello(request: HelloRequest): Future[HelloReply] = { - Future.successful(HelloReply(s"Hello, ${request.name}")) - } - - //#service-request-reply - override def sayHelloToAll(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { - in.runWith(inboundHub) - outboundHub - } - //#service-request-reply -} -//#service-stream -//#service-request-reply From 9043dd5744bf3e0cebc41e36edd65d62b098f666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Mon, 19 Feb 2024 15:53:12 +0100 Subject: [PATCH 09/10] Review feedback --- .../scala/akka/grpc/GrpcClientSettings.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala index 47c683bf0..896dd16fe 100644 --- a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala +++ b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala @@ -16,11 +16,9 @@ import io.grpc.CallCredentials import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider -import java.util.Optional import javax.net.ssl.{ SSLContext, TrustManager } import scala.collection.immutable import scala.concurrent.duration.{ Duration, _ } -import scala.jdk.OptionConverters.RichOptional object GrpcClientSettings { @@ -167,7 +165,7 @@ object GrpcClientSettings { } private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] = - config.getString(path) match { + Helpers.toRootLowerCase(config.getString(path)) match { case "off" => None case _ => Some(config.getDuration(path).asScala) } @@ -297,25 +295,25 @@ final class GrpcClientSettings private ( /** * Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries - * if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead - * only does refresh when the client implementation decides to. + * if the discovery mechanism supports that. The default is no periodic refresh and instead + * * only does refresh when the client implementation decides to. * * Currently only supported by the Netty client backend. */ @ApiMayChange - def withDiscoveryRefreshInterval(refreshInterval: Option[FiniteDuration]): GrpcClientSettings = - copy(discoveryRefreshInterval = refreshInterval) + def withDiscoveryRefreshInterval(refreshInterval: FiniteDuration): GrpcClientSettings = + copy(discoveryRefreshInterval = Some(refreshInterval)) /** * Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries - * if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead + * if the discovery mechanism supports that. The default is no periodic refresh and instead * only does refresh when the client implementation decides to. * * Currently only supported by the Netty client backend. */ @ApiMayChange - def withDiscoveryRefreshInterval(refreshInterval: Optional[java.time.Duration]): GrpcClientSettings = - copy(discoveryRefreshInterval = refreshInterval.map(_.asScala).toScala) + def withDiscoveryRefreshInterval(refreshInterval: java.time.Duration): GrpcClientSettings = + copy(discoveryRefreshInterval = Some(refreshInterval.asScala)) private def copy( serviceName: String = serviceName, From d8e04ebc263dbabb198c2a12d39bc3bc42b92002 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= <johan@markatta.com> Date: Wed, 28 Feb 2024 16:17:58 +0100 Subject: [PATCH 10/10] Proper logging --- .../scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala index c7def5a8a..dac3f9f74 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala @@ -79,11 +79,11 @@ private[akka] final class AkkaDiscoveryNameResolver( listener.onAddresses(addresses(result.addresses), Attributes.EMPTY) } catch { case e: UnknownHostException => - log.warning(e, s"Unknown host for service $serviceName") + log.warning(e, "Unknown host for service {}", serviceName) listener.onError(Status.UNKNOWN.withDescription(e.getMessage)) } case Failure(e) => - log.warning(e, s"Service discovery failed for service $serviceName") + log.warning(e, "Service discovery failed for service {}", serviceName) listener.onError(Status.UNKNOWN.withDescription(e.getMessage)) }