diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala index e14a27506..25c0db682 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala @@ -37,6 +37,8 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] { case object ReadStartLine extends PropsKeyEntry("read.text.start.line") case object ReadEndLine extends PropsKeyEntry("read.text.end.line") + case object ReadLastEndLineMissing extends PropsKeyEntry("read.text.last.end.line.missing") + case object ReadTrimLine extends PropsKeyEntry("read.text.trim") case object StoreEnvelope extends PropsKeyEntry(DataStorageSettings.StoreEnvelopeKey) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/ReadTextMode.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/ReadTextMode.scala index eb6af562f..379b1834e 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/ReadTextMode.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/ReadTextMode.scala @@ -56,6 +56,9 @@ object ReadTextMode { startLine <- props.getString(PropsKeyEnum.ReadStartLine) endLine <- props.getString(PropsKeyEnum.ReadEndLine) trim <- props.getOptionalBoolean(PropsKeyEnum.ReadTrimLine).toOption.flatten.orElse(Some(false)) + lastEndLineMissing <- props.getOptionalBoolean(PropsKeyEnum.ReadLastEndLineMissing).toOption.flatten.orElse( + Some(false), + ) } yield StartEndLineReadTextMode(startLine, endLine, trim) case None => Option.empty } @@ -76,7 +79,8 @@ case class StartEndTagReadTextMode(startTag: String, endTag: String, buffer: Int } } -case class StartEndLineReadTextMode(startLine: String, endLine: String, trim: Boolean) extends ReadTextMode { +case class StartEndLineReadTextMode(startLine: String, endLine: String, trim: Boolean, lastEndLineMissing: Boolean) + extends ReadTextMode { override def createStreamReader( input: InputStream, ): CloudDataIterator[String] = { @@ -85,6 +89,7 @@ case class StartEndLineReadTextMode(startLine: String, endLine: String, trim: Bo startLine, endLine, trim, + lastEndLineMissing, ) new CustomTextStreamReader(() => lineReader.next(), () => lineReader.close()) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala index d2718404f..7db5dc772 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/source/config/kcqlprops/CloudSourcePropsSchema.scala @@ -26,15 +26,16 @@ import scala.jdk.CollectionConverters.MapHasAsScala object CloudSourcePropsSchema { private[source] val keys = Map[PropsKeyEntry, PropsSchema]( - ReadTextMode -> EnumPropsSchema(ReadTextModeEnum), - ReadRegex -> StringPropsSchema, - ReadStartTag -> StringPropsSchema, - ReadEndTag -> StringPropsSchema, - ReadStartLine -> StringPropsSchema, - ReadEndLine -> StringPropsSchema, - BufferSize -> IntPropsSchema, - ReadTrimLine -> BooleanPropsSchema, - StoreEnvelope -> BooleanPropsSchema, + ReadTextMode -> EnumPropsSchema(ReadTextModeEnum), + ReadRegex -> StringPropsSchema, + ReadStartTag -> StringPropsSchema, + ReadEndTag -> StringPropsSchema, + ReadStartLine -> StringPropsSchema, + ReadEndLine -> StringPropsSchema, + ReadLastEndLineMissing -> BooleanPropsSchema, + BufferSize -> IntPropsSchema, + ReadTrimLine -> BooleanPropsSchema, + StoreEnvelope -> BooleanPropsSchema, ) val schema = KcqlPropsSchema(PropsKeyEnum, keys) diff --git a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReader.scala b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReader.scala index 1d7a5ae12..68cb608de 100644 --- a/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReader.scala +++ b/kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReader.scala @@ -24,11 +24,19 @@ import java.io.InputStreamReader * end is found. The start and end lines are included in the record. * If the file ends and there is no end, the record is ignored * - * @param input - * @param start - * @param end + * @param input the input stream + * @param start the record is considered to start when a line matching start is found + * @param end the record is considered complete when a line matching end is found + * @param trim if true, the record is trimmed + * @param lastEndLineMissing if true, the record is considered complete when end of file is reached */ -class LineStartLineEndReader(input: InputStream, start: String, end: String, trim: Boolean = false) extends LineReader { +class LineStartLineEndReader( + input: InputStream, + start: String, + end: String, + trim: Boolean = false, + lastEndLineMissing: Boolean = false, +) extends LineReader { private val br = new BufferedReader(new InputStreamReader(input)) //Returns the next record or None if there are no more @@ -60,10 +68,19 @@ class LineStartLineEndReader(input: InputStream, start: String, end: String, tri builder.append(line) line = br.readLine() } - Option(line).map { _ => - builder.append(System.lineSeparator()) - builder.append(end) - builder.toString() + Option(line) match { + case Some(_) => + builder.append(System.lineSeparator()) + builder.append(end) + Some(builder.toString()) + case None => + if (lastEndLineMissing) { + builder.append(System.lineSeparator()) + builder.append(end) + Some(builder.toString()) + } else { + None + } } } } diff --git a/kafka-connect-common/src/test/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReaderTest.scala b/kafka-connect-common/src/test/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReaderTest.scala index 3d5893602..d78faf6a7 100644 --- a/kafka-connect-common/src/test/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReaderTest.scala +++ b/kafka-connect-common/src/test/scala/io/lenses/streamreactor/connect/io/text/LineStartLineEndReaderTest.scala @@ -234,5 +234,58 @@ class LineStartLineEndReaderTest extends AnyFunSuite with Matchers { |x""".stripMargin, ) } + + test("when lastEndLineMissing=true, return the record if the end line is missing") { + val reader = new LineStartLineEndReader(createInputStream( + """ + |start + |a + |b + |c + | + |start + |x""".stripMargin, + ), + "start", + "", + trim = true, + lastEndLineMissing = true, + ) + reader.next() shouldBe Some( + """start + |a + |b + |c""".stripMargin, + ) + reader.next() shouldBe Some( + """start + |x""".stripMargin, + ) + } + + test("when lastEndLineMissing=true, return the record if the end line is missing all file is a message") { + val reader = new LineStartLineEndReader(createInputStream( + """ + |start + |a + |b + |c + |start + |x""".stripMargin, + ), + "start", + "", + trim = true, + lastEndLineMissing = true, + ) + reader.next() shouldBe Some( + """start + |a + |b + |c + |start + |x""".stripMargin, + ) + } private def createInputStream(data: String): InputStream = new ByteArrayInputStream(data.getBytes) }