Skip to content

Commit

Permalink
add Observable.fromPublisher(java.util.concurrent.Flow.Publisher)
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Jan 29, 2024
1 parent 8cb48ea commit 3090e7e
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package colibri
import cats._
import cats.implicits._
import colibri.effect.RunEffect
import cats.effect.{Sync, SyncIO, Async, IO, Resource}
import cats.effect.{Async, IO, Resource, Sync, SyncIO}

import java.util.concurrent.Flow
import scala.scalajs.js
import scala.scalajs.js.timers
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -133,6 +134,29 @@ object Observable {
}
}

def fromPublisher[A](publisher: Flow.Publisher[A]): Observable[A] = Observable.create { observer =>
var subscription: Flow.Subscription = null
var isComplete = false
val subscriber = new Flow.Subscriber[A] {
def onNext(a: A) = observer.unsafeOnNext(a)
def onError(t: Throwable) = observer.unsafeOnError(t)
def onComplete() = isComplete = true
def onSubscribe(s: Flow.Subscription) = {
subscription = s
s.request(Long.MaxValue)
}
}

publisher.subscribe(subscriber)

Cancelable.withIsEmpty(isComplete)(() => {
if (subscription != null) {
subscription.cancel()
subscription = null
}
})
}

@inline def create[A](produce: Observer[A] => Cancelable): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = produce(sink)
}
Expand Down

0 comments on commit 3090e7e

Please sign in to comment.