Skip to content

Commit 9992c56

Browse files
authoredJun 29, 2022
Always convert non-interrupt failures to Outcome.Errored. Define standard Concurrent/Temporal instances only for Throwable error. Define generic Concurrent/Temporal as operating on Cause[E] errors to be able to wrap non-interrupt errors to Outcome.Errored. (#543)
* Fix `toManagedZIO` not propagating restore of outer `uninterruptibleMask` in interpretation of `Resource.Allocated` case Convert `Throwable` Dies to `Outcome.Error` not `Outcome.Succeeded` whenever the error type is `Throwable` Remove `.interruptible` from `zio.interop.fromEffect` * Split off generic cats-effect instances - define generic Temporal and Concurrent instance for any error type, but with a different error semantic - catching all `Cause[E]`, so recovering from defects etc. * fix build after splitting Cause instances from `catz` object * remove accidental monad instance
1 parent d646a8f commit 9992c56

File tree

9 files changed

+281
-70
lines changed

9 files changed

+281
-70
lines changed
 

‎zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala

+35-5
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,31 @@ class CatsSpec extends ZioSpecBase {
3737
GenTemporalTests[Task, Throwable].temporal[Int, Int, Int](100.millis)
3838
}
3939
)
40-
checkAllAsync("GenSpawn[IO[Int, _], Int]", implicit tc => GenSpawnTests[IO[Int, _], Int].spawn[Int, Int, Int])
41-
checkAllAsync("MonadError[IO[Int, _]]", implicit tc => MonadErrorTests[IO[Int, _], Int].monadError[Int, Int, Int])
40+
41+
locally {
42+
checkAllAsync(
43+
"GenTemporal[IO[Int, _], Cause[Int]]",
44+
{ implicit tc =>
45+
import zio.interop.catz.generic.*
46+
implicit val runtime: Runtime[Clock] = Runtime(environment, platform)
47+
GenTemporalTests[IO[Int, _], Cause[Int]].temporal[Int, Int, Int](100.millis)
48+
}
49+
)
50+
checkAllAsync(
51+
"GenSpawn[IO[Int, _], Cause[Int]]",
52+
{ implicit tc =>
53+
import zio.interop.catz.generic.*
54+
GenSpawnTests[IO[Int, _], Cause[Int]].spawn[Int, Int, Int]
55+
}
56+
)
57+
checkAllAsync(
58+
"MonadCancel[IO[Int, _], Cause[Int]]",
59+
{ implicit tc =>
60+
import zio.interop.catz.generic.*
61+
MonadCancelTests[IO[Int, _], Cause[Int]].monadCancel[Int, Int, Int]
62+
}
63+
)
64+
}
4265
checkAllAsync("MonoidK[IO[Int, _]]", implicit tc => MonoidKTests[IO[Int, _]].monoidK[Int])
4366
checkAllAsync("SemigroupK[IO[Option[Unit], _]]", implicit tc => SemigroupKTests[IO[Option[Unit], _]].semigroupK[Int])
4467
checkAllAsync("SemigroupK[Task]", implicit tc => SemigroupKTests[Task].semigroupK[Int])
@@ -75,9 +98,13 @@ class CatsSpec extends ZioSpecBase {
7598

7699
Async[RIO[ZClock & CBlocking, _]]
77100
Sync[RIO[ZClock & CBlocking, _]]
78-
GenTemporal[ZIO[ZClock, Int, _], Int]
101+
locally {
102+
import zio.interop.catz.generic.*
103+
104+
GenTemporal[ZIO[ZClock, Int, _], Cause[Int]]
105+
GenConcurrent[ZIO[String, Int, _], Cause[Int]]
106+
}
79107
Temporal[RIO[ZClock, _]]
80-
GenConcurrent[ZIO[String, Int, _], Int]
81108
Concurrent[RIO[String, _]]
82109
MonadError[RIO[String, _], Throwable]
83110
Monad[RIO[String, _]]
@@ -95,7 +122,10 @@ class CatsSpec extends ZioSpecBase {
95122

96123
def liftRIO(implicit runtime: IORuntime) = LiftIO[RIO[String, _]]
97124
def liftZManaged(implicit runtime: IORuntime) = LiftIO[ZManaged[String, Throwable, _]]
98-
def runtimeGenTemporal(implicit runtime: Runtime[ZClock]) = GenTemporal[ZIO[Any, Int, _], Int]
125+
def runtimeGenTemporal(implicit runtime: Runtime[ZClock]) = {
126+
import zio.interop.catz.generic.*
127+
GenTemporal[ZIO[Any, Int, _], Cause[Int]]
128+
}
99129
def runtimeTemporal(implicit runtime: Runtime[ZClock]) = Temporal[Task]
100130

101131
// related to issue #173

‎zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala

+28-3
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ private[zio] trait CatsSpecBase
100100
Eq.allEqual
101101

102102
implicit val eqForCauseOfNothing: Eq[Cause[Nothing]] =
103+
eqForCauseOf[Nothing]
104+
105+
implicit def eqForCauseOf[E]: Eq[Cause[E]] =
103106
(x, y) => (x.interrupted && y.interrupted) || x == y
104107

105108
implicit def eqForExitOfNothing[A: Eq]: Eq[Exit[Nothing, A]] = {
@@ -120,14 +123,36 @@ private[zio] trait CatsSpecBase
120123
implicit def eqForURIO[R: Arbitrary, A: Eq](implicit ticker: Ticker): Eq[URIO[R, A]] =
121124
eqForZIO[R, Nothing, A]
122125

123-
implicit def execRIO(rio: RIO[ZEnv, Boolean])(implicit ticker: Ticker): Prop =
124-
rio.provide(environment).toEffect[CIO]
126+
implicit def execZIO[E](rio: ZIO[ZEnv, E, Boolean])(implicit ticker: Ticker): Prop =
127+
rio
128+
.provide(environment)
129+
.mapError {
130+
case t: Throwable => t
131+
case e => FiberFailure(Cause.Fail(e))
132+
}
133+
.toEffect[CIO]
125134

126135
implicit def orderForUIOofFiniteDuration(implicit ticker: Ticker): Order[UIO[FiniteDuration]] =
127136
Order.by(unsafeRun(_).toEither.toOption)
128137

129138
implicit def orderForRIOofFiniteDuration[R: Arbitrary](implicit ticker: Ticker): Order[RIO[R, FiniteDuration]] =
130-
(x, y) => Arbitrary.arbitrary[R].sample.fold(0)(r => x.orDie.provide(r) compare y.orDie.provide(r))
139+
(x, y) =>
140+
Arbitrary
141+
.arbitrary[R]
142+
.sample
143+
.fold(0)(r => orderForUIOofFiniteDuration.compare(x.orDie.provide(r), y.orDie.provide(r)))
144+
145+
implicit def orderForZIOofFiniteDuration[E: Order, R: Arbitrary](implicit
146+
ticker: Ticker
147+
): Order[ZIO[R, E, FiniteDuration]] = {
148+
implicit val orderForIOofFiniteDuration: Order[IO[E, FiniteDuration]] =
149+
Order.by(unsafeRun(_) match {
150+
case Exit.Success(value) => Right(value)
151+
case Exit.Failure(cause) => Left(cause.failureOption)
152+
})
153+
154+
(x, y) => Arbitrary.arbitrary[R].sample.fold(0)(r => x.provide(r) compare y.provide(r))
155+
}
131156

132157
implicit def eqForUManaged[A: Eq](implicit ticker: Ticker): Eq[UManaged[A]] =
133158
zManagedEq[Any, Nothing, A]

‎zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala

+19
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,25 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior
2323

2424
implicit val cogenForClockAndBlocking: Cogen[Clock & CBlocking] =
2525
Cogen(_.hashCode.toLong)
26+
27+
implicit def arbitraryCause[E](implicit e: Arbitrary[E]): Arbitrary[Cause[E]] = {
28+
lazy val self: Gen[Cause[E]] =
29+
Gen.oneOf(
30+
e.arbitrary.map(Cause.Fail(_)),
31+
Arbitrary.arbitrary[Throwable].map(Cause.Die(_)),
32+
// Generating interrupt failures causes law failures (`canceled`/`Outcome.Canceled` are ill-defined as of now https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175=)
33+
// Gen.long.flatMap(l1 => Gen.long.map(l2 => Cause.Interrupt(Fiber.Id(l1, l2)))),
34+
Gen.delay(self.map(Cause.Traced(_, ZTrace(Fiber.Id.None, Nil, Nil, None)))),
35+
Gen.delay(self.map(Cause.stackless)),
36+
Gen.delay(self.flatMap(e1 => self.map(e2 => Cause.Both(e1, e2)))),
37+
Gen.delay(self.flatMap(e1 => self.map(e2 => Cause.Then(e1, e2)))),
38+
Gen.const(Cause.empty)
39+
)
40+
Arbitrary(self)
41+
}
42+
43+
implicit def cogenCause[E]: Cogen[Cause[E]] =
44+
Cogen(_.hashCode.toLong)
2645
}
2746

2847
private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase =>

‎zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import zio.{ Promise, RIO, ZIO }
66
import scala.concurrent.{ ExecutionContext, Future }
77

88
private abstract class ZioAsync[R]
9-
extends ZioTemporal[R, Throwable]
9+
extends ZioTemporal[R, Throwable, Throwable]
1010
with Async[RIO[R, _]]
11-
with ZioBlockingEnv[R, Throwable] {
11+
with ZioBlockingEnv[R, Throwable]
12+
with ZioMonadErrorExitThrowable[R] {
1213

1314
override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] =
1415
fa.on(ec)

‎zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import zio.{ Exit, Promise, RIO, ZIO }
77
import scala.concurrent.{ ExecutionContext, Future }
88

99
private abstract class ZioAsync[R]
10-
extends ZioTemporal[R, Throwable]
10+
extends ZioTemporal[R, Throwable, Throwable]
1111
with Async[RIO[R, _]]
12-
with ZioBlockingEnv[R, Throwable] {
12+
with ZioBlockingEnv[R, Throwable]
13+
with ZioMonadErrorExitThrowable[R] {
1314

1415
override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] =
1516
fa.on(ec)

‎zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala

+131-38
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,23 @@ object catz extends CatsEffectPlatform {
5050
object implicits {
5151
implicit val rts: Runtime[Clock & CBlocking] = Runtime.default
5252
}
53+
54+
/**
55+
* `import zio.interop.catz.generic._` brings in instances of
56+
* `GenConcurrent` and `GenTemporal`,`MonadCancel` and `MonadError`
57+
* for arbitrary non-Throwable `E` error type.
58+
*
59+
* These instances have somewhat different semantics than the instances
60+
* in `catz` however - they operate on `Cause[E]` errors. Meaning that
61+
* cats `ApplicativeError#handleErrorWith` operation can now recover from
62+
* `ZIO.die` and other non-standard ZIO errors not supported by cats IO.
63+
*
64+
* However, in cases where an instance such as `MonadCancel[F, _]` is
65+
* required by a function, these differences should not normally affect behavior -
66+
* by ignoring the error type, such a function signals that it does not
67+
* inspect the errors, but only uses `bracket` portion of `MonadCancel` for finalization.
68+
*/
69+
object generic extends CatsEffectInstancesCause
5370
}
5471

5572
abstract class CatsEffectPlatform
@@ -82,26 +99,50 @@ abstract class CatsEffectInstances extends CatsZioInstances {
8299
implicit final def asyncInstance[R <: Clock & CBlocking]: Async[RIO[R, _]] =
83100
asyncInstance0.asInstanceOf[Async[RIO[R, _]]]
84101

85-
implicit final def temporalInstance[R <: Clock, E]: GenTemporal[ZIO[R, E, _], E] =
86-
temporalInstance0.asInstanceOf[GenTemporal[ZIO[R, E, _], E]]
102+
implicit final def temporalInstance[R <: Clock]: GenTemporal[ZIO[R, Throwable, _], Throwable] =
103+
temporalInstance0.asInstanceOf[GenTemporal[ZIO[R, Throwable, _], Throwable]]
87104

88-
implicit final def concurrentInstance[R, E]: GenConcurrent[ZIO[R, E, _], E] =
89-
concurrentInstance0.asInstanceOf[GenConcurrent[ZIO[R, E, _], E]]
105+
implicit final def concurrentInstance[R]: GenConcurrent[ZIO[R, Throwable, _], Throwable] =
106+
concurrentInstance0.asInstanceOf[GenConcurrent[ZIO[R, Throwable, _], Throwable]]
90107

91108
implicit final def asyncRuntimeInstance[R](implicit runtime: Runtime[Clock & CBlocking]): Async[RIO[R, _]] =
92109
new ZioRuntimeAsync(runtime.environment)
93110

94-
implicit final def temporalRuntimeInstance[R, E](implicit runtime: Runtime[Clock]): GenTemporal[ZIO[R, E, _], E] =
95-
new ZioRuntimeTemporal(runtime.environment)
111+
implicit final def temporalRuntimeInstance[R](implicit
112+
runtime: Runtime[Clock]
113+
): GenTemporal[ZIO[R, Throwable, _], Throwable] =
114+
new ZioRuntimeTemporal[R, Throwable, Throwable](runtime.environment) with ZioMonadErrorExitThrowable[R]
96115

97116
private[this] val asyncInstance0: Async[RIO[Clock & CBlocking, _]] =
98117
new ZioAsync[Clock & CBlocking] with ZioBlockingEnvIdentity[Clock & CBlocking, Throwable]
99118

100119
private[this] val temporalInstance0: Temporal[RIO[Clock, _]] =
101-
new ZioTemporal[Clock, Throwable] with ZioClockEnvIdentity[Clock, Throwable]
120+
new ZioTemporal[Clock, Throwable, Throwable]
121+
with ZioClockEnvIdentity[Clock, Throwable]
122+
with ZioMonadErrorExitThrowable[Clock]
102123

103124
private[this] val concurrentInstance0: Concurrent[Task] =
104-
new ZioConcurrent[Any, Throwable]
125+
new ZioConcurrent[Any, Throwable, Throwable] with ZioMonadErrorExitThrowable[Any]
126+
}
127+
128+
sealed abstract class CatsEffectInstancesCause extends CatsZioInstances {
129+
130+
implicit final def temporalInstanceCause[R <: Clock, E]: GenTemporal[ZIO[R, E, _], Cause[E]] =
131+
temporalInstance1.asInstanceOf[GenTemporal[ZIO[R, E, _], Cause[E]]]
132+
133+
implicit final def concurrentInstanceCause[R, E]: GenConcurrent[ZIO[R, E, _], Cause[E]] =
134+
concurrentInstance1.asInstanceOf[GenConcurrent[ZIO[R, E, _], Cause[E]]]
135+
136+
implicit final def temporalRuntimeInstanceCause[R, E](implicit
137+
runtime: Runtime[Clock]
138+
): GenTemporal[ZIO[R, E, _], Cause[E]] =
139+
new ZioRuntimeTemporal[R, E, Cause[E]](runtime.environment) with ZioMonadErrorExitCause[R, E]
140+
141+
private[this] val temporalInstance1: GenTemporal[ZIO[Clock, Any, _], Cause[Any]] =
142+
new ZioTemporal[Clock, Any, Cause[Any]] with ZioClockEnvIdentity[Clock, Any] with ZioMonadErrorExitCause[Clock, Any]
143+
144+
private[this] val concurrentInstance1: GenConcurrent[ZIO[Any, Any, _], Cause[Any]] =
145+
new ZioConcurrent[Any, Any, Cause[Any]] with ZioMonadErrorExitCause[Any, Any]
105146
}
106147

107148
abstract class CatsZioInstances extends CatsZioInstances1 {
@@ -181,7 +222,7 @@ sealed abstract class CatsZioInstances2 {
181222
new ZioArrowChoice
182223

183224
private[this] val monadErrorInstance0: MonadError[Task, Throwable] =
184-
new ZioMonadError[Any, Throwable]
225+
new ZioMonadError[Any, Throwable, Throwable] with ZioMonadErrorE[Any, Throwable]
185226
}
186227

187228
private class ZioDefer[R, E] extends Defer[ZIO[R, E, _]] {
@@ -191,27 +232,28 @@ private class ZioDefer[R, E] extends Defer[ZIO[R, E, _]] {
191232
ZIO.effectSuspendTotal(fa)
192233
}
193234

194-
private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent[ZIO[R, E, _], E] {
235+
private abstract class ZioConcurrent[R, E, E1]
236+
extends ZioMonadErrorExit[R, E, E1]
237+
with GenConcurrent[ZIO[R, E, _], E1] {
195238

196-
private def toPoll(restore: ZIO.InterruptStatusRestore) = new Poll[ZIO[R, E, _]] {
197-
override def apply[T](fa: ZIO[R, E, T]): ZIO[R, E, T] = restore(fa)
239+
private def toFiber[A](fiber: Fiber[E, A]): effect.Fiber[F, E1, A] = new effect.Fiber[F, E1, A] {
240+
override final val cancel: F[Unit] = fiber.interrupt.unit
241+
override final val join: F[Outcome[F, E1, A]] = fiber.await.map(exitToOutcome)
198242
}
199243

200-
private def toFiber[A](fiber: Fiber[E, A]) = new effect.Fiber[F, E, A] {
201-
override final val cancel: F[Unit] = fiber.interrupt.unit
202-
override final val join: F[Outcome[F, E, A]] = fiber.await.map(toOutcome)
203-
}
204-
205-
private def fiberFailure(error: E) =
206-
FiberFailure(Cause.fail(error))
244+
private def toThrowableOrFiberFailure(error: E): Throwable =
245+
error match {
246+
case t: Throwable => t
247+
case _ => FiberFailure(Cause.fail(error))
248+
}
207249

208250
override def ref[A](a: A): F[effect.Ref[F, A]] =
209251
ZRef.make(a).map(new ZioRef(_))
210252

211253
override def deferred[A]: F[Deferred[F, A]] =
212254
Promise.make[E, A].map(new ZioDeferred(_))
213255

214-
override final def start[A](fa: F[A]): F[effect.Fiber[F, E, A]] =
256+
override final def start[A](fa: F[A]): F[effect.Fiber[F, E1, A]] =
215257
fa.interruptible.forkDaemon.map(toFiber)
216258

217259
override def never[A]: F[A] =
@@ -224,31 +266,34 @@ private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent
224266
fa.foldCauseM(cause => if (cause.interrupted) ZIO.halt(cause) else fb, _ => fb)
225267

226268
override final def uncancelable[A](body: Poll[F] => F[A]): F[A] =
227-
ZIO.uninterruptibleMask(body.compose(toPoll))
269+
ZIO.uninterruptibleMask(restore => body(toPoll(restore)))
228270

229271
override final def canceled: F[Unit] =
230272
ZIO.interrupt
231273

232274
override final def onCancel[A](fa: F[A], fin: F[Unit]): F[A] =
233-
fa.onError(cause => fin.orDieWith(fiberFailure).unless(cause.failed))
275+
fa.onError(cause => fin.orDieWith(toThrowableOrFiberFailure).unless(cause.failed))
234276

235277
override final def memoize[A](fa: F[A]): F[F[A]] =
236278
fa.memoize
237279

238-
override final def racePair[A, B](fa: F[A], fb: F[B]) =
280+
override final def racePair[A, B](
281+
fa: F[A],
282+
fb: F[B]
283+
): ZIO[R, Nothing, Either[(Outcome[F, E1, A], effect.Fiber[F, E1, B]), (effect.Fiber[F, E1, A], Outcome[F, E1, B])]] =
239284
(fa.interruptible raceWith fb.interruptible)(
240-
(exit, fiber) => ZIO.succeedNow(Left((toOutcome(exit), toFiber(fiber)))),
241-
(exit, fiber) => ZIO.succeedNow(Right((toFiber(fiber), toOutcome(exit))))
285+
(exit, fiber) => ZIO.succeedNow(Left((exitToOutcome(exit), toFiber(fiber)))),
286+
(exit, fiber) => ZIO.succeedNow(Right((toFiber(fiber), exitToOutcome(exit))))
242287
)
243288

244289
override final def both[A, B](fa: F[A], fb: F[B]): F[(A, B)] =
245290
fa.interruptible zipPar fb.interruptible
246291

247292
override final def guarantee[A](fa: F[A], fin: F[Unit]): F[A] =
248-
fa.ensuring(fin.orDieWith(fiberFailure))
293+
fa.ensuring(fin.orDieWith(toThrowableOrFiberFailure))
249294

250295
override final def bracket[A, B](acquire: F[A])(use: A => F[B])(release: A => F[Unit]): F[B] =
251-
acquire.bracket(release.andThen(_.orDieWith(fiberFailure)), use)
296+
acquire.bracket(release.andThen(_.orDieWith(toThrowableOrFiberFailure)), use)
252297

253298
override val unique: F[Unique.Token] =
254299
ZIO.effectTotal(new Unique.Token)
@@ -316,9 +361,9 @@ private final class ZioRef[R, E, A](ref: ERef[E, A]) extends effect.Ref[ZIO[R, E
316361
ref.get
317362
}
318363

319-
private abstract class ZioTemporal[R, E]
320-
extends ZioConcurrent[R, E]
321-
with GenTemporal[ZIO[R, E, _], E]
364+
private abstract class ZioTemporal[R, E, E1]
365+
extends ZioConcurrent[R, E, E1]
366+
with GenTemporal[ZIO[R, E, _], E1]
322367
with ZioClockEnv[R, E] {
323368

324369
override final def sleep(time: FiniteDuration): F[Unit] =
@@ -331,7 +376,9 @@ private abstract class ZioTemporal[R, E]
331376
withClock(currentTime(MILLISECONDS).map(FiniteDuration(_, MILLISECONDS)))
332377
}
333378

334-
private class ZioRuntimeTemporal[R, E](environment: Clock) extends ZioTemporal[R, E] with ZioClockEnv[R, E] {
379+
private abstract class ZioRuntimeTemporal[R, E, E1](environment: Clock)
380+
extends ZioTemporal[R, E, E1]
381+
with ZioClockEnv[R, E] {
335382

336383
override protected[this] def withClock[A](fa: ZIO[Clock, E, A]): ZIO[R, E, A] = fa.provide(environment)
337384

@@ -364,7 +411,7 @@ private trait ZioBlockingEnvIdentity[R <: Clock & CBlocking, E]
364411
override protected[this] def withBlocking[A](fa: ZIO[CBlocking, E, A]): ZIO[R, E, A] = fa
365412
}
366413

367-
private class ZioMonadError[R, E] extends MonadError[ZIO[R, E, _], E] {
414+
private abstract class ZioMonadError[R, E, E1] extends MonadError[ZIO[R, E, _], E1] {
368415
type F[A] = ZIO[R, E, A]
369416

370417
override final def pure[A](a: A): F[A] =
@@ -394,6 +441,19 @@ private class ZioMonadError[R, E] extends MonadError[ZIO[R, E, _], E] {
394441
override final def unit: F[Unit] =
395442
ZIO.unit
396443

444+
override final def tailRecM[A, B](a: A)(f: A => F[Either[A, B]]): F[B] = {
445+
def loop(a: A): F[B] = f(a).flatMap {
446+
case Left(a) => loop(a)
447+
case Right(b) => ZIO.succeedNow(b)
448+
}
449+
450+
ZIO.effectSuspendTotal(loop(a))
451+
}
452+
453+
}
454+
455+
private trait ZioMonadErrorE[R, E] extends ZioMonadError[R, E, E] {
456+
397457
override final def handleErrorWith[A](fa: F[A])(f: E => F[A]): F[A] =
398458
fa.catchAll(f)
399459

@@ -408,15 +468,48 @@ private class ZioMonadError[R, E] extends MonadError[ZIO[R, E, _], E] {
408468

409469
override final def adaptError[A](fa: F[A])(pf: PartialFunction[E, E]): F[A] =
410470
fa.mapError(pf.orElse { case error => error })
471+
}
411472

412-
override final def tailRecM[A, B](a: A)(f: A => F[Either[A, B]]): F[B] = {
413-
def loop(a: A): F[B] = f(a).flatMap {
414-
case Left(a) => loop(a)
415-
case Right(b) => ZIO.succeedNow(b)
473+
private trait ZioMonadErrorCause[R, E] extends ZioMonadError[R, E, Cause[E]] {
474+
475+
override final def handleErrorWith[A](fa: F[A])(f: Cause[E] => F[A]): F[A] =
476+
// fa.catchAllCause(f)
477+
fa.catchSomeCause {
478+
// pretend that we can't catch inner interrupt to satisfy `uncancelable canceled associates right over flatMap attempt`
479+
// law since we use a poor definition of `canceled=ZIO.interrupt` right now
480+
// https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175=
481+
case c if !c.interrupted => f(c)
416482
}
417483

418-
ZIO.effectSuspendTotal(loop(a))
419-
}
484+
override final def recoverWith[A](fa: F[A])(pf: PartialFunction[Cause[E], F[A]]): F[A] =
485+
// fa.catchSomeCause(pf)
486+
fa.catchSomeCause(({ case c if !c.interrupted => c }: PartialFunction[Cause[E], Cause[E]]).andThen(pf))
487+
488+
override final def raiseError[A](e: Cause[E]): F[A] =
489+
ZIO.halt(e)
490+
491+
override final def attempt[A](fa: F[A]): F[Either[Cause[E], A]] =
492+
// fa.sandbox.attempt
493+
fa.map(Right(_)).catchSomeCause {
494+
case c if !c.interrupted => ZIO.succeedNow(Left(c))
495+
}
496+
497+
override final def adaptError[A](fa: F[A])(pf: PartialFunction[Cause[E], Cause[E]]): F[A] =
498+
fa.mapErrorCause(pf.orElse { case error => error })
499+
}
500+
501+
private abstract class ZioMonadErrorExit[R, E, E1] extends ZioMonadError[R, E, E1] {
502+
protected def exitToOutcome[A](exit: Exit[E, A]): Outcome[F, E1, A]
503+
}
504+
505+
private trait ZioMonadErrorExitThrowable[R]
506+
extends ZioMonadErrorExit[R, Throwable, Throwable]
507+
with ZioMonadErrorE[R, Throwable] {
508+
override protected def exitToOutcome[A](exit: Exit[Throwable, A]): Outcome[F, Throwable, A] = toOutcomeThrowable(exit)
509+
}
510+
511+
private trait ZioMonadErrorExitCause[R, E] extends ZioMonadErrorExit[R, E, Cause[E]] with ZioMonadErrorCause[R, E] {
512+
override protected def exitToOutcome[A](exit: Exit[E, A]): Outcome[F, Cause[E], A] = toOutcomeCause(exit)
420513
}
421514

422515
private class ZioSemigroupK[R, E] extends SemigroupK[ZIO[R, E, _]] {

‎zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala

+16-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import cats.effect.*
2323
import cats.effect.std.Dispatcher
2424
import cats.kernel.{ CommutativeMonoid, CommutativeSemigroup }
2525
import zio.ZManaged.ReleaseMap
26-
import zio.interop.catz.concurrentInstance
26+
import zio.interop.catz.*
2727

2828
trait CatsZManagedSyntax {
2929
import scala.language.implicitConversions
@@ -56,14 +56,20 @@ final class ZIOResourceSyntax[R, E <: Throwable, A](private val resource: Resour
5656
*/
5757
def toManagedZIO: ZManaged[R, E, A] = {
5858
type F[T] = ZIO[R, E, T]
59-
val F = MonadCancel[F, E]
6059

6160
def go[B](resource: Resource[F, B]): ZManaged[R, E, B] =
6261
resource match {
6362
case allocate: Resource.Allocate[F, b] =>
64-
ZManaged.makeReserve[R, E, B](F.uncancelable(allocate.resource).map { case (b, release) =>
65-
Reservation(ZIO.succeedNow(b), error => release(toExitCase(error)).orDie)
66-
})
63+
ZManaged {
64+
ZIO.uninterruptibleMask { restore =>
65+
for {
66+
r <- ZIO.environment[(R, ReleaseMap)]
67+
af <- allocate.resource(toPoll(restore)).provide(r._1)
68+
(a, release) = af
69+
releaseMapEntry <- r._2.add(exit => release(toExitCase(exit)).provide(r._1).orDie)
70+
} yield (releaseMapEntry, a)
71+
}
72+
}
6773

6874
case bind: Resource.Bind[F, a, B] =>
6975
go(bind.source).flatMap(a => go(bind.fs(a)))
@@ -98,10 +104,13 @@ final class ZManagedSyntax[R, E, A](private val managed: ZManaged[R, E, A]) exte
98104
}
99105
}
100106

101-
def toResource[F[_]: Async](implicit R: Runtime[R], ev: E <:< Throwable): Resource[F, A] =
107+
def toResource[F[_]: Async](implicit R: Runtime[R], ev: E <:< Throwable): Resource[F, A] = {
108+
import zio.interop.catz.generic.*
109+
102110
toResourceZIO.mapK(new (ZIO[R, E, _] ~> F) {
103-
override def apply[B](zio: ZIO[R, E, B]) = toEffect(zio.mapError(ev))
111+
override def apply[B](zio: ZIO[R, E, B]): F[B] = toEffect[F, R, B](zio.mapError(ev))
104112
})
113+
}
105114
}
106115

107116
trait CatsEffectZManagedInstances {

‎zio-interop-cats/shared/src/main/scala/zio/interop/package.scala

+37-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package zio
1818

19-
import cats.effect.kernel.{ Async, Outcome, Resource }
19+
import cats.effect.kernel.{ Async, Outcome, Poll, Resource }
2020
import cats.effect.std.Dispatcher
2121
import cats.syntax.all.*
2222

@@ -39,16 +39,30 @@ package object interop {
3939
type Hub[F[+_], A] = CHub[F, A, A]
4040
val Hub: CHub.type = CHub
4141

42-
@inline private[interop] def toOutcome[R, E, A](exit: Exit[E, A]): Outcome[ZIO[R, E, _], E, A] =
42+
@inline private[interop] def toOutcomeCause[R, E, A](exit: Exit[E, A]): Outcome[ZIO[R, E, _], Cause[E], A] =
4343
exit match {
4444
case Exit.Success(value) =>
45-
Outcome.Succeeded(ZIO.succeed(value))
45+
Outcome.Succeeded(ZIO.succeedNow(value))
46+
case Exit.Failure(cause) if cause.interrupted =>
47+
Outcome.Canceled()
48+
case Exit.Failure(cause) =>
49+
Outcome.Errored(cause)
50+
}
51+
52+
@inline private[interop] def toOutcomeThrowable[R, A](
53+
exit: Exit[Throwable, A]
54+
): Outcome[ZIO[R, Throwable, _], Throwable, A] =
55+
exit match {
56+
case Exit.Success(value) =>
57+
Outcome.Succeeded(ZIO.succeedNow(value))
4658
case Exit.Failure(cause) if cause.interrupted =>
4759
Outcome.Canceled()
4860
case Exit.Failure(cause) =>
4961
cause.failureOrCause match {
5062
case Left(error) => Outcome.Errored(error)
51-
case Right(cause) => Outcome.Succeeded(ZIO.halt(cause))
63+
case Right(cause) =>
64+
val compositeError = dieCauseToThrowable(cause)
65+
Outcome.Errored(compositeError)
5266
}
5367
}
5468

@@ -59,26 +73,40 @@ package object interop {
5973
case Resource.ExitCase.Errored(error) => Exit.fail(error)
6074
}
6175

62-
@inline private[interop] def toExitCase(exit: Exit[Any, Any]): Resource.ExitCase =
76+
private[interop] def toExitCase(exit: Exit[Any, Any]): Resource.ExitCase =
6377
exit match {
6478
case Exit.Success(_) =>
6579
Resource.ExitCase.Succeeded
6680
case Exit.Failure(cause) if cause.interrupted =>
6781
Resource.ExitCase.Canceled
6882
case Exit.Failure(cause) =>
6983
cause.failureOrCause match {
70-
case Left(error: Throwable) => Resource.ExitCase.Errored(error)
71-
case _ => Resource.ExitCase.Errored(FiberFailure(cause))
84+
case Left(error: Throwable) =>
85+
Resource.ExitCase.Errored(error)
86+
case Left(_) =>
87+
Resource.ExitCase.Errored(FiberFailure(cause))
88+
case Right(cause) =>
89+
val compositeError = dieCauseToThrowable(cause)
90+
Resource.ExitCase.Errored(compositeError)
7291
}
7392
}
7493

94+
private[interop] def toPoll[R, E](restore: ZIO.InterruptStatusRestore): Poll[ZIO[R, E, _]] = new Poll[ZIO[R, E, _]] {
95+
override def apply[T](fa: ZIO[R, E, T]): ZIO[R, E, T] = restore(fa)
96+
}
97+
98+
@inline private def dieCauseToThrowable(cause: Cause[Nothing]): Throwable =
99+
cause.defects match {
100+
case one :: Nil => one
101+
case _ => FiberFailure(cause)
102+
}
103+
75104
@inline def fromEffect[F[_], A](fa: F[A])(implicit F: Dispatcher[F]): Task[A] =
76105
ZIO
77106
.effectTotal(F.unsafeToFutureCancelable(fa))
78107
.flatMap { case (future, cancel) =>
79-
ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie).interruptible
108+
ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie)
80109
}
81-
.uninterruptible
82110

83111
@inline def toEffect[F[_], R, A](rio: RIO[R, A])(implicit R: Runtime[R], F: Async[F]): F[A] =
84112
F.uncancelable { poll =>

‎zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package zio
22
package stream.interop
33

4+
import cats.effect.Resource
45
import fs2.{ Pull, Stream }
5-
import zio.interop.catz.{ concurrentInstance, zManagedSyntax }
6+
import zio.interop.catz.*
67
import zio.stream.{ Take, ZStream }
78

89
import scala.language.implicitConversions
@@ -19,12 +20,16 @@ trait FS2StreamSyntax {
1920
class ZStreamSyntax[R, E, A](private val stream: ZStream[R, E, A]) extends AnyVal {
2021

2122
/** Convert a [[zio.stream.ZStream]] into an [[fs2.Stream]]. */
22-
def toFs2Stream: fs2.Stream[ZIO[R, E, _], A] =
23-
fs2.Stream.resource(stream.process.toResourceZIO).flatMap { pull =>
23+
def toFs2Stream: fs2.Stream[ZIO[R, E, _], A] = {
24+
import zio.interop.catz.generic.*
25+
26+
val resource: Resource[ZIO[R, E, _], ZIO[R, Option[E], Chunk[A]]] = stream.process.toResourceZIO
27+
fs2.Stream.resource(resource).flatMap { pull =>
2428
fs2.Stream.repeatEval(pull.optional).unNoneTerminate.flatMap { chunk =>
2529
fs2.Stream.chunk(fs2.Chunk.indexedSeq(chunk))
2630
}
2731
}
32+
}
2833
}
2934

3035
final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) {
@@ -55,7 +60,7 @@ final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) {
5560
val toQueue = ZStream.fromEffect {
5661
integrate(stream, q).stream
5762
.handleErrorWith(e => Stream.eval(q.offer(Take.fail(e))))
58-
.compile
63+
.compile[RIO[R, _], RIO[R, _], Any]
5964
.drain
6065
}
6166

0 commit comments

Comments
 (0)
Please sign in to comment.