Skip to content

Commit

Permalink
Merge pull request #3205 from djspiewak/experiment/uncancelable-async
Browse files Browse the repository at this point in the history
Change `async_` to be uncancelable
  • Loading branch information
djspiewak authored Nov 27, 2022
2 parents 0c6abf3 + 4b87497 commit e8d17df
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ThisBuild / git.gitUncommittedChanges := {
}
}

ThisBuild / tlBaseVersion := "3.4"
ThisBuild / tlBaseVersion := "3.5"
ThisBuild / tlUntaggedAreSnapshots := false

ThisBuild / organization := "org.typelevel"
Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
G.uncancelable { poll =>
lift(k(resume)) flatMap {
case Some(fin) => G.onCancel(poll(get), lift(fin))
case None => poll(get)
case None => get
}
}
}
Expand Down Expand Up @@ -1238,7 +1238,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable { poll => lift(IO.delay(k(resume))).flatMap(_ => poll(get)) }
G.uncancelable(_ => lift(IO.delay(k(resume))).flatMap(_ => get))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private final class IOFiber[A](
val handle = registerListener(oc => cb(Right(oc)))

if (handle == null)
None /* we were already invoked, so no `CallbackStack` needs to be managed */
Some(IO.unit) /* we were already invoked, so no `CallbackStack` needs to be managed */
else
Some(IO(handle.clearCurrent()))
}
Expand Down
4 changes: 2 additions & 2 deletions kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
lift(k(resume)) flatMap {
case Right(a) => G.pure(a)
case Left(Some(fin)) => G.onCancel(poll(get), lift(fin))
case Left(None) => poll(get)
case Left(None) => get
}
}
}
Expand Down Expand Up @@ -156,7 +156,7 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
* Polymorphic so it can be used in situations where an arbitrary effect is expected eg
* [[Fiber.joinWithNever]]
*/
def never[A]: F[A] = async(_ => pure(none[F[Unit]]))
def never[A]: F[A] = async(_ => pure(Some(unit)))

/**
* Shift execution of the effect `fa` to the execution context `ec`. Execution is shifted back
Expand Down
19 changes: 9 additions & 10 deletions kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -996,23 +996,22 @@ object Resource extends ResourceFOInstances0 with ResourceHOInstances0 with Reso
val nt2 = new (Resource[F, *] ~> D) {
def apply[A](rfa: Resource[F, A]) =
Kleisli { r =>
nt(rfa.allocatedCase) flatMap {
case (a, fin) =>
r.update(f => (ec: ExitCase) => f(ec) !> (F.unit >> fin(ec))).as(a)
G uncancelable { poll =>
poll(nt(rfa.allocatedCase)) flatMap {
case (a, fin) =>
r.update(f => (ec: ExitCase) => f(ec) !> (F.unit >> fin(ec))).as(a)
}
}
}
}

for {
r <- nt(F.ref((_: ExitCase) => F.unit).map(_.mapK(nt)))

a <- G.guaranteeCase(body[D].apply(cb, Kleisli.liftF(ga), nt2).run(r)) {
nt(F.ref((_: ExitCase) => F.unit).map(_.mapK(nt))) flatMap { r =>
G.guaranteeCase(
(body[D].apply(cb, Kleisli.liftF(ga), nt2).run(r), r.get).tupled) {
case Outcome.Succeeded(_) => G.unit
case oc => r.get.flatMap(fin => nt(fin(ExitCase.fromOutcome(oc))))
}

fin <- r.get
} yield (a, fin)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] {
F.async[A](k => F.delay(k(Left(e))) >> F.pure(Some(fu))) <-> F.raiseError(e)

def neverIsDerivedFromAsync[A] =
F.never[A] <-> F.async[A](_ => F.pure(None))
F.never[A] <-> F.async[A](_ => F.pure(Some(F.unit)))

def executionContextCommutativity[A](fa: F[A]) =
(fa *> F.executionContext) <-> (F.executionContext <* fa)
Expand Down
2 changes: 1 addition & 1 deletion tests/shared/src/test/scala/cats/effect/ResourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ class ResourceSpec extends BaseSpec with ScalaCheck with Discipline {
"Resource[IO, *]",
AsyncTests[Resource[IO, *]].async[Int, Int, Int](10.millis)
) /*(Parameters(seed =
Some(Seed.fromBase64("75d9nzLIEobZ3mfn0DvzUkMv-Jt7o7IyQyIvjqwkeVJ=").get)))*/
Some(Seed.fromBase64("0FaZxJyh_xN_NL3i_y7bNaLpaWuhO9qUPXmfxxgLIIN=").get)))*/
}

{
Expand Down

0 comments on commit e8d17df

Please sign in to comment.