From 56acb54b26abe5b17834089c638b7c25304e48c9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 05:37:25 +0000 Subject: [PATCH 1/2] Add 'fromFutureCancelable` and friends --- .../cats/effect/IOCompanionPlatform.scala | 6 ++++ .../src/main/scala/cats/effect/IO.scala | 8 ++++- .../cats/effect/kernel/AsyncPlatform.scala | 36 ++++++++++++------- .../main/scala/cats/effect/kernel/Async.scala | 14 ++++++++ .../src/test/scala/cats/effect/IOSpec.scala | 10 ++++++ 5 files changed, 61 insertions(+), 13 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/js/src/main/scala/cats/effect/IOCompanionPlatform.scala index 19bbc46421..137f255b21 100644 --- a/core/js/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/js/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -43,9 +43,15 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => def fromThenable[A](iot: IO[Thenable[A]]): IO[A] = asyncForIO.fromThenable(iot) + def fromThenableCancelable[A](iot: IO[(Thenable[A], IO[Unit])]): IO[A] = + asyncForIO.fromThenableCancelable(iot) + def fromPromise[A](iop: IO[Promise[A]]): IO[A] = asyncForIO.fromPromise(iop) + def fromPromiseCancelable[A](iop: IO[(Promise[A], IO[Unit])]): IO[A] = + asyncForIO.fromPromiseCancelable(iop) + def realTimeDate: IO[js.Date] = asyncForIO.realTimeDate @deprecated("Not implemented for Scala.js. On Node.js consider using fs2.io.stdin.", "3.4.0") diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 954c3d5619..7d0e1ca090 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -1486,11 +1486,17 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits { * }}} * * @see - * [[IO#unsafeToFuture]] + * [[IO#unsafeToFuture]], [[fromFutureCancelable]] */ def fromFuture[A](fut: IO[Future[A]]): IO[A] = asyncForIO.fromFuture(fut) + /** + * Like [[fromFuture]], but is cancelable via the provided finalizer. + */ + def fromFutureCancelable[A](fut: IO[(Future[A], IO[Unit])]): IO[A] = + asyncForIO.fromFutureCancelable(fut) + /** * Run two IO tasks concurrently, and return the first to finish, either in success or error. * The loser of the race is canceled. diff --git a/kernel/js/src/main/scala/cats/effect/kernel/AsyncPlatform.scala b/kernel/js/src/main/scala/cats/effect/kernel/AsyncPlatform.scala index 398544aa1e..2770143925 100644 --- a/kernel/js/src/main/scala/cats/effect/kernel/AsyncPlatform.scala +++ b/kernel/js/src/main/scala/cats/effect/kernel/AsyncPlatform.scala @@ -22,24 +22,36 @@ private[kernel] trait AsyncPlatform[F[_]] { this: Async[F] => def fromPromise[A](iop: F[Promise[A]]): F[A] = fromThenable(widen(iop)) + def fromPromiseCancelable[A](iop: F[(Promise[A], F[Unit])]): F[A] = + fromThenableCancelable(widen(iop)) + def fromThenable[A](iot: F[Thenable[A]]): F[A] = flatMap(iot) { t => async_[A] { cb => - val onFulfilled: Function1[A, Unit | Thenable[Unit]] = - (v: A) => cb(Right(v)): Unit | Thenable[Unit] - - val onRejected: Function1[Any, Unit | Thenable[Unit]] = { (a: Any) => - val e = a match { - case th: Throwable => th - case _ => JavaScriptException(a) - } + t.`then`[Unit](mkOnFulfilled(cb), defined(mkOnRejected(cb))) + () + } + } - cb(Left(e)): Unit | Thenable[Unit] + def fromThenableCancelable[A](iot: F[(Thenable[A], F[Unit])]): F[A] = + flatMap(iot) { + case (t, fin) => + async[A] { cb => + as(delay(t.`then`[Unit](mkOnFulfilled(cb), defined(mkOnRejected(cb)))), Some(fin)) } + } - t.`then`[Unit](onFulfilled, defined(onRejected)) + @inline private[this] def mkOnFulfilled[A]( + cb: Either[Throwable, A] => Unit): Function1[A, Unit | Thenable[Unit]] = + (v: A) => cb(Right(v)): Unit | Thenable[Unit] - () - } + @inline private[this] def mkOnRejected[A]( + cb: Either[Throwable, A] => Unit): Function1[Any, Unit | Thenable[Unit]] = { (a: Any) => + val e = a match { + case th: Throwable => th + case _ => JavaScriptException(a) } + + cb(Left(e)): Unit | Thenable[Unit] + } } diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala index c042060bdb..2291fab4b5 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala @@ -207,6 +207,9 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { /** * Lifts a [[scala.concurrent.Future]] into an `F` effect. + * + * @see + * [[fromFutureCancelable]] for a cancelable version */ def fromFuture[A](fut: F[Future[A]]): F[A] = flatMap(fut) { f => @@ -215,6 +218,17 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] { } } + /** + * Like [[fromFuture]], but is cancelable via the provided finalizer. + */ + def fromFutureCancelable[A](futCancel: F[(Future[A], F[Unit])]): F[A] = + flatMap(futCancel) { + case (fut, fin) => + flatMap(executionContext) { implicit ec => + async[A](cb => as(delay(fut.onComplete(t => cb(t.toEither))), Some(fin))) + } + } + /** * Translates this `F[A]` into a `G` value which, when evaluated, runs the original `F` to its * completion, the `limit` number of stages, or until the first stage that cannot be expressed diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 26f99ec688..586257ed45 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1416,6 +1416,16 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } } + "round trip cancelable through s.c.Future" in ticked { implicit ticker => + forAll { (ioa: IO[Int]) => + ioa eqv IO.fromFutureCancelable( + IO(ioa.unsafeToFutureCancelable()).map { + case (fut, fin) => (fut, IO.fromFuture(IO(fin()))) + } + ) + } + } + "canceled through s.c.Future is errored" in ticked { implicit ticker => val test = IO.fromFuture(IO(IO.canceled.as(-1).unsafeToFuture())).handleError(_ => 42) From df7fac5df14bd881fb7d2ad73926c1097beb4bdf Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 18 Jan 2023 06:40:03 +0000 Subject: [PATCH 2/2] Fix test --- .../shared/src/test/scala/cats/effect/IOSpec.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 586257ed45..c7fda152d4 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -28,7 +28,7 @@ import cats.syntax.all._ import org.scalacheck.Prop import org.typelevel.discipline.specs2.mutable.Discipline -import scala.concurrent.{ExecutionContext, TimeoutException} +import scala.concurrent.{CancellationException, ExecutionContext, TimeoutException} import scala.concurrent.duration._ import Prop.forAll @@ -1418,11 +1418,13 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { "round trip cancelable through s.c.Future" in ticked { implicit ticker => forAll { (ioa: IO[Int]) => - ioa eqv IO.fromFutureCancelable( - IO(ioa.unsafeToFutureCancelable()).map { - case (fut, fin) => (fut, IO.fromFuture(IO(fin()))) - } - ) + ioa eqv IO + .fromFutureCancelable( + IO(ioa.unsafeToFutureCancelable()).map { + case (fut, fin) => (fut, IO.fromFuture(IO(fin()))) + } + ) + .recoverWith { case _: CancellationException => IO.canceled *> IO.never[Int] } } }