diff --git a/build.sbt b/build.sbt index 79dd7b613..696811f9c 100644 --- a/build.sbt +++ b/build.sbt @@ -18,6 +18,7 @@ lazy val iterateeVersion = "0.18.0" lazy val refinedVersion = "0.9.4" lazy val catsEffectVersion = "1.2.0" lazy val fs2Version = "1.0.4" +lazy val monixVersion = "3.0.0-RC2" lazy val compilerOptions = Seq( "-deprecation", @@ -236,7 +237,7 @@ lazy val finch = project.in(file(".")) "io.circe" %% "circe-generic" % circeVersion )) .aggregate( - core, fs2, iteratee, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined + core, fs2, iteratee, monix, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined ) .dependsOn(core, iteratee, generic, circe) @@ -264,6 +265,16 @@ lazy val fs2 = project ) .dependsOn(core % "compile->compile;test->test") +lazy val monix = project + .settings(moduleName := "finchx-monix") + .settings(allSettings) + .settings( + libraryDependencies ++= Seq( + "io.monix" %% "monix" % monixVersion + ) + ) + .dependsOn(core % "compile->compile;test->test") + lazy val generic = project .settings(moduleName := "finchx-generic") .settings(allSettings) diff --git a/monix/src/main/scala/io/finch/monix/package.scala b/monix/src/main/scala/io/finch/monix/package.scala new file mode 100644 index 000000000..2615ddccb --- /dev/null +++ b/monix/src/main/scala/io/finch/monix/package.scala @@ -0,0 +1,85 @@ +package io.finch + +import _root_.monix.tail.Iterant +import cats.effect.{Effect, IO} +import com.twitter.io.{Buf, Pipe, Reader} +import com.twitter.util.Future +import io.finch.internal._ +import java.nio.charset.Charset + +package object monix extends IterantInstances { + + implicit def iterantLiftReader[F[_]](implicit + F: Effect[F], + TE: ToAsync[Future, F] + ): LiftReader[Iterant, F] = + new LiftReader[Iterant, F] { + final def apply[A](reader: Reader[Buf], process: Buf => A): Iterant[F, A] = { + def loop(): Iterant[F, A] = { + Iterant + .liftF(F.suspend(TE(reader.read()))) + .flatMap { + case None => Iterant.empty + case Some(buf) => Iterant.eval(process(buf)) ++ loop() + } + } + + loop().guarantee(F.delay(reader.discard())) + } + } + + implicit def encodeJsonIterant[F[_]: Effect, A](implicit + A: Encode.Json[A] + ): EncodeStream.Json[F, Iterant, A] = + new EncodeNewLineDelimitedIterant[F, A, Application.Json] + + implicit def encodeSseIterant[F[_]: Effect, A](implicit + A: Encode.Aux[A, Text.EventStream] + ): EncodeStream.Aux[F, Iterant, A, Text.EventStream] = + new EncodeNewLineDelimitedIterant[F, A, Text.EventStream] + + implicit def encodeTextIterant[F[_]: Effect, A](implicit + A: Encode.Text[A] + ): EncodeStream.Text[F, Iterant, A] = + new EncodeIterant[F, A, Text.Plain] { + override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + } +} + +trait IterantInstances { + + protected final class EncodeNewLineDelimitedIterant[F[_]: Effect, A, CT <: String](implicit + A: Encode.Aux[A, CT] + ) extends EncodeIterant[F, A, CT] { + protected def encodeChunk(chunk: A, cs: Charset): Buf = + A(chunk, cs).concat(newLine(cs)) + } + + protected abstract class EncodeIterant[F[_], A, CT <: String](implicit + F: Effect[F], + TE: ToAsync[Future, F] + ) extends EncodeStream[F, Iterant, A] with (Either[Throwable, Unit] => IO[Unit]) { + + type ContentType = CT + + protected def encodeChunk(chunk: A, cs: Charset): Buf + + def apply(cb: Either[Throwable, Unit]): IO[Unit] = IO.unit + + def apply(s: Iterant[F, A], cs: Charset): F[Reader[Buf]] = { + val p = new Pipe[Buf] + val run = s + .map(chunk => encodeChunk(chunk, cs)) + .mapEval(chunk => TE(p.write(chunk))) + .guarantee(F.suspend(TE(p.close()))) + .completedL + + F.productR(F.runAsync(run)(this).to[F])(F.pure(p)) + } + } + + implicit def encodeBufIterant[F[_]: Effect, CT <: String]: EncodeStream.Aux[F, Iterant, Buf, CT] = + new EncodeIterant[F, Buf, CT] { + protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk + } +} diff --git a/monix/src/test/scala/io/finch/monix/MonixStreamingSpec.scala b/monix/src/test/scala/io/finch/monix/MonixStreamingSpec.scala new file mode 100644 index 000000000..a8ebc3772 --- /dev/null +++ b/monix/src/test/scala/io/finch/monix/MonixStreamingSpec.scala @@ -0,0 +1,16 @@ +package io.finch.monix + +import _root_.monix.tail.Iterant +import cats.effect.IO +import com.twitter.io.Buf +import io.finch._ +import org.scalatest.prop.GeneratorDrivenPropertyChecks + +class MonixStreamingSpec extends FinchSpec with GeneratorDrivenPropertyChecks { + + checkAll("Iterant.streamBody", StreamingLaws[Iterant, IO]( + list => Iterant.fromList(list), + _.map(array => Buf.ByteArray.Owned(array)).toListL.unsafeRunSync() + ).all) + +}