Skip to content

Commit

Permalink
Remove limiting condition
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Jan 3, 2024
1 parent 4b8428e commit ba2ec27
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,6 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[

}

unitUnderTest should "throw error if prefix contains a slash" in {

val task = createSinkTask()

val prefixWithSlashes = "my/prefix/that/is/a/path"
val props = (defaultProps
+ (
s"$prefix.kcql" -> s"insert into $BucketName:$prefixWithSlashes select * from $TopicName WITH_FLUSH_INTERVAL = 1",
)).asJava

val intercepted = intercept[IllegalArgumentException] {
task.start(props)
}

intercepted.getMessage should be("Nested prefix not currently supported")

}

unitUnderTest should "flush for every record when configured flush count size of 1" in {

val props =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import scala.util.Try

object S3LocationValidator extends CloudLocationValidator {

def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] =
def validate(location: CloudLocation): Validated[Throwable, CloudLocation] =
Validated.fromEither(
for {
_ <- validateBucketName(location.bucket).toEither
_ <- validatePrefix(allowSlash, location.prefix).toEither
} yield location,
)

Expand All @@ -39,12 +38,4 @@ object S3LocationValidator extends CloudLocationValidator {
},
)

private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] =
Validated.fromEither(
Either.cond(
allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))),
(),
new IllegalArgumentException("Nested prefix not currently supported"),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object SourceBucketOptions {
config.getKCQL.map {
kcql: Kcql =>
for {
source <- CloudLocation.splitAndValidate(kcql.getSource, allowSlash = true)
source <- CloudLocation.splitAndValidate(kcql.getSource)
format <- FormatSelection.fromKcql(kcql, CloudSourcePropsSchema.schema)
sourceProps = CloudSourceProps.fromKcql(kcql)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,26 @@ import org.scalatest.matchers.should.Matchers
class CloudLocationTest extends AnyFlatSpec with Matchers with EitherValues {
implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator

"bucketAndPrefix" should "reject prefixes with slashes" in {
expectException(
CloudLocation.splitAndValidate("bucket:/slash", allowSlash = false),
"Nested prefix not currently supported",
)
"bucketAndPrefix" should "accept prefixes with slashes" in {

CloudLocation.splitAndValidate("bucket:/slash").value should be(CloudLocation("bucket", "/slash".some))

}

"bucketAndPrefix" should "split the bucket and prefix" in {
CloudLocation.splitAndValidate("bucket:prefix", allowSlash = false).value should be(CloudLocation("bucket",
"prefix".some,
))
CloudLocation.splitAndValidate("bucket:prefix").value should be(CloudLocation("bucket", "prefix".some))
}

"bucketAndPrefix" should "fail if given too many components to split" in {
expectException(
CloudLocation.splitAndValidate("bucket:path:whatIsThis", false),
CloudLocation.splitAndValidate("bucket:path:whatIsThis"),
"Invalid number of arguments provided to create BucketAndPrefix",
)
}

"bucketAndPrefix" should "fail if not a valid bucket name" in {
expectException(
CloudLocation.splitAndValidate("bucket-police-refu$e-this-name:path", allowSlash = true),
CloudLocation.splitAndValidate("bucket-police-refu$e-this-name:path"),
"Bucket name should not contain '$'",
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
object DatalakeLocationValidator extends CloudLocationValidator {
private val ContainerNamePattern = "^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$".r

def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] =
def validate(location: CloudLocation): Validated[Throwable, CloudLocation] =
Validated.fromEither(
for {
_ <- validateBucketName(location.bucket).toEither
_ <- validatePrefix(allowSlash, location.prefix).toEither
} yield location,
)

Expand All @@ -37,12 +36,4 @@ object DatalakeLocationValidator extends CloudLocationValidator {
Validated.Invalid(new IllegalArgumentException("Nested prefix not currently supported"))
}

private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] =
Validated.fromEither(
Either.cond(
allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))),
(),
new IllegalArgumentException("Nested prefix not currently supported"),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,19 @@ class DatalakeLocationValidatorTest extends AnyFunSuite with Matchers {

test("DatalakeLocationValidator should validate a valid bucket name") {
val location = CloudLocation("valid-bucket-name", none, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = false)
val result = DatalakeLocationValidator.validate(location)
result shouldBe Validated.Valid(location)
}

test("DatalakeLocationValidator should return an error for an invalid bucket name") {
val location = CloudLocation("invalid_bucket_name", none, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = false)
val result = DatalakeLocationValidator.validate(location)
result shouldBe a[Validated.Invalid[_]]
}

test("DatalakeLocationValidator should return an error if prefix contains a slash when not allowed") {
test("DatalakeLocationValidator should allow prefix with a slash in") {
val location = CloudLocation("valid-bucket-name", "prefix/".some, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = false)
result shouldBe a[Validated.Invalid[_]]
}

test("DatalakeLocationValidator should allow prefix with a slash when allowed") {
val location = CloudLocation("valid-bucket-name", "prefix/".some, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = true)
val result = DatalakeLocationValidator.validate(location)
result shouldBe Validated.Valid(location)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ case class CloudLocation(

def prefixOrDefault(): String = prefix.getOrElse("")

private def validate(allowSlash: Boolean): Validated[Throwable, CloudLocation] =
cloudLocationValidator.validate(this, allowSlash)
private def validate(): Validated[Throwable, CloudLocation] =
cloudLocationValidator.validate(this)

override def toString: String = {
val prefixStr = prefix.map(p => s"$p/").getOrElse("")
Expand All @@ -69,16 +69,15 @@ case class CloudLocation(
case object CloudLocation {
def splitAndValidate(
bucketAndPrefix: String,
allowSlash: Boolean,
)(
implicit
validator: CloudLocationValidator,
): Either[Throwable, CloudLocation] =
bucketAndPrefix.split(":") match {
case Array(bucket) =>
CloudLocation(bucket, None).validate(allowSlash).toEither
CloudLocation(bucket, None).validate().toEither
case Array(bucket, path) =>
CloudLocation(bucket, Some(path)).validate(allowSlash).toEither
CloudLocation(bucket, Some(path)).validate().toEither
case _ => new IllegalArgumentException("Invalid number of arguments provided to create BucketAndPrefix").asLeft
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.model.location
import cats.data.Validated

trait CloudLocationValidator {
def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation]
def validate(location: CloudLocation): Validated[Throwable, CloudLocation]

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object CloudSinkBucketOptions extends LazyLogging {
}
keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)
stagingArea <- config.getLocalStagingArea()
target <- CloudLocation.splitAndValidate(kcql.getTarget, allowSlash = false)
target <- CloudLocation.splitAndValidate(kcql.getTarget)
storageSettings <- DataStorageSettings.from(sinkProps)
_ <- validateEnvelopeAndFormat(formatSelection, storageSettings)
commitPolicy = config.commitPolicy(kcql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters.MapHasAsScala

object SampleData extends Matchers {

implicit val cloudLocationValidator: CloudLocationValidator = (s3Location: CloudLocation, allowSlash: Boolean) =>
implicit val cloudLocationValidator: CloudLocationValidator = (s3Location: CloudLocation) =>
Validated.fromEither(Right(s3Location))

val topic: Topic = Topic("niceTopic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,6 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[

}

unitUnderTest should "throw error if prefix contains a slash" in {

val task = createSinkTask()

val prefixWithSlashes = "my/prefix/that/is/a/path"
val props = (defaultProps
+ (
s"$prefix.kcql" -> s"insert into $BucketName:$prefixWithSlashes select * from $TopicName WITH_FLUSH_INTERVAL = 1",
)).asJava

val intercepted = intercept[IllegalArgumentException] {
task.start(props)
}

intercepted.getMessage should be("Nested prefix not currently supported")

}

unitUnderTest should "flush for every record when configured flush count size of 1" in {

val props =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
object GCPStorageLocationValidator extends CloudLocationValidator {
private val ContainerNamePattern = "^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$".r

def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] =
def validate(location: CloudLocation): Validated[Throwable, CloudLocation] =
Validated.fromEither(
for {
_ <- validateBucketName(location.bucket).toEither
_ <- validatePrefix(allowSlash, location.prefix).toEither
} yield location,
)

Expand All @@ -37,12 +36,4 @@ object GCPStorageLocationValidator extends CloudLocationValidator {
Validated.Invalid(new IllegalArgumentException("Nested prefix not currently supported"))
}

private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] =
Validated.fromEither(
Either.cond(
allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))),
(),
new IllegalArgumentException("Nested prefix not currently supported"),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,35 @@ class GCPStorageLocationValidatorTest extends AnyFunSuite with Matchers with Val

test("validate should succeed for a valid CloudLocation") {
val validLocation = CloudLocation("valid-bucket", Some("valid-prefix"))
val result = GCPStorageLocationValidator.validate(validLocation, allowSlash = true)
val result = GCPStorageLocationValidator.validate(validLocation)
result.value should be(validLocation)
}

test("validate should fail for an invalid bucket name") {
val invalidLocation = CloudLocation("invalid@bucket", Some("valid-prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(invalidLocation, allowSlash = true)
GCPStorageLocationValidator.validate(invalidLocation)
result.leftValue.getMessage should be("Nested prefix not currently supported")
}

test("validate should fail for an invalid prefix with slashes not allowed") {
val invalidLocation = CloudLocation("valid-bucket", Some("invalid/prefix"))
test("validate should fail for a prefix with slashes") {
val invalidLocation = CloudLocation("valid-bucket", Some("slash/prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(invalidLocation, allowSlash = false)
result.leftValue.getMessage should be("Nested prefix not currently supported")
GCPStorageLocationValidator.validate(invalidLocation)
result.value should be(CloudLocation("valid-bucket", Some("slash/prefix")))
}

test("validate should succeed for a valid prefix with slashes not allowed") {
val validLocation = CloudLocation("valid-bucket", Some("valid-prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(validLocation, allowSlash = false)
GCPStorageLocationValidator.validate(validLocation)
result.value should be(validLocation)
}

test("validate should succeed for a valid prefix with slashes allowed") {
val validLocation = CloudLocation("valid-bucket", Some("valid-prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(validLocation, allowSlash = true)
GCPStorageLocationValidator.validate(validLocation)
result.value should be(validLocation)
}
}

0 comments on commit ba2ec27

Please sign in to comment.