Skip to content

Commit

Permalink
Replace SubstreamCancelStrategy with SupervisionDecider for Split
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Mar 18, 2023
1 parent 33583e0 commit fcf46b5
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.stream._
import pekko.stream.StreamSubscriptionTimeoutTerminationMode
import pekko.stream.Supervision.resumingDecider
import pekko.stream.Supervision.{ resumingDecider, Decider }
import pekko.stream.impl.SubscriptionTimeoutException
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.TestPublisher
Expand Down Expand Up @@ -58,13 +58,16 @@ class FlowSplitAfterSpec extends StreamSpec("""
def cancel(): Unit = subscription.cancel()
}

class SubstreamsSupport(
splitAfter: Int = 3,
class SubstreamsSupport(splitAfter: Int = 3,
elementCount: Int = 6,
substreamCancelStrategy: SubstreamCancelStrategy = SubstreamCancelStrategy.drain) {
decider: Decider = Supervision.resumingDecider) {

val source = Source(1 to elementCount)
val groupStream = source.splitAfter(substreamCancelStrategy)(_ == splitAfter).lift.runWith(Sink.asPublisher(false))
val groupStream = source
.splitAfter(_ == splitAfter)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.asPublisher(false))
val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]()

groupStream.subscribe(masterSubscriber)
Expand Down Expand Up @@ -272,7 +275,7 @@ class FlowSplitAfterSpec extends StreamSpec("""
}

"support eager cancellation of master stream on cancelling substreams" in {
new SubstreamsSupport(splitAfter = 5, elementCount = 8, SubstreamCancelStrategy.propagate) {
new SubstreamsSupport(splitAfter = 5, elementCount = 8, Supervision.stoppingDecider) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.asPublisher(false)))
s1.cancel()
masterSubscriber.expectComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.pekko
import pekko.Done
import pekko.NotUsed
import pekko.stream._
import pekko.stream.Supervision.resumingDecider
import pekko.stream.Supervision.Decider
import pekko.stream.impl.SubscriptionTimeoutException
import pekko.stream.impl.fusing.Split
import pekko.stream.testkit._
Expand Down Expand Up @@ -51,13 +51,16 @@ class FlowSplitWhenSpec extends StreamSpec("""
def cancel(): Unit = subscription.cancel()
}

class SubstreamsSupport(
splitWhen: Int = 3,
class SubstreamsSupport(splitWhen: Int = 3,
elementCount: Int = 6,
substreamCancelStrategy: SubstreamCancelStrategy = SubstreamCancelStrategy.drain) {
decider: Decider = Supervision.resumingDecider) {

val source = Source(1 to elementCount)
val groupStream = source.splitWhen(substreamCancelStrategy)(_ == splitWhen).lift.runWith(Sink.asPublisher(false))
val groupStream = source
.splitWhen(_ == splitWhen)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.asPublisher(false))
val masterSubscriber = TestSubscriber.manualProbe[Source[Int, _]]()

groupStream.subscribe(masterSubscriber)
Expand Down Expand Up @@ -162,6 +165,7 @@ class FlowSplitWhenSpec extends StreamSpec("""
.fromPublisher(inputs)
.splitWhen(_ == 2)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.map(_.runWith(Sink.fromSubscriber(substream)))
.runWith(Sink.fromSubscriber(masterStream))

Expand All @@ -176,15 +180,26 @@ class FlowSplitWhenSpec extends StreamSpec("""
inputs.expectCancellation()

val inputs2 = TestPublisher.probe[Int]()
Source.fromPublisher(inputs2).splitWhen(_ == 2).lift.map(_.runWith(Sink.cancelled)).runWith(Sink.cancelled)
Source
.fromPublisher(inputs2)
.splitWhen(_ == 2)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.map(_.runWith(Sink.cancelled))
.runWith(Sink.cancelled)

inputs2.expectCancellation()

val inputs3 = TestPublisher.probe[Int]()

val masterStream3 = TestSubscriber.probe[Source[Int, Any]]()

Source.fromPublisher(inputs3).splitWhen(_ == 2).lift.runWith(Sink.fromSubscriber(masterStream3))
Source
.fromPublisher(inputs3)
.splitWhen(_ == 2)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.fromSubscriber(masterStream3))

masterStream3.request(1)
inputs3.sendNext(1)
Expand Down Expand Up @@ -267,6 +282,7 @@ class FlowSplitWhenSpec extends StreamSpec("""
Source(1 to 100)
.splitWhen(_ => true)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.mapAsync(1)(_.runWith(Sink.head)) // Please note that this line *also* implicitly asserts nonempty substreams
.grouped(200)
.runWith(Sink.head),
Expand All @@ -281,7 +297,7 @@ class FlowSplitWhenSpec extends StreamSpec("""
// `lift` doesn't cut here because it will prevent the behavior we'd like to see.
// In fact, this test is somewhat useless, as a user cannot trigger double materialization using
// the public splitWhen => SubFlow API.
.via(Split.when(_ => true, SubstreamCancelStrategy.drain))
.via(Split.when(_ => true))
.map { source =>
// run twice, but make sure we return the result of the materialization that ran second
source.runWith(Sink.ignore).flatMap(_ => source.runWith(Sink.ignore))
Expand Down Expand Up @@ -320,7 +336,7 @@ class FlowSplitWhenSpec extends StreamSpec("""
.fromPublisher(publisherProbeProbe)
.splitWhen(elem => if (elem == 3) throw exc else elem % 3 == 0)
.lift
.withAttributes(ActorAttributes.supervisionStrategy(resumingDecider))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[Source[Int, NotUsed]]()
publisher.subscribe(subscriber)
Expand Down Expand Up @@ -374,7 +390,7 @@ class FlowSplitWhenSpec extends StreamSpec("""
}

"support eager cancellation of master stream on cancelling substreams" in {
new SubstreamsSupport(splitWhen = 5, elementCount = 8, SubstreamCancelStrategy.propagate) {
new SubstreamsSupport(splitWhen = 5, elementCount = 8, Supervision.stoppingDecider) {
val s1 = StreamPuppet(getSubFlow().runWith(Sink.asPublisher(false)))
s1.cancel()
masterSubscriber.expectComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.{ Buffer => BufferImpl }
import pekko.stream.Supervision.Decider
import pekko.stream.impl.ActorSubscriberMessage
import pekko.stream.impl.ActorSubscriberMessage.OnError
import pekko.stream.impl.Stages.DefaultAttributes
Expand Down Expand Up @@ -467,49 +468,53 @@ import pekko.util.ccompat.JavaConverters._
/** Splits after the current element. The current element will be the last element in the current substream. */
case object SplitAfter extends SplitDecision

def when[T](
p: T => Boolean,
substreamCancelStrategy: SubstreamCancelStrategy): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] =
new Split(Split.SplitBefore, p, substreamCancelStrategy)
def cancelStrategyToDecider(substreamCancelStrategy: SubstreamCancelStrategy): Decider =
substreamCancelStrategy match {
case SubstreamCancelStrategies.Propagate => Supervision.stoppingDecider
case SubstreamCancelStrategies.Drain => Supervision.resumingDecider
}

def when[T](p: T => Boolean): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] = {
new Split(Split.SplitBefore, p)
}

def after[T](
p: T => Boolean,
substreamCancelStrategy: SubstreamCancelStrategy): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] =
new Split(Split.SplitAfter, p, substreamCancelStrategy)
def after[T](p: T => Boolean): Graph[FlowShape[T, Source[T, NotUsed]], NotUsed] =
new Split(Split.SplitAfter, p)
}

/**
* INTERNAL API
*/
@InternalApi private[pekko] final class Split[T](
val decision: Split.SplitDecision,
val p: T => Boolean,
val substreamCancelStrategy: SubstreamCancelStrategy)
@InternalApi private[pekko] final class Split[T](val decision: Split.SplitDecision, val p: T => Boolean)
extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {

val in: Inlet[T] = Inlet("Split.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out")

override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)

private val propagateSubstreamCancel = substreamCancelStrategy match {
case SubstreamCancelStrategies.Propagate => true
case SubstreamCancelStrategies.Drain => false
}

override protected def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)
override def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
import Split._

private val SubscriptionTimer = "SubstreamSubscriptionTimer"

private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

private val timeout: FiniteDuration =
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
private var substreamSource: SubSourceOutlet[T] = null
private var substreamWaitingToBePushed = false
private var substreamCancelled = false

def propagateSubstreamCancel(ex: Throwable): Boolean =
decider(ex) match {
case Supervision.Stop => true
case Supervision.Resume => false
case Supervision.Restart => false
}

setHandler(
out,
new OutHandler {
Expand Down Expand Up @@ -606,7 +611,7 @@ import pekko.util.ccompat.JavaConverters._

override def onDownstreamFinish(cause: Throwable): Unit = {
substreamCancelled = true
if (isClosed(in) || propagateSubstreamCancel) {
if (isClosed(in) || propagateSubstreamCancel(cause)) {
cancelStage(cause)
} else {
// Start draining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* @see [[#splitWhen]]
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitWhen(substreamCancelStrategy)(p.test))

Expand Down Expand Up @@ -2395,6 +2398,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* @see [[#splitAfter]]
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.splitAfter(substreamCancelStrategy)(p.test))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3827,6 +3827,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* @see [[#splitWhen]]
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitWhen(substreamCancelStrategy)(p.test))

Expand Down Expand Up @@ -3884,6 +3887,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* @see [[#splitAfter]]
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "2.6.19")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(p: function.Predicate[Out]): SubSource[Out, Mat] =
new SubSource(delegate.splitAfter(substreamCancelStrategy)(p.test))

Expand Down
56 changes: 46 additions & 10 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2246,15 +2246,25 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.splitAfter]].
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitWhen(substreamCancelStrategy: SubstreamCancelStrategy)(
p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(Split.when(p, substreamCancelStrategy)).map(_.via(flow)).via(new FlattenMerge(breadth))
via(Split
.when(p)
.withAttributes(ActorAttributes.supervisionStrategy(Split.cancelStrategyToDecider(substreamCancelStrategy))))
.map(_.via(flow))
.via(new FlattenMerge(breadth))
}

val finish: (Sink[Out, NotUsed]) => Closed = s =>
via(Split.when(p, substreamCancelStrategy))
val finish: Sink[Out, NotUsed] => Closed = s =>
via(
Split
.when(p)
.withAttributes(ActorAttributes.supervisionStrategy(Split.cancelStrategyToDecider(substreamCancelStrategy))))
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))

new SubFlowImpl(Flow[Out], merge, finish)
Expand All @@ -2267,8 +2277,17 @@ trait FlowOps[+Out, +Mat] {
*
* @see [[#splitWhen]]
*/
def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] =
splitWhen(SubstreamCancelStrategy.drain)(p)
def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(Split.when(p)).map(_.via(flow)).via(new FlattenMerge(breadth))
}

val finish: Sink[Out, NotUsed] => Closed = s =>
via(Split.when(p)).to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))

new SubFlowImpl(Flow[Out], merge, finish)
}

/**
* This operation applies the given predicate to all incoming elements and
Expand Down Expand Up @@ -2315,14 +2334,24 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.splitWhen]].
*/
@deprecated(
"Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy",
since = "1.1.0")
def splitAfter(substreamCancelStrategy: SubstreamCancelStrategy)(
p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(Split.after(p, substreamCancelStrategy)).map(_.via(flow)).via(new FlattenMerge(breadth))
via(Split
.after(p)
.withAttributes(ActorAttributes.supervisionStrategy(Split.cancelStrategyToDecider(substreamCancelStrategy))))
.map(_.via(flow))
.via(new FlattenMerge(breadth))
}
val finish: (Sink[Out, NotUsed]) => Closed = s =>
via(Split.after(p, substreamCancelStrategy))
val finish: Sink[Out, NotUsed] => Closed = s =>
via(
Split
.after(p)
.withAttributes(ActorAttributes.supervisionStrategy(Split.cancelStrategyToDecider(substreamCancelStrategy))))
.to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
new SubFlowImpl(Flow[Out], merge, finish)
}
Expand All @@ -2334,8 +2363,15 @@ trait FlowOps[+Out, +Mat] {
*
* @see [[#splitAfter]]
*/
def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] =
splitAfter(SubstreamCancelStrategy.drain)(p)
def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] = {
val merge = new SubFlowImpl.MergeBack[Out, Repr] {
override def apply[T](flow: Flow[Out, T, NotUsed], breadth: Int): Repr[T] =
via(Split.after(p)).map(_.via(flow)).via(new FlattenMerge(breadth))
}
val finish: Sink[Out, NotUsed] => Closed = s =>
via(Split.after(p)).to(Sink.foreach(_.runWith(s)(GraphInterpreter.currentInterpreter.materializer)))
new SubFlowImpl(Flow[Out], merge, finish)
}

/**
* Transform each input element into a `Source` of output elements that is
Expand Down

0 comments on commit fcf46b5

Please sign in to comment.