@@ -23,90 +23,92 @@ package fs2
2323package grpc
2424package server
2525
26- import scala .concurrent .duration ._
2726import cats .effect ._
2827import cats .effect .std .Dispatcher
2928import cats .effect .testkit .TestContext
3029import cats .effect .testkit .TestControl
3130import io .grpc ._
31+ import scala .concurrent .duration ._
3232
3333class ServerSuite extends Fs2GrpcSuite {
3434
3535 private val compressionOps =
3636 ServerOptions .default.configureCallOptions(_.withServerCompressor(Some (GzipCompressor )))
3737
38- private def startCall [ R ] (
39- sync : Fs2ServerCallHandler [IO ] => ServerCallHandler [String , Int ],
38+ private def startCall (
39+ implement : Fs2ServerCallHandler [IO ] => ServerCallHandler [String , Int ],
4040 serverOptions : ServerOptions = ServerOptions .default
41- )(f : (ServerCall .Listener [String ], DummyServerCall ) => IO [R ]): IO [R ] =
42- TestControl .executeEmbed(
43- Dispatcher [IO ]
44- .map(Fs2ServerCallHandler [IO ](_, serverOptions))
45- .use(h =>
46- IO .defer {
47- val dummy = new DummyServerCall
48- f(sync(h).startCall(dummy, new Metadata ()), dummy)
49- }
50- )
51- )
41+ )(call : ServerCall [String , Int ], thunk : ServerCall .Listener [String ] => IO [Unit ]): IO [Unit ] =
42+ for {
43+ releaseRef <- IO .ref[IO [Unit ]](IO .unit)
44+ startBarrier <- Deferred [IO , Unit ]
45+ tc <- TestControl .execute {
46+ for {
47+ allocated <- Dispatcher [IO ].map(Fs2ServerCallHandler [IO ](_, serverOptions)).allocated
48+ (handler, release) = allocated
49+ _ <- releaseRef.set(release)
50+ listener <- IO (implement(handler).startCall(call, new Metadata ()))
51+ _ <- startBarrier.get
52+ _ <- IO .defer(thunk(listener))
53+ } yield ()
54+ }
55+ _ <- tc.tick
56+ _ <- startBarrier.complete(())
57+ _ <- tc.tickAll
58+ _ <- releaseRef.get
59+ } yield ()
60+
61+ private def syncCall (
62+ fs : (ServerCall .Listener [String ] => Unit )*
63+ ): ServerCall .Listener [String ] => IO [Unit ] =
64+ listener => IO (fs.foreach(_.apply(listener)))
5265
5366 test(" unaryToUnary with compression" ) {
54- startCall(_.unaryToUnaryCall((req, _) => IO (req.length)), compressionOps) { (_, dummy) =>
55- IO {
56- assertEquals(dummy.explicitCompressor, Some (" gzip" ))
57- }
58- }
67+ testCompression(_.unaryToUnaryCall((req, _) => IO (req.length)))
5968 }
6069
6170 test(" unaryToStream with compression" ) {
62- startCall(_.unaryToStreamingCall((req, _) => Stream .emit(req.length)), compressionOps) { (_, dummy) =>
63- IO {
64- assertEquals(dummy.explicitCompressor, Some (" gzip" ))
65- }
66- }
71+ testCompression(_.unaryToStreamingCall((req, _) => Stream .emit(req.length).repeatN(5 )))
6772 }
6873
6974 test(" streamToUnary with compression" ) {
70- startCall(_.streamingToUnaryCall((req, _) => req.compile.count.map(_.toInt)), compressionOps) { (_, dummy) =>
71- IO {
72- assertEquals(dummy.explicitCompressor, Some (" gzip" ))
73- }
74- }
75+ testCompression(_.streamingToUnaryCall((req, _) => req.compile.foldMonoid.map(_.length)))
7576 }
7677
77- test(" streamToStream with compression" ) {
78- startCall(_.streamingToStreamingCall((req, _) => req.map(_.length)), compressionOps) { (_, dummy) =>
79- IO {
80- assertEquals(dummy.explicitCompressor, Some (" gzip" ))
81- }
78+ test(" streamToStream with compression" )(
79+ testCompression(_.streamingToStreamingCall((req, _) => req.map(_.length)))
80+ )
81+
82+ private def testCompression (sync : Fs2ServerCallHandler [IO ] => ServerCallHandler [String , Int ]): IO [Unit ] = {
83+ val dummy = new DummyServerCall
84+ startCall(sync, compressionOps)(dummy, _ => IO .unit) >> IO {
85+ assertEquals(dummy.explicitCompressor, Some (" gzip" ))
8286 }
8387 }
8488
8589 test(" single message to unaryToUnary" ) {
86- startCall(_.unaryToUnaryCall((req, _) => IO (req.length))) { (listener, dummy) =>
87- listener.onMessage( " 123 " )
88- listener.onHalfClose()
89- IO {
90- assertEquals(dummy.explicitCompressor, None )
91- assertEquals(dummy.messages.size, 1 )
92- assertEquals(dummy.messages( 0 ), 3 )
93- assertEquals(dummy.currentStatus.isDefined, true )
94- assertEquals(dummy.currentStatus.get.isOk , true )
95- }.delayBy( 1 .seconds )
90+ val dummy = new DummyServerCall
91+ startCall(_.unaryToUnaryCall((req, _) => IO (req.length)))(
92+ dummy,
93+ syncCall(_.onMessage( " 123 " ), _.onHalfClose())
94+ ) >> IO {
95+ assertEquals(dummy.explicitCompressor, None )
96+ assertEquals(dummy.messages.size, 1 )
97+ assertEquals(dummy.messages( 0 ), 3 )
98+ assertEquals(dummy.currentStatus.isDefined , true )
99+ assertEquals(dummy.currentStatus.get.isOk, true )
96100 }
97101 }
98102
99- runTest (" cancellation for unaryToUnary" ) { (tc, d) =>
103+ test (" cancellation for unaryToUnary" ) {
100104 val dummy = new DummyServerCall
101- val listener = Fs2ServerCallHandler [IO ](d, ServerOptions .default)
102- .unaryToUnaryCall[String , Int ]((req, _) => IO (req.length))
103- .startCall(dummy, new Metadata ())
104-
105- listener.onCancel()
106- tc.tick()
107-
108- assertEquals(dummy.currentStatus, None )
109- assertEquals(dummy.messages.length, 0 )
105+ startCall(_.unaryToUnaryCall((req, _) => IO (req.length)))(
106+ dummy,
107+ syncCall(_.onCancel())
108+ ) >> IO {
109+ assertEquals(dummy.currentStatus, None )
110+ assertEquals(dummy.messages.length, 0 )
111+ }
110112 }
111113
112114 runTest(" cancellation on the fly for unaryToUnary" ) { (tc, d) =>
@@ -145,13 +147,12 @@ class ServerSuite extends Fs2GrpcSuite {
145147 }
146148
147149 test(" no messages to unaryToUnary" ) {
148- startCall(
149- _.unaryToUnaryCall((req, _) => IO (req.length))
150- ) { (listener, dummy) =>
151- listener.onHalfClose()
152- IO {
153- assertEquals(dummy.currentStatus.map(_.getCode), Some (Status .Code .INTERNAL ))
154- }.delayBy(1 .seconds)
150+ val dummy = new DummyServerCall
151+ startCall(_.unaryToUnaryCall((req, _) => IO (req.length)))(
152+ dummy,
153+ syncCall(_.onHalfClose())
154+ ) >> IO {
155+ assertEquals(dummy.currentStatus.map(_.getCode), Some (Status .Code .INTERNAL ))
155156 }
156157 }
157158
@@ -204,14 +205,14 @@ class ServerSuite extends Fs2GrpcSuite {
204205 }
205206
206207 test(" cancellation for streamingToStreaming" ) {
208+ val dummy = new DummyServerCall
207209 startCall(
208210 _.streamingToStreamingCall((_, _) => Stream .emit(3 ).repeat.take(5 ).zipLeft(Stream .awakeDelay[IO ](1 .seconds)))
209- ) { (listener, dummy) =>
210- IO {
211- listener.onCancel() // wait onCancel for stream is started
212- }.delayBy(2 .seconds) >> IO {
213- assertEquals(dummy.currentStatus.map(_.getCode), Some (Status .Code .CANCELLED ))
214- }.delayBy(1 .seconds) // wait for stream cancellation is completed
211+ )(
212+ dummy,
213+ syncCall(_.onCancel())
214+ ) >> IO {
215+ assertEquals(dummy.currentStatus.map(_.getCode), Some (Status .Code .CANCELLED ))
215216 }
216217 }
217218
0 commit comments