diff --git a/README.md b/README.md index 5acc2b7..ae9b6d0 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,24 @@ ### Getting Started -The `kamon-http4s` module brings traces and metrics to your [http4s][4] based applications. +The `kamon-http4s-` module brings traces and metrics to your [http4s][4] based applications. -Kamon kamon-http4s is currently available for Scala 2.12 and 2.13. +It is currently available for Scala 2.12 and 2.13. The current version supports Kamon 2.2 and +is published for http4s 0.22, 0.23 and 1.0. -Supported releases and dependencies are shown below. +| kamon | kamon-http4s | status | jdk | scala | http4s +|:-----:|:------:|:------:|:----:|--------------:|------- +| 2.2.x | 2.2.0 | stable | 8+ | 2.12, 2.13 | 0.22.x +| 2.2.x | 2.2.0 | stable | 8+ | 2.12, 2.13 | 0.23.x +| 2.2.x | 2.2.0 | stable | 8+ | 2.12, 2.13 | 1.0.x + +To get started with sbt, simply add the following to your `build.sbt` file, for instance for http4s 0.23: + +```scala +libraryDependencies += "io.kamon" %% "kamon-http4s-0.23" % "2.2.0" +``` + +The releases and dependencies for the legacy module `kamon-http4s` (without http4s version) are shown below. | kamon-http4s | status | jdk | scala | http4s |:------:|:------:|:----:|--------------:|------- @@ -20,13 +33,6 @@ Supported releases and dependencies are shown below. | 2.0.1 | stable | 8+ | 2.12, 2.13 | 0.21.x -To get started with SBT, simply add the following to your `build.sbt` -file: - -```scala -libraryDependencies += "io.kamon" %% "kamon-http4s" % "2.0.1" -``` - ## Metrics and Tracing for http4s in 2 steps ### The Server diff --git a/build.sbt b/build.sbt index 8d464f8..5c26936 100644 --- a/build.sbt +++ b/build.sbt @@ -13,28 +13,61 @@ * ========================================================================================= */ -val kamonCore = "io.kamon" %% "kamon-core" % "2.1.0" -val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.1.0" -val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.1.0" +val kamonCore = "io.kamon" %% "kamon-core" % "2.2.3" +val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.2.3" +val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.2.3" -val server = "org.http4s" %% "http4s-blaze-server" % "0.21.3" -val client = "org.http4s" %% "http4s-blaze-client" % "0.21.3" -val dsl = "org.http4s" %% "http4s-dsl" % "0.21.3" +def http4sDeps(version: String) = Seq( + "org.http4s" %% "http4s-client" % version % Provided, + "org.http4s" %% "http4s-server" % version % Provided, + "org.http4s" %% "http4s-blaze-client" % version % Test, + "org.http4s" %% "http4s-blaze-server" % version % Test, + "org.http4s" %% "http4s-dsl" % version % Test +) - -lazy val root = (project in file(".")) - .settings(Seq( - name := "kamon-http4s", - scalaVersion := "2.13.1", - crossScalaVersions := Seq("2.12.11", "2.13.1"))) - .settings(resolvers += Resolver.bintrayRepo("kamon-io", "snapshots")) - .settings(resolvers += Resolver.mavenLocal) - .settings(scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { +lazy val shared = Seq( + scalaVersion := "2.13.6", + crossScalaVersions := Seq("2.12.14", "2.13.6"), + scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, 12)) => Seq("-Ypartial-unification", "-language:higherKinds") - case _ => "-language:higherKinds" :: Nil - })) + case _ => "-language:higherKinds" :: Nil + }), + libraryDependencies ++= + compileScope(kamonCore, kamonCommon) ++ + testScope(scalatest, kamonTestkit, logbackClassic) +) + +lazy val `kamon-http4s-0_22` = project + .in(file("modules/0.22")) + .settings( + shared, + name := "kamon-http4s-0.22", + libraryDependencies ++= http4sDeps("0.22.1") + ) + +lazy val `kamon-http4s-0_23` = project + .in(file("modules/0.23")) + .settings( + shared, + name := "kamon-http4s-0.23", + libraryDependencies ++= http4sDeps("0.23.0") + ) + +lazy val `kamon-http4s-1_0` = project + .in(file("modules/1.0")) + .settings( + shared, + name := "kamon-http4s-1.0", + libraryDependencies ++= http4sDeps("1.0.0-M23") + ) + +lazy val root = project + .in(file(".")) .settings( - libraryDependencies ++= - compileScope(kamonCore, kamonCommon) ++ - providedScope(server, client, dsl) ++ - testScope(scalatest, kamonTestkit, logbackClassic)) + shared, + name := "kamon-http4s", + publish / skip := true, + Test / parallelExecution := false, + Global / concurrentRestrictions += Tags.limit(Tags.Test, 1) + ) + .aggregate(`kamon-http4s-0_22`, `kamon-http4s-0_23`, `kamon-http4s-1_0`) diff --git a/src/main/resources/reference.conf b/modules/0.22/src/main/resources/reference.conf similarity index 100% rename from src/main/resources/reference.conf rename to modules/0.22/src/main/resources/reference.conf diff --git a/src/main/scala/kamon/http4s/Http4s.scala b/modules/0.22/src/main/scala/kamon/http4s/Http4s.scala similarity index 100% rename from src/main/scala/kamon/http4s/Http4s.scala rename to modules/0.22/src/main/scala/kamon/http4s/Http4s.scala diff --git a/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala b/modules/0.22/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala similarity index 100% rename from src/main/scala/kamon/http4s/PathOperationNameGenerator.scala rename to modules/0.22/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala diff --git a/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala b/modules/0.22/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala similarity index 84% rename from src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala rename to modules/0.22/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala index ca802f2..c302cfe 100644 --- a/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala +++ b/modules/0.22/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala @@ -38,13 +38,10 @@ object KamonSupport { Kamon.onReconfigure(newConfig => _instrumentation = instrumentation(newConfig)) - - def apply[F[_]](underlying: Client[F])(implicit F:Sync[F]): Client[F] = Client { request => - - for { - ctx <- Resource.liftF(F.delay(Kamon.currentContext())) - k <- kamonClient(underlying)(request)(ctx)(_instrumentation) - } yield k + def apply[F[_]](underlying: Client[F])(implicit F: Sync[F]): Client[F] = Client { request => + // this needs to run on the same thread as the caller, so can't be suspended in F + val ctx = Kamon.currentContext() + kamonClient(underlying)(request)(ctx)(_instrumentation) } @@ -54,9 +51,9 @@ object KamonSupport { (instrumentation: HttpClientInstrumentation) (implicit F:Sync[F]): Resource[F, Response[F]] = for { - requestHandler <- Resource.liftF(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) + requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) response <- underlying.run(requestHandler.request).attempt - trackedResponse <- Resource.liftF(handleResponse(response, requestHandler)) + trackedResponse <- Resource.eval(handleResponse(response, requestHandler)) } yield trackedResponse def handleResponse[F[_]]( diff --git a/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala b/modules/0.22/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala similarity index 97% rename from src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala rename to modules/0.22/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala index a47f121..f88ae0f 100644 --- a/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala +++ b/modules/0.22/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala @@ -56,7 +56,7 @@ object KamonSupport { private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] = for { - handler <- Resource.liftF(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) + handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) _ <- processRequest(handler) _ <- withContext(handler) } yield handler diff --git a/src/main/scala/kamon/http4s/package.scala b/modules/0.22/src/main/scala/kamon/http4s/package.scala similarity index 71% rename from src/main/scala/kamon/http4s/package.scala rename to modules/0.22/src/main/scala/kamon/http4s/package.scala index f08d786..9da40b5 100644 --- a/src/main/scala/kamon/http4s/package.scala +++ b/modules/0.22/src/main/scala/kamon/http4s/package.scala @@ -3,7 +3,7 @@ package kamon import org.http4s.{Header, Headers, Request, Response, Status} import kamon.instrumentation.http.HttpMessage import kamon.instrumentation.http.HttpMessage.ResponseBuilder -import org.http4s.util.CaseInsensitiveString +import org.typelevel.ci.CIString package object http4s { @@ -11,7 +11,7 @@ package object http4s { def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request { override def url: String = inner.uri.toString() - override def path: String = inner.uri.path + override def path: String = inner.uri.path.renderString override def method: String = inner.method.name @@ -19,11 +19,11 @@ package object http4s { override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0) - override def read(header: String): Option[String] = inner.headers.get(CaseInsensitiveString(header)).map(_.value) + override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.head.value) override def readAll(): Map[String, String] = { val builder = Map.newBuilder[String, String] - inner.headers.foreach(h => builder += (h.name.value -> h.value)) + inner.headers.foreach(h => builder += (h.name.toString -> h.value)) builder.result() } } @@ -31,7 +31,7 @@ package object http4s { def errorResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] { override def write(header: String, value: String): Unit = () override def statusCode: Int = 500 - override def build(): Response[F] = new Response[F](status = Status.InternalServerError) + override def build(): Response[F] = Response[F](status = Status.InternalServerError) } //TODO both of these @@ -39,13 +39,13 @@ package object http4s { private var _headers = Headers.empty override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) override def statusCode: Int = 404 - override def build(): Response[F] = new Response[F](status = Status.NotFound, headers = _headers) + override def build(): Response[F] = Response[F](status = Status.NotFound, headers = _headers) } - def getResponseBuilder[F[_]](response: Response[F]) = new HttpMessage.ResponseBuilder[Response[F]] { + def getResponseBuilder[F[_]](response: Response[F]): ResponseBuilder[Response[F]] = new HttpMessage.ResponseBuilder[Response[F]] { private var _headers = response.headers override def statusCode: Int = response.status.code @@ -53,7 +53,7 @@ package object http4s { override def build(): Response[F] = response.withHeaders(_headers) override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) } @@ -63,11 +63,11 @@ package object http4s { override def build(): Request[F] = request.withHeaders(_headers) override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) override def url: String = request.uri.toString() - override def path: String = request.uri.path + override def path: String = request.uri.path.renderString override def method: String = request.method.name @@ -75,11 +75,11 @@ package object http4s { override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0) - override def read(header: String): Option[String] = _headers.get(CaseInsensitiveString(header)).map(_.value) + override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.head.value) override def readAll(): Map[String, String] = { val builder = Map.newBuilder[String, String] - request.headers.foreach(h => builder += (h.name.value -> h.value)) + request.headers.foreach(h => builder += (h.name.toString -> h.value)) builder.result() } } diff --git a/src/test/resources/application.conf b/modules/0.22/src/test/resources/application.conf similarity index 100% rename from src/test/resources/application.conf rename to modules/0.22/src/test/resources/application.conf diff --git a/src/test/resources/logback.xml b/modules/0.22/src/test/resources/logback.xml similarity index 100% rename from src/test/resources/logback.xml rename to modules/0.22/src/test/resources/logback.xml diff --git a/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala b/modules/0.22/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala similarity index 94% rename from src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala rename to modules/0.22/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala index 16b8112..1785e06 100644 --- a/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala +++ b/modules/0.22/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala @@ -16,22 +16,21 @@ package kamon.http4s -import java.net.ConnectException - import cats.effect.{IO, Resource} import kamon.Kamon import kamon.http4s.middleware.client.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} import kamon.testkit.TestSpanReporter import kamon.trace.Span -import org.http4s.{HttpRoutes, Response} import org.http4s.client._ import org.http4s.dsl.io._ import org.http4s.implicits._ +import org.http4s.{HttpRoutes, Response} import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} -import kamon.tag.Lookups.{plainLong, plain} +import java.net.ConnectException class ClientInstrumentationSpec extends WordSpec with Matchers @@ -57,7 +56,7 @@ class ClientInstrumentationSpec extends WordSpec client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok" } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/ok" @@ -74,7 +73,7 @@ class ClientInstrumentationSpec extends WordSpec "close and finish a span even if an exception is thrown by the client" in { val okSpan = Kamon.spanBuilder("client-exception").start() val client: Client[IO] = KamonSupport[IO]( - Client(_ => Resource.liftF(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) + Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) ) Kamon.runWithSpan(okSpan) { @@ -83,7 +82,7 @@ class ClientInstrumentationSpec extends WordSpec } } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/ok" span.kind shouldBe Span.Kind.Client @@ -102,7 +101,7 @@ class ClientInstrumentationSpec extends WordSpec client.expect[String]("/tracing/not-found").attempt.unsafeRunSync().isLeft shouldBe true } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/not-found" span.kind shouldBe Span.Kind.Client @@ -124,7 +123,7 @@ class ClientInstrumentationSpec extends WordSpec client.expect[String]("/tracing/error").attempt.unsafeRunSync().isLeft shouldBe true } - eventually(timeout(2 seconds)) { + eventually(timeout(3 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/error" diff --git a/src/test/scala/kamon/http4s/HttpMetricsSpec.scala b/modules/0.22/src/test/scala/kamon/http4s/HttpMetricsSpec.scala similarity index 79% rename from src/test/scala/kamon/http4s/HttpMetricsSpec.scala rename to modules/0.22/src/test/scala/kamon/http4s/HttpMetricsSpec.scala index 1509476..fe2556c 100644 --- a/src/test/scala/kamon/http4s/HttpMetricsSpec.scala +++ b/modules/0.22/src/test/scala/kamon/http4s/HttpMetricsSpec.scala @@ -17,22 +17,22 @@ package kamon.http4s import cats.effect._ +import cats.implicits._ +import kamon.http4s.middleware.server.KamonSupport +import kamon.instrumentation.http.HttpServerMetrics import kamon.testkit.InstrumentInspection import org.http4s.HttpRoutes +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.client.Client import org.http4s.dsl.io._ +import org.http4s.implicits._ import org.http4s.server.Server -import org.http4s.server.blaze.BlazeServerBuilder import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{Matchers, OptionValues, WordSpec} -import cats.implicits._ -import kamon.http4s.middleware.server.KamonSupport -import kamon.instrumentation.http.HttpServerMetrics -import org.http4s.client.blaze.BlazeClientBuilder -import org.http4s.client.Client import scala.concurrent.ExecutionContext -import org.http4s.implicits._ class HttpMetricsSpec extends WordSpec with Matchers @@ -42,11 +42,11 @@ class HttpMetricsSpec extends WordSpec with OptionValues { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - val srv = - BlazeServerBuilder[IO] + val srv = + BlazeServerBuilder[IO](ExecutionContext.global) .bindLocal(43567) .withHttpApp(KamonSupport(HttpRoutes.of[IO] { case GET -> Root / "tracing" / "ok" => Ok("ok") @@ -59,13 +59,13 @@ class HttpMetricsSpec extends WordSpec BlazeClientBuilder[IO](ExecutionContext.global).withMaxTotalConnections(10).resource val metrics = - Resource.liftF(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) + Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) - def withServerAndClient[A](f: (Server[IO], Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = + def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = (srv, client, metrics).tupled.use(f.tupled).unsafeRunSync() - private def get[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[String] = { + private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = { client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path") } @@ -88,7 +88,7 @@ class HttpMetricsSpec extends WordSpec "track the response time with status code 2xx" in withServerAndClient { (server, client, serverMetrics) => val requests: IO[Unit] = List.fill(100)(get("/tracing/ok")(server, client)).sequence_ - val test = IO(serverMetrics.requestsSuccessful.value should be >= 0L) + val test = IO(serverMetrics.requestsSuccessful.value() should be >= 0L) requests *> test } @@ -96,7 +96,7 @@ class HttpMetricsSpec extends WordSpec "track the response time with status code 4xx" in withServerAndClient { (server, client, serverMetrics) => val requests: IO[Unit] = List.fill(100)(get("/tracing/not-found")(server, client).attempt).sequence_ - val test = IO(serverMetrics.requestsClientError.value should be >= 0L) + val test = IO(serverMetrics.requestsClientError.value() should be >= 0L) requests *> test } @@ -104,7 +104,7 @@ class HttpMetricsSpec extends WordSpec "track the response time with status code 5xx" in withServerAndClient { (server, client, serverMetrics) => val requests: IO[Unit] = List.fill(100)(get("/tracing/error")(server, client).attempt).sequence_ - val test = IO(serverMetrics.requestsServerError.value should be >= 0L) + val test = IO(serverMetrics.requestsServerError.value() should be >= 0L) requests *> test } diff --git a/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala b/modules/0.22/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala similarity index 82% rename from src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala rename to modules/0.22/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala index ef4e568..a0080d1 100644 --- a/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala +++ b/modules/0.22/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala @@ -16,25 +16,25 @@ package kamon.http4s -import cats.effect.{ContextShift, IO, Sync, Timer} +import cats.effect._ +import cats.implicits._ import kamon.http4s.middleware.server.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} +import kamon.testkit.TestSpanReporter import kamon.trace.Span -import org.http4s.{Headers, HttpRoutes} +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.client.Client -import org.http4s.client.blaze.BlazeClientBuilder import org.http4s.dsl.io._ -import org.http4s.server.{Server} -import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.implicits._ +import org.http4s.server.Server +import org.http4s.{Headers, HttpRoutes} import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} +import org.typelevel.ci.CIString import scala.concurrent.ExecutionContext -import org.http4s.implicits._ -import cats.implicits._ -import kamon.testkit.TestSpanReporter -import kamon.tag.Lookups.{plain, plainLong} -import org.http4s.util.CaseInsensitiveString class ServerInstrumentationSpec extends WordSpec with Matchers @@ -48,9 +48,8 @@ class ServerInstrumentationSpec extends WordSpec implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) val srv = - BlazeServerBuilder[IO] + BlazeServerBuilder[IO](ExecutionContext.global) .bindAny() - .withExecutionContext(ExecutionContext.global) .withHttpApp(KamonSupport(HttpRoutes.of[IO] { case GET -> Root / "tracing" / "ok" => Ok("ok") case GET -> Root / "tracing" / "error" => InternalServerError("error!") @@ -60,22 +59,21 @@ class ServerInstrumentationSpec extends WordSpec ,"", 0).orNotFound) .resource - val client = - BlazeClientBuilder[IO](ExecutionContext.global).resource + val client = BlazeClientBuilder[IO](ExecutionContext.global).resource - def withServerAndClient[A](f: (Server[IO], Client[IO]) => IO[A]): A = + def withServerAndClient[A](f: (Server, Client[IO]) => IO[A]): A = (srv, client).tupled.use(f.tupled).unsafeRunSync() - private def getResponse[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[(String, Headers)] = { - client.get(s"http://127.0.0.1:${server.address.getPort}$path"){ r => - r.bodyAsText.compile.toList.map(_.mkString).map(_ -> r.headers) + private def getResponse[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[(String, Headers)] = { + client.get(s"http://127.0.0.1:${server.address.getPort}$path") { r => + r.bodyText.compile.toList.map(_.mkString).map(_ -> r.headers) } } "The Server instrumentation" should { "propagate the current context and respond to the ok action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/ok")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true body should startWith("ok") } @@ -96,12 +94,12 @@ class ServerInstrumentationSpec extends WordSpec "propagate the current context and respond to the not-found action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/not-found")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true } val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "unhandled" span.kind shouldBe Span.Kind.Server @@ -116,13 +114,13 @@ class ServerInstrumentationSpec extends WordSpec "propagate the current context and respond to the error action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/error")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true body should startWith("error!") } val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/error" span.kind shouldBe Span.Kind.Server @@ -139,13 +137,13 @@ class ServerInstrumentationSpec extends WordSpec val request = getResponse("/tracing/errorinternal")(server, client) /* TODO serviceErrorHandler kicks in and rewrites response, loosing trace information .map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.get(CIString("trace-id")).nonEmpty shouldBe true } */ val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/errorinternal" span.kind shouldBe Span.Kind.Server @@ -164,7 +162,7 @@ class ServerInstrumentationSpec extends WordSpec getResponse("/tracing/bazz/ok")(server, client) val test = IO { eventually(timeout(5.seconds)) { - val span = testSpanReporter.nextSpan().value + val span = testSpanReporter().nextSpan().value span.operationName shouldBe "/tracing/:name/ok" span.kind shouldBe Span.Kind.Server diff --git a/modules/0.23/src/main/resources/reference.conf b/modules/0.23/src/main/resources/reference.conf new file mode 100644 index 0000000..8d3adc8 --- /dev/null +++ b/modules/0.23/src/main/resources/reference.conf @@ -0,0 +1,254 @@ +# ======================================= # +# Kamon-Http4s Reference Configuration # +# ======================================= # + +kamon.instrumentation.http4s { + + # Settings to control the HTTP Server instrumentation. + # + # IMPORTANT: Besides the "initial-operation-name" and "unhandled-operation-name" settings, the entire configuration of + # the HTTP Server Instrumentation is based on the constructs provided by the Kamon Instrumentation Common library + # which will always fallback to the settings found under the "kamon.instrumentation.http-server.default" path. The + # default settings have been included here to make them easy to find and understand in the context of this project and + # commented out so that any changes to the default settings will actually have effect. + # + server { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + + + } + + + # + # Configuration for HTTP server metrics collection. + # + metrics { + + # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming + # that the instrumentation is fully compliant: + # + # - http.server.requets + # - http.server.request.active + # - http.server.request.size + # - http.server.response.size + # - http.server.connection.lifetime + # - http.server.connection.usage + # - http.server.connection.open + # + # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests + # metric will also have a status_code tag with the status code group (1xx, 2xx and so on). + # + #enabled = yes + } + + + # + # Configuration for HTTP request tracing. + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + #enabled = yes + + # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valid in accordance to the identity provider. + #preferred-trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + # Controls writing trace and span identifiers to HTTP response headers sent by the instrumented servers. The + # configuration can be set to either "none" to disable writing the identifiers on the response headers or to + # the header name to be used when writing the identifiers. + response-headers { + + # HTTP response header name for the trace identifier, or "none" to disable it. + #trace-id = "trace-id" + + # HTTP response header name for the server span identifier, or "none" to disable it. + #span-id = none + } + + # Custom mappings between routes and operation names. + operations { + + # The default operation name to be used when creating Spans to handle the HTTP server requests. In most + # cases it is not possible to define an operation name right at the moment of starting the HTTP server Span + # and in those cases, this operation name will be initially assigned to the Span. Instrumentation authors + # should do their best effort to provide a suitable operation name or make use of the "mappings" facilities. + default = "http.server.request" + + # The operation name to be assigned when an application cannot find any route/endpoint/controller to handle + # a given request. Depending on the instrumented framework, it might be possible to apply this operation + # name automatically or not, check the frameworks' instrumentation docs for more details. + unhandled = "unhandled" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - default: Uses the set default operation name + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.http4s.PathOperationNameGenerator" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } + } + } + } + + # Settings to control the HTTP Client instrumentation + # + # IMPORTANT: The entire configuration of the HTTP Client Instrumentation is based on the constructs provided by the + # Kamon Instrumentation Common library which will always fallback to the settings found under the + # "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to + # find and understand in the context of this project and commented out so that any changes to the default settings + # will actually have effect. + # + client { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to be used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + #channel = "default" + } + + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests + # and finish them when the response is received from the server. + #enabled = yes + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + operations { + + # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP + # Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get + # a name, but if it fails to generate it then this name will be used. + #default = "http.client.request" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - hostname: Uses the request Host as the operation name. + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.http4s.PathOperationNameGenerator" + } + } + } + + +} + + +kanela.modules { + http4s { + name = "Http4s Instrumentation" + description = "Provides context propagation, distributed tracing and HTTP client and server metrics for Http4s" + + instrumentations = [] + + within = [] + } +} + diff --git a/modules/0.23/src/main/scala/kamon/http4s/Http4s.scala b/modules/0.23/src/main/scala/kamon/http4s/Http4s.scala new file mode 100644 index 0000000..0b34cd7 --- /dev/null +++ b/modules/0.23/src/main/scala/kamon/http4s/Http4s.scala @@ -0,0 +1,62 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import kamon.Kamon +import kamon.instrumentation.http.{HttpMessage, HttpOperationNameGenerator} + +object Http4s { + @volatile var nameGenerator: HttpOperationNameGenerator = nameGeneratorFromConfig(Kamon.config()) + + private def nameGeneratorFromConfig(config: Config): HttpOperationNameGenerator = { + val dynamic = new DynamicAccess(getClass.getClassLoader) + val nameGeneratorFQCN = config.getString("kamon.instrumentation.http4s.client.tracing.operations.name-generator") + dynamic.createInstanceFor[HttpOperationNameGenerator](nameGeneratorFQCN, Nil) + } + + Kamon.onReconfigure { newConfig => + nameGenerator = nameGeneratorFromConfig(newConfig) + } +} + + +class DefaultNameGenerator extends HttpOperationNameGenerator { + + import java.util.Locale + + import scala.collection.concurrent.TrieMap + + private val localCache = TrieMap.empty[String, String] + private val normalizePattern = """\$([^<]+)<[^>]+>""".r + + + override def name(request: HttpMessage.Request): Option[String] = { + Some( + localCache.getOrElseUpdate(s"${request.method}${request.path}", { + // Convert paths of form GET /foo/bar/$paramname/blah to foo.bar.paramname.blah.get + val p = normalizePattern.replaceAllIn(request.path, "$1").replace('/', '.').dropWhile(_ == '.') + val normalisedPath = { + if (p.lastOption.exists(_ != '.')) s"$p." + else p + } + s"$normalisedPath${request.method.toLowerCase(Locale.ENGLISH)}" + }) + ) + } +} diff --git a/modules/0.23/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala b/modules/0.23/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala new file mode 100644 index 0000000..fb0a3ed --- /dev/null +++ b/modules/0.23/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala @@ -0,0 +1,7 @@ +package kamon.http4s + +import kamon.instrumentation.http.{HttpMessage, HttpOperationNameGenerator} + +class PathOperationNameGenerator extends HttpOperationNameGenerator { + override def name(request: HttpMessage.Request): Option[String] = Some(request.path) +} diff --git a/modules/0.23/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala b/modules/0.23/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala new file mode 100644 index 0000000..c302cfe --- /dev/null +++ b/modules/0.23/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala @@ -0,0 +1,72 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + + +package kamon.http4s +package middleware.client + +import cats.effect.{Sync, Resource} +import cats.implicits._ +import com.typesafe.config.Config +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.http.HttpClientInstrumentation +import org.http4s.{Request, Response} +import org.http4s.client.Client + +object KamonSupport { + + private var _instrumentation = instrumentation(Kamon.config()) + + private def instrumentation(kamonConfig: Config): HttpClientInstrumentation = { + val httpClientConfig = kamonConfig.getConfig("kamon.instrumentation.http4s.client") + HttpClientInstrumentation.from(httpClientConfig, "http4s.client") + } + + Kamon.onReconfigure(newConfig => _instrumentation = instrumentation(newConfig)) + + def apply[F[_]](underlying: Client[F])(implicit F: Sync[F]): Client[F] = Client { request => + // this needs to run on the same thread as the caller, so can't be suspended in F + val ctx = Kamon.currentContext() + kamonClient(underlying)(request)(ctx)(_instrumentation) + } + + + private def kamonClient[F[_]](underlying: Client[F]) + (request: Request[F]) + (ctx: Context) + (instrumentation: HttpClientInstrumentation) + (implicit F:Sync[F]): Resource[F, Response[F]] = + for { + requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) + response <- underlying.run(requestHandler.request).attempt + trackedResponse <- Resource.eval(handleResponse(response, requestHandler)) + } yield trackedResponse + + def handleResponse[F[_]]( + response: Either[Throwable, Response[F]], + requestHandler: HttpClientInstrumentation.RequestHandler[Request[F]], + )(implicit F:Sync[F]): F[Response[F]] = + response match { + case Right(res) => + requestHandler.processResponse(getResponseBuilder(res)) + F.delay(res) + case Left(error) => + requestHandler.span.fail(error).finish() + F.raiseError(error) + } + +} diff --git a/modules/0.23/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala b/modules/0.23/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala new file mode 100644 index 0000000..f88ae0f --- /dev/null +++ b/modules/0.23/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s +package middleware.server + +import cats.data.{Kleisli, OptionT} +import cats.effect.{Resource, Sync} +import cats.implicits._ +import kamon.Kamon +import kamon.context.Storage +import kamon.instrumentation.http.HttpServerInstrumentation.RequestHandler +import kamon.instrumentation.http.HttpServerInstrumentation +import org.http4s.{HttpRoutes, Request, Response} + +object KamonSupport { + + def apply[F[_]: Sync](service: HttpRoutes[F], interface: String, port: Int): HttpRoutes[F] = { + val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.http4s.server") + val instrumentation = HttpServerInstrumentation.from(httpServerConfig, "http4s.server", interface, port) + + Kleisli(kamonService[F](service, instrumentation)(_)) + } + + + private def kamonService[F[_]](service: HttpRoutes[F], instrumentation: HttpServerInstrumentation) + (request: Request[F]) + (implicit F: Sync[F]): OptionT[F, Response[F]] = OptionT { + getHandler(instrumentation)(request).use { handler => + for { + resOrUnhandled <- service(request).value.attempt + respWithContext <- kamonServiceHandler(handler, resOrUnhandled, instrumentation.settings) + } yield respWithContext + } + } + + private def processRequest[F[_]](requestHandler: RequestHandler)(implicit F: Sync[F]): Resource[F, RequestHandler] = + Resource.make(F.delay(requestHandler.requestReceived()))(h => F.delay(h.responseSent())) + + private def withContext[F[_]](requestHandler: RequestHandler)(implicit F: Sync[F]): Resource[F, Storage.Scope] = + Resource.make(F.delay(Kamon.storeContext(requestHandler.context)))( scope => F.delay(scope.close())) + + + private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] = + for { + handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) + _ <- processRequest(handler) + _ <- withContext(handler) + } yield handler + + private def kamonServiceHandler[F[_]](requestHandler: RequestHandler, + e: Either[Throwable, Option[Response[F]]], + settings: HttpServerInstrumentation.Settings) + (implicit F: Sync[F]): F[Option[Response[F]]] = + e match { + case Left(e) => + F.delay { + requestHandler.span.fail(e.getMessage) + Some(requestHandler.buildResponse(errorResponseBuilder, requestHandler.context)) + } *> F.raiseError(e) + case Right(None) => + F.delay { + requestHandler.span.name(settings.unhandledOperationName) + val response: Response[F] = requestHandler.buildResponse[Response[F]]( + notFoundResponseBuilder, requestHandler.context + ) + Some(response) + } + case Right(Some(response)) => + F.delay { + val a = requestHandler.buildResponse(getResponseBuilder(response), requestHandler.context) + Some(a) + } + } + +} diff --git a/modules/0.23/src/main/scala/kamon/http4s/package.scala b/modules/0.23/src/main/scala/kamon/http4s/package.scala new file mode 100644 index 0000000..9da40b5 --- /dev/null +++ b/modules/0.23/src/main/scala/kamon/http4s/package.scala @@ -0,0 +1,87 @@ +package kamon + +import org.http4s.{Header, Headers, Request, Response, Status} +import kamon.instrumentation.http.HttpMessage +import kamon.instrumentation.http.HttpMessage.ResponseBuilder +import org.typelevel.ci.CIString + +package object http4s { + + + def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request { + override def url: String = inner.uri.toString() + + override def path: String = inner.uri.path.renderString + + override def method: String = inner.method.name + + override def host: String = inner.uri.authority.map(_.host.value).getOrElse("") + + override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0) + + override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.head.value) + + override def readAll(): Map[String, String] = { + val builder = Map.newBuilder[String, String] + inner.headers.foreach(h => builder += (h.name.toString -> h.value)) + builder.result() + } + } + + def errorResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] { + override def write(header: String, value: String): Unit = () + override def statusCode: Int = 500 + override def build(): Response[F] = Response[F](status = Status.InternalServerError) + } + + //TODO both of these + def notFoundResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] { + private var _headers = Headers.empty + + override def write(header: String, value: String): Unit = + _headers = _headers.put(Header.Raw(CIString(header), value)) + + override def statusCode: Int = 404 + override def build(): Response[F] = Response[F](status = Status.NotFound, headers = _headers) + } + + def getResponseBuilder[F[_]](response: Response[F]): ResponseBuilder[Response[F]] = new HttpMessage.ResponseBuilder[Response[F]] { + private var _headers = response.headers + + override def statusCode: Int = response.status.code + + override def build(): Response[F] = response.withHeaders(_headers) + + override def write(header: String, value: String): Unit = + _headers = _headers.put(Header.Raw(CIString(header), value)) + } + + + def getRequestBuilder[F[_]](request: Request[F]): HttpMessage.RequestBuilder[Request[F]] = new HttpMessage.RequestBuilder[Request[F]] { + private var _headers = request.headers + + override def build(): Request[F] = request.withHeaders(_headers) + + override def write(header: String, value: String): Unit = + _headers = _headers.put(Header.Raw(CIString(header), value)) + + override def url: String = request.uri.toString() + + override def path: String = request.uri.path.renderString + + override def method: String = request.method.name + + override def host: String = request.uri.authority.map(_.host.value).getOrElse("") + + override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0) + + override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.head.value) + + override def readAll(): Map[String, String] = { + val builder = Map.newBuilder[String, String] + request.headers.foreach(h => builder += (h.name.toString -> h.value)) + builder.result() + } + } + +} diff --git a/modules/0.23/src/test/resources/application.conf b/modules/0.23/src/test/resources/application.conf new file mode 100644 index 0000000..d00dadf --- /dev/null +++ b/modules/0.23/src/test/resources/application.conf @@ -0,0 +1,5 @@ +kamon.instrumentation.http4s.server.tracing.operations{ + mappings { + "/tracing/*/ok" = "/tracing/:name/ok" + } +} diff --git a/modules/0.23/src/test/resources/logback.xml b/modules/0.23/src/test/resources/logback.xml new file mode 100644 index 0000000..a420a01 --- /dev/null +++ b/modules/0.23/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/modules/0.23/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala b/modules/0.23/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala new file mode 100644 index 0000000..e65ba09 --- /dev/null +++ b/modules/0.23/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala @@ -0,0 +1,145 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import cats.effect.unsafe.implicits.global +import cats.effect.{IO, Resource} +import kamon.Kamon +import kamon.http4s.middleware.client.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} +import kamon.testkit.TestSpanReporter +import kamon.trace.Span +import org.http4s.client._ +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.http4s.{HttpRoutes, Response} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} + +import java.net.ConnectException + +class ClientInstrumentationSpec extends WordSpec + with Matchers + with Eventually + with SpanSugar + with OptionValues + with TestSpanReporter + with BeforeAndAfterAll { + + val service = HttpRoutes.of[IO] { + case GET -> Root / "tracing" / "ok" => Ok("ok") + case GET -> Root / "tracing" / "not-found" => NotFound("not-found") + case GET -> Root / "tracing" / "error" => InternalServerError("This page will generate an error!") + } + + val client: Client[IO] = KamonSupport[IO](Client.fromHttpApp[IO](service.orNotFound)) + + "The Client instrumentation" should { + "propagate the current context and generate a span inside an action and complete the ws request" in { + val okSpan = Kamon.spanBuilder("ok-operation-span").start() + + Kamon.runWithSpan(okSpan) { + client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok" + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/ok" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + span.metricTags.get(plain("parentOperation")) shouldBe "ok-operation-span" + + okSpan.id == span.parentId + } + } + + "close and finish a span even if an exception is thrown by the client" in { + val okSpan = Kamon.spanBuilder("client-exception").start() + val client: Client[IO] = KamonSupport[IO]( + Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) + ) + + Kamon.runWithSpan(okSpan) { + a[ConnectException] should be thrownBy { + client.expect[String]("/tracing/ok").unsafeRunSync() + } + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "/tracing/ok" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.hasError shouldBe true + + okSpan.id == span.parentId + } + } + + "propagate the current context and generate a span called not-found and complete the ws request" in { + val notFoundSpan = Kamon.spanBuilder("not-found-operation-span").start() + + Kamon.runWithSpan(notFoundSpan) { + client.expect[String]("/tracing/not-found").attempt.unsafeRunSync().isLeft shouldBe true + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "/tracing/not-found" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 404 + span.metricTags.get(plain("parentOperation")) shouldBe "not-found-operation-span" + + notFoundSpan.id == span.parentId + } + } + + + + "propagate the current context and generate a span with error and complete the ws request" in { + val errorSpan = Kamon.spanBuilder("error-operation-span").start() + + Kamon.runWithSpan(errorSpan) { + client.expect[String]("/tracing/error").attempt.unsafeRunSync().isLeft shouldBe true + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/error" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.hasError shouldBe true + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + span.metricTags.get(plain("parentOperation")) shouldBe "error-operation-span" + + errorSpan.id == span.parentId + } + } + + } + + +} diff --git a/modules/0.23/src/test/scala/kamon/http4s/HttpMetricsSpec.scala b/modules/0.23/src/test/scala/kamon/http4s/HttpMetricsSpec.scala new file mode 100644 index 0000000..c5cb9b0 --- /dev/null +++ b/modules/0.23/src/test/scala/kamon/http4s/HttpMetricsSpec.scala @@ -0,0 +1,108 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import cats.effect._ +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import kamon.http4s.middleware.server.KamonSupport +import kamon.instrumentation.http.HttpServerMetrics +import kamon.testkit.InstrumentInspection +import org.http4s.HttpRoutes +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.client.Client +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.http4s.server.Server +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar +import org.scalatest.{Matchers, OptionValues, WordSpec} + +class HttpMetricsSpec extends WordSpec + with Matchers + with Eventually + with SpanSugar + with InstrumentInspection.Syntax + with OptionValues + { + + val srv = + BlazeServerBuilder[IO](global.compute) + .bindLocal(43567) + .withHttpApp(KamonSupport(HttpRoutes.of[IO] { + case GET -> Root / "tracing" / "ok" => Ok("ok") + case GET -> Root / "tracing" / "not-found" => NotFound("not-found") + case GET -> Root / "tracing" / "error" => InternalServerError("This page will generate an error!") + }, "/127.0.0.1", 43567).orNotFound) + .resource + + val client = + BlazeClientBuilder[IO](global.compute).withMaxTotalConnections(10).resource + + val metrics = + Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) + + + def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = + (srv, client, metrics).tupled.use(f.tupled).unsafeRunSync() + + private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = { + client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path") + } + + "The HttpMetrics" should { + + "track the total of active requests" in withServerAndClient { (server, client, serverMetrics) => + + val requests = List + .fill(100) { + get("/tracing/ok")(server, client) + }.parSequence_ + + val test = IO { + serverMetrics.activeRequests.distribution().max should be > 1L + serverMetrics.activeRequests.distribution().min shouldBe 0L + } + requests *> test + } + + "track the response time with status code 2xx" in withServerAndClient { (server, client, serverMetrics) => + val requests: IO[Unit] = List.fill(100)(get("/tracing/ok")(server, client)).sequence_ + + val test = IO(serverMetrics.requestsSuccessful.value() should be >= 0L) + + requests *> test + } + + "track the response time with status code 4xx" in withServerAndClient { (server, client, serverMetrics) => + val requests: IO[Unit] = List.fill(100)(get("/tracing/not-found")(server, client).attempt).sequence_ + + val test = IO(serverMetrics.requestsClientError.value() should be >= 0L) + + requests *> test + } + + "track the response time with status code 5xx" in withServerAndClient { (server, client, serverMetrics) => + val requests: IO[Unit] = List.fill(100)(get("/tracing/error")(server, client).attempt).sequence_ + + val test = IO(serverMetrics.requestsServerError.value() should be >= 0L) + + requests *> test + } + } +} diff --git a/modules/0.23/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala b/modules/0.23/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala new file mode 100644 index 0000000..04e4446 --- /dev/null +++ b/modules/0.23/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala @@ -0,0 +1,175 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import cats.effect.unsafe.implicits.global +import cats.effect.{Concurrent, IO} +import cats.implicits._ +import kamon.http4s.middleware.server.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} +import kamon.testkit.TestSpanReporter +import kamon.trace.Span +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.client.Client +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.http4s.server.Server +import org.http4s.{Headers, HttpRoutes} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} +import org.typelevel.ci.CIString + +class ServerInstrumentationSpec extends WordSpec + with Matchers + with Eventually + with SpanSugar + with OptionValues + with TestSpanReporter + with BeforeAndAfterAll { + + val srv = + BlazeServerBuilder[IO](global.compute) + .bindAny() + .withHttpApp(KamonSupport(HttpRoutes.of[IO] { + case GET -> Root / "tracing" / "ok" => Ok("ok") + case GET -> Root / "tracing" / "error" => InternalServerError("error!") + case GET -> Root / "tracing" / "errorinternal" => throw new RuntimeException("ble") + case GET -> Root / "tracing" / name / "ok" => Ok(s"ok $name") + } + ,"", 0).orNotFound) + .resource + + val client = BlazeClientBuilder[IO](global.compute).resource + + def withServerAndClient[A](f: (Server, Client[IO]) => IO[A]): A = + (srv, client).tupled.use(f.tupled).unsafeRunSync() + + private def getResponse[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[(String, Headers)] = { + client.get(s"http://127.0.0.1:${server.address.getPort}$path") { r => + r.bodyText.compile.toList.map(_.mkString).map(_ -> r.headers) + } + } + + "The Server instrumentation" should { + "propagate the current context and respond to the ok action" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/ok")(server, client).map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + body should startWith("ok") + } + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/ok" + span.kind shouldBe Span.Kind.Server + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + } + } + + request *> test + } + + "propagate the current context and respond to the not-found action" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/not-found")(server, client).map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + } + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "unhandled" + span.kind shouldBe Span.Kind.Server + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 404 + } + } + + request *> test + } + + "propagate the current context and respond to the error action" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/error")(server, client).map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + body should startWith("error!") + } + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/error" + span.kind shouldBe Span.Kind.Server + span.hasError shouldBe true + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + } + } + + request *> test + } + "propagate the current context and respond to the error while processing" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/errorinternal")(server, client) + /* TODO serviceErrorHandler kicks in and rewrites response, loosing trace information + .map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + } + */ + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/errorinternal" + span.kind shouldBe Span.Kind.Server + span.hasError shouldBe true + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + } + } + + request *> test + } + + "handle path parameter" in withServerAndClient { (server, client) => + val request: IO[(String, Headers)] = + getResponse("/tracing/bazz/ok")(server, client) + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/:name/ok" + span.kind shouldBe Span.Kind.Server + span.hasError shouldBe false + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + } + } + + request *> test + } + } +} diff --git a/modules/1.0/src/main/resources/reference.conf b/modules/1.0/src/main/resources/reference.conf new file mode 100644 index 0000000..8d3adc8 --- /dev/null +++ b/modules/1.0/src/main/resources/reference.conf @@ -0,0 +1,254 @@ +# ======================================= # +# Kamon-Http4s Reference Configuration # +# ======================================= # + +kamon.instrumentation.http4s { + + # Settings to control the HTTP Server instrumentation. + # + # IMPORTANT: Besides the "initial-operation-name" and "unhandled-operation-name" settings, the entire configuration of + # the HTTP Server Instrumentation is based on the constructs provided by the Kamon Instrumentation Common library + # which will always fallback to the settings found under the "kamon.instrumentation.http-server.default" path. The + # default settings have been included here to make them easy to find and understand in the context of this project and + # commented out so that any changes to the default settings will actually have effect. + # + server { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + + + } + + + # + # Configuration for HTTP server metrics collection. + # + metrics { + + # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming + # that the instrumentation is fully compliant: + # + # - http.server.requets + # - http.server.request.active + # - http.server.request.size + # - http.server.response.size + # - http.server.connection.lifetime + # - http.server.connection.usage + # - http.server.connection.open + # + # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests + # metric will also have a status_code tag with the status code group (1xx, 2xx and so on). + # + #enabled = yes + } + + + # + # Configuration for HTTP request tracing. + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + #enabled = yes + + # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valid in accordance to the identity provider. + #preferred-trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + # Controls writing trace and span identifiers to HTTP response headers sent by the instrumented servers. The + # configuration can be set to either "none" to disable writing the identifiers on the response headers or to + # the header name to be used when writing the identifiers. + response-headers { + + # HTTP response header name for the trace identifier, or "none" to disable it. + #trace-id = "trace-id" + + # HTTP response header name for the server span identifier, or "none" to disable it. + #span-id = none + } + + # Custom mappings between routes and operation names. + operations { + + # The default operation name to be used when creating Spans to handle the HTTP server requests. In most + # cases it is not possible to define an operation name right at the moment of starting the HTTP server Span + # and in those cases, this operation name will be initially assigned to the Span. Instrumentation authors + # should do their best effort to provide a suitable operation name or make use of the "mappings" facilities. + default = "http.server.request" + + # The operation name to be assigned when an application cannot find any route/endpoint/controller to handle + # a given request. Depending on the instrumented framework, it might be possible to apply this operation + # name automatically or not, check the frameworks' instrumentation docs for more details. + unhandled = "unhandled" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - default: Uses the set default operation name + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.http4s.PathOperationNameGenerator" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } + } + } + } + + # Settings to control the HTTP Client instrumentation + # + # IMPORTANT: The entire configuration of the HTTP Client Instrumentation is based on the constructs provided by the + # Kamon Instrumentation Common library which will always fallback to the settings found under the + # "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to + # find and understand in the context of this project and commented out so that any changes to the default settings + # will actually have effect. + # + client { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to be used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + #channel = "default" + } + + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests + # and finish them when the response is received from the server. + #enabled = yes + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + operations { + + # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP + # Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get + # a name, but if it fails to generate it then this name will be used. + #default = "http.client.request" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - hostname: Uses the request Host as the operation name. + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.http4s.PathOperationNameGenerator" + } + } + } + + +} + + +kanela.modules { + http4s { + name = "Http4s Instrumentation" + description = "Provides context propagation, distributed tracing and HTTP client and server metrics for Http4s" + + instrumentations = [] + + within = [] + } +} + diff --git a/modules/1.0/src/main/scala/kamon/http4s/Http4s.scala b/modules/1.0/src/main/scala/kamon/http4s/Http4s.scala new file mode 100644 index 0000000..0b34cd7 --- /dev/null +++ b/modules/1.0/src/main/scala/kamon/http4s/Http4s.scala @@ -0,0 +1,62 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import kamon.Kamon +import kamon.instrumentation.http.{HttpMessage, HttpOperationNameGenerator} + +object Http4s { + @volatile var nameGenerator: HttpOperationNameGenerator = nameGeneratorFromConfig(Kamon.config()) + + private def nameGeneratorFromConfig(config: Config): HttpOperationNameGenerator = { + val dynamic = new DynamicAccess(getClass.getClassLoader) + val nameGeneratorFQCN = config.getString("kamon.instrumentation.http4s.client.tracing.operations.name-generator") + dynamic.createInstanceFor[HttpOperationNameGenerator](nameGeneratorFQCN, Nil) + } + + Kamon.onReconfigure { newConfig => + nameGenerator = nameGeneratorFromConfig(newConfig) + } +} + + +class DefaultNameGenerator extends HttpOperationNameGenerator { + + import java.util.Locale + + import scala.collection.concurrent.TrieMap + + private val localCache = TrieMap.empty[String, String] + private val normalizePattern = """\$([^<]+)<[^>]+>""".r + + + override def name(request: HttpMessage.Request): Option[String] = { + Some( + localCache.getOrElseUpdate(s"${request.method}${request.path}", { + // Convert paths of form GET /foo/bar/$paramname/blah to foo.bar.paramname.blah.get + val p = normalizePattern.replaceAllIn(request.path, "$1").replace('/', '.').dropWhile(_ == '.') + val normalisedPath = { + if (p.lastOption.exists(_ != '.')) s"$p." + else p + } + s"$normalisedPath${request.method.toLowerCase(Locale.ENGLISH)}" + }) + ) + } +} diff --git a/modules/1.0/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala b/modules/1.0/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala new file mode 100644 index 0000000..fb0a3ed --- /dev/null +++ b/modules/1.0/src/main/scala/kamon/http4s/PathOperationNameGenerator.scala @@ -0,0 +1,7 @@ +package kamon.http4s + +import kamon.instrumentation.http.{HttpMessage, HttpOperationNameGenerator} + +class PathOperationNameGenerator extends HttpOperationNameGenerator { + override def name(request: HttpMessage.Request): Option[String] = Some(request.path) +} diff --git a/modules/1.0/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala b/modules/1.0/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala new file mode 100644 index 0000000..c302cfe --- /dev/null +++ b/modules/1.0/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala @@ -0,0 +1,72 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + + +package kamon.http4s +package middleware.client + +import cats.effect.{Sync, Resource} +import cats.implicits._ +import com.typesafe.config.Config +import kamon.Kamon +import kamon.context.Context +import kamon.instrumentation.http.HttpClientInstrumentation +import org.http4s.{Request, Response} +import org.http4s.client.Client + +object KamonSupport { + + private var _instrumentation = instrumentation(Kamon.config()) + + private def instrumentation(kamonConfig: Config): HttpClientInstrumentation = { + val httpClientConfig = kamonConfig.getConfig("kamon.instrumentation.http4s.client") + HttpClientInstrumentation.from(httpClientConfig, "http4s.client") + } + + Kamon.onReconfigure(newConfig => _instrumentation = instrumentation(newConfig)) + + def apply[F[_]](underlying: Client[F])(implicit F: Sync[F]): Client[F] = Client { request => + // this needs to run on the same thread as the caller, so can't be suspended in F + val ctx = Kamon.currentContext() + kamonClient(underlying)(request)(ctx)(_instrumentation) + } + + + private def kamonClient[F[_]](underlying: Client[F]) + (request: Request[F]) + (ctx: Context) + (instrumentation: HttpClientInstrumentation) + (implicit F:Sync[F]): Resource[F, Response[F]] = + for { + requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) + response <- underlying.run(requestHandler.request).attempt + trackedResponse <- Resource.eval(handleResponse(response, requestHandler)) + } yield trackedResponse + + def handleResponse[F[_]]( + response: Either[Throwable, Response[F]], + requestHandler: HttpClientInstrumentation.RequestHandler[Request[F]], + )(implicit F:Sync[F]): F[Response[F]] = + response match { + case Right(res) => + requestHandler.processResponse(getResponseBuilder(res)) + F.delay(res) + case Left(error) => + requestHandler.span.fail(error).finish() + F.raiseError(error) + } + +} diff --git a/modules/1.0/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala b/modules/1.0/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala new file mode 100644 index 0000000..f88ae0f --- /dev/null +++ b/modules/1.0/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s +package middleware.server + +import cats.data.{Kleisli, OptionT} +import cats.effect.{Resource, Sync} +import cats.implicits._ +import kamon.Kamon +import kamon.context.Storage +import kamon.instrumentation.http.HttpServerInstrumentation.RequestHandler +import kamon.instrumentation.http.HttpServerInstrumentation +import org.http4s.{HttpRoutes, Request, Response} + +object KamonSupport { + + def apply[F[_]: Sync](service: HttpRoutes[F], interface: String, port: Int): HttpRoutes[F] = { + val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.http4s.server") + val instrumentation = HttpServerInstrumentation.from(httpServerConfig, "http4s.server", interface, port) + + Kleisli(kamonService[F](service, instrumentation)(_)) + } + + + private def kamonService[F[_]](service: HttpRoutes[F], instrumentation: HttpServerInstrumentation) + (request: Request[F]) + (implicit F: Sync[F]): OptionT[F, Response[F]] = OptionT { + getHandler(instrumentation)(request).use { handler => + for { + resOrUnhandled <- service(request).value.attempt + respWithContext <- kamonServiceHandler(handler, resOrUnhandled, instrumentation.settings) + } yield respWithContext + } + } + + private def processRequest[F[_]](requestHandler: RequestHandler)(implicit F: Sync[F]): Resource[F, RequestHandler] = + Resource.make(F.delay(requestHandler.requestReceived()))(h => F.delay(h.responseSent())) + + private def withContext[F[_]](requestHandler: RequestHandler)(implicit F: Sync[F]): Resource[F, Storage.Scope] = + Resource.make(F.delay(Kamon.storeContext(requestHandler.context)))( scope => F.delay(scope.close())) + + + private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] = + for { + handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) + _ <- processRequest(handler) + _ <- withContext(handler) + } yield handler + + private def kamonServiceHandler[F[_]](requestHandler: RequestHandler, + e: Either[Throwable, Option[Response[F]]], + settings: HttpServerInstrumentation.Settings) + (implicit F: Sync[F]): F[Option[Response[F]]] = + e match { + case Left(e) => + F.delay { + requestHandler.span.fail(e.getMessage) + Some(requestHandler.buildResponse(errorResponseBuilder, requestHandler.context)) + } *> F.raiseError(e) + case Right(None) => + F.delay { + requestHandler.span.name(settings.unhandledOperationName) + val response: Response[F] = requestHandler.buildResponse[Response[F]]( + notFoundResponseBuilder, requestHandler.context + ) + Some(response) + } + case Right(Some(response)) => + F.delay { + val a = requestHandler.buildResponse(getResponseBuilder(response), requestHandler.context) + Some(a) + } + } + +} diff --git a/modules/1.0/src/main/scala/kamon/http4s/package.scala b/modules/1.0/src/main/scala/kamon/http4s/package.scala new file mode 100644 index 0000000..9da40b5 --- /dev/null +++ b/modules/1.0/src/main/scala/kamon/http4s/package.scala @@ -0,0 +1,87 @@ +package kamon + +import org.http4s.{Header, Headers, Request, Response, Status} +import kamon.instrumentation.http.HttpMessage +import kamon.instrumentation.http.HttpMessage.ResponseBuilder +import org.typelevel.ci.CIString + +package object http4s { + + + def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request { + override def url: String = inner.uri.toString() + + override def path: String = inner.uri.path.renderString + + override def method: String = inner.method.name + + override def host: String = inner.uri.authority.map(_.host.value).getOrElse("") + + override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0) + + override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.head.value) + + override def readAll(): Map[String, String] = { + val builder = Map.newBuilder[String, String] + inner.headers.foreach(h => builder += (h.name.toString -> h.value)) + builder.result() + } + } + + def errorResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] { + override def write(header: String, value: String): Unit = () + override def statusCode: Int = 500 + override def build(): Response[F] = Response[F](status = Status.InternalServerError) + } + + //TODO both of these + def notFoundResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] { + private var _headers = Headers.empty + + override def write(header: String, value: String): Unit = + _headers = _headers.put(Header.Raw(CIString(header), value)) + + override def statusCode: Int = 404 + override def build(): Response[F] = Response[F](status = Status.NotFound, headers = _headers) + } + + def getResponseBuilder[F[_]](response: Response[F]): ResponseBuilder[Response[F]] = new HttpMessage.ResponseBuilder[Response[F]] { + private var _headers = response.headers + + override def statusCode: Int = response.status.code + + override def build(): Response[F] = response.withHeaders(_headers) + + override def write(header: String, value: String): Unit = + _headers = _headers.put(Header.Raw(CIString(header), value)) + } + + + def getRequestBuilder[F[_]](request: Request[F]): HttpMessage.RequestBuilder[Request[F]] = new HttpMessage.RequestBuilder[Request[F]] { + private var _headers = request.headers + + override def build(): Request[F] = request.withHeaders(_headers) + + override def write(header: String, value: String): Unit = + _headers = _headers.put(Header.Raw(CIString(header), value)) + + override def url: String = request.uri.toString() + + override def path: String = request.uri.path.renderString + + override def method: String = request.method.name + + override def host: String = request.uri.authority.map(_.host.value).getOrElse("") + + override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0) + + override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.head.value) + + override def readAll(): Map[String, String] = { + val builder = Map.newBuilder[String, String] + request.headers.foreach(h => builder += (h.name.toString -> h.value)) + builder.result() + } + } + +} diff --git a/modules/1.0/src/test/resources/application.conf b/modules/1.0/src/test/resources/application.conf new file mode 100644 index 0000000..d00dadf --- /dev/null +++ b/modules/1.0/src/test/resources/application.conf @@ -0,0 +1,5 @@ +kamon.instrumentation.http4s.server.tracing.operations{ + mappings { + "/tracing/*/ok" = "/tracing/:name/ok" + } +} diff --git a/modules/1.0/src/test/resources/logback.xml b/modules/1.0/src/test/resources/logback.xml new file mode 100644 index 0000000..a420a01 --- /dev/null +++ b/modules/1.0/src/test/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/modules/1.0/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala b/modules/1.0/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala new file mode 100644 index 0000000..e65ba09 --- /dev/null +++ b/modules/1.0/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala @@ -0,0 +1,145 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import cats.effect.unsafe.implicits.global +import cats.effect.{IO, Resource} +import kamon.Kamon +import kamon.http4s.middleware.client.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} +import kamon.testkit.TestSpanReporter +import kamon.trace.Span +import org.http4s.client._ +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.http4s.{HttpRoutes, Response} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} + +import java.net.ConnectException + +class ClientInstrumentationSpec extends WordSpec + with Matchers + with Eventually + with SpanSugar + with OptionValues + with TestSpanReporter + with BeforeAndAfterAll { + + val service = HttpRoutes.of[IO] { + case GET -> Root / "tracing" / "ok" => Ok("ok") + case GET -> Root / "tracing" / "not-found" => NotFound("not-found") + case GET -> Root / "tracing" / "error" => InternalServerError("This page will generate an error!") + } + + val client: Client[IO] = KamonSupport[IO](Client.fromHttpApp[IO](service.orNotFound)) + + "The Client instrumentation" should { + "propagate the current context and generate a span inside an action and complete the ws request" in { + val okSpan = Kamon.spanBuilder("ok-operation-span").start() + + Kamon.runWithSpan(okSpan) { + client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok" + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/ok" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + span.metricTags.get(plain("parentOperation")) shouldBe "ok-operation-span" + + okSpan.id == span.parentId + } + } + + "close and finish a span even if an exception is thrown by the client" in { + val okSpan = Kamon.spanBuilder("client-exception").start() + val client: Client[IO] = KamonSupport[IO]( + Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) + ) + + Kamon.runWithSpan(okSpan) { + a[ConnectException] should be thrownBy { + client.expect[String]("/tracing/ok").unsafeRunSync() + } + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "/tracing/ok" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.hasError shouldBe true + + okSpan.id == span.parentId + } + } + + "propagate the current context and generate a span called not-found and complete the ws request" in { + val notFoundSpan = Kamon.spanBuilder("not-found-operation-span").start() + + Kamon.runWithSpan(notFoundSpan) { + client.expect[String]("/tracing/not-found").attempt.unsafeRunSync().isLeft shouldBe true + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "/tracing/not-found" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 404 + span.metricTags.get(plain("parentOperation")) shouldBe "not-found-operation-span" + + notFoundSpan.id == span.parentId + } + } + + + + "propagate the current context and generate a span with error and complete the ws request" in { + val errorSpan = Kamon.spanBuilder("error-operation-span").start() + + Kamon.runWithSpan(errorSpan) { + client.expect[String]("/tracing/error").attempt.unsafeRunSync().isLeft shouldBe true + } + + eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/error" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "http4s.client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.hasError shouldBe true + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + span.metricTags.get(plain("parentOperation")) shouldBe "error-operation-span" + + errorSpan.id == span.parentId + } + } + + } + + +} diff --git a/modules/1.0/src/test/scala/kamon/http4s/HttpMetricsSpec.scala b/modules/1.0/src/test/scala/kamon/http4s/HttpMetricsSpec.scala new file mode 100644 index 0000000..c5cb9b0 --- /dev/null +++ b/modules/1.0/src/test/scala/kamon/http4s/HttpMetricsSpec.scala @@ -0,0 +1,108 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import cats.effect._ +import cats.effect.unsafe.implicits.global +import cats.implicits._ +import kamon.http4s.middleware.server.KamonSupport +import kamon.instrumentation.http.HttpServerMetrics +import kamon.testkit.InstrumentInspection +import org.http4s.HttpRoutes +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.client.Client +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.http4s.server.Server +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar +import org.scalatest.{Matchers, OptionValues, WordSpec} + +class HttpMetricsSpec extends WordSpec + with Matchers + with Eventually + with SpanSugar + with InstrumentInspection.Syntax + with OptionValues + { + + val srv = + BlazeServerBuilder[IO](global.compute) + .bindLocal(43567) + .withHttpApp(KamonSupport(HttpRoutes.of[IO] { + case GET -> Root / "tracing" / "ok" => Ok("ok") + case GET -> Root / "tracing" / "not-found" => NotFound("not-found") + case GET -> Root / "tracing" / "error" => InternalServerError("This page will generate an error!") + }, "/127.0.0.1", 43567).orNotFound) + .resource + + val client = + BlazeClientBuilder[IO](global.compute).withMaxTotalConnections(10).resource + + val metrics = + Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) + + + def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = + (srv, client, metrics).tupled.use(f.tupled).unsafeRunSync() + + private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = { + client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path") + } + + "The HttpMetrics" should { + + "track the total of active requests" in withServerAndClient { (server, client, serverMetrics) => + + val requests = List + .fill(100) { + get("/tracing/ok")(server, client) + }.parSequence_ + + val test = IO { + serverMetrics.activeRequests.distribution().max should be > 1L + serverMetrics.activeRequests.distribution().min shouldBe 0L + } + requests *> test + } + + "track the response time with status code 2xx" in withServerAndClient { (server, client, serverMetrics) => + val requests: IO[Unit] = List.fill(100)(get("/tracing/ok")(server, client)).sequence_ + + val test = IO(serverMetrics.requestsSuccessful.value() should be >= 0L) + + requests *> test + } + + "track the response time with status code 4xx" in withServerAndClient { (server, client, serverMetrics) => + val requests: IO[Unit] = List.fill(100)(get("/tracing/not-found")(server, client).attempt).sequence_ + + val test = IO(serverMetrics.requestsClientError.value() should be >= 0L) + + requests *> test + } + + "track the response time with status code 5xx" in withServerAndClient { (server, client, serverMetrics) => + val requests: IO[Unit] = List.fill(100)(get("/tracing/error")(server, client).attempt).sequence_ + + val test = IO(serverMetrics.requestsServerError.value() should be >= 0L) + + requests *> test + } + } +} diff --git a/modules/1.0/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala b/modules/1.0/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala new file mode 100644 index 0000000..04e4446 --- /dev/null +++ b/modules/1.0/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala @@ -0,0 +1,175 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.http4s + +import cats.effect.unsafe.implicits.global +import cats.effect.{Concurrent, IO} +import cats.implicits._ +import kamon.http4s.middleware.server.KamonSupport +import kamon.tag.Lookups.{plain, plainLong} +import kamon.testkit.TestSpanReporter +import kamon.trace.Span +import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.client.Client +import org.http4s.dsl.io._ +import org.http4s.implicits._ +import org.http4s.server.Server +import org.http4s.{Headers, HttpRoutes} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} +import org.typelevel.ci.CIString + +class ServerInstrumentationSpec extends WordSpec + with Matchers + with Eventually + with SpanSugar + with OptionValues + with TestSpanReporter + with BeforeAndAfterAll { + + val srv = + BlazeServerBuilder[IO](global.compute) + .bindAny() + .withHttpApp(KamonSupport(HttpRoutes.of[IO] { + case GET -> Root / "tracing" / "ok" => Ok("ok") + case GET -> Root / "tracing" / "error" => InternalServerError("error!") + case GET -> Root / "tracing" / "errorinternal" => throw new RuntimeException("ble") + case GET -> Root / "tracing" / name / "ok" => Ok(s"ok $name") + } + ,"", 0).orNotFound) + .resource + + val client = BlazeClientBuilder[IO](global.compute).resource + + def withServerAndClient[A](f: (Server, Client[IO]) => IO[A]): A = + (srv, client).tupled.use(f.tupled).unsafeRunSync() + + private def getResponse[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[(String, Headers)] = { + client.get(s"http://127.0.0.1:${server.address.getPort}$path") { r => + r.bodyText.compile.toList.map(_.mkString).map(_ -> r.headers) + } + } + + "The Server instrumentation" should { + "propagate the current context and respond to the ok action" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/ok")(server, client).map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + body should startWith("ok") + } + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/ok" + span.kind shouldBe Span.Kind.Server + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + } + } + + request *> test + } + + "propagate the current context and respond to the not-found action" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/not-found")(server, client).map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + } + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "unhandled" + span.kind shouldBe Span.Kind.Server + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 404 + } + } + + request *> test + } + + "propagate the current context and respond to the error action" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/error")(server, client).map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + body should startWith("error!") + } + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/error" + span.kind shouldBe Span.Kind.Server + span.hasError shouldBe true + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + } + } + + request *> test + } + "propagate the current context and respond to the error while processing" in withServerAndClient { (server, client) => + val request = getResponse("/tracing/errorinternal")(server, client) + /* TODO serviceErrorHandler kicks in and rewrites response, loosing trace information + .map { case (body, headers) => + headers.get(CIString("trace-id")).nonEmpty shouldBe true + } + */ + + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/errorinternal" + span.kind shouldBe Span.Kind.Server + span.hasError shouldBe true + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + } + } + + request *> test + } + + "handle path parameter" in withServerAndClient { (server, client) => + val request: IO[(String, Headers)] = + getResponse("/tracing/bazz/ok")(server, client) + val test = IO { + eventually(timeout(5.seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe "/tracing/:name/ok" + span.kind shouldBe Span.Kind.Server + span.hasError shouldBe false + span.metricTags.get(plain("component")) shouldBe "http4s.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + } + } + + request *> test + } + } +} diff --git a/project/build.properties b/project/build.properties index 06703e3..10fd9ee 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.9 +sbt.version=1.5.5 diff --git a/version.sbt b/version.sbt index 3e00daa..854b5c1 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "2.0.3" +ThisBuild / version := "2.2.0"