diff --git a/core/shared/src/main/scala/fs2/text.scala b/core/shared/src/main/scala/fs2/text.scala index 7ad15714fd..430c713857 100644 --- a/core/shared/src/main/scala/fs2/text.scala +++ b/core/shared/src/main/scala/fs2/text.scala @@ -184,6 +184,65 @@ object text { def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] = encodeC(utf8Charset) + sealed abstract class LineEnding(val value: String) + object LineEnding { +// case object CR extends LineEnding("\r") looking at the existing code this won't be produced so leaving it out for now + case object LF extends LineEnding("\n") + case object CRLF extends LineEnding("\r\n") + case object EOF extends LineEnding("") + } + + /** Transforms a stream of `String` such that each emitted `String` is a line from the input. */ + def linesWithEndings[F[_]]: Pipe[F, String, (String,LineEnding)] = { + import LineEnding._ + def fillBuffers( + stringBuilder: StringBuilder, + linesBuffer: ArrayBuffer[(String,LineEnding)], + string: String + ): Unit = { + val l = stringBuilder.length + var i = + if (l > 0 && stringBuilder(l - 1) == '\r' && string.nonEmpty && string(0) == '\n') { + stringBuilder.deleteCharAt(l - 1) + linesBuffer += (stringBuilder.result() -> CRLF) + stringBuilder.clear() + 1 + } else 0 + + while (i < string.size) { + string(i) match { + case '\n' => + linesBuffer += (stringBuilder.result() -> LF) + stringBuilder.clear() + case '\r' if i + 1 < string.size && string(i + 1) == '\n' => + linesBuffer += (stringBuilder.result() -> CRLF) + stringBuilder.clear() + i += 1 + case other => + stringBuilder.append(other) + } + i += 1 + } + } + + def go( + stream: Stream[F, String], + stringBuilder: StringBuilder, + first: Boolean + ): Pull[F, (String, LineEnding), Unit] = + stream.pull.uncons.flatMap { + case None => if (first) Pull.done else Pull.output1(stringBuilder.result() -> LineEnding.EOF) + case Some((chunk, stream)) => + val linesBuffer = ArrayBuffer.empty[(String,LineEnding)] + chunk.foreach { string => + fillBuffers(stringBuilder, linesBuffer, string) + } + Pull.output(Chunk.buffer(linesBuffer)) >> go(stream, stringBuilder, first = false) + } + + s => Stream.suspend(go(s, new StringBuilder(), first = true).stream) + } + /** Transforms a stream of `String` such that each emitted `String` is a line from the input. */ def lines[F[_]]: Pipe[F, String, String] = { def fillBuffers( diff --git a/core/shared/src/test/scala/fs2/TextSuite.scala b/core/shared/src/test/scala/fs2/TextSuite.scala index 063fc8718c..f72d82aebf 100644 --- a/core/shared/src/test/scala/fs2/TextSuite.scala +++ b/core/shared/src/test/scala/fs2/TextSuite.scala @@ -266,7 +266,7 @@ class TextSuite extends Fs2Suite { } } - property("grouped in 3 characater chunks") { + property("grouped in 3 character chunks") { forAll { (lines0: Stream[Pure, String]) => val lines = lines0.map(escapeCrLf) val s = lines.intersperse("\r\n").toList.mkString.grouped(3).toList @@ -280,6 +280,51 @@ class TextSuite extends Fs2Suite { } } + group("linesWithEndings") { + def escapeCrLf(s: String): String = + s.replaceAll("\r\n", "").replaceAll("\n", "").replaceAll("\r", "") + + def expected(ending: LineEnding, originalLines: Stream[Pure, String]): List[(String,LineEnding)] = { + val v = originalLines.toVector + (v.dropRight(1).map(_ -> ending) ++ v.takeRight(1).map(_ -> LineEnding.EOF)).toList + } + + property("newlines appear in between chunks") { + forAll { (lines0: Stream[Pure, String]) => + val lines = lines0.map(escapeCrLf) + def assertEnding(ending: LineEnding): Unit = { + assertEquals(lines.intersperse(ending.value).through(text.linesWithEndings).toList, expected(ending, lines)) + } + assertEnding(LineEnding.CRLF) + assertEnding(LineEnding.LF) + } + } + + property("single string") { + forAll { (lines0: Stream[Pure, String]) => + val lines = lines0.map(escapeCrLf) + if (lines.toList.nonEmpty) { + val s = lines.intersperse("\r\n").toList.mkString + assertEquals(Stream.emit(s).through(text.linesWithEndings).toList, expected(LineEnding.CRLF, lines)) + } + } + } + + property("grouped in 3 character chunks") { + forAll { (lines0: Stream[Pure, String]) => + val lines = lines0.map(escapeCrLf) + val s = lines.intersperse("\r\n").toList.mkString.grouped(3).toList + if (s.isEmpty) + assertEquals(Stream.emits(s).through(text.linesWithEndings).toList, Nil) + else { + val expectedLines = expected(LineEnding.CRLF, lines) + assertEquals(Stream.emits(s).through(text.linesWithEndings).toList, expectedLines) + assertEquals(Stream.emits(s).unchunk.through(text.linesWithEndings).toList, expectedLines) + } + } + } + } + property("base64Encode") { forAll { (bs: List[Array[Byte]]) => assertEquals(