diff --git a/zio-http/shared/src/main/scala/zio/http/ServerSentEvent.scala b/zio-http/shared/src/main/scala/zio/http/ServerSentEvent.scala index dfe8c4c2d..c7c4b4987 100644 --- a/zio-http/shared/src/main/scala/zio/http/ServerSentEvent.scala +++ b/zio-http/shared/src/main/scala/zio/http/ServerSentEvent.scala @@ -45,12 +45,17 @@ final case class ServerSentEvent[T]( retry: Option[Duration] = None, ) { - def encode(implicit binaryCodec: BinaryCodec[T]): String = { - val dataString: String = - data match { - case s: String => s - case _ => binaryCodec.encode(data).asString(Charsets.Utf8) - } + def encode(implicit binaryCodec: BinaryCodec[T]): String = + encodeWith { + case s: String => s + case data => binaryCodec.encode(data).asString(Charsets.Utf8) + } + + def encoded(implicit ev: T <:< String): String = + encodeWith(ev) + + def encodeWith(f: T => String): String = { + val dataString: String = f(data) val dataLines: Array[String] = dataString.split("\n") val isComment = dataString.startsWith(":") @@ -101,6 +106,7 @@ final case class ServerSentEvent[T]( } sb.append('\n').toString } + } object ServerSentEvent { @@ -128,17 +134,18 @@ object ServerSentEvent { implicit def defaultBinaryCodec[T](implicit schema: Schema[T]): BinaryCodec[ServerSentEvent[T]] = defaultContentCodec(schema).defaultCodec - implicit def binaryCodec[T](implicit binaryCodec: BinaryCodec[T]): BinaryCodec[ServerSentEvent[T]] = - new BinaryCodec[ServerSentEvent[T]] { - override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[T]] = { + val rawEventLines: BinaryCodec[ServerSentEvent[Chunk[String]]] = + new BinaryCodec[ServerSentEvent[Chunk[String]]] { + + override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[Chunk[String]]] = { val event = processEvent(Chunk.fromArray(whole.asString(Charsets.Utf8).split("\n"))) if (event.data.isEmpty && event.retry.isEmpty) Left(DecodeError.EmptyContent("Neither 'data' nor 'retry' fields specified")) else - decodeDataField(event) + Right(event) } - override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[T]] = + override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[Chunk[String]]] = ZPipeline.utf8Decode.orDie >>> ZPipeline.splitOn("\n") >>> ZPipeline @@ -151,7 +158,6 @@ object ServerSentEvent { .filter { case (completed, event) => completed && event.nonEmpty } .map { case (_, lines) => processEvent(lines) } .filter(event => event.data.nonEmpty || event.retry.nonEmpty) - .mapZIO(event => ZIO.fromEither(decodeDataField(event))) private def processEvent(lines: Chunk[String]): ServerSentEvent[Chunk[String]] = lines.foldLeft(ServerSentEvent(data = Chunk.empty[String])) { case (event, line) => @@ -172,6 +178,40 @@ object ServerSentEvent { } } + override def encode(value: ServerSentEvent[Chunk[String]]): Chunk[Byte] = + Chunk.fromArray(value.encodeWith(_.mkString("\n")).getBytes(Charsets.Utf8)) + + override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[Chunk[String]], Byte] = + ZPipeline.mapChunks(value => value.flatMap(c => c.encodeWith(_.mkString("\n")).getBytes(Charsets.Utf8))) + + } + + val rawEvent: BinaryCodec[ServerSentEvent[String]] = + new BinaryCodec[ServerSentEvent[String]] { + + override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[String]] = + rawEventLines.decode(whole).map { e => e.copy(data = e.data.asString) } + + override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[String]] = + rawEventLines.streamDecoder.map { e => e.copy(data = e.data.asString) } + + override def encode(value: ServerSentEvent[String]): Chunk[Byte] = + Chunk.fromArray(value.encoded.getBytes(Charsets.Utf8)) + + override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[String], Byte] = + ZPipeline.mapChunks(value => value.flatMap(c => c.encoded.getBytes(Charsets.Utf8))) + + } + + implicit def binaryCodec[T](implicit binaryCodec: BinaryCodec[T]): BinaryCodec[ServerSentEvent[T]] = + new BinaryCodec[ServerSentEvent[T]] { + + override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[T]] = + rawEventLines.decode(whole).flatMap(decodeDataField) + + override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[T]] = + rawEventLines.streamDecoder.mapZIO(event => ZIO.fromEither(decodeDataField(event))) + private def decodeDataField(event: ServerSentEvent[Chunk[String]]): Either[DecodeError, ServerSentEvent[T]] = binaryCodec .decode(Chunk.fromArray(event.data.mkString("\n").getBytes(Charsets.Utf8))) @@ -182,6 +222,7 @@ object ServerSentEvent { override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[T], Byte] = ZPipeline.mapChunks(value => value.flatMap(c => c.encode.getBytes(Charsets.Utf8))) + } implicit def contentCodec[T](implicit