Skip to content

Commit

Permalink
LC-187 Avoid adapting the partition fields value (#1137) (#1169)
Browse files Browse the repository at this point in the history
* LC-187 Avoid adapting the partition fields value

At the moment an SMT can control the format of the key. For example it can set it to: yyyy-MM-dd/HH. But the connector replaces the `/` and thus breaks the key.

The change removes the code to replace the value and adds a test for it.

* Address the PR comments

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Apr 23, 2024
1 parent da04836 commit 75b2aed
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,8 @@ class CloudKeyNamer(
}
}

private def getFieldStringValue(struct: SinkData, partitionName: Option[PartitionNamePath]) =
adaptErrorResponse(SinkDataExtractor.extractPathFromSinkData(struct)(partitionName)).fold(Option.empty[String])(
fieldVal =>
Option(fieldVal
.replace("/", "-")
.replace("\\", "-")),
)
private def getFieldStringValue(struct: SinkData, partitionName: Option[PartitionNamePath]): Option[String] =
adaptErrorResponse(SinkDataExtractor.extractPathFromSinkData(struct)(partitionName))

private def getPartitionValueFromSinkData(sinkData: SinkData, partitionName: PartitionNamePath): String =
getFieldStringValue(sinkData, Option(partitionName)).getOrElse("[missing]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package io.lenses.streamreactor.connect.cloud.common.sink.naming
import cats.implicits.none
import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StringSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config._
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionPartitionField
import io.lenses.streamreactor.connect.cloud.common.sink.config.PartitionSelection
import io.lenses.streamreactor.connect.cloud.common.sink.config.TopicPartitionField
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData
import org.mockito.ArgumentMatchers.anyString
import org.mockito.MockitoSugar
Expand Down Expand Up @@ -69,6 +70,26 @@ class CloudKeyNamerTest extends AnyFunSuite with Matchers with OptionValues with

private val s3KeyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)

test("the partition values do not replace / or \\ characters") {
val partitionSelection =
PartitionSelection(isCustom = false, List(HeaderPartitionField(PartitionNamePath("h"))), Values)
val keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)
val either: Either[SinkError, Map[PartitionField, String]] = keyNamer.processPartitionValues(
MessageDetail(
NullSinkData(None),
NullSinkData(None),
Map("h" -> StringSinkData("val1/val2")),
None,
topicPartition.topic,
topicPartition.partition,
topicPartition.offset,
),
topicPartition.toTopicPartition,
)

either.value shouldBe Map(HeaderPartitionField(PartitionNamePath("h")) -> "val1/val2")
}

test("stagingFile should generate the correct staging file path with no prefix") {
val stagingDirectory = Files.createTempDirectory("myTempDir").toFile

Expand Down

0 comments on commit 75b2aed

Please sign in to comment.