Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove limiting condition #1017

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,6 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[

}

unitUnderTest should "throw error if prefix contains a slash" in {

val task = createSinkTask()

val prefixWithSlashes = "my/prefix/that/is/a/path"
val props = (defaultProps
+ (
s"$prefix.kcql" -> s"insert into $BucketName:$prefixWithSlashes select * from $TopicName WITH_FLUSH_INTERVAL = 1",
)).asJava

val intercepted = intercept[IllegalArgumentException] {
task.start(props)
}

intercepted.getMessage should be("Nested prefix not currently supported")

}

unitUnderTest should "flush for every record when configured flush count size of 1" in {

val props =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import scala.util.Try

object S3LocationValidator extends CloudLocationValidator {

def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] =
def validate(location: CloudLocation): Validated[Throwable, CloudLocation] =
Validated.fromEither(
for {
_ <- validateBucketName(location.bucket).toEither
_ <- validatePrefix(allowSlash, location.prefix).toEither
} yield location,
)

Expand All @@ -39,12 +38,4 @@ object S3LocationValidator extends CloudLocationValidator {
},
)

private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] =
Validated.fromEither(
Either.cond(
allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))),
(),
new IllegalArgumentException("Nested prefix not currently supported"),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object SourceBucketOptions {
config.getKCQL.map {
kcql: Kcql =>
for {
source <- CloudLocation.splitAndValidate(kcql.getSource, allowSlash = true)
source <- CloudLocation.splitAndValidate(kcql.getSource)
format <- FormatSelection.fromKcql(kcql, CloudSourcePropsSchema.schema)
sourceProps = CloudSourceProps.fromKcql(kcql)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,26 @@ import org.scalatest.matchers.should.Matchers
class CloudLocationTest extends AnyFlatSpec with Matchers with EitherValues {
implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator

"bucketAndPrefix" should "reject prefixes with slashes" in {
expectException(
CloudLocation.splitAndValidate("bucket:/slash", allowSlash = false),
"Nested prefix not currently supported",
)
"bucketAndPrefix" should "accept prefixes with slashes" in {

CloudLocation.splitAndValidate("bucket:/slash").value should be(CloudLocation("bucket", "/slash".some))

}

"bucketAndPrefix" should "split the bucket and prefix" in {
CloudLocation.splitAndValidate("bucket:prefix", allowSlash = false).value should be(CloudLocation("bucket",
"prefix".some,
))
CloudLocation.splitAndValidate("bucket:prefix").value should be(CloudLocation("bucket", "prefix".some))
}

"bucketAndPrefix" should "fail if given too many components to split" in {
expectException(
CloudLocation.splitAndValidate("bucket:path:whatIsThis", false),
CloudLocation.splitAndValidate("bucket:path:whatIsThis"),
"Invalid number of arguments provided to create BucketAndPrefix",
)
}

"bucketAndPrefix" should "fail if not a valid bucket name" in {
expectException(
CloudLocation.splitAndValidate("bucket-police-refu$e-this-name:path", allowSlash = true),
CloudLocation.splitAndValidate("bucket-police-refu$e-this-name:path"),
"Bucket name should not contain '$'",
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
object DatalakeLocationValidator extends CloudLocationValidator {
private val ContainerNamePattern = "^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$".r

def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] =
def validate(location: CloudLocation): Validated[Throwable, CloudLocation] =
Validated.fromEither(
for {
_ <- validateBucketName(location.bucket).toEither
_ <- validatePrefix(allowSlash, location.prefix).toEither
} yield location,
)

Expand All @@ -37,12 +36,4 @@ object DatalakeLocationValidator extends CloudLocationValidator {
Validated.Invalid(new IllegalArgumentException("Nested prefix not currently supported"))
}

private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] =
Validated.fromEither(
Either.cond(
allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))),
(),
new IllegalArgumentException("Nested prefix not currently supported"),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,19 @@ class DatalakeLocationValidatorTest extends AnyFunSuite with Matchers {

test("DatalakeLocationValidator should validate a valid bucket name") {
val location = CloudLocation("valid-bucket-name", none, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = false)
val result = DatalakeLocationValidator.validate(location)
result shouldBe Validated.Valid(location)
}

test("DatalakeLocationValidator should return an error for an invalid bucket name") {
val location = CloudLocation("invalid_bucket_name", none, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = false)
val result = DatalakeLocationValidator.validate(location)
result shouldBe a[Validated.Invalid[_]]
}

test("DatalakeLocationValidator should return an error if prefix contains a slash when not allowed") {
test("DatalakeLocationValidator should allow prefix with a slash in") {
val location = CloudLocation("valid-bucket-name", "prefix/".some, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = false)
result shouldBe a[Validated.Invalid[_]]
}

test("DatalakeLocationValidator should allow prefix with a slash when allowed") {
val location = CloudLocation("valid-bucket-name", "prefix/".some, "valid-path".some)
val result = DatalakeLocationValidator.validate(location, allowSlash = true)
val result = DatalakeLocationValidator.validate(location)
result shouldBe Validated.Valid(location)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ case class CloudLocation(

def prefixOrDefault(): String = prefix.getOrElse("")

private def validate(allowSlash: Boolean): Validated[Throwable, CloudLocation] =
cloudLocationValidator.validate(this, allowSlash)
private def validate(): Validated[Throwable, CloudLocation] =
cloudLocationValidator.validate(this)

override def toString: String = {
val prefixStr = prefix.map(p => s"$p/").getOrElse("")
Expand All @@ -69,16 +69,15 @@ case class CloudLocation(
case object CloudLocation {
def splitAndValidate(
bucketAndPrefix: String,
allowSlash: Boolean,
)(
implicit
validator: CloudLocationValidator,
): Either[Throwable, CloudLocation] =
bucketAndPrefix.split(":") match {
case Array(bucket) =>
CloudLocation(bucket, None).validate(allowSlash).toEither
CloudLocation(bucket, None).validate().toEither
case Array(bucket, path) =>
CloudLocation(bucket, Some(path)).validate(allowSlash).toEither
CloudLocation(bucket, Some(path)).validate().toEither
case _ => new IllegalArgumentException("Invalid number of arguments provided to create BucketAndPrefix").asLeft
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.model.location
import cats.data.Validated

trait CloudLocationValidator {
def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation]
def validate(location: CloudLocation): Validated[Throwable, CloudLocation]

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object CloudSinkBucketOptions extends LazyLogging {
}
keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)
stagingArea <- config.getLocalStagingArea()
target <- CloudLocation.splitAndValidate(kcql.getTarget, allowSlash = false)
target <- CloudLocation.splitAndValidate(kcql.getTarget)
storageSettings <- DataStorageSettings.from(sinkProps)
_ <- validateEnvelopeAndFormat(formatSelection, storageSettings)
commitPolicy = config.commitPolicy(kcql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters.MapHasAsScala

object SampleData extends Matchers {

implicit val cloudLocationValidator: CloudLocationValidator = (s3Location: CloudLocation, allowSlash: Boolean) =>
implicit val cloudLocationValidator: CloudLocationValidator = (s3Location: CloudLocation) =>
Validated.fromEither(Right(s3Location))

val topic: Topic = Topic("niceTopic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,6 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[

}

unitUnderTest should "throw error if prefix contains a slash" in {

val task = createSinkTask()

val prefixWithSlashes = "my/prefix/that/is/a/path"
val props = (defaultProps
+ (
s"$prefix.kcql" -> s"insert into $BucketName:$prefixWithSlashes select * from $TopicName WITH_FLUSH_INTERVAL = 1",
)).asJava

val intercepted = intercept[IllegalArgumentException] {
task.start(props)
}

intercepted.getMessage should be("Nested prefix not currently supported")

}

unitUnderTest should "flush for every record when configured flush count size of 1" in {

val props =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
object GCPStorageLocationValidator extends CloudLocationValidator {
private val ContainerNamePattern = "^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$".r

def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] =
def validate(location: CloudLocation): Validated[Throwable, CloudLocation] =
Validated.fromEither(
for {
_ <- validateBucketName(location.bucket).toEither
_ <- validatePrefix(allowSlash, location.prefix).toEither
} yield location,
)

Expand All @@ -37,12 +36,4 @@ object GCPStorageLocationValidator extends CloudLocationValidator {
Validated.Invalid(new IllegalArgumentException("Nested prefix not currently supported"))
}

private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] =
Validated.fromEither(
Either.cond(
allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))),
(),
new IllegalArgumentException("Nested prefix not currently supported"),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,35 @@ class GCPStorageLocationValidatorTest extends AnyFunSuite with Matchers with Val

test("validate should succeed for a valid CloudLocation") {
val validLocation = CloudLocation("valid-bucket", Some("valid-prefix"))
val result = GCPStorageLocationValidator.validate(validLocation, allowSlash = true)
val result = GCPStorageLocationValidator.validate(validLocation)
result.value should be(validLocation)
}

test("validate should fail for an invalid bucket name") {
val invalidLocation = CloudLocation("invalid@bucket", Some("valid-prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(invalidLocation, allowSlash = true)
GCPStorageLocationValidator.validate(invalidLocation)
result.leftValue.getMessage should be("Nested prefix not currently supported")
}

test("validate should fail for an invalid prefix with slashes not allowed") {
val invalidLocation = CloudLocation("valid-bucket", Some("invalid/prefix"))
test("validate should fail for a prefix with slashes") {
val invalidLocation = CloudLocation("valid-bucket", Some("slash/prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(invalidLocation, allowSlash = false)
result.leftValue.getMessage should be("Nested prefix not currently supported")
GCPStorageLocationValidator.validate(invalidLocation)
result.value should be(CloudLocation("valid-bucket", Some("slash/prefix")))
}

test("validate should succeed for a valid prefix with slashes not allowed") {
val validLocation = CloudLocation("valid-bucket", Some("valid-prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(validLocation, allowSlash = false)
GCPStorageLocationValidator.validate(validLocation)
result.value should be(validLocation)
}

test("validate should succeed for a valid prefix with slashes allowed") {
val validLocation = CloudLocation("valid-bucket", Some("valid-prefix"))
val result: Validated[Throwable, CloudLocation] =
GCPStorageLocationValidator.validate(validLocation, allowSlash = true)
GCPStorageLocationValidator.validate(validLocation)
result.value should be(validLocation)
}
}
Loading