From ca4b0cbfddb1eae71e990ff1b6472de744b5bb9c Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Thu, 19 Jan 2023 16:27:50 +0000 Subject: [PATCH 01/10] Benchmarks of reader vs requestBody --- .../http4s/servlet/ServletIoBenchmarks.scala | 168 ++++++++++++++++++ build.sbt | 14 ++ project/plugins.sbt | 1 + 3 files changed, 183 insertions(+) create mode 100644 benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala diff --git a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala new file mode 100644 index 00000000..9a1cf423 --- /dev/null +++ b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala @@ -0,0 +1,168 @@ +package org.http4s.servlet + +import cats.effect.IO +import cats.effect.std.Dispatcher +import cats.effect.unsafe.implicits.global + +import org.openjdk.jmh.annotations._ +import org.http4s.servlet.NonBlockingServletIo + +import java.io.ByteArrayInputStream +import java.util.concurrent.TimeUnit +import javax.servlet.{ServletInputStream, ReadListener} +import javax.servlet.http.HttpServletRequest +import scala.util.Random + +/** To do comparative benchmarks between versions: + * + * benchmarks/run-benchmark AsyncBenchmark + * + * This will generate results in `benchmarks/results`. + * + * Or to run the benchmark from within sbt: + * + * Jmh / run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.AsyncBenchmark + * + * Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that + * benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but + * more is better. + */ +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.Throughput)) +@OutputTimeUnit(TimeUnit.SECONDS) +class ServletIoBenchmarks { + + @Param(Array("100000")) + var size: Int = _ + + @Param(Array("1000")) + var iters: Int = _ + + def servletRequest: HttpServletRequest = new HttpServletRequestStub( + new TestServletInputStream(Random.nextBytes(size)) + ) + + @Benchmark + def reader() = { + val req = servletRequest + val servletIo = NonBlockingServletIo[IO](4096) + + def loop(i: Int): IO[Unit] = + if (i == iters) IO.unit else servletIo.reader(req).compile.drain >> loop(i + 1) + + loop(0).unsafeRunSync() + } + + @Benchmark + def requestBody() = { + val req = servletRequest + val servletIo = NonBlockingServletIo[IO](4096) + + Dispatcher.parallel[IO].use { disp => + def loop(i: Int): IO[Unit] = + if (i == iters) IO.unit else servletIo.requestBody(req, disp).compile.drain >> loop(i + 1) + + loop(0) + }.unsafeRunSync() + } + + class TestServletInputStream(body: Array[Byte]) extends ServletInputStream { + private var readListener: ReadListener = null + private val in = new ByteArrayInputStream(body) + + override def isReady: Boolean = true + + override def isFinished: Boolean = in.available() == 0 + + override def setReadListener(readListener: ReadListener): Unit = { + this.readListener = readListener + readListener.onDataAvailable() + } + + override def read(): Int = { + val result = in.read() + if (in.available() == 0) + readListener.onAllDataRead() + result + } + } + + final case class HttpServletRequestStub( + inputStream: ServletInputStream + ) extends HttpServletRequest { + def getInputStream(): ServletInputStream = inputStream + + def authenticate(x$1: javax.servlet.http.HttpServletResponse): Boolean = ??? + def changeSessionId(): String = ??? + def getAuthType(): String = ??? + def getContextPath(): String = ??? + def getCookies(): Array[javax.servlet.http.Cookie] = ??? + def getDateHeader(x$1: String): Long = ??? + def getHeader(x$1: String): String = ??? + def getHeaderNames(): java.util.Enumeration[String] = ??? + def getHeaders(x$1: String): java.util.Enumeration[String] = ??? + def getIntHeader(x$1: String): Int = ??? + def getMethod(): String = ??? + def getPart(x$1: String): javax.servlet.http.Part = ??? + def getParts(): java.util.Collection[javax.servlet.http.Part] = ??? + def getPathInfo(): String = ??? + def getPathTranslated(): String = ??? + def getQueryString(): String = ??? + def getRemoteUser(): String = ??? + def getRequestURI(): String = ??? + def getRequestURL(): StringBuffer = ??? + def getRequestedSessionId(): String = ??? + def getServletPath(): String = ??? + def getSession(): javax.servlet.http.HttpSession = ??? + def getSession(x$1: Boolean): javax.servlet.http.HttpSession = ??? + def getUserPrincipal(): java.security.Principal = ??? + def isRequestedSessionIdFromCookie(): Boolean = ??? + def isRequestedSessionIdFromURL(): Boolean = ??? + def isRequestedSessionIdFromUrl(): Boolean = ??? + def isRequestedSessionIdValid(): Boolean = ??? + def isUserInRole(x$1: String): Boolean = ??? + def login(x$1: String, x$2: String): Unit = ??? + def logout(): Unit = ??? + def upgrade[T <: javax.servlet.http.HttpUpgradeHandler](x$1: Class[T]): T = ??? + def getAsyncContext(): javax.servlet.AsyncContext = ??? + def getAttribute(x$1: String): Object = ??? + def getAttributeNames(): java.util.Enumeration[String] = ??? + def getCharacterEncoding(): String = ??? + def getContentLength(): Int = ??? + def getContentLengthLong(): Long = ??? + def getContentType(): String = ??? + def getDispatcherType(): javax.servlet.DispatcherType = ??? + def getLocalAddr(): String = ??? + def getLocalName(): String = ??? + def getLocalPort(): Int = ??? + def getLocale(): java.util.Locale = ??? + def getLocales(): java.util.Enumeration[java.util.Locale] = ??? + def getParameter(x$1: String): String = ??? + def getParameterMap(): java.util.Map[String, Array[String]] = ??? + def getParameterNames(): java.util.Enumeration[String] = ??? + def getParameterValues(x$1: String): Array[String] = ??? + def getProtocol(): String = ??? + def getReader(): java.io.BufferedReader = ??? + def getRealPath(x$1: String): String = ??? + def getRemoteAddr(): String = ??? + def getRemoteHost(): String = ??? + def getRemotePort(): Int = ??? + def getRequestDispatcher(x$1: String): javax.servlet.RequestDispatcher = ??? + def getScheme(): String = ??? + def getServerName(): String = ??? + def getServerPort(): Int = ??? + def getServletContext(): javax.servlet.ServletContext = ??? + def isAsyncStarted(): Boolean = ??? + def isAsyncSupported(): Boolean = ??? + def isSecure(): Boolean = ??? + def removeAttribute(x$1: String): Unit = ??? + def setAttribute(x$1: String, x$2: Object): Unit = ??? + def setCharacterEncoding(x$1: String): Unit = ??? + def startAsync( + x$1: javax.servlet.ServletRequest, + x$2: javax.servlet.ServletResponse, + ): javax.servlet.AsyncContext = ??? + def startAsync(): javax.servlet.AsyncContext = ??? + } + +} diff --git a/build.sbt b/build.sbt index 76410da4..80d31df9 100644 --- a/build.sbt +++ b/build.sbt @@ -43,6 +43,7 @@ lazy val servlet = project "org.eclipse.jetty" % "jetty-servlet" % jettyVersion % Test, "org.http4s" %% "http4s-dsl" % http4sVersion % Test, "org.http4s" %% "http4s-server" % http4sVersion, + "org.typelevel" %% "cats-effect" % "3.4.5", "org.typelevel" %% "munit-cats-effect-3" % munitCatsEffectVersion % Test, ), ) @@ -64,3 +65,16 @@ lazy val examples = project .dependsOn(servlet) lazy val docs = project.in(file("site")).enablePlugins(TypelevelSitePlugin) + +lazy val benchmarks = project + .in(file("benchmarks")) + .dependsOn(servlet) + .settings( + name := "servlet-benchmarks", + libraryDependencies ++= Seq( + "javax.servlet" % "javax.servlet-api" % servletApiVersion, + ), + javaOptions ++= Seq( + "-Dcats.effect.tracing.mode=none", + "-Dcats.effect.tracing.exceptions.enhanced=false")) + .enablePlugins(NoPublishPlugin, JmhPlugin) diff --git a/project/plugins.sbt b/project/plugins.sbt index ab1a29f0..d8df1722 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ addSbtPlugin("com.earldouglas" % "xsbt-web-plugin" % "4.2.4") addSbtPlugin("org.http4s" % "sbt-http4s-org" % "0.14.9") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.3") From 2c3970da12b9912c67f1441ebc9b8a33cc08ea81 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Mon, 23 Jan 2023 17:25:50 +0000 Subject: [PATCH 02/10] Don't use concurrently with single-element stream Concurrently introduces an fs2 interruptible scope which means that each pull from the queue forks a fiber. The GC impact of this is significant. --- servlet/src/main/scala/org/http4s/servlet/ServletIo.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index 6d179e35..b6ea4ff8 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -220,7 +220,7 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl Stream.eval(F.delay(servletRequest.getInputStream)).flatMap { in => Stream.eval(Queue.bounded[F, Read](4)).flatMap { q => - val readBody = Stream.exec(F.delay(in.setReadListener(new ReadListener { + val readBody = Stream.eval(F.delay(in.setReadListener(new ReadListener { var buf: Array[Byte] = _ unsafeReplaceBuffer() @@ -268,7 +268,7 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl case Error(t) => Pull.raiseError[F](t) } - pullBody.stream.concurrently(readBody) + readBody.flatMap(_ => pullBody.stream) } } } From 7260d8d16728cdc291e9b96d817a53be2035957e Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Mon, 23 Jan 2023 17:26:48 +0000 Subject: [PATCH 03/10] Optimization: don't emit empty chunk --- servlet/src/main/scala/org/http4s/servlet/ServletIo.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index b6ea4ff8..d6503b35 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -239,9 +239,10 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl case len if len == chunkSize => // We used the whole buffer. Replace it new before next read. q.offer(Bytes(Chunk.array(buf))) >> F.delay(unsafeReplaceBuffer()) >> loopIfReady - case len if len >= 0 => + case len if len > 0 => // Got a partial chunk. Copy it, and reuse the current buffer. q.offer(Bytes(Chunk.array(Arrays.copyOf(buf, len)))) >> loopIfReady + case len if len == 0 => loopIfReady case _ => F.unit } From 831675b6b6c1a09700d90b621d0e7c4672d56e41 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Tue, 24 Jan 2023 14:47:37 +0000 Subject: [PATCH 04/10] Minimize allocations --- .../http4s/servlet/ServletIoBenchmarks.scala | 2 +- .../scala/org/http4s/servlet/ServletIo.scala | 17 +++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala index 9a1cf423..1f19642f 100644 --- a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala +++ b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala @@ -58,7 +58,7 @@ class ServletIoBenchmarks { val req = servletRequest val servletIo = NonBlockingServletIo[IO](4096) - Dispatcher.parallel[IO].use { disp => + Dispatcher.sequential[IO].use { disp => def loop(i: Int): IO[Unit] = if (i == iters) IO.unit else servletIo.requestBody(req, disp).compile.drain >> loop(i + 1) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index d6503b35..b88ed58b 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -213,13 +213,10 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl servletRequest: HttpServletRequest, dispatcher: Dispatcher[F], ): Stream[F, Byte] = { - sealed trait Read - final case class Bytes(chunk: Chunk[Byte]) extends Read - case object End extends Read - final case class Error(t: Throwable) extends Read + case object End Stream.eval(F.delay(servletRequest.getInputStream)).flatMap { in => - Stream.eval(Queue.bounded[F, Read](4)).flatMap { q => + Stream.eval(Queue.bounded[F, Any](4)).flatMap { q => val readBody = Stream.eval(F.delay(in.setReadListener(new ReadListener { var buf: Array[Byte] = _ unsafeReplaceBuffer() @@ -238,10 +235,10 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl F.delay(in.read(buf)).flatMap { case len if len == chunkSize => // We used the whole buffer. Replace it new before next read. - q.offer(Bytes(Chunk.array(buf))) >> F.delay(unsafeReplaceBuffer()) >> loopIfReady + q.offer(Chunk.array(buf)) >> F.delay(unsafeReplaceBuffer()) >> loopIfReady case len if len > 0 => // Got a partial chunk. Copy it, and reuse the current buffer. - q.offer(Bytes(Chunk.array(Arrays.copyOf(buf, len)))) >> loopIfReady + q.offer(Chunk.array(Arrays.copyOf(buf, len))) >> loopIfReady case len if len == 0 => loopIfReady case _ => F.unit @@ -254,7 +251,7 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl unsafeRunAndForget(q.offer(End)) def onError(t: Throwable): Unit = - unsafeRunAndForget(q.offer(Error(t))) + unsafeRunAndForget(q.offer(t)) def unsafeRunAndForget[A](fa: F[A]): Unit = dispatcher.unsafeRunAndForget( @@ -264,9 +261,9 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl def pullBody: Pull[F, Byte, Unit] = Pull.eval(q.take).flatMap { - case Bytes(chunk) => Pull.output(chunk) >> pullBody + case chunk: Chunk[Byte] => Pull.output(chunk) >> pullBody case End => Pull.done - case Error(t) => Pull.raiseError[F](t) + case t: Throwable => Pull.raiseError[F](t) } readBody.flatMap(_ => pullBody.stream) From 11797288ad4a3a60eb405075a1e0393dc8edaaf4 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Tue, 24 Jan 2023 15:58:08 +0000 Subject: [PATCH 05/10] Suppress compiler warning --- servlet/src/main/scala/org/http4s/servlet/ServletIo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index b88ed58b..5f6eca40 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -261,7 +261,7 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl def pullBody: Pull[F, Byte, Unit] = Pull.eval(q.take).flatMap { - case chunk: Chunk[Byte] => Pull.output(chunk) >> pullBody + case chunk: Chunk[Byte] @ unchecked => Pull.output(chunk) >> pullBody case End => Pull.done case t: Throwable => Pull.raiseError[F](t) } From 290eb5ba73688ae3b624287e35bf857717aaa1f9 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Tue, 24 Jan 2023 16:02:30 +0000 Subject: [PATCH 06/10] Fix compiler warning --- .../http4s/servlet/ServletIoBenchmarks.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala index 1f19642f..e0601375 100644 --- a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala +++ b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala @@ -58,12 +58,15 @@ class ServletIoBenchmarks { val req = servletRequest val servletIo = NonBlockingServletIo[IO](4096) - Dispatcher.sequential[IO].use { disp => - def loop(i: Int): IO[Unit] = - if (i == iters) IO.unit else servletIo.requestBody(req, disp).compile.drain >> loop(i + 1) - - loop(0) - }.unsafeRunSync() + Dispatcher + .sequential[IO] + .use { disp => + def loop(i: Int): IO[Unit] = + if (i == iters) IO.unit else servletIo.requestBody(req, disp).compile.drain >> loop(i + 1) + + loop(0) + } + .unsafeRunSync() } class TestServletInputStream(body: Array[Byte]) extends ServletInputStream { @@ -87,7 +90,7 @@ class ServletIoBenchmarks { } } - final case class HttpServletRequestStub( + case class HttpServletRequestStub( inputStream: ServletInputStream ) extends HttpServletRequest { def getInputStream(): ServletInputStream = inputStream From 9210893109d3c3903eaa20c92a5ad5ee553cac5f Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Wed, 25 Jan 2023 08:51:52 -0500 Subject: [PATCH 07/10] Dispatcher per request in benchmark --- .../org/http4s/servlet/ServletIoBenchmarks.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala index e0601375..7b165b21 100644 --- a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala +++ b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala @@ -58,15 +58,13 @@ class ServletIoBenchmarks { val req = servletRequest val servletIo = NonBlockingServletIo[IO](4096) - Dispatcher - .sequential[IO] - .use { disp => - def loop(i: Int): IO[Unit] = - if (i == iters) IO.unit else servletIo.requestBody(req, disp).compile.drain >> loop(i + 1) - - loop(0) - } - .unsafeRunSync() + def loop(i: Int): IO[Unit] = + if (i == iters) IO.unit + else Dispatcher.sequential[IO].use { dispatcher => + servletIo.requestBody(req, dispatcher).compile.drain + } >> loop(i + 1) + + loop(0).unsafeRunSync() } class TestServletInputStream(body: Array[Byte]) extends ServletInputStream { From ffa5f3c7831e649caae7c6be8db3ca63385347db Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Wed, 25 Jan 2023 09:10:45 -0500 Subject: [PATCH 08/10] Bulk reads are more realistic --- .../org/http4s/servlet/ServletIoBenchmarks.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala index 7b165b21..bdb9152d 100644 --- a/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala +++ b/benchmarks/src/main/scala/org/http4s/servlet/ServletIoBenchmarks.scala @@ -86,6 +86,20 @@ class ServletIoBenchmarks { readListener.onAllDataRead() result } + + override def read(buf: Array[Byte]) = { + val result = in.read(buf) + if (in.available() == 0) + readListener.onAllDataRead() + result + } + + override def read(buf: Array[Byte], off: Int, len: Int) = { + val result = in.read(buf, off, len) + if (in.available() == 0) + readListener.onAllDataRead() + result + } } case class HttpServletRequestStub( From 20467d6c45688855e8fd008a83b0304fa369d8ca Mon Sep 17 00:00:00 2001 From: "Ross A. Baker" Date: Wed, 25 Jan 2023 10:37:49 -0500 Subject: [PATCH 09/10] Not sure how I broke the workflow, but alright --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 13efaaac..244fe175 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -136,11 +136,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p examples/target target .js/target site/target servlet/target .jvm/target .native/target project/target + run: mkdir -p benchmarks/target examples/target target .js/target site/target servlet/target .jvm/target .native/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar examples/target target .js/target site/target servlet/target .jvm/target .native/target project/target + run: tar cf targets.tar benchmarks/target examples/target target .js/target site/target servlet/target .jvm/target .native/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') From 806fac74ab850e9bbf87fadae138dfc013150ed2 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Thu, 26 Jan 2023 11:31:01 +0000 Subject: [PATCH 10/10] Scalafmt --- servlet/src/main/scala/org/http4s/servlet/ServletIo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala index 5f6eca40..19375c5d 100644 --- a/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala +++ b/servlet/src/main/scala/org/http4s/servlet/ServletIo.scala @@ -261,7 +261,7 @@ final case class NonBlockingServletIo[F[_]: Async](chunkSize: Int) extends Servl def pullBody: Pull[F, Byte, Unit] = Pull.eval(q.take).flatMap { - case chunk: Chunk[Byte] @ unchecked => Pull.output(chunk) >> pullBody + case chunk: Chunk[Byte] @unchecked => Pull.output(chunk) >> pullBody case End => Pull.done case t: Throwable => Pull.raiseError[F](t) }