Skip to content

Commit 58201b8

Browse files
committed
Cleanup code for server resources
1 parent b0bed91 commit 58201b8

13 files changed

Lines changed: 33 additions & 50 deletions

File tree

client/testserver/src/main/scala/sttp/tapir/client/tests/HttpServer.scala

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,20 @@ import org.http4s._
1616
import org.slf4j.LoggerFactory
1717
import org.typelevel.ci.CIString
1818
import scodec.bits.ByteVector
19-
import sttp.tapir.client.tests.HttpServer._
2019

21-
import scala.concurrent.ExecutionContext
22-
23-
object HttpServer {
20+
object HttpServer extends ResourceApp.Forever {
2421
type Port = Int
2522

26-
def main(args: Array[String]): Unit = {
23+
def run(args: List[String]): Resource[IO, Unit] = {
2724
val port = args.headOption.map(_.toInt).getOrElse(51823)
28-
new HttpServer(port).start()
25+
new HttpServer(port).build
2926
}
3027
}
3128

32-
class HttpServer(port: Port) {
29+
class HttpServer(port: HttpServer.Port) {
3330

3431
private val logger = LoggerFactory.getLogger(getClass)
3532

36-
private var stopServer: IO[Unit] = _
37-
3833
//
3934

4035
private object numParam extends QueryParamDecoderMatcher[Int]("num")
@@ -75,7 +70,7 @@ class HttpServer(port: Port) {
7570
case r @ POST -> Root / "api" / "echo" / "multipart" =>
7671
r.decode[multipart.Multipart[IO]] { mp =>
7772
val parts: Vector[multipart.Part[IO]] = mp.parts
78-
def toString(s: fs2.Stream[IO, Byte]): IO[String] = s.through(fs2.text.utf8Decode).compile.foldMonoid
73+
def toString(s: fs2.Stream[IO, Byte]): IO[String] = s.through(fs2.text.utf8.decode).compile.foldMonoid
7974
def partToString(name: String): IO[String] = parts.find(_.name.contains(name)).map(p => toString(p.body)).getOrElse(IO.pure(""))
8075
partToString("fruit").product(partToString("amount")).flatMap { case (fruit, amount) =>
8176
Ok(s"$fruit=$amount")
@@ -212,23 +207,11 @@ class HttpServer(port: Port) {
212207

213208
//
214209

215-
def start(): Unit = {
216-
val (_, _stopServer) = BlazeServerBuilder[IO]
210+
def build: Resource[IO, server.Server] = BlazeServerBuilder[IO]
217211
.withExecutionContext(ExecutionContext.global)
218212
.bindHttp(port)
219213
.withHttpWebSocketApp(app)
220214
.resource
221-
.map(_.address.getPort)
222-
.allocated
223-
.unsafeRunSync()
224-
225-
stopServer = _stopServer
226-
227-
logger.info(s"Server on port $port started")
228-
}
229-
230-
def close(): Unit = {
231-
stopServer.unsafeRunSync()
232-
logger.info(s"Server on port $port stopped")
233-
}
215+
.evalTap(_ => IO(logger.info(s"Server on port $port started")))
216+
.onFinalize(IO(logger.info(s"Server on port $port stopped")))
234217
}

doc/tutorials/07_cats_effect.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp:
230230
.bindHttp(8080, "localhost")
231231
.withHttpApp(Router("/" -> allRoutes).orNotFound)
232232
.resource
233-
.use(_ => IO.never)
234-
.as(ExitCode.Success)
233+
.useForever
235234
```
236235

237236
Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated

examples/src/main/scala/sttp/tapir/examples/streaming/ProxyHttp4sFs2Server.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,4 @@ object ProxyHttp4sFs2Server extends IOApp:
6262
.bindHttp(8080, "localhost")
6363
.withHttpApp(Router("/" -> routes).orNotFound)
6464
.resource
65-
} yield ())
66-
.use { _ => IO.never }
67-
.as(ExitCode.Success)
65+
} yield ()).useForever

examples/src/main/scala/sttp/tapir/examples/streaming/StreamingHttp4sFs2ServerOrError.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,4 @@ object StreamingHttp4sFs2ServerOrError extends IOApp:
5656
.bindHttp(8080, "localhost")
5757
.withHttpApp(Router("/" -> userDataRoutes).orNotFound)
5858
.resource
59-
.use { _ => IO.never }
60-
.as(ExitCode.Success)
59+
.useForever

generated-doc/out/tutorials/07_cats_effect.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ object HelloWorldTapir extends IOApp:
230230
.bindHttp(8080, "localhost")
231231
.withHttpApp(Router("/" -> allRoutes).orNotFound)
232232
.resource
233-
.use(_ => IO.never)
234-
.as(ExitCode.Success)
233+
.useForever
235234
```
236235

237236
Hence, we first generate endpoint descriptions, which correspond to exposing the Swagger UI (containing the generated

perf-tests/src/main/scala/sttp/tapir/perf/http4s/Http4s.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,9 @@ object server {
115115
.withMaxConnections(maxConnections)
116116
.withConnectorPoolSize(connectorPoolSize)
117117
.resource
118-
.allocated
119-
.map(_._2)
120-
.map(_.flatTap { _ =>
121-
IO.println("Http4s server closed.")
122-
})
118+
.useForever
119+
.start
120+
.map(_.cancel *> IO.println("Http4s server closed."))
123121
}
124122

125123
object TapirServer extends ServerRunner { override def start = server.runServer(Tapir.router(1)) }

server/http4s-server/src/test/scala/sttp/tapir/server/http4s/Http4sServerTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ class Http4sServerTest[R >: Fs2Streams[IO] with WebSockets] extends TestSuite wi
109109
endpoint.out(streamBinaryBody(Fs2Streams[IO])(CodecFormat.OctetStream())),
110110
"streaming should send data according to producer stream rate"
111111
)((_: Unit) =>
112-
IO(Right(fs2.Stream.awakeEvery[IO](1.second).map(_.toString()).through(fs2.text.utf8Encode).interruptAfter(10.seconds)))
112+
IO(Right(fs2.Stream.awakeEvery[IO](1.second).map(_.toString()).through(fs2.text.utf8.encode).interruptAfter(10.seconds)))
113113
) { (backend, baseUri) =>
114114
basicRequest
115115
.response(
116116
asStream(Fs2Streams[IO])(bs => {
117-
bs.through(fs2.text.utf8Decode).mapAccumulate(0)((pings, currentTime) => (pings + 1, currentTime)).compile.last
117+
bs.through(fs2.text.utf8.decode).mapAccumulate(0)((pings, currentTime) => (pings + 1, currentTime)).compile.last
118118
})
119119
)
120120
.get(baseUri)

server/http4s-server/zio/src/test/scala/sttp/tapir/server/http4s/ztapir/ZHttp4sTestServerInterpreter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import sttp.tapir.server.tests.TestServerInterpreter
1515
import sttp.tapir.tests._
1616
import sttp.tapir.ztapir.ZServerEndpoint
1717
import zio.{Runtime, Task, Unsafe}
18+
import zio.interop._
1819
import zio.interop.catz._
20+
import zio.interop.catz.implicits._
1921

2022
import scala.concurrent.ExecutionContext
2123
import scala.concurrent.duration.FiniteDuration
@@ -27,7 +29,6 @@ object ZHttp4sTestServerInterpreter {
2729
}
2830

2931
class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStreams with WebSockets, ServerOptions, Routes] {
30-
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
3132

3233
override def route(es: List[ZServerEndpoint[Any, ZioStreams with WebSockets]], interceptors: Interceptors): Routes = {
3334
val serverOptions: ServerOptions = interceptors(Http4sServerOptions.customiseInterceptors[Task]).options
@@ -49,7 +50,7 @@ class ZHttp4sTestServerInterpreter extends TestServerInterpreter[Task, ZioStream
4950
.map(_.address.getPort)
5051
.mapK(new ~>[Task, IO] {
5152
// Converting a ZIO effect to an Cats Effect IO effect
52-
def apply[B](fa: Task[B]): IO[B] = IO.fromFuture(Unsafe.unsafe(implicit u => IO(Runtime.default.unsafe.runToFuture(fa))))
53+
def apply[B](fa: Task[B]): IO[B] = fa.toEffect[IO]
5354
})
5455
}
5556
}

server/tests/src/main/scala/sttp/tapir/server/tests/CreateServerTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ class DefaultCreateServerTest[F[_], +R, OPTIONS, ROUTE](
133133
Test(name)(
134134
resources
135135
.use { port =>
136-
runTest(backend, uri"http://localhost:$port").guarantee(IO(logger.info(s"Tests completed on port $port")))
136+
runTest(backend, uri"http://localhost:$port").guaranteeCase(exitCase =>
137+
IO(logger.info(s"Test on port $port: ${exitCase.getClass.getSimpleName}"))
138+
)
137139
}
138140
.unsafeToFuture()
139141
)

server/tests/src/main/scala/sttp/tapir/server/tests/TestServerInterpreter.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package sttp.tapir.server.tests
22

33
import cats.data.NonEmptyList
4-
import cats.effect.{IO, Resource}
4+
import cats.effect.{Deferred, IO, Resource}
55
import sttp.tapir.server.ServerEndpoint
66
import sttp.tapir.server.interceptor.CustomiseInterceptors
77
import sttp.tapir.tests._
@@ -21,8 +21,12 @@ trait TestServerInterpreter[F[_], +R, OPTIONS, ROUTE] {
2121
def serverWithStop(
2222
routes: NonEmptyList[ROUTE],
2323
gracefulShutdownTimeout: Option[FiniteDuration] = None
24-
): Resource[IO, (Port, KillSwitch)] =
25-
Resource.eval(server(routes, gracefulShutdownTimeout).allocated)
24+
): Resource[IO, (Port, KillSwitch)] = for {
25+
stopSignal <- Resource.eval(Deferred[IO, Unit])
26+
portValue <- Resource.eval(Deferred[IO, Port])
27+
_ <- server(routes, gracefulShutdownTimeout).use(port => portValue.complete(port) *> stopSignal.get).background
28+
port <- Resource.eval(portValue.get)
29+
} yield (port, stopSignal.complete(()).void)
2630

2731
def server(routes: NonEmptyList[ROUTE], gracefulShutdownTimeout: Option[FiniteDuration] = None): Resource[IO, Port]
2832
}

0 commit comments

Comments
 (0)