-
Notifications
You must be signed in to change notification settings - Fork 529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add raceN / firstCompletedOf recipe #3565
base: series/3.4.x
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,7 +5,7 @@ title: Recipes | |||||||||||||||||
|
||||||||||||||||||
## Start a supervised task that outlives the creating scope | ||||||||||||||||||
|
||||||||||||||||||
If you need to run an action in a fiber in a "start-and-forget" manner, you'll want to use [Supervisor](std/supervisor.md). | ||||||||||||||||||
If you need to run an action in a fiber in a "start-and-forget" manner, you'll want to use [Supervisor](std/supervisor.md). | ||||||||||||||||||
This lets you safely evaluate an effect in the background without waiting for it to complete and ensuring that the fiber and all its resources are cleaned up at the end. | ||||||||||||||||||
You can configure a`Supervisor` to wait for all supervised fibers to complete at the end its lifecycle, or to simply cancel any remaining active fibers. | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -74,7 +74,7 @@ import cats.effect.std.Supervisor | |||||||||||||||||
object Server extends IOApp.Simple { | ||||||||||||||||||
|
||||||||||||||||||
def handler(supervisor: Supervisor[IO]): Request => IO[Response] = { | ||||||||||||||||||
case Request("start", params) => | ||||||||||||||||||
case Request("start", params) => | ||||||||||||||||||
supervisor.supervise(longRunningTask(params)).void >> IO.pure(Ok("started")) | ||||||||||||||||||
case Request(_, _) => IO.pure(NotFound) | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -93,8 +93,8 @@ The server returns to the client without waiting for the task to finish. | |||||||||||||||||
|
||||||||||||||||||
## Atomically update a Ref with result of an effect | ||||||||||||||||||
|
||||||||||||||||||
Cats Effect provides [Ref](std/ref.md), that we can use to model mutable concurrent reference. | ||||||||||||||||||
However, if we want to update our ref using result of an effect `Ref` will usually not be enough and we need a more powerful construct to achieve that. | ||||||||||||||||||
Cats Effect provides [Ref](std/ref.md), that we can use to model mutable concurrent reference. | ||||||||||||||||||
However, if we want to update our ref using result of an effect `Ref` will usually not be enough and we need a more powerful construct to achieve that. | ||||||||||||||||||
In cases like that we can use the [AtomicCell](std/atomic-cell.md) that can be viewed as a synchronized `Ref`. | ||||||||||||||||||
|
||||||||||||||||||
The most typical example for `Ref` is the concurrent counter, but what if the update function for our counter would be effectful? | ||||||||||||||||||
|
@@ -144,7 +144,7 @@ Now, say that we want to have a cache that holds the highest exchange rate that | |||||||||||||||||
```scala mdoc:silent | ||||||||||||||||||
class MaxProxy(atomicCell: AtomicCell[IO, Double], requestService: Service) { | ||||||||||||||||||
|
||||||||||||||||||
def queryCache(): IO[ServiceResponse] = | ||||||||||||||||||
def queryCache(): IO[ServiceResponse] = | ||||||||||||||||||
atomicCell evalModify { current => | ||||||||||||||||||
requestService.query() map { result => | ||||||||||||||||||
if (result.exchangeRate > current) | ||||||||||||||||||
|
@@ -153,10 +153,66 @@ class MaxProxy(atomicCell: AtomicCell[IO, Double], requestService: Service) { | |||||||||||||||||
(current, result) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
def getHistoryMax(): IO[Double] = atomicCell.get | ||||||||||||||||||
} | ||||||||||||||||||
``` | ||||||||||||||||||
|
||||||||||||||||||
## How to implement raceN / firstCompletedOf | ||||||||||||||||||
|
||||||||||||||||||
Assume you have a bunch of `IOs` that you want to run all in parallel and get the first success; | ||||||||||||||||||
canceling all the others. | ||||||||||||||||||
|
||||||||||||||||||
```scala mdoc:silent | ||||||||||||||||||
def raceN[A](ios: List[IO[A]]): IO[A] = | ||||||||||||||||||
Deferred[IO, A].flatMap { d => | ||||||||||||||||||
ios.parTraverse_(io => io.flatMap(d.complete)).background.surround(d.get) | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+168
to
+170
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Playing more golf:
Suggested change
Though maybe I'm putting the suggestion in the wrong spot, since this isn't in any way the simplest implementation. |
||||||||||||||||||
``` | ||||||||||||||||||
|
||||||||||||||||||
This is the simplest implementation which may work well for many cases but not all. | ||||||||||||||||||
But we can scale this one to consider more complex situations: | ||||||||||||||||||
|
||||||||||||||||||
**The list was empty or all ios fail** | ||||||||||||||||||
|
||||||||||||||||||
In that case the returned `IO` will be blocked forever, | ||||||||||||||||||
we can avoid that using an `Option` like this: | ||||||||||||||||||
|
||||||||||||||||||
```scala mdoc:silent | ||||||||||||||||||
def raceN[A](ios: List[IO[A]]): IO[Option[A]] = | ||||||||||||||||||
Deferred[IO, Option[A]].flatMap { d => | ||||||||||||||||||
val runAll = ios.parTraverse_ { io => | ||||||||||||||||||
io.flatMap(a => d.complete(Some(a))) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
val fallbackResult = d.complete(None) | ||||||||||||||||||
|
||||||||||||||||||
(runAll >> fallbackResult).background.surround(d.get) | ||||||||||||||||||
BalmungSan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
} | ||||||||||||||||||
``` | ||||||||||||||||||
|
||||||||||||||||||
**We want the first completed instead of the first success** | ||||||||||||||||||
|
||||||||||||||||||
What if instead of waiting for the first success, | ||||||||||||||||||
you want the first completed `IO`, even if it completes with an error. | ||||||||||||||||||
Similar that the previous case, we can just use an `Either` & `attempt`: | ||||||||||||||||||
|
||||||||||||||||||
```scala mdoc:silent | ||||||||||||||||||
def raceN[A](ios: List[IO[A]]): IO[Either[Throwable, A]] = | ||||||||||||||||||
Deferred[IO, Either[Throwable, A]].flatMap { d => | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should stop using this type generally and prefer the use of Outcome. This ends in a lot of weird deadlocks if folks use this without considering cancelation. So generally I say outcome or not at all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, the example mentions that the hypothetical user only cares about either success or error. |
||||||||||||||||||
val runAll = ios.parTraverse_ { io => | ||||||||||||||||||
io.attempt.flatMap(d.complete) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// You may use an especial custom exception, or combine the Either + Option. | ||||||||||||||||||
val fallbackResult = d.complete(new Exception("No IO completed")) | ||||||||||||||||||
|
||||||||||||||||||
(runAll >> fallbackResult).background.surround(d.get) | ||||||||||||||||||
BalmungSan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
} | ||||||||||||||||||
``` | ||||||||||||||||||
|
||||||||||||||||||
You can also adapt the code to only complete the `Deferred` if the value satisfy a property, | ||||||||||||||||||
or any other custom condition that you may have. | ||||||||||||||||||
|
||||||||||||||||||
> As you can see the general idea is very flexible and you can adapt it to your needs. | ||||||||||||||||||
> That is the power of having composable programs and high level structures like `Deferred`! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I throw one more implementation into the ring?
Edit: fixed 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was OP's original implementation (or something similar)
It really doesn't feel natural, at least to me.