Skip to content

Commit

Permalink
Merge branch 'topic/async-canary' into update/cats-effect-3.5.0-RC1
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Feb 14, 2023
2 parents 87716c8 + 1c0be5c commit 0d3df0b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 49 deletions.
5 changes: 3 additions & 2 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ private[fs2] trait ioplatform {
val end =
if (endAfterUse)
Stream.exec {
F.async_[Unit] { cb =>
writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))
F.async[Unit] { cb =>
F.delay(writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))))
.as(Some(F.unit))
}
}
else Stream.empty
Expand Down
125 changes: 78 additions & 47 deletions io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import java.nio.channels.{
}
import java.nio.channels.AsynchronousChannelGroup
import cats.syntax.all._
import cats.effect.syntax.all._
import cats.effect.kernel.{Async, Resource}
import com.comcast.ip4s.{Host, IpAddress, Port, SocketAddress}
import fs2.concurrent.Channel

private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
private[fs2] def unsafe[F[_]: Async](channelGroup: AsynchronousChannelGroup): SocketGroup[F] =
Expand All @@ -57,17 +59,21 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>

def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] =
to.resolve[F].flatMap { ip =>
Async[F].async_[AsynchronousSocketChannel] { cb =>
ch.connect(
ip.toInetSocketAddress,
null,
new CompletionHandler[Void, Void] {
def completed(result: Void, attachment: Void): Unit =
cb(Right(ch))
def failed(rsn: Throwable, attachment: Void): Unit =
cb(Left(rsn))
Async[F].async[AsynchronousSocketChannel] { cb =>
Async[F]
.delay {
ch.connect(
ip.toInetSocketAddress,
null,
new CompletionHandler[Void, Void] {
def completed(result: Void, attachment: Void): Unit =
cb(Right(ch))
def failed(rsn: Throwable, attachment: Void): Unit =
cb(Left(rsn))
}
)
}
)
.as(Some(Async[F].delay(ch.close())))
}
}

Expand All @@ -80,69 +86,94 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
options: List[SocketOption]
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = {

val setup: Resource[F, AsynchronousServerSocketChannel] =
val setup: Resource[
F,
(AsynchronousServerSocketChannel, Channel[F, Either[Throwable, AsynchronousSocketChannel]])
] =
Resource.eval(address.traverse(_.resolve[F])).flatMap { addr =>
Resource
.make(
Async[F].delay(
AsynchronousServerSocketChannel.open(channelGroup)
)
)(sch => Async[F].delay(if (sch.isOpen) sch.close()))
.evalTap(ch =>
.evalTap { sch =>
Async[F].delay(
ch.bind(
sch.bind(
new InetSocketAddress(
addr.map(_.toInetAddress).orNull,
port.map(_.value).getOrElse(0)
)
)
)
)
}
}
.mproduct { sch =>
def acceptChannel: F[AsynchronousSocketChannel] =
Async[F].async[AsynchronousSocketChannel] { cb =>
Async[F]
.delay {
sch.accept(
null,
new CompletionHandler[AsynchronousSocketChannel, Void] {
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
cb(Right(ch))
def failed(rsn: Throwable, attachment: Void): Unit =
cb(Left(rsn))
}
)
}
.as(Some(Async[F].delay(sch.close())))
}

def acceptIncoming(
sch: AsynchronousServerSocketChannel
): Stream[F, Socket[F]] = {
def go: Stream[F, Socket[F]] = {
def acceptChannel: F[AsynchronousSocketChannel] =
Async[F].async_[AsynchronousSocketChannel] { cb =>
sch.accept(
null,
new CompletionHandler[AsynchronousSocketChannel, Void] {
def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit =
cb(Right(ch))
def failed(rsn: Throwable, attachment: Void): Unit =
cb(Left(rsn))
Resource
.make(Channel.synchronous[F, Either[Throwable, AsynchronousSocketChannel]]) {
accepted =>
accepted.close *>
accepted.stream
.foreach(_.traverse_(ch => Async[F].delay(ch.close())))
.compile
.drain
}
.flatTap { accepted =>
Stream
.repeatEval(acceptChannel.attempt)
.through(accepted.sendAll)
.compile
.drain
.background
}
)
}

def setOpts(ch: AsynchronousSocketChannel) =
Async[F].delay {
options.foreach(o => ch.setOption(o.key, o.value))
}
}

Stream.eval(acceptChannel.attempt).flatMap {
def acceptIncoming(sch: AsynchronousServerSocketChannel)(
incoming: Stream[F, Either[Throwable, AsynchronousSocketChannel]]
): Stream[F, Socket[F]] = {
def setOpts(ch: AsynchronousSocketChannel) =
Async[F].delay {
options.foreach(o => ch.setOption(o.key, o.value))
}

incoming
.flatMap {
case Left(_) => Stream.empty[F]
case Right(accepted) =>
Stream.resource(Socket.forAsync(accepted).evalTap(_ => setOpts(accepted)))
} ++ go
}

go.handleErrorWith {
case err: AsynchronousCloseException =>
Stream.eval(Async[F].delay(sch.isOpen)).flatMap { isOpen =>
if (isOpen) Stream.raiseError[F](err)
else Stream.empty
}
case err => Stream.raiseError[F](err)
}
}
.handleErrorWith {
case err: AsynchronousCloseException =>
Stream.eval(Async[F].delay(sch.isOpen)).flatMap { isOpen =>
if (isOpen) Stream.raiseError[F](err)
else Stream.empty
}
case err => Stream.raiseError[F](err)
}
}

setup.map { sch =>
setup.map { case (sch, incoming) =>
val jLocalAddress = sch.getLocalAddress.asInstanceOf[java.net.InetSocketAddress]
val localAddress = SocketAddress.fromInetSocketAddress(jLocalAddress)
(localAddress, acceptIncoming(sch))
(localAddress, incoming.stream.through(acceptIncoming(sch)(_)))
}
}
}
Expand Down

0 comments on commit 0d3df0b

Please sign in to comment.