Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions zio-http/shared/src/main/scala/zio/http/ServerSentEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ final case class ServerSentEvent[T](
retry: Option[Duration] = None,
) {

def encode(implicit binaryCodec: BinaryCodec[T]): String = {
val dataString: String =
data match {
case s: String => s
case _ => binaryCodec.encode(data).asString(Charsets.Utf8)
}
def encode(implicit binaryCodec: BinaryCodec[T]): String =
encodeWith {
case s: String => s
case data => binaryCodec.encode(data).asString(Charsets.Utf8)
}

def encoded(implicit ev: T <:< String): String =
encodeWith(ev)

def encodeWith(f: T => String): String = {
val dataString: String = f(data)

val dataLines: Array[String] = dataString.split("\n")
val isComment = dataString.startsWith(":")
Expand Down Expand Up @@ -101,6 +106,7 @@ final case class ServerSentEvent[T](
}
sb.append('\n').toString
}

}

object ServerSentEvent {
Expand Down Expand Up @@ -128,17 +134,18 @@ object ServerSentEvent {
implicit def defaultBinaryCodec[T](implicit schema: Schema[T]): BinaryCodec[ServerSentEvent[T]] =
defaultContentCodec(schema).defaultCodec

implicit def binaryCodec[T](implicit binaryCodec: BinaryCodec[T]): BinaryCodec[ServerSentEvent[T]] =
new BinaryCodec[ServerSentEvent[T]] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[T]] = {
val rawEventLines: BinaryCodec[ServerSentEvent[Chunk[String]]] =
new BinaryCodec[ServerSentEvent[Chunk[String]]] {

override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[Chunk[String]]] = {
val event = processEvent(Chunk.fromArray(whole.asString(Charsets.Utf8).split("\n")))
if (event.data.isEmpty && event.retry.isEmpty)
Left(DecodeError.EmptyContent("Neither 'data' nor 'retry' fields specified"))
else
decodeDataField(event)
Right(event)
}

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[T]] =
override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[Chunk[String]]] =
ZPipeline.utf8Decode.orDie >>>
ZPipeline.splitOn("\n") >>>
ZPipeline
Expand All @@ -151,7 +158,6 @@ object ServerSentEvent {
.filter { case (completed, event) => completed && event.nonEmpty }
.map { case (_, lines) => processEvent(lines) }
.filter(event => event.data.nonEmpty || event.retry.nonEmpty)
.mapZIO(event => ZIO.fromEither(decodeDataField(event)))

private def processEvent(lines: Chunk[String]): ServerSentEvent[Chunk[String]] =
lines.foldLeft(ServerSentEvent(data = Chunk.empty[String])) { case (event, line) =>
Expand All @@ -172,6 +178,40 @@ object ServerSentEvent {
}
}

override def encode(value: ServerSentEvent[Chunk[String]]): Chunk[Byte] =
Chunk.fromArray(value.encodeWith(_.mkString("\n")).getBytes(Charsets.Utf8))

override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[Chunk[String]], Byte] =
ZPipeline.mapChunks(value => value.flatMap(c => c.encodeWith(_.mkString("\n")).getBytes(Charsets.Utf8)))

}

val rawEvent: BinaryCodec[ServerSentEvent[String]] =
new BinaryCodec[ServerSentEvent[String]] {

override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[String]] =
rawEventLines.decode(whole).map { e => e.copy(data = e.data.asString) }

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[String]] =
rawEventLines.streamDecoder.map { e => e.copy(data = e.data.asString) }

override def encode(value: ServerSentEvent[String]): Chunk[Byte] =
Chunk.fromArray(value.encoded.getBytes(Charsets.Utf8))

override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[String], Byte] =
ZPipeline.mapChunks(value => value.flatMap(c => c.encoded.getBytes(Charsets.Utf8)))

}

implicit def binaryCodec[T](implicit binaryCodec: BinaryCodec[T]): BinaryCodec[ServerSentEvent[T]] =
new BinaryCodec[ServerSentEvent[T]] {

override def decode(whole: Chunk[Byte]): Either[DecodeError, ServerSentEvent[T]] =
rawEventLines.decode(whole).flatMap(decodeDataField)

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, ServerSentEvent[T]] =
rawEventLines.streamDecoder.mapZIO(event => ZIO.fromEither(decodeDataField(event)))

private def decodeDataField(event: ServerSentEvent[Chunk[String]]): Either[DecodeError, ServerSentEvent[T]] =
binaryCodec
.decode(Chunk.fromArray(event.data.mkString("\n").getBytes(Charsets.Utf8)))
Expand All @@ -182,6 +222,7 @@ object ServerSentEvent {

override def streamEncoder: ZPipeline[Any, Nothing, ServerSentEvent[T], Byte] =
ZPipeline.mapChunks(value => value.flatMap(c => c.encode.getBytes(Charsets.Utf8)))

}

implicit def contentCodec[T](implicit
Expand Down
Loading