Skip to content

Commit 839c89d

Browse files
committed
Add Monix task utilities
1 parent 465e549 commit 839c89d

File tree

3 files changed

+88
-0
lines changed

3 files changed

+88
-0
lines changed

core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ object ObservableExtensions extends ObservableExtensions {
2222
*/
2323
def headOptL: Task[Opt[T]] = obs.headOptionL.map(_.toOpt)
2424

25+
/**
26+
* Returns a [[monix.eval.Task Task]] which emits the first item for which the predicate holds.
27+
*/
28+
def findOptL(p: T => Boolean): Task[Opt[T]] = obs.findL(p).map(_.toOpt)
29+
2530
/** Suppress the duplicate elements emitted by the source Observable.
2631
*
2732
* WARNING: this requires unbounded buffering.
@@ -79,5 +84,15 @@ object ObservableExtensions extends ObservableExtensions {
7984
obs
8085
.foldLeftL(factory.newBuilder)(_ += _)
8186
.map(_.result())
87+
88+
/** Returns a [[monix.eval.Task Task]] that upon evaluation
89+
* will collect all items from the source into a [[Map]] instance
90+
* using provided functions to compute keys and values.
91+
*
92+
* WARNING: for infinite streams the process will eventually blow up
93+
* with an out of memory error.
94+
*/
95+
def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] =
96+
obs.foldLeftL(Map.newBuilder[K, V])({ case (res, a) => res += ((keyFun(a), valueFun(a))) }).map(_.result())
8297
}
8398
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.avsystem.commons
2+
package concurrent
3+
4+
import com.avsystem.commons.concurrent.TaskExtensions.{TaskCompanionOps, TaskOps}
5+
import com.avsystem.commons.misc.Timestamp
6+
import monix.eval.Task
7+
8+
import java.util.concurrent.TimeUnit
9+
import scala.concurrent.TimeoutException
10+
import scala.concurrent.duration.FiniteDuration
11+
12+
trait TaskExtensions {
13+
implicit def taskOps[T](task: Task[T]): TaskOps[T] = new TaskOps(task)
14+
15+
implicit def taskCompanionOps(task: Task.type): TaskCompanionOps.type = TaskCompanionOps
16+
}
17+
18+
object TaskExtensions extends TaskExtensions {
19+
final class TaskOps[T](private val task: Task[T]) extends AnyVal {
20+
/**
21+
* Like regular `timeout` but [[TimeoutException]] is created lazily (for performance).
22+
*/
23+
def lazyTimeout(after: FiniteDuration, msg: => String): Task[T] =
24+
task.timeoutTo(after, Task.raiseError(new TimeoutException(msg)))
25+
26+
/**
27+
* Similar to [[Task.tapEval]], accepts simple consumer function as an argument
28+
*/
29+
def tapL(f: T => Unit): Task[T] =
30+
task.map(_.setup(f))
31+
32+
/**
33+
* Similar to [[Task.tapError]], accepts [[PartialFunction]] as an argument
34+
*/
35+
def tapErrorL[B](f: PartialFunction[Throwable, B]): Task[T] =
36+
task.tapError(t => Task(f.applyOpt(t)))
37+
}
38+
39+
object TaskCompanionOps {
40+
/** A [[Task]] of [[Opt.Empty]] */
41+
def optEmpty[A]: Task[Opt[A]] = Task.pure(Opt.Empty)
42+
43+
def traverseOpt[A, B](opt: Opt[A])(f: A => Task[B]): Task[Opt[B]] =
44+
opt.fold(Task.optEmpty[B])(a => f(a).map(_.opt))
45+
46+
def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match {
47+
case Opt(task) => task.map(_.opt)
48+
case Opt.Empty => optEmpty
49+
}
50+
51+
def currentTimestamp: Task[Timestamp] =
52+
Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_))
53+
54+
def usingNow[T](useNow: Timestamp => Task[T]): Task[T] =
55+
currentTimestamp.flatMap(useNow)
56+
}
57+
}

core/src/test/scala/com/avsystem/commons/concurrent/ObservableExtensionsTest.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,19 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
2020
Observable.fromIterable(ints).headOptL.runToFuture.futureValue shouldBe ints.headOpt
2121
}
2222
}
23+
24+
test("findOptL") {
25+
forAll { ints: List[Int] =>
26+
Observable.fromIterable(ints).findOptL(_ > 1).runToFuture.futureValue shouldBe ints.findOpt(_ > 1)
27+
}
28+
}
29+
2330
test("distinct") {
2431
forAll { ints: List[Int] =>
2532
Observable.fromIterable(ints).distinct.toListL.runToFuture.futureValue shouldBe ints.distinct
2633
}
2734
}
35+
2836
test("distinctBy") {
2937
forAll { ints: List[Int] =>
3038
val f: Int => Int = _ % 256
@@ -33,17 +41,20 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
3341
ints.foldLeft(MLinkedHashMap.empty[Int, Int])((map, v) => f(v) |> (key => map.applyIf(!_.contains(key))(_ += key -> v))).valuesIterator.toList
3442
}
3543
}
44+
3645
test("sortedL") {
3746
forAll { ints: List[Int] =>
3847
Observable.fromIterable(ints).sortedL.runToFuture.futureValue shouldBe ints.sorted
3948
}
4049
}
50+
4151
test("sortedByL") {
4252
forAll { ints: List[Int] =>
4353
val f: Int => Int = _ % 256
4454
Observable.fromIterable(ints).sortedByL(f).runToFuture.futureValue shouldBe ints.sortBy(f)
4555
}
4656
}
57+
4758
test("toL") {
4859
forAll { ints: List[(Int, Int)] =>
4960
def testFactory[T](factory: Factory[(Int, Int), T])(implicit position: Position) =
@@ -78,4 +89,9 @@ class ObservableExtensionsTest extends AnyFunSuite with Matchers
7889
}
7990
}
8091

92+
test("mkMapL") {
93+
forAll { ints: List[Int] =>
94+
Observable.fromIterable(ints).mkMapL(_ % 3, _ + 2).runToFuture.futureValue shouldBe ints.mkMap(_ % 3, _ + 2)
95+
}
96+
}
8197
}

0 commit comments

Comments
 (0)