Skip to content

Commit

Permalink
Enhancements for Source Line-Start-End Functionality (LC-196)
Browse files Browse the repository at this point in the history
This PR addresses a specific challenge encountered by users of the S3 source functionality. In cases where an external producer terminates a file without the end message mark. This ends up losing data.

To mitigate this issue, the PR introduces a new property entry for KCQL to signal the unterminated message to be considered.
  • Loading branch information
stheppi committed Apr 26, 2024
1 parent de38391 commit ddff74a
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] {
case object ReadStartLine extends PropsKeyEntry("read.text.start.line")
case object ReadEndLine extends PropsKeyEntry("read.text.end.line")

case object ReadLastEndLineMissing extends PropsKeyEntry("read.text.last.end.line.missing")

case object ReadTrimLine extends PropsKeyEntry("read.text.trim")

case object StoreEnvelope extends PropsKeyEntry(DataStorageSettings.StoreEnvelopeKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ object ReadTextMode {
startLine <- props.getString(PropsKeyEnum.ReadStartLine)
endLine <- props.getString(PropsKeyEnum.ReadEndLine)
trim <- props.getOptionalBoolean(PropsKeyEnum.ReadTrimLine).toOption.flatten.orElse(Some(false))
lastEndLineMissing <- props.getOptionalBoolean(PropsKeyEnum.ReadLastEndLineMissing).toOption.flatten.orElse(
Some(false),
)
} yield StartEndLineReadTextMode(startLine, endLine, trim)
case None => Option.empty
}
Expand All @@ -76,7 +79,8 @@ case class StartEndTagReadTextMode(startTag: String, endTag: String, buffer: Int
}
}

case class StartEndLineReadTextMode(startLine: String, endLine: String, trim: Boolean) extends ReadTextMode {
case class StartEndLineReadTextMode(startLine: String, endLine: String, trim: Boolean, lastEndLineMissing: Boolean)
extends ReadTextMode {
override def createStreamReader(
input: InputStream,
): CloudDataIterator[String] = {
Expand All @@ -85,6 +89,7 @@ case class StartEndLineReadTextMode(startLine: String, endLine: String, trim: Bo
startLine,
endLine,
trim,
lastEndLineMissing,
)
new CustomTextStreamReader(() => lineReader.next(), () => lineReader.close())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import scala.jdk.CollectionConverters.MapHasAsScala
object CloudSourcePropsSchema {

private[source] val keys = Map[PropsKeyEntry, PropsSchema](
ReadTextMode -> EnumPropsSchema(ReadTextModeEnum),
ReadRegex -> StringPropsSchema,
ReadStartTag -> StringPropsSchema,
ReadEndTag -> StringPropsSchema,
ReadStartLine -> StringPropsSchema,
ReadEndLine -> StringPropsSchema,
BufferSize -> IntPropsSchema,
ReadTrimLine -> BooleanPropsSchema,
StoreEnvelope -> BooleanPropsSchema,
ReadTextMode -> EnumPropsSchema(ReadTextModeEnum),
ReadRegex -> StringPropsSchema,
ReadStartTag -> StringPropsSchema,
ReadEndTag -> StringPropsSchema,
ReadStartLine -> StringPropsSchema,
ReadEndLine -> StringPropsSchema,
ReadLastEndLineMissing -> BooleanPropsSchema,
BufferSize -> IntPropsSchema,
ReadTrimLine -> BooleanPropsSchema,
StoreEnvelope -> BooleanPropsSchema,
)

val schema = KcqlPropsSchema(PropsKeyEnum, keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,19 @@ import java.io.InputStreamReader
* end is found. The start and end lines are included in the record.
* If the file ends and there is no end, the record is ignored
*
* @param input
* @param start
* @param end
* @param input the input stream
* @param start the record is considered to start when a line matching start is found
* @param end the record is considered complete when a line matching end is found
* @param trim if true, the record is trimmed
* @param lastEndLineMissing if true, the record is considered complete when end of file is reached
*/
class LineStartLineEndReader(input: InputStream, start: String, end: String, trim: Boolean = false) extends LineReader {
class LineStartLineEndReader(
input: InputStream,
start: String,
end: String,
trim: Boolean = false,
lastEndLineMissing: Boolean = false,
) extends LineReader {
private val br = new BufferedReader(new InputStreamReader(input))

//Returns the next record or None if there are no more
Expand Down Expand Up @@ -60,10 +68,19 @@ class LineStartLineEndReader(input: InputStream, start: String, end: String, tri
builder.append(line)
line = br.readLine()
}
Option(line).map { _ =>
builder.append(System.lineSeparator())
builder.append(end)
builder.toString()
Option(line) match {
case Some(_) =>
builder.append(System.lineSeparator())
builder.append(end)
Some(builder.toString())
case None =>
if (lastEndLineMissing) {
builder.append(System.lineSeparator())
builder.append(end)
Some(builder.toString())
} else {
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,58 @@ class LineStartLineEndReaderTest extends AnyFunSuite with Matchers {
|x""".stripMargin,
)
}

test("when lastEndLineMissing=true, return the record if the end line is missing") {
val reader = new LineStartLineEndReader(createInputStream(
"""
|start
|a
|b
|c
|
|start
|x""".stripMargin,
),
"start",
"",
trim = true,
lastEndLineMissing = true,
)
reader.next() shouldBe Some(
"""start
|a
|b
|c""".stripMargin,
)
reader.next() shouldBe Some(
"""start
|x""".stripMargin,
)
}

test("when lastEndLineMissing=true, return the record if the end line is missing all file is a message") {
val reader = new LineStartLineEndReader(createInputStream(
"""
|start
|a
|b
|c
|start
|x""".stripMargin,
),
"start",
"",
trim = true,
lastEndLineMissing = true,
)
reader.next() shouldBe Some(
"""start
|a
|b
|c
|start
|x""".stripMargin,
)
}
private def createInputStream(data: String): InputStream = new ByteArrayInputStream(data.getBytes)
}

0 comments on commit ddff74a

Please sign in to comment.