Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added fs2.text.linesWithEndings #2418

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions core/shared/src/main/scala/fs2/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,65 @@ object text {
def utf8EncodeC[F[_]]: Pipe[F, String, Chunk[Byte]] =
encodeC(utf8Charset)

sealed abstract class LineEnding(val value: String)
object LineEnding {
// case object CR extends LineEnding("\r") looking at the existing code this won't be produced so leaving it out for now
case object LF extends LineEnding("\n")
case object CRLF extends LineEnding("\r\n")
case object EOF extends LineEnding("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think that EOF is technically not a LineEnding, but I can't come up with the appropriate name for the group(LF, CRLF, EOF).
  2. lines is not only about files, but about Stream[F, String]. Thus socket, pure streams are also consumable by .lines operator.

}

/** Transforms a stream of `String` such that each emitted `String` is a line from the input. */
def linesWithEndings[F[_]]: Pipe[F, String, (String,LineEnding)] = {
import LineEnding._
def fillBuffers(
stringBuilder: StringBuilder,
linesBuffer: ArrayBuffer[(String,LineEnding)],
string: String
): Unit = {
val l = stringBuilder.length
var i =
if (l > 0 && stringBuilder(l - 1) == '\r' && string.nonEmpty && string(0) == '\n') {
stringBuilder.deleteCharAt(l - 1)
linesBuffer += (stringBuilder.result() -> CRLF)
stringBuilder.clear()
1
} else 0

while (i < string.size) {
string(i) match {
case '\n' =>
linesBuffer += (stringBuilder.result() -> LF)
stringBuilder.clear()
case '\r' if i + 1 < string.size && string(i + 1) == '\n' =>
linesBuffer += (stringBuilder.result() -> CRLF)
stringBuilder.clear()
i += 1
case other =>
stringBuilder.append(other)
}
i += 1
}
}

def go(
stream: Stream[F, String],
stringBuilder: StringBuilder,
first: Boolean
): Pull[F, (String, LineEnding), Unit] =
stream.pull.uncons.flatMap {
case None => if (first) Pull.done else Pull.output1(stringBuilder.result() -> LineEnding.EOF)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the EOF remains, I think we should replace Pull.done with Pull.output1("", LineEnding.EOF). Otherwise there would be no EOF at the end of the stream, which breaks "EOF at the end of the stream" consistency.

case Some((chunk, stream)) =>
val linesBuffer = ArrayBuffer.empty[(String,LineEnding)]
chunk.foreach { string =>
fillBuffers(stringBuilder, linesBuffer, string)
}
Pull.output(Chunk.buffer(linesBuffer)) >> go(stream, stringBuilder, first = false)
}

s => Stream.suspend(go(s, new StringBuilder(), first = true).stream)
}

/** Transforms a stream of `String` such that each emitted `String` is a line from the input. */
def lines[F[_]]: Pipe[F, String, String] = {
def fillBuffers(
Expand Down
47 changes: 46 additions & 1 deletion core/shared/src/test/scala/fs2/TextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TextSuite extends Fs2Suite {
}
}

property("grouped in 3 characater chunks") {
property("grouped in 3 character chunks") {
forAll { (lines0: Stream[Pure, String]) =>
val lines = lines0.map(escapeCrLf)
val s = lines.intersperse("\r\n").toList.mkString.grouped(3).toList
Expand All @@ -280,6 +280,51 @@ class TextSuite extends Fs2Suite {
}
}

group("linesWithEndings") {
def escapeCrLf(s: String): String =
s.replaceAll("\r\n", "<CRLF>").replaceAll("\n", "<LF>").replaceAll("\r", "<CR>")

def expected(ending: LineEnding, originalLines: Stream[Pure, String]): List[(String,LineEnding)] = {
val v = originalLines.toVector
(v.dropRight(1).map(_ -> ending) ++ v.takeRight(1).map(_ -> LineEnding.EOF)).toList
}

property("newlines appear in between chunks") {
forAll { (lines0: Stream[Pure, String]) =>
val lines = lines0.map(escapeCrLf)
def assertEnding(ending: LineEnding): Unit = {
assertEquals(lines.intersperse(ending.value).through(text.linesWithEndings).toList, expected(ending, lines))
}
assertEnding(LineEnding.CRLF)
assertEnding(LineEnding.LF)
}
}

property("single string") {
forAll { (lines0: Stream[Pure, String]) =>
val lines = lines0.map(escapeCrLf)
if (lines.toList.nonEmpty) {
val s = lines.intersperse("\r\n").toList.mkString
assertEquals(Stream.emit(s).through(text.linesWithEndings).toList, expected(LineEnding.CRLF, lines))
}
}
}

property("grouped in 3 character chunks") {
forAll { (lines0: Stream[Pure, String]) =>
val lines = lines0.map(escapeCrLf)
val s = lines.intersperse("\r\n").toList.mkString.grouped(3).toList
if (s.isEmpty)
assertEquals(Stream.emits(s).through(text.linesWithEndings).toList, Nil)
else {
val expectedLines = expected(LineEnding.CRLF, lines)
assertEquals(Stream.emits(s).through(text.linesWithEndings).toList, expectedLines)
assertEquals(Stream.emits(s).unchunk.through(text.linesWithEndings).toList, expectedLines)
}
}
}
}

property("base64Encode") {
forAll { (bs: List[Array[Byte]]) =>
assertEquals(
Expand Down