From f3091ce2d26ae9c60d990634c16779e23a3c6dbc Mon Sep 17 00:00:00 2001 From: Mike Shepherd Date: Mon, 20 Nov 2023 12:04:12 +0000 Subject: [PATCH 1/6] Upgrade a js socket to TLS directly if possible --- .../scala/fs2/io/net/SocketPlatform.scala | 2 +- .../fs2/io/net/tls/TLSContextPlatform.scala | 6 ++-- .../fs2/io/net/tls/TLSSocketPlatform.scala | 29 ++++++++++++++----- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala index 4f7ecab9ea..e39734ab15 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketPlatform.scala @@ -53,7 +53,7 @@ private[net] trait SocketCompanionPlatform { } } - private[net] class AsyncSocket[F[_]]( + private[net] case class AsyncSocket[F[_]]( sock: facade.net.Socket, readStream: SuspendedStream[F, Byte] )(implicit F: Async[F]) diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala index a506e02f58..3e52d5ba64 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala @@ -89,7 +89,8 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => ) ) tlsSock - } + }, + clientMode = clientMode ) .evalTap(_ => handshake.get.rethrow) } @@ -128,7 +129,8 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => ) ) tlsSock - } + }, + clientMode = clientMode ) .evalTap(_ => verifyError.get.rethrow) } diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala index 050f9d8353..b0295c1a8f 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala @@ -38,16 +38,29 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => private[tls] def forAsync[F[_]]( socket: Socket[F], - upgrade: fs2.io.Duplex => facade.tls.TLSSocket + upgrade: fs2.io.Duplex => facade.tls.TLSSocket, + clientMode: Boolean )(implicit F: Async[F]): Resource[F, TLSSocket[F]] = for { - duplexOut <- mkDuplex(socket.reads) - (duplex, out) = duplexOut - _ <- out.through(socket.writes).compile.drain.background - tlsSockReadable <- suspendReadableAndRead( - destroyIfNotEnded = false, - destroyIfCanceled = false - )(upgrade(duplex)) + tlsSockReadable <- socket match { + case Socket.AsyncSocket(sock, _) if clientMode => + for { + tlsSockReadable <- suspendReadableAndRead( + destroyIfNotEnded = false, + destroyIfCanceled = false + )(upgrade(sock)) + } yield tlsSockReadable + case _ => + for { + duplexOut <- mkDuplex(socket.reads) + (duplex, out) = duplexOut + _ <- out.through(socket.writes).compile.drain.background + tlsSockReadable <- suspendReadableAndRead( + destroyIfNotEnded = false, + destroyIfCanceled = false + )(upgrade(duplex)) + } yield tlsSockReadable + } (tlsSock, readable) = tlsSockReadable readStream <- SuspendedStream(readable) } yield new AsyncTLSSocket( From 40d2113546b0076ae61d977a823278d1c73cf3fb Mon Sep 17 00:00:00 2001 From: Mike Shepherd Date: Mon, 20 Nov 2023 13:05:45 +0000 Subject: [PATCH 2/6] Fix bin compat issue --- .../scala/fs2/io/net/tls/TLSContextPlatform.scala | 8 +++----- .../scala/fs2/io/net/tls/TLSSocketPlatform.scala | 12 ++++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala index 3e52d5ba64..5659fd1018 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala @@ -67,7 +67,7 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => if (clientMode) { Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake => TLSSocket - .forAsync( + .forAsyncClient( socket, sock => { val options = params.toTLSConnectOptions(parDispatcher) @@ -89,8 +89,7 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => ) ) tlsSock - }, - clientMode = clientMode + } ) .evalTap(_ => handshake.get.rethrow) } @@ -129,8 +128,7 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => ) ) tlsSock - }, - clientMode = clientMode + } ) .evalTap(_ => verifyError.get.rethrow) } diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala index b0295c1a8f..f425e8e33a 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala @@ -37,6 +37,18 @@ private[tls] trait TLSSocketPlatform[F[_]] private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => private[tls] def forAsync[F[_]]( + socket: Socket[F], + upgrade: fs2.io.Duplex => facade.tls.TLSSocket + )(implicit F: Async[F]): Resource[F, TLSSocket[F]] = + forAsyncInternal(socket, upgrade, clientMode = false) + + private[tls] def forAsyncClient[F[_]]( + socket: Socket[F], + upgrade: fs2.io.Duplex => facade.tls.TLSSocket + )(implicit F: Async[F]): Resource[F, TLSSocket[F]] = + forAsyncInternal(socket, upgrade, clientMode = true) + + private[this] def forAsyncInternal[F[_]]( socket: Socket[F], upgrade: fs2.io.Duplex => facade.tls.TLSSocket, clientMode: Boolean From 7d7d5433072e9ee04e842fb1bcc9d930e773a2d6 Mon Sep 17 00:00:00 2001 From: Mike Shepherd Date: Tue, 21 Nov 2023 15:37:34 +0000 Subject: [PATCH 3/6] Fix bincompat issues by ignoring --- build.sbt | 1 + .../scala/fs2/io/net/tls/TLSContextPlatform.scala | 4 +++- .../scala/fs2/io/net/tls/TLSSocketPlatform.scala | 14 +------------- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/build.sbt b/build.sbt index b285d4e192..abea213307 100644 --- a/build.sbt +++ b/build.sbt @@ -347,6 +347,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform) .dependsOn(core % "compile->compile;test->test") .jsSettings( mimaBinaryIssueFilters ++= Seq( + ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.tls.TLSSocket.forAsync"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.package.stdinUtf8"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.package.stdoutLines"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.package.stdout"), diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala index 5659fd1018..7c7e35dcb4 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala @@ -67,8 +67,9 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => if (clientMode) { Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake => TLSSocket - .forAsyncClient( + .forAsync( socket, + clientMode, sock => { val options = params.toTLSConnectOptions(parDispatcher) options.secureContext = context @@ -98,6 +99,7 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => TLSSocket .forAsync( socket, + clientMode, sock => { val options = params.toTLSSocketOptions(parDispatcher) options.secureContext = context diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala index f425e8e33a..d54624f3c4 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala @@ -38,20 +38,8 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type => private[tls] def forAsync[F[_]]( socket: Socket[F], + clientMode: Boolean, upgrade: fs2.io.Duplex => facade.tls.TLSSocket - )(implicit F: Async[F]): Resource[F, TLSSocket[F]] = - forAsyncInternal(socket, upgrade, clientMode = false) - - private[tls] def forAsyncClient[F[_]]( - socket: Socket[F], - upgrade: fs2.io.Duplex => facade.tls.TLSSocket - )(implicit F: Async[F]): Resource[F, TLSSocket[F]] = - forAsyncInternal(socket, upgrade, clientMode = true) - - private[this] def forAsyncInternal[F[_]]( - socket: Socket[F], - upgrade: fs2.io.Duplex => facade.tls.TLSSocket, - clientMode: Boolean )(implicit F: Async[F]): Resource[F, TLSSocket[F]] = for { tlsSockReadable <- socket match { From 02e788a06481e2c27e9d5c654ced2ef783b3aec9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 25 Nov 2023 08:00:23 +0000 Subject: [PATCH 4/6] Remove `Dispatcher` from `suspendReadableAndRead` --- .../scala/fs2/io/internal/facade/events.scala | 2 + io/js/src/main/scala/fs2/io/ioplatform.scala | 130 ++++++++++++------ 2 files changed, 88 insertions(+), 44 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/internal/facade/events.scala b/io/js/src/main/scala/fs2/io/internal/facade/events.scala index 8cdb1041db..c05fa8d3f9 100644 --- a/io/js/src/main/scala/fs2/io/internal/facade/events.scala +++ b/io/js/src/main/scala/fs2/io/internal/facade/events.scala @@ -34,6 +34,8 @@ import scala.scalajs.js @nowarn212("cat=unused") private[io] trait EventEmitter extends js.Object { + protected[io] def on(eventName: String, listener: js.Function0[Unit]): this.type = js.native + protected[io] def on[E](eventName: String, listener: js.Function1[E, Unit]): this.type = js.native protected[io] def on[E, F](eventName: String, listener: js.Function2[E, F, Unit]): this.type = diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index 011f770764..4e887baf74 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -31,7 +31,6 @@ import cats.effect.std.Queue import cats.effect.syntax.all._ import cats.syntax.all._ import fs2.concurrent.Channel -import fs2.io.internal.MicrotaskExecutor import fs2.io.internal.facade import java.nio.charset.Charset @@ -58,53 +57,96 @@ private[fs2] trait ioplatform { def suspendReadableAndRead[F[_], R <: Readable]( destroyIfNotEnded: Boolean = true, destroyIfCanceled: Boolean = true - )(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] = - (for { - dispatcher <- Dispatcher.sequential[F] - channel <- Channel.unbounded[F, Unit].toResource - error <- F.deferred[Throwable].toResource - readableResource = for { - readable <- Resource.makeCase(F.delay(thunk)) { - case (readable, Resource.ExitCase.Succeeded) => + )(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] = { + + final class Listener { + private[this] var readableCounter = 0 + private[this] var error: Either[Throwable, Option[Unit]] = null + private[this] var ended = false + private[this] var callback: Either[Throwable, Option[Unit]] => Unit = null + + def handleReadable(): Unit = + if (callback eq null) { + readableCounter += 1 + } else { + callback(Right(Some(()))) + callback = null + } + + def handleEnd(): Unit = { + ended = true + if (readableCounter == 0 && (callback ne null)) { + callback(Right(None)) + } + } + + def handleError(e: js.Error): Unit = { + error = Left(js.JavaScriptException(e)) + if (callback ne null) { + callback(error) + } + } + + private[this] def next: F[Option[Unit]] = F.async { cb => + F.delay { + if (error ne null) { + cb(error) + None + } else if (readableCounter > 0) { + cb(Right(Some(()))) + readableCounter -= 1 + None + } else if (ended) { + cb(Right(None)) + None + } else { + callback = cb + Some(F.delay { callback = null }) + } + } + } + + def readableEvents: Stream[F, Unit] = + Stream.repeatEval(next).unNoneTerminate + } + + Resource + .eval(F.delay(new Listener)) + .flatMap { listener => + Resource + .makeCase { F.delay { - if (!readable.readableEnded & destroyIfNotEnded) - readable.destroy() + val readable = thunk + readable.on("readable", () => listener.handleReadable()) + readable.once("error", listener.handleError(_)) + readable.once("end", () => listener.handleEnd()) + readable } - case (readable, Resource.ExitCase.Errored(_)) => - // tempting, but don't propagate the error! - // that would trigger a unhandled Node.js error that circumvents FS2/CE error channels - F.delay(readable.destroy()) - case (readable, Resource.ExitCase.Canceled) => - if (destroyIfCanceled) + } { + case (readable, Resource.ExitCase.Succeeded) => + F.delay { + if (!readable.readableEnded & destroyIfNotEnded) + readable.destroy() + } + case (readable, Resource.ExitCase.Errored(_)) => + // tempting, but don't propagate the error! + // that would trigger a unhandled Node.js error that circumvents FS2/CE error channels F.delay(readable.destroy()) - else - F.unit - } - _ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void) - _ <- readable.registerListener[F, Any]("end", dispatcher)(_ => channel.close.void) - _ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void) - _ <- readable.registerListener[F, js.Error]("error", dispatcher) { e => - error.complete(js.JavaScriptException(e)).void - } - } yield readable - // Implementation note: why run on the MicrotaskExecutor? - // In many cases creating a `Readable` starts async side-effects (e.g. negotiating TLS handshake or opening a file handle). - // Furthermore, these side-effects will invoke the listeners we register to the `Readable`. - // Therefore, it is critical that the listeners are registered to the `Readable` _before_ these async side-effects occur: - // in other words, before we next yield (cede) to the event loop. Because an arbitrary effect `F` (particularly `IO`) may cede at any time, - // our only recourse is to run the entire creation/listener registration process on the microtask executor. - readable <- readableResource.evalOn(MicrotaskExecutor) - stream = - (channel.stream - .concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >> - Stream - .evalUnChunk( - F.delay( - Option(readable.read()) - .fold(Chunk.empty[Byte])(Chunk.uint8Array) + case (readable, Resource.ExitCase.Canceled) => + if (destroyIfCanceled) + F.delay(readable.destroy()) + else + F.unit + } + .fproduct { readable => + listener.readableEvents.adaptError { case IOException(ex) => ex } >> + Stream.evalUnChunk( + F.delay(Option(readable.read()).fold(Chunk.empty[Byte])(Chunk.uint8Array(_))) ) - )).adaptError { case IOException(ex) => ex } - } yield (readable, stream)).adaptError { case IOException(ex) => ex } + } + } + .adaptError { case IOException(ex) => ex } + } /** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`, * that ends whenever the resulting stream terminates. From 78748430f1e5b5d836ded2617654f7f3002af6aa Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 26 Nov 2023 06:35:37 +0000 Subject: [PATCH 5/6] Don't use `Dispatcher` for TLS init errors --- .../fs2/io/net/tls/TLSContextPlatform.scala | 60 ++++++++++++------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala index a506e02f58..b7fbbd04c7 100644 --- a/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala @@ -62,10 +62,36 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => clientMode: Boolean, params: TLSParameters, logger: TLSLogger[F] - ): Resource[F, TLSSocket[F]] = (Dispatcher.sequential[F], Dispatcher.parallel[F]) - .flatMapN { (seqDispatcher, parDispatcher) => - if (clientMode) { - Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake => + ): Resource[F, TLSSocket[F]] = { + + final class Listener { + private[this] var value: Either[Throwable, Unit] = null + private[this] var callback: Either[Throwable, Unit] => Unit = null + + def complete(value: Either[Throwable, Unit]): Unit = + if (callback ne null) { + callback(value) + callback = null + } else { + this.value = value + } + + def get: F[Unit] = F.async { cb => + F.delay { + if (value ne null) { + cb(value) + None + } else { + callback = cb + Some(F.delay { callback = null }) + } + } + } + } + + (Dispatcher.parallel[F], Resource.eval(F.delay(new Listener))) + .flatMapN { (parDispatcher, listener) => + if (clientMode) { TLSSocket .forAsync( socket, @@ -79,22 +105,17 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => val tlsSock = facade.tls.connect(options) tlsSock.once( "secureConnect", - () => seqDispatcher.unsafeRunAndForget(handshake.complete(Either.unit)) + () => listener.complete(Either.unit) ) tlsSock.once[js.Error]( "error", - e => - seqDispatcher.unsafeRunAndForget( - handshake.complete(Left(new js.JavaScriptException(e))) - ) + e => listener.complete(Left(new js.JavaScriptException(e))) ) tlsSock } ) - .evalTap(_ => handshake.get.rethrow) - } - } else { - Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { verifyError => + .evalTap(_ => listener.get) + } else { TLSSocket .forAsync( socket, @@ -117,24 +138,21 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type => .map(e => new JavaScriptSSLException(js.JavaScriptException(e))) .toLeft(()) else Either.unit - seqDispatcher.unsafeRunAndForget(verifyError.complete(result)) + listener.complete(result) } ) tlsSock.once[js.Error]( "error", - e => - seqDispatcher.unsafeRunAndForget( - verifyError.complete(Left(new js.JavaScriptException(e))) - ) + e => listener.complete(Left(new js.JavaScriptException(e))) ) tlsSock } ) - .evalTap(_ => verifyError.get.rethrow) + .evalTap(_ => listener.get) } } - } - .adaptError { case IOException(ex) => ex } + .adaptError { case IOException(ex) => ex } + } } def fromSecureContext(context: SecureContext): TLSContext[F] = From 297fc7d8867a5246062d1f64a8ef923849ca4747 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 27 Nov 2023 13:26:54 +0000 Subject: [PATCH 6/6] Optimize --- io/js/src/main/scala/fs2/io/ioplatform.scala | 28 +++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index 4e887baf74..3989185608 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -61,22 +61,22 @@ private[fs2] trait ioplatform { final class Listener { private[this] var readableCounter = 0 - private[this] var error: Either[Throwable, Option[Unit]] = null + private[this] var error: Either[Throwable, Boolean] = null private[this] var ended = false - private[this] var callback: Either[Throwable, Option[Unit]] => Unit = null + private[this] var callback: Either[Throwable, Boolean] => Unit = null def handleReadable(): Unit = if (callback eq null) { readableCounter += 1 } else { - callback(Right(Some(()))) + callback(Right(true)) callback = null } def handleEnd(): Unit = { ended = true if (readableCounter == 0 && (callback ne null)) { - callback(Right(None)) + callback(Right(false)) } } @@ -87,17 +87,17 @@ private[fs2] trait ioplatform { } } - private[this] def next: F[Option[Unit]] = F.async { cb => + private[this] def next: F[Boolean] = F.async { cb => F.delay { if (error ne null) { cb(error) None } else if (readableCounter > 0) { - cb(Right(Some(()))) + cb(Right(true)) readableCounter -= 1 None } else if (ended) { - cb(Right(None)) + cb(Right(false)) None } else { callback = cb @@ -106,8 +106,18 @@ private[fs2] trait ioplatform { } } - def readableEvents: Stream[F, Unit] = - Stream.repeatEval(next).unNoneTerminate + def readableEvents: Stream[F, Unit] = { + def go: Pull[F, Unit, Unit] = + Pull.eval(next).flatMap { continue => + if (continue) + Pull.outUnit >> go + else + Pull.done + } + + go.streamNoScope + } + } Resource