Skip to content

Commit 54a8942

Browse files
committed
Implement Task.traverseMap and unit tests for the new extensions
1 parent 7eb8986 commit 54a8942

File tree

3 files changed

+60
-2
lines changed

3 files changed

+60
-2
lines changed

core/src/main/scala/com/avsystem/commons/concurrent/ObservableExtensions.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,6 @@ object ObservableExtensions extends ObservableExtensions {
9393
* with an out of memory error.
9494
*/
9595
def mkMapL[K, V](keyFun: T => K, valueFun: T => V): Task[Map[K, V]] =
96-
obs.foldLeftL(Map.newBuilder[K, V])({ case (res, a) => res += ((keyFun(a), valueFun(a))) }).map(_.result())
96+
obs.map(v => (keyFun(v), valueFun(v))).toL(Map)
9797
}
9898
}

core/src/main/scala/com/avsystem/commons/concurrent/TaskExtensions.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,15 @@ object TaskExtensions extends TaskExtensions {
4545

4646
def fromOpt[A](maybeTask: Opt[Task[A]]): Task[Opt[A]] = maybeTask match {
4747
case Opt(task) => task.map(_.opt)
48-
case Opt.Empty => optEmpty
48+
case Opt.Empty => Task.optEmpty
4949
}
5050

51+
def traverseMap[K, V, A, B](map: Map[K, V])(f: (K, V) => Task[(A, B)]): Task[Map[A, B]] =
52+
Task.traverse(map.toSeq)({ case (key, value) => f(key, value) }).map(_.toMap)
53+
54+
def traverseMapValues[K, A, B](map: Map[K, A])(f: (K, A) => Task[B]): Task[Map[K, B]] =
55+
traverseMap(map)({ case (key, value) => f(key, value).map(key -> _) })
56+
5157
def currentTimestamp: Task[Timestamp] =
5258
Task.clock.realTime(TimeUnit.MILLISECONDS).map(Timestamp(_))
5359

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.avsystem.commons
2+
package concurrent
3+
4+
import monix.eval.Task
5+
import monix.execution.Scheduler
6+
import org.scalatest.concurrent.ScalaFutures
7+
import org.scalatest.funsuite.AnyFunSuite
8+
import org.scalatest.matchers.should.Matchers
9+
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
10+
11+
import scala.concurrent.TimeoutException
12+
import scala.concurrent.duration._
13+
14+
class TaskExtensionsTest extends AnyFunSuite with Matchers with ScalaCheckDrivenPropertyChecks with ScalaFutures {
15+
import com.avsystem.commons.concurrent.TaskExtensions._
16+
17+
private implicit val scheduler: Scheduler = Scheduler(RunNowEC)
18+
19+
test("lazyTimeout") {
20+
val result = Task.never.lazyTimeout(100.millis, "Lazy timeout").runToFuture.failed.futureValue
21+
result shouldBe a[TimeoutException]
22+
result.getMessage shouldBe "Lazy timeout"
23+
}
24+
25+
test("traverseOpt") {
26+
Task.traverseOpt(Opt.empty[Int])(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.Empty
27+
Task.traverseOpt(Opt.some(123))(i => Task.now(i)).runToFuture.futureValue shouldBe Opt.some(123)
28+
}
29+
30+
test("fromOpt") {
31+
Task.fromOpt(Opt.empty[Task[Int]]).runToFuture.futureValue shouldBe Opt.Empty
32+
Task.fromOpt(Opt.some(Task.now(123))).runToFuture.futureValue shouldBe Opt.some(123)
33+
}
34+
35+
test("traverseMap") {
36+
forAll { data: List[(String, Int)] =>
37+
val map = data.toMap
38+
val expected = map.view.map({ case (key, value) => (key + key, value + 2) }).toMap
39+
val result = Task.traverseMap(map)({ case (key, value) => Task((key + key, value + 2)) }).runToFuture.futureValue
40+
result shouldBe expected
41+
}
42+
}
43+
44+
test("traverseMapValues") {
45+
forAll { data: List[(String, Int)] =>
46+
val map = data.toMap
47+
val expected = map.view.mapValues(value => value + 2).toMap
48+
val result = Task.traverseMapValues(map)({ case (key, value) => Task(value + 2) }).runToFuture.futureValue
49+
result shouldBe expected
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)