diff --git a/stream/src/main/scala-2.12/org/apache/pekko/stream/javadsl/CollectionUtil.scala b/stream/src/main/scala-2.12/org/apache/pekko/stream/javadsl/CollectionUtil.scala new file mode 100644 index 0000000000..01c33e7b18 --- /dev/null +++ b/stream/src/main/scala-2.12/org/apache/pekko/stream/javadsl/CollectionUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream +package javadsl + +import scala.collection.immutable + +import org.apache.pekko +import pekko.util.ccompat.JavaConverters._ + +/** + * INTERNAL API + */ +private[javadsl] object CollectionUtil { + def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] = + jlist.asScala.toList +} diff --git a/stream/src/main/scala-2.13+/org/apache/pekko/stream/javadsl/CollectionUtil.scala b/stream/src/main/scala-2.13+/org/apache/pekko/stream/javadsl/CollectionUtil.scala new file mode 100644 index 0000000000..27ce069473 --- /dev/null +++ b/stream/src/main/scala-2.13+/org/apache/pekko/stream/javadsl/CollectionUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream +package javadsl + +import scala.collection.immutable + +import org.apache.pekko +import pekko.util.ccompat.JavaConverters._ + +/** + * INTERNAL API + */ +private[javadsl] object CollectionUtil { + def toSeq[T](jlist: java.util.List[T]): immutable.Seq[T] = + jlist.asScala.toSeq +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 245d7a2d1e..91b0385e49 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -34,7 +34,6 @@ import pekko.annotation.ApiMayChange import pekko.dispatch.ExecutionContexts import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.Pair -import pekko.japi.Util import pekko.japi.function import pekko.japi.function.Creator import pekko.stream.{ javadsl, _ } @@ -3245,7 +3244,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -3327,7 +3326,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index f4f8806340..998e303391 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -21,7 +21,7 @@ import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } -import pekko.japi.{ function, Pair, Util } +import pekko.japi.{ function, Pair } import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ @@ -273,7 +273,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( */ def mapConcat[Out2]( f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = - viaScala(_.mapConcat(elem => Util.immutableSeq(f.apply(elem)))) + viaScala(_.mapConcat(elem => f.apply(elem).asScala)) /** * Apply the given function to each context element (leaving the data elements unchanged). diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 4bd056d358..b43ff064b8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -28,7 +28,7 @@ import org.apache.pekko import pekko._ import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status } import pekko.dispatch.ExecutionContexts -import pekko.japi.{ function, Util } +import pekko.japi.function import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.LinearTraversalBuilder @@ -461,7 +461,7 @@ object Sink { sinks: java.util.List[_ <: Graph[SinkShape[U], M]], fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]]) : Sink[T, java.util.List[M]] = { - val seq = if (sinks != null) Util.immutableSeq(sinks).collect { + val seq = if (sinks != null) CollectionUtil.toSeq(sinks).collect { case sink: Sink[U @unchecked, M @unchecked] => sink.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 5fdefb5081..bcd289b785 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -26,16 +26,16 @@ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import org.apache.pekko -import org.apache.pekko.stream.impl.fusing.ArraySource import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import pekko.annotation.ApiMayChange import pekko.dispatch.ExecutionContexts import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } -import pekko.japi.{ function, JavaPartialFunction, Pair, Util } +import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } +import pekko.stream.impl.fusing.ArraySource import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -718,7 +718,7 @@ object Source { @deprecatedName(Symbol("strategy")) fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, NotUsed] = { - val seq = if (rest != null) Util.immutableSeq(rest).map(_.asScala) else immutable.Seq() + val seq = if (rest != null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq() new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num))) } @@ -745,12 +745,11 @@ object Source { sources: java.util.List[_ <: Graph[SourceShape[T], M]], fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]]) : Source[U, java.util.List[M]] = { - val seq = if (sources != null) Util.immutableSeq(sources).collect { + val seq = if (sources != null) CollectionUtil.toSeq(sources).collect { case source: Source[T @unchecked, M @unchecked] => source.asScala case other => other } else immutable.Seq() - import org.apache.pekko.util.ccompat.JavaConverters._ new Source(scaladsl.Source.combine(seq)(size => fanInStrategy(size)).mapMaterializedValue(_.asJava)) } @@ -758,7 +757,7 @@ object Source { * Combine the elements of multiple streams into a stream of lists. */ def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = { - val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq() + val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq() new Source(scaladsl.Source.zipN(seq).map(_.asJava)) } @@ -768,7 +767,7 @@ object Source { def zipWithN[T, O]( zipper: function.Function[java.util.List[T], O], sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = { - val seq = if (sources != null) Util.immutableSeq(sources).map(_.asScala) else immutable.Seq() + val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq() new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq)) } @@ -980,7 +979,7 @@ object Source { eagerComplete: Boolean): javadsl.Source[T, NotUsed] = { val seq = if (sourcesAndPriorities != null) - Util.immutableSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue())) + CollectionUtil.toSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue())) else immutable.Seq() new Source(scaladsl.Source.mergePrioritizedN(seq, eagerComplete)) @@ -1743,7 +1742,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): javadsl.Source[Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -1823,7 +1822,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): javadsl.Source[Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2517,7 +2516,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] = - new Source(delegate.mapConcat(elem => Util.immutableSeq(f.apply(elem)))) + new Source(delegate.mapConcat(elem => f.apply(elem).asScala)) /** * Transform each stream element with the help of a state. @@ -2670,7 +2669,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] = new Source(delegate.statefulMapConcat { () => val fun = f.create() - elem => Util.immutableSeq(fun(elem)) + elem => fun(elem).asScala }) /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index 8478835b52..53d1449883 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -23,7 +23,6 @@ import pekko.actor.ClassicActorSystemProvider import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.Pair -import pekko.japi.Util import pekko.japi.function import pekko.stream._ import pekko.util.ConstantFun @@ -267,7 +266,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon * @see [[pekko.stream.javadsl.Source.mapConcat]] */ def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Out2, Ctx, Mat] = - viaScala(_.mapConcat(elem => Util.immutableSeq(f.apply(elem)))) + viaScala(_.mapConcat(elem => f.apply(elem).asScala)) /** * Apply the given function to each context element (leaving the data elements unchanged). diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index addcc8b07b..20f73c4f01 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -27,7 +27,7 @@ import org.apache.pekko import pekko.NotUsed import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } -import pekko.japi.{ function, Pair, Util } +import pekko.japi.{ function, Pair } import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ @@ -202,7 +202,7 @@ class SubFlow[In, Out, Mat]( */ def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.mapConcat { elem => - Util.immutableSeq(f(elem)) + f(elem).asScala }) /** @@ -356,7 +356,7 @@ class SubFlow[In, Out, Mat]( def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] = new SubFlow(delegate.statefulMapConcat { () => val fun = f.create() - elem => Util.immutableSeq(fun(elem)) + elem => fun(elem).asScala }) /** @@ -2048,7 +2048,7 @@ class SubFlow[In, Out, Mat]( def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): SubFlow[In, Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2106,7 +2106,7 @@ class SubFlow[In, Out, Mat]( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): SubFlow[In, Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index ad68e3579e..9ac6f9c714 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -27,7 +27,7 @@ import org.apache.pekko import pekko.NotUsed import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } -import pekko.japi.{ function, Pair, Util } +import pekko.japi.{ function, Pair } import pekko.stream._ import pekko.util.ConstantFun import pekko.util.FutureConverters._ @@ -193,7 +193,7 @@ class SubSource[Out, Mat]( */ def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] = new SubSource(delegate.mapConcat { elem => - Util.immutableSeq(f(elem)) + f(elem).asScala }) /** @@ -347,7 +347,7 @@ class SubSource[Out, Mat]( def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] = new SubSource(delegate.statefulMapConcat { () => val fun = f.create() - elem => Util.immutableSeq(fun(elem)) + elem => fun(elem).asScala }) /** @@ -2022,7 +2022,7 @@ class SubSource[Out, Mat]( def mergeAll( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], eagerComplete: Boolean): SubSource[Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } @@ -2081,7 +2081,7 @@ class SubSource[Out, Mat]( those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]], segmentSize: Int, eagerClose: Boolean): SubSource[Out, Mat] = { - val seq = if (those != null) Util.immutableSeq(those).collect { + val seq = if (those != null) CollectionUtil.toSeq(those).collect { case source: Source[Out @unchecked, _] => source.asScala case other => other } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala index 3a82dd001c..fed4044733 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala @@ -37,7 +37,6 @@ import pekko.actor.ExtensionId import pekko.actor.ExtensionIdProvider import pekko.annotation.InternalApi import pekko.io.Inet.SocketOption -import pekko.japi.Util.immutableSeq import pekko.stream.Materializer import pekko.stream.SystemMaterializer import pekko.stream.TLSClosing @@ -182,7 +181,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { idleTimeout: Optional[java.time.Duration]): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( delegate - .bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout)) + .bind(interface, port, backlog, CollectionUtil.toSeq(options), halfClose, optionalDurationToScala(idleTimeout)) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) @@ -264,7 +263,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { .outgoingConnection( remoteAddress, localAddress.toScala, - immutableSeq(options), + CollectionUtil.toSeq(options), halfClose, optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout)) @@ -369,7 +368,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { sslContext, negotiateNewSession, localAddress.toScala, - immutableSeq(options), + CollectionUtil.toSeq(options), connectTimeout, idleTimeout) .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) @@ -417,7 +416,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { remoteAddress, createSSLEngine = () => createSSLEngine.get(), localAddress.toScala, - immutableSeq(options), + CollectionUtil.toSeq(options), optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout), session => @@ -454,7 +453,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { idleTimeout: Duration): Source[IncomingConnection, CompletionStage[ServerBinding]] = Source.fromGraph( delegate - .bindTls(interface, port, sslContext, negotiateNewSession, backlog, immutableSeq(options), idleTimeout) + .bindTls(interface, port, sslContext, negotiateNewSession, backlog, CollectionUtil.toSeq(options), idleTimeout) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) @@ -518,7 +517,7 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { port, createSSLEngine = () => createSSLEngine.get(), backlog, - immutableSeq(options), + CollectionUtil.toSeq(options), optionalDurationToScala(idleTimeout), session => verifySession.apply(session).toScala match {