Skip to content

Commit

Permalink
Add raceN / firstCompletedOf recipe
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Apr 26, 2023
1 parent 3f61234 commit 6e7a3c3
Showing 1 changed file with 62 additions and 6 deletions.
68 changes: 62 additions & 6 deletions docs/recipes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ title: Recipes

## Start a supervised task that outlives the creating scope

If you need to run an action in a fiber in a "start-and-forget" manner, you'll want to use [Supervisor](std/supervisor.md).
If you need to run an action in a fiber in a "start-and-forget" manner, you'll want to use [Supervisor](std/supervisor.md).
This lets you safely evaluate an effect in the background without waiting for it to complete and ensuring that the fiber and all its resources are cleaned up at the end.
You can configure a`Supervisor` to wait for all supervised fibers to complete at the end its lifecycle, or to simply cancel any remaining active fibers.

Expand Down Expand Up @@ -74,7 +74,7 @@ import cats.effect.std.Supervisor
object Server extends IOApp.Simple {

def handler(supervisor: Supervisor[IO]): Request => IO[Response] = {
case Request("start", params) =>
case Request("start", params) =>
supervisor.supervise(longRunningTask(params)).void >> IO.pure(Ok("started"))
case Request(_, _) => IO.pure(NotFound)
}
Expand All @@ -93,8 +93,8 @@ The server returns to the client without waiting for the task to finish.

## Atomically update a Ref with result of an effect

Cats Effect provides [Ref](std/ref.md), that we can use to model mutable concurrent reference.
However, if we want to update our ref using result of an effect `Ref` will usually not be enough and we need a more powerful construct to achieve that.
Cats Effect provides [Ref](std/ref.md), that we can use to model mutable concurrent reference.
However, if we want to update our ref using result of an effect `Ref` will usually not be enough and we need a more powerful construct to achieve that.
In cases like that we can use the [AtomicCell](std/atomic-cell.md) that can be viewed as a synchronized `Ref`.

The most typical example for `Ref` is the concurrent counter, but what if the update function for our counter would be effectful?
Expand Down Expand Up @@ -144,7 +144,7 @@ Now, say that we want to have a cache that holds the highest exchange rate that
```scala mdoc:silent
class MaxProxy(atomicCell: AtomicCell[IO, Double], requestService: Service) {

def queryCache(): IO[ServiceResponse] =
def queryCache(): IO[ServiceResponse] =
atomicCell evalModify { current =>
requestService.query() map { result =>
if (result.exchangeRate > current)
Expand All @@ -153,10 +153,66 @@ class MaxProxy(atomicCell: AtomicCell[IO, Double], requestService: Service) {
(current, result)
}
}

def getHistoryMax(): IO[Double] = atomicCell.get
}
```

## How to implement raceN / firstCompletedOf

Assume you have a bunch of `IOs` that you want to run all in parallel and get the first success;
canceling all the others.

```scala mdoc:silent
def raceN[A](ios: List[IO[A]]): IO[A] =
Deferred[IO, A].flatMap { d =>
ios.parTraverse_(io => io.flatMap(d.complete)).background.surround(d.get)
}
```

This is the simplest implementation which may work well for many cases but not all.
But we can scale this one to consider more complex situations:

**The list was empty or all ios fail**

In that case the returned `IO` will be blocked forever,
we can avoid that using an `Option` like this:

```scala mdoc:silent
def raceN[A](ios: List[IO[A]]): IO[Option[A]] =
Deferred[IO, Option[A]].flatMap { d =>
val runAll = ios.parTraverse_ { io =>
io.flatMap(a => d.complete(Some(a)))
}

val fallbackResult = d.complete(None)

(runAll >> fallbackResult).background.surround(d.get)
}
```

**We want the first completed instead of the first success**

What if instead of waiting for the first success,
you want the first completed `IO`, even if it completes with an error.
Similar that the previous case, we can just use an `Either` & `attempt`:

```scala mdoc:silent
def raceN[A](ios: List[IO[A]]): IO[Either[Throwable, A]] =
Deferred[IO, Either[Throwable, A]].flatMap { d =>
val runAll = ios.parTraverse_ { io =>
io.attempt.flatMap(d.complete)
}

// You may use an especial custom exception, or combine the Either + Option.
val fallbackResult = d.complete(new Exception("No IO completed"))

(runAll >> fallbackResult).background.surround(d.get)
}
```

You can also adapt the code to only complete the `Deferred` if the value satisfy a property,
or any other custom condition that you may have.

> As you can see the general idea is very flexible and you can adapt it to your needs.
> That is the power of having composable programs and high level structures like `Deferred`!

0 comments on commit 6e7a3c3

Please sign in to comment.