From d30c75a2af8dd4b4787cf32cbea096cb6f13f4bd Mon Sep 17 00:00:00 2001 From: Stefan Bocutiu Date: Wed, 24 Apr 2024 10:10:03 +0100 Subject: [PATCH] LC-195: Introduce an 8.0 Compatibility Setting to Streamline Future Migrations (#1172) ome customers are onboarding on 7.0 which will bring changes to the KCQL statement. For 8.0 there will be a new property to address faster seeks on the source. By incorporating this property now, users can seamlessly integrate it into their workflows, preempting the need for a subsequent migration to the upcoming version 8.0. Co-authored-by: stheppi --- .../config/kcqlprops/KeyNamerVersion.scala | 43 +++++++++++++++++++ .../config/kcqlprops/PropsKeyEnum.scala | 2 + .../config/kcqlprops/SinkPropsSchema.scala | 1 + 3 files changed, 46 insertions(+) create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/KeyNamerVersion.scala diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/KeyNamerVersion.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/KeyNamerVersion.scala new file mode 100644 index 000000000..87714c6b2 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/KeyNamerVersion.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.config.kcqlprops + +import enumeratum.Enum +import enumeratum.EnumEntry +import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.KeyNameFormatVersion +import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties + +sealed trait KeyNamerVersion extends EnumEntry + +object KeyNamerVersion extends Enum[KeyNamerVersion] { + + case object V0 extends KeyNamerVersion + + case object V1 extends KeyNamerVersion + + def apply( + props: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type], + default: KeyNamerVersion, + ): KeyNamerVersion = fromProps(props).getOrElse(default) + + private def fromProps(props: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]): Option[KeyNamerVersion] = + props.getOptionalInt(KeyNameFormatVersion).collect { + case 0 => V0 + case 1 => V1 + } + + override def values: IndexedSeq[KeyNamerVersion] = findValues +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala index 7997dc1b4..e14a27506 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/kcqlprops/PropsKeyEnum.scala @@ -58,4 +58,6 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] { case object FlushCount extends PropsKeyEntry("flush.count") case object FlushInterval extends PropsKeyEntry("flush.interval") + + case object KeyNameFormatVersion extends PropsKeyEntry("key.name.format.version") } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/kcqlprops/SinkPropsSchema.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/kcqlprops/SinkPropsSchema.scala index 91a93b1e8..5186415c8 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/kcqlprops/SinkPropsSchema.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/kcqlprops/SinkPropsSchema.scala @@ -39,6 +39,7 @@ object SinkPropsSchema { FlushCount -> LongPropsSchema, FlushSize -> LongPropsSchema, FlushInterval -> IntPropsSchema, + KeyNameFormatVersion -> IntPropsSchema, ) val schema: KcqlPropsSchema[PropsKeyEntry, PropsKeyEnum.type] =