Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade a JS socket to TLS directly if possible #3341

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
.dependsOn(core % "compile->compile;test->test")
.jsSettings(
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.tls.TLSSocket.forAsync"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.package.stdinUtf8"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.package.stdoutLines"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.package.stdout"),
Expand Down
2 changes: 2 additions & 0 deletions io/js/src/main/scala/fs2/io/internal/facade/events.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import scala.scalajs.js
@nowarn212("cat=unused")
private[io] trait EventEmitter extends js.Object {

protected[io] def on(eventName: String, listener: js.Function0[Unit]): this.type = js.native

protected[io] def on[E](eventName: String, listener: js.Function1[E, Unit]): this.type = js.native

protected[io] def on[E, F](eventName: String, listener: js.Function2[E, F, Unit]): this.type =
Expand Down
140 changes: 96 additions & 44 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.internal.MicrotaskExecutor
import fs2.io.internal.facade

import java.nio.charset.Charset
Expand All @@ -58,53 +57,106 @@ private[fs2] trait ioplatform {
def suspendReadableAndRead[F[_], R <: Readable](
destroyIfNotEnded: Boolean = true,
destroyIfCanceled: Boolean = true
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] =
(for {
dispatcher <- Dispatcher.sequential[F]
channel <- Channel.unbounded[F, Unit].toResource
error <- F.deferred[Throwable].toResource
readableResource = for {
readable <- Resource.makeCase(F.delay(thunk)) {
case (readable, Resource.ExitCase.Succeeded) =>
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] = {

final class Listener {
private[this] var readableCounter = 0
private[this] var error: Either[Throwable, Boolean] = null
private[this] var ended = false
private[this] var callback: Either[Throwable, Boolean] => Unit = null

def handleReadable(): Unit =
if (callback eq null) {
readableCounter += 1
} else {
callback(Right(true))
callback = null
}

def handleEnd(): Unit = {
ended = true
if (readableCounter == 0 && (callback ne null)) {
callback(Right(false))
}
}

def handleError(e: js.Error): Unit = {
error = Left(js.JavaScriptException(e))
if (callback ne null) {
callback(error)
}
}

private[this] def next: F[Boolean] = F.async { cb =>
F.delay {
if (error ne null) {
cb(error)
None
} else if (readableCounter > 0) {
cb(Right(true))
readableCounter -= 1
None
} else if (ended) {
cb(Right(false))
None
} else {
callback = cb
Some(F.delay { callback = null })
}
}
}

def readableEvents: Stream[F, Unit] = {
def go: Pull[F, Unit, Unit] =
Pull.eval(next).flatMap { continue =>
if (continue)
Pull.outUnit >> go
else
Pull.done
}

go.streamNoScope
}

}

Resource
.eval(F.delay(new Listener))
.flatMap { listener =>
Resource
.makeCase {
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
val readable = thunk
readable.on("readable", () => listener.handleReadable())
readable.once("error", listener.handleError(_))
readable.once("end", () => listener.handleEnd())
readable
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
} {
case (readable, Resource.ExitCase.Succeeded) =>
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
else
F.unit
}
_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void)
_ <- readable.registerListener[F, Any]("end", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, Any]("close", dispatcher)(_ => channel.close.void)
_ <- readable.registerListener[F, js.Error]("error", dispatcher) { e =>
error.complete(js.JavaScriptException(e)).void
}
} yield readable
// Implementation note: why run on the MicrotaskExecutor?
// In many cases creating a `Readable` starts async side-effects (e.g. negotiating TLS handshake or opening a file handle).
// Furthermore, these side-effects will invoke the listeners we register to the `Readable`.
// Therefore, it is critical that the listeners are registered to the `Readable` _before_ these async side-effects occur:
// in other words, before we next yield (cede) to the event loop. Because an arbitrary effect `F` (particularly `IO`) may cede at any time,
// our only recourse is to run the entire creation/listener registration process on the microtask executor.
readable <- readableResource.evalOn(MicrotaskExecutor)
stream =
(channel.stream
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
Stream
.evalUnChunk(
F.delay(
Option(readable.read())
.fold(Chunk.empty[Byte])(Chunk.uint8Array)
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
else
F.unit
}
.fproduct { readable =>
listener.readableEvents.adaptError { case IOException(ex) => ex } >>
Stream.evalUnChunk(
F.delay(Option(readable.read()).fold(Chunk.empty[Byte])(Chunk.uint8Array(_)))
)
)).adaptError { case IOException(ex) => ex }
} yield (readable, stream)).adaptError { case IOException(ex) => ex }
}
}
.adaptError { case IOException(ex) => ex }
}

/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
* that ends whenever the resulting stream terminates.
Expand Down
2 changes: 1 addition & 1 deletion io/js/src/main/scala/fs2/io/net/SocketPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[net] trait SocketCompanionPlatform {
}
}

private[net] class AsyncSocket[F[_]](
private[net] case class AsyncSocket[F[_]](
sock: facade.net.Socket,
readStream: SuspendedStream[F, Byte]
Comment on lines +56 to 58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this. Do we know what happens to this SuspendedStream after the sock is directly upgraded?

What I mean is that this readStream has already established listeners on the sock. So if events are firing on those listeners as sock is being used, but nobody is consuming from the SuspendedStream, then I am concerned that this is actually a memory leak.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question.
As far as I can tell the listeners are all on control events, rather than data, i.e. they trigger changes of behaviour rather than push any data anywhere.

The read loop boils down to stream pull -> readable.read(), so if the stream isn't being consumed then read() is never being called and there is no data to leak.

This also means that we're relying on the Javascript runtime to propagate any error or close events, which I think is a reasonable assumption, but we could add additional listeners for them if we feel that it's needed

Copy link
Member

@armanbilge armanbilge Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell the listeners are all on control events, rather than data, i.e. they trigger changes of behaviour rather than push any data anywhere.

Right, sure. But those triggers still invoke callbacks, that typically put things in Queues. If they are not consumed, they accumulate and leak.

_ <- readable.registerListener[F, Any]("readable", dispatcher)(_ => channel.send(()).void)

Copy link
Author

@mikeshepherd mikeshepherd Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we're going to leak any data anywhere, but I guess we will end up with a channel full of ().

The only Queue that I can see in the process is synchronous so wont accumulate anything.

It seems like it ought to be possible for that channel not to be a channel since it's only acting as an indicator that something can be read or not (that is we need to communicate that something can be read, but duplicate events prior to a read are meaningless)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I guess we will end up with a channel full of ().

Yes, sorry, this is what I meant by "memory leak". Not specifically a data leak, as in bytes of data.

It seems like it ought to be possible for that channel not to be a channel since it's only acting as an indicator that something can be read or not (that is we need to communicate that something can be read, but duplicate events prior to a read are meaningless)

This is a great point. I wonder if we can replace it with a Signal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only Queue that I can see in the process is synchronous so wont accumulate anything.

Ah, this is a misconception. Unless whoever is pushing to the synchronous queue respect backpressure and stops trying to push, the attempts to push will queue up unboundedly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sorry, I actually thought that we wouldn't leak anything (including ()). Had to reread the Node docs to correct myself about how the readable event actually works (I thought it was only fired once, rather than repeatedly).

I'm happy to switch over to a Signal here but I wonder if it's worth it with #3348 now which should solve the issue as well. Don't know if you've got any preference for handling related PRs like these?

Unless whoever is pushing to the synchronous queue respect backpressure

I admit I'd assumed that the internals of fs2 would get that right 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to reread the Node docs to correct myself about how the readable event actually works (I thought it was only fired once, rather than repeatedly).

It is very confusing. While working on #3348 I discovered that the readable event may be fired multiple times in a row (honestly it seemed a bit buggy) without read()s in between, which is why we needed a counter for readable events and not simply a boolean toggle ...

I think if you merge #3348 into your PR, then my concerns about memory leaks would be addressed. But I need to think about this more. The "dangling" listeners are difficult to reason about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readable event may be fired multiple times in a row

I would agree that sounds like a bug. I thought that there wouldn't be a new readable event until you've done a read(). I guess maybe it can half fill the internal buffer emit readable, then fill it and emit another readable? 🤷
At the worst the second read() will just always return null and then be flattened away immediately.

Will do that merge 👍 We're happy running off snapshot builds for a while so we've got time to get it right

)(implicit F: Async[F])
Expand Down
62 changes: 41 additions & 21 deletions io/js/src/main/scala/fs2/io/net/tls/TLSContextPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,40 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
clientMode: Boolean,
params: TLSParameters,
logger: TLSLogger[F]
): Resource[F, TLSSocket[F]] = (Dispatcher.sequential[F], Dispatcher.parallel[F])
.flatMapN { (seqDispatcher, parDispatcher) =>
if (clientMode) {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { handshake =>
): Resource[F, TLSSocket[F]] = {

final class Listener {
private[this] var value: Either[Throwable, Unit] = null
private[this] var callback: Either[Throwable, Unit] => Unit = null

def complete(value: Either[Throwable, Unit]): Unit =
if (callback ne null) {
callback(value)
callback = null
} else {
this.value = value
}

def get: F[Unit] = F.async { cb =>
F.delay {
if (value ne null) {
cb(value)
None
} else {
callback = cb
Some(F.delay { callback = null })
}
}
}
}

(Dispatcher.parallel[F], Resource.eval(F.delay(new Listener)))
.flatMapN { (parDispatcher, listener) =>
if (clientMode) {
TLSSocket
.forAsync(
socket,
clientMode,
sock => {
val options = params.toTLSConnectOptions(parDispatcher)
options.secureContext = context
Expand All @@ -79,25 +106,21 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
val tlsSock = facade.tls.connect(options)
tlsSock.once(
"secureConnect",
() => seqDispatcher.unsafeRunAndForget(handshake.complete(Either.unit))
() => listener.complete(Either.unit)
)
tlsSock.once[js.Error](
"error",
e =>
seqDispatcher.unsafeRunAndForget(
handshake.complete(Left(new js.JavaScriptException(e)))
)
e => listener.complete(Left(new js.JavaScriptException(e)))
)
tlsSock
}
)
.evalTap(_ => handshake.get.rethrow)
}
} else {
Resource.eval(F.deferred[Either[Throwable, Unit]]).flatMap { verifyError =>
.evalTap(_ => listener.get)
} else {
TLSSocket
.forAsync(
socket,
clientMode,
sock => {
val options = params.toTLSSocketOptions(parDispatcher)
options.secureContext = context
Expand All @@ -117,24 +140,21 @@ private[tls] trait TLSContextCompanionPlatform { self: TLSContext.type =>
.map(e => new JavaScriptSSLException(js.JavaScriptException(e)))
.toLeft(())
else Either.unit
seqDispatcher.unsafeRunAndForget(verifyError.complete(result))
listener.complete(result)
}
)
tlsSock.once[js.Error](
"error",
e =>
seqDispatcher.unsafeRunAndForget(
verifyError.complete(Left(new js.JavaScriptException(e)))
)
e => listener.complete(Left(new js.JavaScriptException(e)))
)
tlsSock
}
)
.evalTap(_ => verifyError.get.rethrow)
.evalTap(_ => listener.get)
}
}
}
.adaptError { case IOException(ex) => ex }
.adaptError { case IOException(ex) => ex }
}
}

def fromSecureContext(context: SecureContext): TLSContext[F] =
Expand Down
27 changes: 20 additions & 7 deletions io/js/src/main/scala/fs2/io/net/tls/TLSSocketPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,29 @@ private[tls] trait TLSSocketCompanionPlatform { self: TLSSocket.type =>

private[tls] def forAsync[F[_]](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this forAsyncServer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or couldn't we just pass clientMode in forAsync so we don't need separate methods? I don't really care, these are just nitpicks :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, it was bincompat. But this is all private anyway. We can just add the exclusion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't want to just add the exclusion unilaterally. Since it was fairly easy to avoid I did, but I've gone back to what it was before ignored the error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it was fairly easy to avoid I did

Yeah, this is good practice so thanks for doing that. But in this case I think its cleaner like this.

socket: Socket[F],
clientMode: Boolean,
upgrade: fs2.io.Duplex => facade.tls.TLSSocket
)(implicit F: Async[F]): Resource[F, TLSSocket[F]] =
for {
duplexOut <- mkDuplex(socket.reads)
(duplex, out) = duplexOut
_ <- out.through(socket.writes).compile.drain.background
tlsSockReadable <- suspendReadableAndRead(
destroyIfNotEnded = false,
destroyIfCanceled = false
)(upgrade(duplex))
tlsSockReadable <- socket match {
case Socket.AsyncSocket(sock, _) if clientMode =>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory the same logic should work for server sockets as well, but it results in a hang. I think this is connected to the pause or readable state of the socket vs duplex but I can't figure out how to unpause the socket at the right moment to allow things to flow.

for {
tlsSockReadable <- suspendReadableAndRead(
destroyIfNotEnded = false,
destroyIfCanceled = false
)(upgrade(sock))
} yield tlsSockReadable
case _ =>
for {
duplexOut <- mkDuplex(socket.reads)
(duplex, out) = duplexOut
_ <- out.through(socket.writes).compile.drain.background
tlsSockReadable <- suspendReadableAndRead(
destroyIfNotEnded = false,
destroyIfCanceled = false
)(upgrade(duplex))
} yield tlsSockReadable
}
(tlsSock, readable) = tlsSockReadable
readStream <- SuspendedStream(readable)
} yield new AsyncTLSSocket(
Expand Down