From 3090e7e5f002a36e5dd7eb9cf09641d5fee0b87a Mon Sep 17 00:00:00 2001 From: johannes karoff <johannes@karoff.net> Date: Mon, 29 Jan 2024 18:23:32 +0100 Subject: [PATCH] add Observable.fromPublisher(java.util.concurrent.Flow.Publisher) --- .../src/main/scala/colibri/Observable.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/colibri/src/main/scala/colibri/Observable.scala b/colibri/src/main/scala/colibri/Observable.scala index 23a49016..3dfd7aab 100644 --- a/colibri/src/main/scala/colibri/Observable.scala +++ b/colibri/src/main/scala/colibri/Observable.scala @@ -3,8 +3,9 @@ package colibri import cats._ import cats.implicits._ import colibri.effect.RunEffect -import cats.effect.{Sync, SyncIO, Async, IO, Resource} +import cats.effect.{Async, IO, Resource, Sync, SyncIO} +import java.util.concurrent.Flow import scala.scalajs.js import scala.scalajs.js.timers import scala.concurrent.{ExecutionContext, Future} @@ -133,6 +134,29 @@ object Observable { } } + def fromPublisher[A](publisher: Flow.Publisher[A]): Observable[A] = Observable.create { observer => + var subscription: Flow.Subscription = null + var isComplete = false + val subscriber = new Flow.Subscriber[A] { + def onNext(a: A) = observer.unsafeOnNext(a) + def onError(t: Throwable) = observer.unsafeOnError(t) + def onComplete() = isComplete = true + def onSubscribe(s: Flow.Subscription) = { + subscription = s + s.request(Long.MaxValue) + } + } + + publisher.subscribe(subscriber) + + Cancelable.withIsEmpty(isComplete)(() => { + if (subscription != null) { + subscription.cancel() + subscription = null + } + }) + } + @inline def create[A](produce: Observer[A] => Cancelable): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = produce(sink) }