diff --git a/docs/kafka.md b/docs/kafka.md index 5152cc5..06715c1 100644 --- a/docs/kafka.md +++ b/docs/kafka.md @@ -33,6 +33,7 @@ These Kafka-specific properties, if used, may be specified either at the global |`topicPattern`|A regular expression used to match Kafka topics to dataSource configurations. See "Matching Topics to Data Sources" for details.|{match nothing}, must be provided| |`topicPattern.priority`|If multiple topicPatterns match the same topic name, the highest priority dataSource configuration will be used. A higher number indicates a higher priority. See "Matching Topics to Data Sources" for details.|1| |`useTopicAsDataSource`|Use the Kafka topic as the dataSource name instead of the one provided in the configuration file. Useful when combined with a topicPattern that matches more than one Kafka topic. See "Matching Topics to Data Sources" for details.|false| +|`useInputTopicAsDecodeTopic`|Use the topic which used by kafka consumer as the avro stream decoder's topic. Useful when consume multiply topic with different avro schema.|true| |`reportDropsAsExceptions`|Whether or not dropped messages will cause an exception and terminate the application.|false| ### Running diff --git a/kafka/src/main/java/com/metamx/tranquility/kafka/model/PropertiesBasedKafkaConfig.java b/kafka/src/main/java/com/metamx/tranquility/kafka/model/PropertiesBasedKafkaConfig.java index 9dd59b6..b530dfe 100644 --- a/kafka/src/main/java/com/metamx/tranquility/kafka/model/PropertiesBasedKafkaConfig.java +++ b/kafka/src/main/java/com/metamx/tranquility/kafka/model/PropertiesBasedKafkaConfig.java @@ -60,6 +60,10 @@ public PropertiesBasedKafkaConfig() @Default("false") public abstract Boolean useTopicAsDataSource(); + @Config("useInputTopicAsDecodeTopic") + @Default("false") + public abstract Boolean useInputTopicAsDecodeTopic(); + @Config("topicPattern.priority") @Default("1") public abstract Integer getTopicPatternPriority(); diff --git a/kafka/src/main/java/com/metamx/tranquility/kafka/writer/WriterController.java b/kafka/src/main/java/com/metamx/tranquility/kafka/writer/WriterController.java index 83c0c90..bd1ad7c 100644 --- a/kafka/src/main/java/com/metamx/tranquility/kafka/writer/WriterController.java +++ b/kafka/src/main/java/com/metamx/tranquility/kafka/writer/WriterController.java @@ -24,6 +24,7 @@ import com.metamx.tranquility.config.DataSourceConfig; import com.metamx.tranquility.finagle.FinagleRegistry; import com.metamx.tranquility.finagle.FinagleRegistryConfig; +import com.metamx.tranquility.kafka.KafkaBeamUtils; import com.metamx.tranquility.kafka.model.MessageCounters; import com.metamx.tranquility.kafka.model.PropertiesBasedKafkaConfig; import org.apache.curator.RetryPolicy; @@ -163,7 +164,9 @@ finagleKey, new FinagleRegistry( ) ); } - + if (dataSourceConfig.propertiesBasedConfig().useInputTopicAsDecodeTopic()) { + dataSourceConfig = KafkaBeamUtils.useInputTopicAsDecodeTopic(topic, dataSourceConfig); + } return new TranquilityEventWriter( topic, dataSourceConfig, diff --git a/kafka/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala b/kafka/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala index ac5cb3d..c246fb0 100644 --- a/kafka/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala +++ b/kafka/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala @@ -19,6 +19,7 @@ package com.metamx.tranquility.kafka +import com.metamx.common.scala.untyped.Dict import com.metamx.tranquility.config.DataSourceConfig import com.metamx.tranquility.druid.DruidBeams import com.metamx.tranquility.druid.DruidLocation @@ -28,15 +29,13 @@ import com.metamx.tranquility.tranquilizer.Tranquilizer import org.apache.curator.framework.CuratorFramework import scala.reflect.runtime.universe.typeTag -object KafkaBeamUtils -{ +object KafkaBeamUtils { def createTranquilizer( - topic: String, - config: DataSourceConfig[PropertiesBasedKafkaConfig], - curator: CuratorFramework, - finagleRegistry: FinagleRegistry - ): Tranquilizer[Array[Byte]] = - { + topic: String, + config: DataSourceConfig[PropertiesBasedKafkaConfig], + curator: CuratorFramework, + finagleRegistry: FinagleRegistry + ): Tranquilizer[Array[Byte]] = { DruidBeams.fromConfig(config, typeTag[Array[Byte]]) .location( DruidLocation.create( @@ -48,4 +47,17 @@ object KafkaBeamUtils .finagleRegistry(finagleRegistry) .buildTranquilizer(config.tranquilizerBuilder()) } + + def useInputTopicAsDecodeTopic(topic: String, config: DataSourceConfig[PropertiesBasedKafkaConfig]): DataSourceConfig[PropertiesBasedKafkaConfig] = { + val dataSchema = config.specMap.get("dataSchema").get.asInstanceOf[Dict] + val parser = dataSchema.get("parser").get.asInstanceOf[Dict] + if ("avro_stream".equals(parser.get("type").toString)) { + val avroBytesDecoder = parser.get("avroBytesDecoder").get.asInstanceOf[Dict] + val subjectAndIdConverter = avroBytesDecoder.get("subjectAndIdConverter").get.asInstanceOf[Dict] + val map = config.specMap.updated("dataSchema", dataSchema.updated("parser", parser.updated("avroBytesDecoder", avroBytesDecoder.updated("subjectAndIdConverter", subjectAndIdConverter.updated("topic", topic))))) + config.copy(config.dataSource, config.propertiesBasedConfig, map) + } else { + config + } + } }