From e95d34262b974c4e57c877187cac7fe7aa5a0730 Mon Sep 17 00:00:00 2001 From: Aaruna Godthi Date: Wed, 13 Apr 2016 20:30:10 +0530 Subject: [PATCH] Fixed #16 Added option to configure mapping of fields in SinkRecord to CQL columns --- README.md | 21 ++++- build.sbt | 18 ++-- src/it/resources/setup.cql | 7 ++ .../cassandra/CassandraSinkTaskSpec.scala | 84 ++++++++++++++++- .../connect/cassandra/CassandraSinkTask.scala | 4 +- .../connect/cassandra/ConnectorSyntax.scala | 9 +- .../kafka/connect/cassandra/TaskConfig.scala | 25 ++++-- .../kafka/connect/cassandra/package.scala | 89 ++++++++++++++++--- .../connect/cassandra/AbstractSpec.scala | 2 +- .../kafka/connect/cassandra/SchemaSpec.scala | 4 +- .../connect/cassandra/TaskConfigSpec.scala | 3 +- 11 files changed, 226 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index f85ae72..93dc5ff 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,8 @@ All the others (BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE, ## CassandraSink It stores Kafka SinkRecord in Cassandra tables. Currently, we only support STRUCT type in the SinkRecord. -The STRUCT can have multiple fields with primitive fieldtypes. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names. +The STRUCT can have multiple fields with primitive fieldtypes. +By default, we assume one-to-one mapping between the column names in the Cassandra sink table and the field names. Say, the SinkRecords has the following STRUCT value ``` @@ -97,6 +98,23 @@ Say, the SinkRecords has the following STRUCT value Then the Cassandra table should have the columns - id, username, text +We also support specifying the field name mapping to column names, using the property `cassandra.sink.field.mapping` +Say, the SinkRecords has the following STRUCT value +``` +{ + 'id': 1, + 'user': { + 'id': 123, + 'name': 'Foo', + 'email': 'foo@bar.com' + }, + 'text': 'This is my first tweet' +} +``` +and the `cassandra.sink.field.mapping` has the value `{'id': 'id', 'user': {'id': 'uid', 'name': 'username'}, 'text': 'tweet_text'}` +Then the Cassandra table should have the columns - id, uid, username, tweet_text. +Note that since we did not specify any mapping for 'user.email', it is ignored and not inserted in the Cassandra Sink table. + Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink ## Configuration @@ -132,6 +150,7 @@ Refer `examples/config` for sample configuration files |-------- |----------------------------|-----------------------| | cassandra.sink.route.\ | The table to write the SinkRecords to, \.\ | | | cassandra.sink.consistency | The consistency level for writes to Cassandra. | LOCAL_QUORUM | +| cassandra.sink.field.mapping | The JSON String mapping field names to column names. | | ## Building from Source diff --git a/build.sbt b/build.sbt index fe5e8ba..85844f9 100644 --- a/build.sbt +++ b/build.sbt @@ -33,14 +33,16 @@ libraryDependencies ++= Seq( "org.joda" % "joda-convert" % "1.8.1", "org.scalatest" %% "scalatest" % "2.2.6" % "test,it", "org.mockito" % "mockito-core" % "2.0.34-beta" % "test,it", - "ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it", - CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, minor)) if minor < 11 => - "org.slf4j" % "slf4j-api" % "1.7.13" - case _ => - "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0" - } -) + "ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it" +) ++ (CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, minor)) if minor < 11 => Seq( + "org.slf4j" % "slf4j-api" % "1.7.13" + ) + case _ => Seq( + "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0", + "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4" + ) +}) publishMavenStyle := true diff --git a/src/it/resources/setup.cql b/src/it/resources/setup.cql index 7a0dec1..49d345a 100644 --- a/src/it/resources/setup.cql +++ b/src/it/resources/setup.cql @@ -26,6 +26,13 @@ CREATE TABLE IF NOT EXISTS test.kv ( value int, PRIMARY KEY (key)); +CREATE TABLE IF NOT EXISTS test.fieldmap ( + new_key text, + new_value int, + new_nested text, + new_dnested text, + PRIMARY KEY (new_key)); + CREATE TABLE test.playlists ( id bigint, song_order int, diff --git a/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala b/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala index 0210cd7..b073fe8 100644 --- a/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala +++ b/src/it/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTaskSpec.scala @@ -19,16 +19,17 @@ package com.tuplejump.kafka.connect.cassandra import scala.collection.JavaConverters._ +import scala.util.parsing.json.JSONObject import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct} import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext} class CassandraSinkTaskSpec extends AbstractFlatSpec { - val topicName = "test_kv_topic" - val tableName = "test.kv" - val config = sinkProperties(Map(topicName -> tableName)) - it should "start sink task" in { + val topicName = "test_kv_topic" + val tableName = "test.kv" + val config = sinkProperties(Map(topicName -> tableName)) + val sinkTask = new CassandraSinkTask() val mockContext = mock[SinkTaskContext] @@ -38,6 +39,10 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec { } it should "save records in cassandra" in { + val topicName = "test_kv_topic" + val tableName = "test.kv" + val config = sinkProperties(Map(topicName -> tableName)) + val sinkTask = new CassandraSinkTask() val mockContext = mock[SinkTaskContext] @@ -64,5 +69,76 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec { rowCount should be(2) cc.shutdown() } + + + it should "save records in cassandra with custom field mapping" in { + val topicName = "test_fieldmap_topic" + val tableName = "test.fieldmap" + val config = sinkProperties(Map(topicName -> tableName)) + + val sinkTask = new CassandraSinkTask() + val mockContext = mock[SinkTaskContext] + + val fieldMapping: JSONObject = JSONObject(Map( + "key" -> "new_key", + "value" -> "new_value", + "nvalue" -> JSONObject(Map( + "blah1" -> "new_nested", + "blah2" -> JSONObject(Map( + "blah2" -> "new_dnested" + )) + )) + )) + + sinkTask.initialize(mockContext) + sinkTask.start((config + ("cassandra.sink.field.mapping" -> fieldMapping.toString())).asJava) + + val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1) + .field("blah1", Schema.STRING_SCHEMA) + .field("blah2", Schema.STRING_SCHEMA).build + val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1) + .field("blah1", Schema.STRING_SCHEMA) + .field("blah2", doubleNestedSchema).build + val valueSchema = SchemaBuilder.struct.name("record").version(1) + .field("key", Schema.STRING_SCHEMA) + .field("value", Schema.INT32_SCHEMA) + .field("nvalue", nestedSchema).build + + val dnestedValue1 = new Struct(doubleNestedSchema) + .put("blah1", "dnes_blah1_1") + .put("blah2", "dnes_blah2_1") + val nestedValue1 = new Struct(nestedSchema) + .put("blah1", "nes_blah1_1") + .put("blah2", dnestedValue1) + val value1 = new Struct(valueSchema) + .put("key", "pqr") + .put("value", 15) + .put("nvalue", nestedValue1) + + val dnestedValue2 = new Struct(doubleNestedSchema) + .put("blah1", "dnes_blah1_2") + .put("blah2", "dnes_blah2_2") + val nestedValue2 = new Struct(nestedSchema) + .put("blah1", "nes_blah1_2") + .put("blah2", dnestedValue2) + val value2 = new Struct(valueSchema) + .put("key", "abc") + .put("value", 17) + .put("nvalue", nestedValue2) + + val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0) + val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0) + + sinkTask.put(List(record1, record2).asJavaCollection) + + sinkTask.stop() + + val cc = CassandraCluster.local + val session = cc.session + val result = session.execute(s"select count(1) from $tableName").one() + val rowCount = result.getLong(0) + rowCount should be(2) + cc.shutdown() + } } diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala index 9f5544a..2a7cdb5 100644 --- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala +++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/CassandraSinkTask.scala @@ -52,7 +52,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask { private def write(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = { // TODO needs ticket: if (byTopic.size > 1) boundWrite(sc, byTopic) else for (record <- byTopic) { - val query = record.as(sc.schema.namespace) + val query = record.as(sc.schema.namespace, sc.options.fieldMapping) Try(session.executeAsync(query.cql)) recover { case NonFatal(e) => throw new ConnectException( s"Error executing ${byTopic.size} records for schema '${sc.schema}'.", e) @@ -64,7 +64,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask { private def boundWrite(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = { val statement = prepare(session, sc) val futures = for (record <- byTopic) yield { - val query = record.as(sc.schema.namespace) + val query = record.as(sc.schema.namespace, sc.options.fieldMapping) try { val bs = statement.bind(query.cql) session.executeAsync(bs) diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/ConnectorSyntax.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/ConnectorSyntax.scala index 2425fa8..080c57f 100644 --- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/ConnectorSyntax.scala +++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/ConnectorSyntax.scala @@ -107,9 +107,12 @@ private[cassandra] object Syntax { namespace.length >= 3 || namespace.contains(".") } - def apply(namespace: String, columnNames: List[ColumnName], columnValues: String): SinkQuery = { - val columns = columnNames.mkString(",") - SinkQuery(s"INSERT INTO $namespace($columns) VALUES($columnValues)") + def apply(namespace: String, columnNamesVsValues: Map[ColumnName, String]): SinkQuery = { + val query = columnNamesVsValues.view.map(e => Vector(e._1, e._2)).transpose match { + case columnNames :: columnValues :: Nil => + s"INSERT INTO ${namespace}(${columnNames.mkString(",")}) VALUES(${columnValues.mkString(",")})" + } + SinkQuery(query) } } diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/TaskConfig.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/TaskConfig.scala index d19e536..6d9db4d 100644 --- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/TaskConfig.scala +++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/TaskConfig.scala @@ -18,8 +18,10 @@ package com.tuplejump.kafka.connect.cassandra import scala.collection.immutable import scala.util.control.NonFatal +import scala.util.parsing.json.JSON import org.apache.kafka.common.config.ConfigException import org.apache.kafka.connect.connector.Task +import org.apache.kafka.connect.errors.DataException import com.datastax.driver.core.{TableMetadata, ConsistencyLevel} import InternalConfig._ @@ -123,6 +125,9 @@ object TaskConfig { final val SinkConsistency: Key = "cassandra.sink.consistency" final val DefaultSinkConsistency = ConsistencyLevel.LOCAL_QUORUM + final val FieldMapping: Key = "cassandra.sink.field.mapping" + final val DefaultFieldMapping = Map.empty[String, String] + /* **** Task config **** */ final val TaskParallelismStrategy: Key = "cassandra.task.parallelism" @@ -156,6 +161,10 @@ private[cassandra] object InternalConfig { def toInt(a: String): Int = a.toInt def toLong(a: String): Long = a.toLong def toConsistency(a: String): ConsistencyLevel = ConsistencyLevel.valueOf(a) + def toMap(a: String): Map[String, Any] = JSON.parseFull(a) collect { + case data: Map[_, _] => data.asInstanceOf[Map[String, Any]] + } getOrElse(throw new DataException(s"Field mapping type for '$a' is not supported.")) + /** A Cassandra `keyspace.table` to Kafka topic mapping. * @@ -319,15 +328,21 @@ private[cassandra] object InternalConfig { sealed trait ClusterQueryOptions /** Settings related for individual queries, can be set per keyspace.table. */ - final case class WriteOptions(consistency: ConsistencyLevel) extends ClusterQueryOptions + final case class WriteOptions(consistency: ConsistencyLevel, + fieldMapping: Map[String, Any]) extends ClusterQueryOptions object WriteOptions { - def apply(config: Map[String,String]): WriteOptions = - WriteOptions(config.valueOr[ConsistencyLevel]( - SinkConsistency, toConsistency, DefaultSourceConsistency)) + def apply(config: Map[String, String]): WriteOptions = { + WriteOptions( + consistency = config.valueOr[ConsistencyLevel]( + SinkConsistency, toConsistency, DefaultSourceConsistency), + fieldMapping = config.valueOr[Map[String, Any]]( + FieldMapping, toMap, DefaultFieldMapping + ) + ) + } } - /** Settings related for individual queries, can be set per keyspace.table. */ final case class ReadOptions(splitSize: Int, fetchSize: Int, diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala index 30c684e..951eca3 100644 --- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala +++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala @@ -16,6 +16,8 @@ package com.tuplejump.kafka.connect +import org.apache.kafka.connect.data.Field + /** Common package operations. */ package object cassandra { import java.util.{List => JList, Map => JMap, Date => JDate} @@ -69,29 +71,90 @@ package object cassandra { implicit class SinkRecordOps(record: SinkRecord) { - def as(namespace: String): SinkQuery = { + def as(namespace: String, fieldMapping: Map[String, Any]): SinkQuery = { + val colNamesVsValues: Map[String, String] = { + if (fieldMapping.isEmpty) { + toCqlData + } else { + toCqlData(fieldMapping) + } + } + SinkQuery(namespace, colNamesVsValues) + } + + def toCqlData(): (Map[String, String]) = { val schema = record.valueSchema - val columnNames = schema.asColumnNames - val columnValues = schema.`type`() match { + schema.`type`() match { case STRUCT => - val struct: Struct = record.value.asInstanceOf[Struct] - columnNames.map(convert(schema, struct, _)).mkString(",") - case other => throw new DataException( - s"Unable to create insert statement with unsupported value schema type $other.") + schema.fields.asScala.map { field => + field.name -> convert(schema, record.value.asInstanceOf[Struct], field) + }.toMap + case other => + throw new DataException( + s"Unable to create insert statement with unsupported value schema type $other.") + } + } + + def toCqlData(fieldMapping: Map[String, Any]): Map[String, String] = { + record.valueSchema.`type`() match { + case STRUCT => + toColNamesVsValues(Map.empty[String, String], record.value.asInstanceOf[Struct], fieldMapping) + case other => + throw new DataException( + s"Unable to create insert statement with unsupported value schema type $other.") + } + } + + // scalastyle:off + private def toColNamesVsValues(colNameVsValues: Map[String, String], + struct: Struct, fieldMapping: Map[String, Any]): Map[String, String] = { + lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema") + var result: Map[String, String] = colNameVsValues + struct.schema.fields.asScala.foreach { field => + val fieldMappingValue = fieldMapping.get(field.name) + field.schema.`type`() match { + case STRUCT => + fieldMappingValue match { + case Some(value) => + value match { + case newMap: Map[_, _] => + result = toColNamesVsValues( + result, + struct.get(field).asInstanceOf[Struct], + newMap.asInstanceOf[Map[String, Any]] + ) + case _ => + throw exception + } + case None => + } + case _ => + fieldMappingValue match { + case Some(value) => + value match { + case strValue: String => + result += (strValue -> convert(field.schema, struct, field)) + case _ => + throw exception + } + case None => + } + } } - SinkQuery(namespace, columnNames, columnValues) + result } + // scalastyle:on /* TODO support all types. */ - def convert(schema: Schema, result: Struct, col: String): AnyRef = - schema.field(col).schema match { + def convert(schema: Schema, result: Struct, field: Field): String = + field.schema match { case x if x.`type`() == Schema.STRING_SCHEMA.`type`() => - s"'${result.get(col).toString}'" + s"'${result.get(field).toString}'" case x if x.name() == Timestamp.LOGICAL_NAME => - val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate]) + val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate]) s"$time" case y => - result.get(col) + String.valueOf(result.get(field)) } def asColumnNames: List[ColumnName] = diff --git a/src/test/scala/com/tuplejump/kafka/connect/cassandra/AbstractSpec.scala b/src/test/scala/com/tuplejump/kafka/connect/cassandra/AbstractSpec.scala index 84d6901..9471401 100644 --- a/src/test/scala/com/tuplejump/kafka/connect/cassandra/AbstractSpec.scala +++ b/src/test/scala/com/tuplejump/kafka/connect/cassandra/AbstractSpec.scala @@ -62,7 +62,7 @@ trait ConfigFixture extends { val route = Route(TaskConfig.SinkRoute + topic, s"$keyspace.$table").get val schema = Schema(route, Nil, Nil, Nil, columnNames, "") - SinkConfig(schema, PreparedQuery(schema), WriteOptions(DefaultSinkConsistency)) + SinkConfig(schema, PreparedQuery(schema), WriteOptions(DefaultSinkConsistency, DefaultFieldMapping)) } } diff --git a/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala b/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala index 93ce833..6ef181e 100644 --- a/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala +++ b/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala @@ -39,7 +39,7 @@ class SchemaSpec extends AbstractFlatSpec { sc.schema.route.table should be ("tablex") sc.schema is record should be (true) - val query = record.as(sc.schema.namespace) + val query = record.as(sc.schema.namespace, Map.empty[String, Any]) query.cql should be("INSERT INTO keyspacex.tablex(id) VALUES(1)") } @@ -60,7 +60,7 @@ class SchemaSpec extends AbstractFlatSpec { sc.schema is record should be (true) sc.query.cql should be ("INSERT INTO keyspacex.tablex(available,name,age) VALUES(?,?,?)") - val query = record.as(sc.schema.namespace) + val query = record.as(sc.schema.namespace, Map.empty[String, Any]) query.cql should be("INSERT INTO keyspacex.tablex(available,name,age) VALUES(false,'user',15)") } diff --git a/src/test/scala/com/tuplejump/kafka/connect/cassandra/TaskConfigSpec.scala b/src/test/scala/com/tuplejump/kafka/connect/cassandra/TaskConfigSpec.scala index 3a20087..cca0ec2 100644 --- a/src/test/scala/com/tuplejump/kafka/connect/cassandra/TaskConfigSpec.scala +++ b/src/test/scala/com/tuplejump/kafka/connect/cassandra/TaskConfigSpec.scala @@ -67,7 +67,8 @@ class TaskConfigSpec extends AbstractSpec { config.source.isEmpty should be (true) val consistency = DefaultSourceConsistency sinkSchemas.forall{ schema => - val sc = SinkConfig(schema, PreparedQuery(schema), WriteOptions(DefaultSourceConsistency)) + val sc = SinkConfig(schema, PreparedQuery(schema), + WriteOptions(DefaultSourceConsistency, DefaultFieldMapping)) config.sink.contains(sc) } should be (true) }