Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add evalFlatten methods for stream of effects #2851

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4073,10 +4073,7 @@ object Stream extends StreamLowPriority {
}

def outcomeJoiner: F[Unit] =
outcomes.stream
.evalMap(identity)
.compile
.drain
outcomes.stream.evalFlatten.compile.drain
.guaranteeCase {
case Outcome.Succeeded(_) =>
stop(None) >> output.close.void
Expand Down Expand Up @@ -4117,6 +4114,25 @@ object Stream extends StreamLowPriority {
parJoin(Int.MaxValue)
}

/** Provides syntax for a stream of `F` effects. */
implicit class StreamFOps[F[_], O](private val self: Stream[F, F[O]]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what it's worth, I've regretted using this shape of stream every time. You can run yourself out of memory really quickly if those inner F[A] are nontrivial and you have a lot of them.

I'm uncomfortable with encoding this in the library in a way that makes it easier to use, because I personally think this shape should be discouraged

Especially when the implementation here is pretty trivial.

And as a library user, I think it's a lot more clear to the code reader to see an inline evalMap(identity) rather than yet another method they need to learn as part of the api

So I'm a polite 👎 on the PR for those reasons

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to know @Daenyth, I'd never think it could blow the memory. Would a suspend help resolve that? (at least to reduce the overhead of the non-trivial ones)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not because the issue is the memory used by having large Chunk[IO[A]]

It's also possibly my codebase just was doing something really silly and that's why it cost so much memory for that structure


/** Sequences the inner effects into the stream. */
def evalFlatten: Stream[F, O] =
self.evalMap(identity)

/** Evaluates up to `maxConcurrent` inner effects concurrently, emitting
* the results in order.
*/
def parEvalFlatten(
maxConcurrent: Int
)(implicit F: Concurrent[F]) = self.parEvalMap(maxConcurrent)(identity)
bplommer marked this conversation as resolved.
Show resolved Hide resolved

/** Evaluates all inner effects concurrently, emitting the results in order.
*/
def parEvalFlattenUnbounded(implicit F: Concurrent[F]) = self.parEvalMapUnbounded(identity)
Copy link
Contributor

@diesalbla diesalbla Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, just to clarify something: compiling this stream would immediately start a background action that would launch all the actions in this stream. The inside of the auxiliary parEvalMapAction function use an internal queue and a semaphore to control how many items from the source are running, but if the concurrency are infinite then those do not limit progress. Also nothing allows the user to control, based on the outputs of the resulting stream, if any actions from the source are delayed... Is that a desirable mode of operation?

Note that the parJoinUnbounded relies on a single-chunk channel (a funnel) to stop streams from advancing before the consumer has pulled latest chunk. So, the semantics of that unbounded is to launch all source streams to, but contending for that channel. This unbounded, on the other hand, would be "pull all the F[O] actions from the stream, and start all of them. So, operation of those actions Furthermore, without the consumer being able to control progress of the source, this stream offers no means for back-pressure. If the consumer is too slow, this operation is going to fill that unbounded buffer until it exhaust memory...

Copy link
Contributor

@nikiforo nikiforo Mar 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is as unsafe as other *Unbounded methods. All concurrent methods: parJoin, parEvalMap and other - are effectively push based and the only thing that guards users from launching too many computations is semaphore. In case of *Unbounded semaphore isn't working, so they all should be unsafe in that regard.

However, I see you make a distinction between parJoinUnbounded and parEvalMapUnbounded in a way they block on Channel. I think this functionality isn't intended as a protection and shouldn't be relied on.

Personally, when I use par*Unbounded methods I assume that the stream won't be parallel enough to overwhelm the consumer and choose not to bother restricting the level of parallelism. Otherwise, I would have to widen interfaces(functions or constructors) with the configuration of parallelism. Because when you really want to restrict parallelism, you should make that restriction configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, when I use par*Unbounded methods I assume that the stream won't be parallel enough to overwhelm the consumer and choose not to bother restricting the level of parallelism

There are two mixed concerns here: parallelism, and buffering vs backpressuring.

In parJoinUnbounded you take an outer stream of sources of data, and it launches all sources to pull, and push into the queue, in parallel. That outer stream may not end and keep incorporating new sources; so parJoinUnbounded does not limit parallelism. However, what it does restrict, by means of that output queue, is how many items are pulled from each source into memory.

The problem with this new combinator is that it would prefetch and load all data from its source into local memory, without any feedback from the consumer to stop it. Thus, a short slowdown in the consumer would cause this combinator to accrue a lot of data, and thus crash the program. A basic reliability guideline is to avoid infinite buffers.

However, I see you make a distinction between parJoinUnbounded and parEvalMapUnbounded in how they block on Channel. I don't think this functionality is intended as a protection and should be relied on.

Given that back-pressuring and laziness is an essential part of fs2 streams, building that check and balance into the pipeline seems to me a crucial part of that combinator.

As an aside, there are FS2 combinators that do fetch a lot of data, like prefetchAll, but those are some legacies. Also, some choices of parameters in other combinators can cause trouble, but those cannot be avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the radio silence on this!

Yes, the use case I have in mind for this is when backpressure is provided upstream - the case I have in mind (which occurs for example in fs2-kafka) is where you have an operation that returns F[F[Result]], where the outer F does a backpressured enqueue operation and then returns the inner F which is a handle for waiting on the result - so there is no need for backpressure on the inner F.

This should definitely have documentation and an example though, so I'm going to remove parEvalFlattenUnbounded from this PR and maybe put it in a separate one later.

}

/** Provides syntax for pure streams. */
implicit final class PureOps[O](private val self: Stream[Pure, O]) extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class UnixSocketsSuite extends Fs2Suite with UnixSocketsSuitePlatform {

val clients = (0 until 100).map(b => client(Chunk.singleton(b.toByte)))

(Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalMap(identity))
(Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalFlatten)
.concurrently(server)
.compile
.drain
Expand Down