Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parEvalMap* runs resource finaliser before usage #3305

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
107 changes: 88 additions & 19 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
def concurrently[F2[x] >: F[x], O2](
that: Stream[F2, O2]
)(implicit F: Concurrent[F2]): Stream[F2, O] =
concurrentlyAux(that).flatMap { case (startBack, fore) => startBack >> fore }
Stream.eval(F.unit.map(_ => println("DEBUG: Invoked concurrently"))) *> concurrentlyAux(that)
.flatMap { case (startBack, fore) =>
startBack >> fore
}

private def concurrentlyAux[F2[x] >: F[x], O2](
that: Stream[F2, O2]
Expand All @@ -545,20 +548,59 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
interrupt <- F.deferred[Unit]
backResult <- F.deferred[Either[Throwable, Unit]]
} yield {
def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt)
def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt) <* Stream.eval(
F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch is invoked"))
)

val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase {
val compileBack: F2[Unit] = F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' compileBack started (starting resource)")
) *> watch(that).compile.drain.guaranteeCase {
// Pass the result of backstream completion in the backResult deferred.
// IF result of back-stream was failed, interrupt fore. Otherwise, let it be
case Outcome.Errored(t) => backResult.complete(Left(t)) >> interrupt.complete(()).void
case _ => backResult.complete(Right(())).void
case Outcome.Errored(t) =>
F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' compileBack ERRORED case")
) *> backResult.complete(Left(t)) >> interrupt.complete(()).void
case Outcome.Canceled() =>
backResult.complete(Right(())).void *> F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' compileBack CANCELED case")
)
case Outcome.Succeeded(fa) =>
F.unit.map(_ =>
println(s"DEBUG: Inside 'concurrently' compileBack SUCCESS case with value: $fa")
) *>
backResult.complete(Right(())).void *> F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' compileBack SUCCESS case")
)
}.voidError

// stop background process but await for it to finalise with a result
// We use F.fromEither to bring errors from the back into the fore
val stopBack: F2[Unit] = interrupt.complete(()) >> backResult.get.flatMap(F.fromEither)
val stopBack: F2[Unit] =
interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) *> F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' AFTER stopBack completed")
)

val tmp = Stream.resource(
Resource.make(compileBack.start)(_ =>
F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' stopBack invoked (closing resource)")
) *> stopBack
)
)

(Stream.bracket(compileBack.start)(_ => stopBack), watch(this))
(tmp, watch(this))

/*
(
Stream.bracket(compileBack.start)(_ =>
F.unit.map(_ =>
println("DEBUG: Inside 'concurrently' stopBack invoked (closing resource)")
) *> stopBack
),
watch(this)
)
*/
}

Stream.eval(fstream)
Expand Down Expand Up @@ -2251,19 +2293,33 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
val releaseAndCheckCompletion =
semaphore.release *>
semaphore.available.flatMap {
case `concurrency` => channel.close *> end.complete(()).void
case _ => F.unit
case `concurrency` =>
F.unit.map(_ =>
println("DEBUG: inside releaseAndCheckCompletion 'concurrency' case ")
) *>
channel.close *> end.complete(()).void
case _ =>
F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion 'other' case ")) *>
F.unit
}

def forkOnElem(el: O): F2[Unit] =
F.uncancelable { poll =>
poll(semaphore.acquire) <*
Deferred[F2, Unit].flatMap { pushed =>
val init = initFork(pushed.complete(()).void)
poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get
F.start(stop.get.race(action) *> releaseAndCheckCompletion)
}
F.unit.map(_ => println("DEBUG: Inside forkOnElem")) *>
poll(init).onCancel(releaseAndCheckCompletion).flatMap { send =>
val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get
F.unit
.map(_ => println("DEBUG: Inside forkOnElem and BEFORE action invocation")) *>
// F.start(stop.get.race(action) *> releaseAndCheckCompletion)
F.start(
stop.get.race(action) *> F.unit.map(_ =>
println("DEBUG: Inside forkOnElem when stop vs action race ends")
) *> releaseAndCheckCompletion
)
}
}
}

Expand All @@ -2272,11 +2328,21 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
interruptWhen(stop.get.map(_.asRight[Throwable]))
.foreach(forkOnElem)
.onFinalizeCase {
case ExitCase.Succeeded => releaseAndCheckCompletion
case _ => stop.complete(()) *> releaseAndCheckCompletion
case ExitCase.Succeeded =>
F.unit.map(_ => println("DEBUG: inside background SUCCESS case")) *>
releaseAndCheckCompletion
case _ =>
F.unit.map(_ => println("DEBUG: inside background OTHER case")) *>
stop.complete(()) *> releaseAndCheckCompletion
}

val foreground = channel.stream.evalMap(_.rethrow)
val foreground = channel.stream
.evalTap { x =>
F.unit.map(_ => println("DEBUG: Inside foreground evalTap")) *>
x
}
.evalMap(_.rethrow)

foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background)
}

Expand Down Expand Up @@ -3845,13 +3911,16 @@ object Stream extends StreamLowPriority {
case Resource.Allocate(resource) =>
Stream
.bracketFullWeak(resource) { case ((_, release), exit) =>
release(exit)
F.unit.map(_ => println("DEBUG: inside resourceWeak Allocate case CALLING release")) *>
release(exit)
}
.mapNoScope(_._1)
case Resource.Bind(source, f) =>
resourceWeak(source).flatMap(o => resourceWeak(f(o)))
case Resource.Eval(fo) => Stream.eval(fo)
case Resource.Pure(o) => Stream.emit(o)
case Resource.Eval(fo) =>
Stream.eval(fo)
case Resource.Pure(o) =>
Stream.emit(o)
}

/** Same as [[resourceWeak]], but expressed as a FunctionK. */
Expand Down
51 changes: 51 additions & 0 deletions core/shared/src/test/scala/fs2/StreamSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1042,4 +1042,55 @@ class StreamSuite extends Fs2Suite {
}
}

test("parEvalMap works correctly") {
/*
Stream
.resource(
Resource.make(IO.println("acquire"))(_ => IO.println("release"))
)
*/
/*
Stream
.resource(
Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r =>
IO.println("Closing Resource") *> r.set(false)
)
)
.parEvalMap(2)(ref =>
ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> IO.sleep(1.second) >>
ref.get.flatMap(x => IO.println(s"after sleep: ${x}"))
)
.compile
.drain
*/

Stream
.resource(
Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r =>
IO.println("Closing Resource") *> r.set(false)
)
)
.parEvalMap(2) { ref =>
ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >>
IO.sleep(1 second) >>
ref.get.flatMap(x => IO.println(s"after sleep: ${x}"))
}
.compile
.drain
/*
Stream
.resource(Resource.make(IO.println("acquire"))(_ => IO.println("release")))
.parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use"))
.compile
.drain

Stream.range(0, 60)
.covary[IO]
.parEvalMap(60)(_ => IO.sleep(1.second))
.compile
.drain
*/

}

}
Loading