Skip to content

Commit

Permalink
Simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Jan 16, 2024
1 parent 10094b1 commit 350b347
Show file tree
Hide file tree
Showing 19 changed files with 193 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig.fromJson
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef.configProp
import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import org.http4s.client.Client
Expand Down Expand Up @@ -91,11 +92,17 @@ class HttpSinkTask extends SinkTask {
(maybeConfig, maybeTemplate) match {
case (Some(config), Some(template)) =>
val sender = new HttpRequestSender(config.authentication, config.method)
val processed: ProcessedTemplate = template.process(sinkRecords)
sender.sendHttpRequest(
client,
processed,
)
template.process(sinkRecords) match {
case Left(subsError: SubstitutionError) =>
throw new ConnectException(s"Error during record substitution: ${subsError.msg}",
subsError.throwable.orNull,
)
case Right(processedRecords) => sender.sendHttpRequest(
client,
processedRecords,
)
}

()

case _ => throw new IllegalArgumentException("Config or template not set")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.lenses.streamreactor.connect.http.sink.client

import cats.effect.IO
import io.lenses.streamreactor.connect.http.sink.tpl.templates.ProcessedTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.ProcessedTemplate
import org.http4s._
import org.http4s.client.Client
import org.http4s.headers.Authorization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.templates
package io.lenses.streamreactor.connect.http.sink.tpl

import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError

object ProcessedTemplate {
def apply(
endpointOrError: Either[SubstitutionError, String],
contentOrError: Either[SubstitutionError, String],
headersOrError: Either[SubstitutionError, Seq[(String, String)]],
): Either[SubstitutionError, ProcessedTemplate] =
for {
ep <- endpointOrError
ct <- contentOrError
hd <- headersOrError
} yield {
ProcessedTemplate(ep, ct, hd)
}
}
case class ProcessedTemplate(
endpoint: String,
content: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.templates
package io.lenses.streamreactor.connect.http.sink.tpl

import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate.innerTemplatePattern
import io.lenses.streamreactor.connect.http.sink.tpl.templates.RawTemplate.templatePattern
import cats.implicits._
import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate.innerTemplatePattern
import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate.templatePattern
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionType
import org.apache.kafka.connect.sink.SinkRecord

import scala.util.matching.Regex
Expand All @@ -40,70 +43,72 @@ case class RawTemplate(
) {

// Method to render a single data entry with a template
private def render(data: SinkRecord, tplText: String): String =
templatePattern.replaceAllIn(tplText,
matchTag => {
val tag = matchTag.group(1).trim
getValue(tag, data)
},
private def render(data: SinkRecord, tplText: String): Either[SubstitutionError, String] =
Either.catchOnly[SubstitutionError](
templatePattern
.replaceAllIn(
tplText,
matchTag => {
val tag = matchTag.group(1).trim
getValue(tag, data)
.leftMap(throw _)
.merge
},
),
)

private def renderInnerTpl(data: Seq[SinkRecord], innerTpl: String): String =
data
.map(rec =>
templatePattern.replaceAllIn(innerTpl,
matchTag => {
val tag = matchTag.group(1).trim
getValue(tag, rec)
},
),
).mkString("")
private def renderInnerTpl(data: Seq[SinkRecord], innerTpl: String): Either[SubstitutionError, String] =
data.map(rec => render(rec, innerTpl)).sequence.map(_.mkString)

// Method to render a single data entry with a template
private def renderMulti(data: Seq[SinkRecord], tplText: String): String = {
val tplTxt = innerTemplate.map(renderInnerTpl(data, _)) match {
case Some(renderedInnerTpl) =>
innerTemplatePattern.replaceFirstIn(tplText, renderedInnerTpl)
case None => tplText
private def renderMulti(data: Seq[SinkRecord], tplText: String): Either[SubstitutionError, String] = {

val innerTpl: Either[SubstitutionError, String] = innerTemplate match {
case Some(tpl) => renderInnerTpl(data, tpl).map {
innerTemplatePattern.replaceFirstIn(tplText, _)
}
case None => tplText.asRight
}

templatePattern.replaceAllIn(tplTxt,
matchTag => {
val tag = matchTag.group(1).trim
getValue(tag, data.head)
},
)
innerTpl match {
case Left(value) => value.asLeft[String]
case Right(value) => render(data.head, value)
}

}

def process(
sinkRecords: Seq[SinkRecord],
): ProcessedTemplate = {
require(sinkRecords.nonEmpty) // TODO err handling
): Either[SubstitutionError, ProcessedTemplate] =
ProcessedTemplate(
render(sinkRecords.head, endpoint),
renderMulti(sinkRecords.toList, content),
sinkRecords.flatMap {
sinkRecord =>
headers.map {
case (keyTpl, valTpl) =>
render(sinkRecord, keyTpl) -> render(sinkRecord, valTpl)
}.distinct
},
renderSinkRecordHeaders(sinkRecords),
)
}

private def renderSinkRecordHeaders(sinkRecords: Seq[SinkRecord]): Either[SubstitutionError, Seq[(String, String)]] =
sinkRecords.flatMap {
sinkRecord =>
headers.map {
case (keyTpl, valTpl) =>
for {
key <- render(sinkRecord, keyTpl)
value <- render(sinkRecord, valTpl)
} yield key -> value
}.distinct
}.sequence

// Helper method to get the value for a given tag from data
private def getValue(tag: String, data: SinkRecord): String = {
private def getValue(tag: String, data: SinkRecord): Either[SubstitutionError, String] = {
val locs = tag.split("\\.", 2)
(locs(0).toLowerCase, locs.lift(1)) match {
case ("#message", _) => ""
case ("/message", _) => ""
case ("#message", _) => "".asRight
case ("/message", _) => "".asRight
case (key: String, locator: Option[String]) =>
val sType = SubstitutionType.withNameInsensitiveOption(key).getOrElse(
throw new IllegalArgumentException(s"Couldn't find $key SubstitutionType"),
)
sType.toBinding(locator = locator).get(data).toString
sType.get(locator, data).map(_.toString)
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.binding
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.catsSyntaxEitherId
import org.apache.kafka.connect.sink.SinkRecord

class KafkaConnectOffsetBinding() extends KafkaConnectBinding() {
def get(sinkRecord: SinkRecord): AnyRef =
Long.box(sinkRecord.kafkaOffset())
case object Header extends SubstitutionType {
def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
locator match {
case Some(loc) => sinkRecord.headers().lastWithName(loc).value().asRight
case None => SubstitutionError("Invalid locator for path").asLeft
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.binding
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.connect.cloud.common.sink.extractors.KafkaConnectExtractor
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord

class KafkaConnectKeyBinding(name: Option[String]) extends KafkaConnectBinding() {
def get(sinkRecord: SinkRecord): AnyRef = KafkaConnectExtractor.extractFromKey(sinkRecord, name).leftMap(e =>
throw new ConnectException(s"unable to extract field $name for template, ", e),
).merge
case object Key extends SubstitutionType {

def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
KafkaConnectExtractor.extractFromKey(sinkRecord, locator).leftMap(e =>
SubstitutionError(s"unable to extract field $locator for template, ", e),
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.binding
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.catsSyntaxEitherId
import org.apache.kafka.connect.sink.SinkRecord

class KafkaConnectHeaderBinding(name: String) extends KafkaConnectBinding() {
def get(sinkRecord: SinkRecord): AnyRef =
sinkRecord.headers().lastWithName(name).value()
case object Offset extends SubstitutionType {
override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
Long.box(sinkRecord.kafkaOffset()).asRight
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.binding
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.catsSyntaxEitherId
import org.apache.kafka.connect.sink.SinkRecord

class KafkaConnectTopicBinding() extends KafkaConnectBinding() {
def get(sinkRecord: SinkRecord): AnyRef =
sinkRecord.topic()
case object Partition extends SubstitutionType {
override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
sinkRecord.kafkaPartition().asRight

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.catsSyntaxOptionId

object SubstitutionError {
def apply(msg: String): SubstitutionError = SubstitutionError(msg, Option.empty)
def apply(msg: String, throwable: Throwable): SubstitutionError = SubstitutionError(msg, throwable.some)
}
case class SubstitutionError(msg: String, throwable: Option[Throwable]) extends Throwable
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import enumeratum.CirceEnum
import enumeratum.Enum
import enumeratum.EnumEntry
import org.apache.kafka.connect.sink.SinkRecord

trait SubstitutionType extends EnumEntry {
def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef]
}

case object SubstitutionType extends Enum[SubstitutionType] with CirceEnum[SubstitutionType] {
override def values: IndexedSeq[SubstitutionType] =
IndexedSeq(Header, Key, Offset, Partition, Timestamp, Topic, Value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.binding
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.catsSyntaxEitherId
import org.apache.kafka.connect.sink.SinkRecord

class KafkaConnectPartitionBinding() extends KafkaConnectBinding() {
def get(sinkRecord: SinkRecord): AnyRef =
sinkRecord.kafkaPartition()
object Timestamp extends SubstitutionType {
override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
Long.box(sinkRecord.timestamp()).asRight

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.tpl.binding
package io.lenses.streamreactor.connect.http.sink.tpl.substitutions

import cats.implicits.catsSyntaxEitherId
import org.apache.kafka.connect.sink.SinkRecord

trait KafkaConnectBinding {
def get(sinkRecord: SinkRecord): AnyRef

case object Topic extends SubstitutionType {
override def get(locator: Option[String], sinkRecord: SinkRecord): Either[SubstitutionError, AnyRef] =
sinkRecord.topic().asRight
}
Loading

0 comments on commit 350b347

Please sign in to comment.