Skip to content

Commit 81ec8a0

Browse files
authored
Merge pull request #542 from AVSystem/reactive-publisher-extensions
Reactive Publisher to Monix extensions for typed Mongo
2 parents f9d1aa1 + c493ed7 commit 81ec8a0

File tree

3 files changed

+39
-14
lines changed

3 files changed

+39
-14
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.avsystem.commons
2+
package mongo.reactive
3+
4+
import monix.eval.Task
5+
import monix.reactive.Observable
6+
import org.reactivestreams.Publisher
7+
8+
trait ReactiveMongoExtensions {
9+
import ReactiveMongoExtensions._
10+
11+
implicit final def publisherOps[T](publisher: Publisher[T]): PublisherOps[T] = new PublisherOps(publisher)
12+
}
13+
object ReactiveMongoExtensions extends ReactiveMongoExtensions {
14+
/**
15+
* Extensions for converting [[Publisher]] to [[Task]]/[[Observable]] Monix types
16+
*/
17+
final class PublisherOps[T](private val publisher: Publisher[T]) extends AnyVal {
18+
def asMonix: Observable[T] = Observable.fromReactivePublisher(publisher)
19+
20+
// prefer using the family of methods below for observables which are intended to only return a single document,
21+
// mongo observable implementation sometimes logs an error on server-closed cursors
22+
private def singleObservable: Task[Option[T]] = Task.fromReactivePublisher(publisher)
23+
24+
// handles both an empty Publisher and and a single null item
25+
def headOptionL: Task[Option[T]] = singleObservable.map(_.filterNot(_ == null))
26+
def headOptL: Task[Opt[T]] = headOptionL.map(_.toOpt)
27+
// can return null if Mongo driver for some reason returns null, intentionally don't want to report failure
28+
def headL: Task[T] = singleObservable.map(_.get)
29+
def completedL: Task[Unit] = headOptionL.void
30+
}
31+
}

mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoCollection.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private(
9191
* to be used for database operations not covered directly by [[TypedMongoCollection]].
9292
*/
9393
def multiResultNativeOp[T](operation: MongoCollection[E] => Publisher[T]): Observable[T] =
94-
Observable.fromReactivePublisher(operation(nativeCollection))
94+
multi(operation(nativeCollection))
9595

9696
def drop(): Task[Unit] =
9797
empty(optionalizeFirstArg(nativeCollection.drop(sessionOrNull)))
@@ -155,7 +155,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private(
155155
}
156156

157157
def toObservable[X](publisher: FindPublisher[X]): Observable[X] =
158-
Observable.fromReactivePublisher(setupPublisher(publisher))
158+
multi(setupPublisher(publisher))
159159

160160
projection match {
161161
case SelfRef =>
@@ -242,15 +242,14 @@ class TypedMongoCollection[E <: BaseMongoEntity] private(
242242
filter: MongoDocumentFilter[E] = MongoFilter.empty,
243243
setupOptions: DistinctPublisher[Any] => DistinctPublisher[Any] = identity,
244244
): Observable[T] = {
245-
246245
val publisher =
247246
optionalizeFirstArg(nativeCollection.distinct(sessionOrNull, property.rawPath, classOf[BsonValue]))
248247
.filter(filter.toFilterBson(Opt.Empty, property.projectionRefs))
249248

250249
val publisherWithOptions =
251250
setupOptions(publisher.asInstanceOf[DistinctPublisher[Any]]).asInstanceOf[DistinctPublisher[BsonValue]]
252251

253-
Observable.fromReactivePublisher(publisherWithOptions).map(property.format.readBson)
252+
multi(publisherWithOptions).map(property.format.readBson)
254253
}
255254

256255
def insertOne(

mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoUtils.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,13 @@ import monix.reactive.Observable
77
import org.reactivestreams.Publisher
88

99
trait TypedMongoUtils {
10-
protected final def empty(publisher: Publisher[Void]): Task[Unit] =
11-
Observable.fromReactivePublisher(publisher, 1).completedL
12-
13-
protected final def single[T](publisher: Publisher[T]): Task[T] =
14-
Observable.fromReactivePublisher(publisher, 1).firstL
10+
import com.avsystem.commons.mongo.reactive.ReactiveMongoExtensions._
1511

12+
protected final def empty(publisher: Publisher[Void]): Task[Unit] = publisher.completedL
13+
protected final def single[T](publisher: Publisher[T]): Task[T] = publisher.headL
1614
// handles both an empty Publisher and and a single null item
17-
protected final def singleOpt[T](publisher: Publisher[T]): Task[Option[T]] =
18-
Observable.fromReactivePublisher(publisher, 1).filter(_ != null).firstOptionL
19-
20-
protected final def multi[T](publisher: Publisher[T]): Observable[T] =
21-
Observable.fromReactivePublisher(publisher)
15+
protected final def singleOpt[T](publisher: Publisher[T]): Task[Option[T]] = publisher.headOptionL
16+
protected final def multi[T](publisher: Publisher[T]): Observable[T] = publisher.asMonix
2217

2318
/**
2419
* Transforms an expression `method(nullableArg, moreArgs)` into

0 commit comments

Comments
 (0)