Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventub connector to support multiple kcqls #1108

Merged
7 changes: 4 additions & 3 deletions java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ allprojects {

group = "io.lenses.streamreactor"
version = "1.0-SNAPSHOT"
description = "stream-reactor"

apply plugin: 'java'
apply plugin: 'java-library'
Expand All @@ -30,7 +31,7 @@ allprojects {

//for jar building
rootRelease = "${project.rootDir}/release/"
versionDir = "${rootRelease}/${project.description}-${project.version}-${kafkaVersion}"
versionDir = "${rootRelease}/${project.description}-${project.version}"
confDir = "${versionDir}/conf"
libsDir = "${versionDir}/libs"

Expand Down Expand Up @@ -130,8 +131,8 @@ allprojects {
task fatJar(dependsOn: [test, jar, shadowJar])

task collectFatJar(type: Copy, dependsOn: [fatJar]) {
from("${buildDir}/libs").include("*-all.jar")
into(libsDir)
from("${buildDir}/libs").include("kafka-connect-*-all.jar")
.exclude("*-common-*").into(libsDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigPort;
import io.lenses.kcql.Kcql;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -44,10 +41,6 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Setting task configurations for {} workers.", maxTasks);
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);

Kcql firstKcql = KcqlConfigPort.parseMultipleKcqlStatementsPickingOnlyFirst(
configProperties.get(AzureEventHubsConfigConstants.KCQL_CONFIG));
configProperties.put(AzureEventHubsConfigConstants.KCQL_CONFIG, firstKcql.getQuery());

IntStream.range(0, maxTasks).forEach(task -> taskConfigs.add(configProperties));
return taskConfigs;
}
Expand All @@ -71,4 +64,4 @@ public ConfigDef config() {
public String version() {
return jarManifest.getVersion();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigPort;
import io.lenses.kcql.Kcql;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
Expand Down Expand Up @@ -52,12 +51,13 @@ public void start(Map<String, String> props) {

ArrayBlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue = new ArrayBlockingQueue<>(
RECORDS_QUEUE_DEFAULT_SIZE);
Map<String, String> inputToOutputTopics = KcqlConfigPort.mapInputToOutputsFromConfig(
azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG));
blockingQueueProducerProvider = new BlockingQueueProducerProvider(topicPartitionOffsetProvider);
KafkaByteBlockingQueuedProducer producer = blockingQueueProducerProvider.createProducer(
azureEventHubsSourceConfig, recordsQueue);
String outputTopic = getOutputTopicFromConfig(azureEventHubsSourceConfig);
azureEventHubsSourceConfig, recordsQueue, inputToOutputTopics);
EventHubsKafkaConsumerController kafkaConsumerController = new EventHubsKafkaConsumerController(
producer, recordsQueue, outputTopic);
producer, recordsQueue, inputToOutputTopics);
initialize(kafkaConsumerController, azureEventHubsSourceConfig);
}

Expand Down Expand Up @@ -90,17 +90,4 @@ public void stop() {
ofNullable(eventHubsKafkaConsumerController)
.ifPresent(consumerController -> consumerController.close(closeTimeout));
}

/**
* Returns output topic (specified in config using KCQL).
*
* @param azureEventHubsSourceConfig task configuration
* @return output topic
*/
private String getOutputTopicFromConfig(
AzureEventHubsSourceConfig azureEventHubsSourceConfig) {
Kcql kcql = KcqlConfigPort.parseMultipleKcqlStatementsPickingOnlyFirst(
azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG));
return kcql.getTarget();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants;
import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType.KeyValueTypes;
import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigPort;
import io.lenses.kcql.Kcql;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -36,12 +35,14 @@ public BlockingQueueProducerProvider(TopicPartitionOffsetProvider topicPartition
* Instantiates BlockingQueuedKafkaConsumer from given properties.
*
* @param azureEventHubsSourceConfig Config of Task
* @param recordBlockingQueue BlockingQueue for ConsumerRecords
* @param recordBlockingQueue BlockingQueue for ConsumerRecords
* @param inputToOutputTopics map of input to output topics
* @return BlockingQueuedKafkaConsumer instance.
*/
public KafkaByteBlockingQueuedProducer createProducer(
AzureEventHubsSourceConfig azureEventHubsSourceConfig,
BlockingQueue<ConsumerRecords<byte[], byte[]>> recordBlockingQueue) {
BlockingQueue<ConsumerRecords<byte[], byte[]>> recordBlockingQueue,
Map<String, String> inputToOutputTopics) {
String connectorName = azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.CONNECTOR_NAME);
final String clientId = connectorName + "#" + UUID.randomUUID();
log.info("Attempting to create Client with Id:{}", clientId);
Expand All @@ -61,10 +62,10 @@ public KafkaByteBlockingQueuedProducer createProducer(
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerProperties);

boolean shouldSeekToLatest = shouldConsumerSeekToLatest(azureEventHubsSourceConfig);
String topic = getInputTopicFromConfig(azureEventHubsSourceConfig);
Set<String> inputTopics = inputToOutputTopics.keySet();

return new KafkaByteBlockingQueuedProducer(topicPartitionOffsetProvider, recordBlockingQueue,
kafkaConsumer, keyValueTypes, clientId, topic, shouldSeekToLatest);
kafkaConsumer, keyValueTypes, clientId, inputTopics, shouldSeekToLatest);
}

private boolean shouldConsumerSeekToLatest(AzureEventHubsSourceConfig azureEventHubsSourceConfig) {
Expand All @@ -77,17 +78,4 @@ private boolean shouldConsumerSeekToLatest(AzureEventHubsSourceConfig azureEvent
throw new ConfigException(AzureEventHubsConfigConstants.CONSUMER_OFFSET, seekValue,
CONSUMER_OFFSET_EXCEPTION_MESSAGE);
}

/**
* Returns input topic (specified in KCQL config).
*
* @param azureEventHubsSourceConfig task configuration
* @return input topic
*/
private String getInputTopicFromConfig(
AzureEventHubsSourceConfig azureEventHubsSourceConfig) {
Kcql kcql = KcqlConfigPort.parseMultipleKcqlStatementsPickingOnlyFirst(
azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG));
return kcql.getSource();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -17,33 +18,35 @@
import org.apache.kafka.connect.source.SourceRecord;

/**
* Class is a bridge between EventHub KafkaConsumers and AzureEventHubsSourceTask. It verifies the configuration
* of kafka consumers and instantiates them, then allows AzureEventHubsSourceTask to pull for SourceRecords.
* Class is a bridge between EventHub KafkaConsumers and AzureEventHubsSourceTask. It verifies the
* configuration of kafka consumers and instantiates them, then allows AzureEventHubsSourceTask to
* pull for SourceRecords.
*/
@Slf4j
public class EventHubsKafkaConsumerController {

private final BlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue;
private KafkaByteBlockingQueuedProducer queuedKafkaProducer;
private final String outputTopic;
private final Map<String, String> inputToOutputTopics;

/**
* Constructs EventHubsKafkaConsumerController.
*
* @param queuedKafkaProducer producer to the recordsQueue
* @param recordsQueue queue that contains EventHub records
* @param outputTopic topics to output to
* @param inputToOutputTopics input to output topics
*/
public EventHubsKafkaConsumerController(KafkaByteBlockingQueuedProducer queuedKafkaProducer,
BlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue, String outputTopic) {
BlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue,
Map<String, String> inputToOutputTopics) {
this.recordsQueue = recordsQueue;
this.queuedKafkaProducer = queuedKafkaProducer;
this.outputTopic = outputTopic;
this.inputToOutputTopics = inputToOutputTopics;
}

/**
* This method leverages BlockingQueue mechanism that BlockingQueuedKafkaConsumer puts EventHub records
* into. It tries to poll the queue then returns list of SourceRecords
* This method leverages BlockingQueue mechanism that BlockingQueuedKafkaConsumer puts EventHub
* records into. It tries to poll the queue then returns list of SourceRecords
*
* @param duration how often to poll.
* @return list of SourceRecords (can be empty if it couldn't poll from queue)
Expand All @@ -67,15 +70,18 @@ public List<SourceRecord> poll(Duration duration) throws InterruptedException {
sourceRecords = new ArrayList<>(consumerRecords.count());
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {

String inputTopic = consumerRecord.topic();
AzureTopicPartitionKey azureTopicPartitionKey = new AzureTopicPartitionKey(
consumerRecord.topic(), consumerRecord.partition());
inputTopic, consumerRecord.partition());
AzureOffsetMarker offsetMarker = new AzureOffsetMarker(consumerRecord.offset());

SourceRecord sourceRecord = mapSourceRecordIncludingHeaders(consumerRecord, azureTopicPartitionKey,
offsetMarker, outputTopic, queuedKafkaProducer.getKeyValueTypes().getKeyType().getSchema(),
queuedKafkaProducer.getKeyValueTypes().getValueType().getSchema());
SourceRecord sourceRecord = mapSourceRecordIncludingHeaders(consumerRecord,
azureTopicPartitionKey,
offsetMarker, inputToOutputTopics.get(inputTopic),
queuedKafkaProducer.getKeyValueTypes().getKeyType().getSchema(),
queuedKafkaProducer.getKeyValueTypes().getValueType().getSchema());

sourceRecords.add(sourceRecord);
sourceRecords.add(sourceRecord);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType.KeyValueTypes;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -23,7 +23,7 @@ public class KafkaByteBlockingQueuedProducer implements BlockingQueueProducer {
private final BlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue;
private final Consumer<byte[], byte[]> consumer;
private final String clientId;
private final String topic;
private final Set<String> inputTopics;
private EventhubsPollingRunnable pollingRunnable;
private final boolean shouldSeekToLatest;
@Getter
Expand All @@ -42,18 +42,18 @@ public class KafkaByteBlockingQueuedProducer implements BlockingQueueProducer {
* @param keyValueTypes {@link KeyValueTypes} instance indicating key and value
* types
* @param clientId consumer client id
* @param topic kafka topic to consume from
* @param inputTopics kafka inputTopics to consume from
* @param shouldSeekToLatest informs where should consumer seek when there are no
* offsets committed
*/
public KafkaByteBlockingQueuedProducer(TopicPartitionOffsetProvider topicPartitionOffsetProvider,
BlockingQueue<ConsumerRecords<byte[], byte[]>> recordsQueue, Consumer<byte[], byte[]> consumer,
KeyValueTypes keyValueTypes, String clientId, String topic, boolean shouldSeekToLatest) {
KeyValueTypes keyValueTypes, String clientId, Set<String> inputTopics, boolean shouldSeekToLatest) {
this.topicPartitionOffsetProvider = topicPartitionOffsetProvider;
this.recordsQueue = recordsQueue;
this.consumer = consumer;
this.clientId = clientId;
this.topic = topic;
this.inputTopics = inputTopics;
this.shouldSeekToLatest = shouldSeekToLatest;
this.keyValueTypes = keyValueTypes;

Expand Down Expand Up @@ -82,8 +82,8 @@ private class EventhubsPollingRunnable implements Runnable {
@Override
public void run() {
running.set(true);
log.info("Subscribing to topic: {}", topic);
consumer.subscribe(Collections.singletonList(topic),
log.info("Subscribing to topics: {}", String.join(",", inputTopics));
consumer.subscribe(inputTopics,
new AzureConsumerRebalancerListener(topicPartitionOffsetProvider, consumer, shouldSeekToLatest));
while (running.get()) {
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(DEFAULT_POLL_DURATION);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.lenses.streamreactor.connect.azure.eventhubs.source;

import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Expand All @@ -10,5 +11,6 @@
public interface ProducerProvider<K, V> {

BlockingQueueProducer createProducer(AzureEventHubsSourceConfig azureEventHubsSourceConfig,
BlockingQueue<ConsumerRecords<K, V>> recordBlockingQueue);
BlockingQueue<ConsumerRecords<byte[], byte[]>> recordBlockingQueue,
Map<String, String> inputToOutputTopics);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,56 @@
package io.lenses.streamreactor.connect.azure.eventhubs.util;

import io.lenses.kcql.Kcql;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigException;

/**
* Class that represents methods around KCQL handling.
*/
public class KcqlConfigPort {

public static Kcql parseMultipleKcqlStatementsPickingOnlyFirst(String kcql) {
return Kcql.parseMultiple(kcql).get(0);
private static final String TOPIC_NAME_REGEX = "^[\\w][\\w\\-\\_\\.]*$";
private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(TOPIC_NAME_REGEX);
public static final String TOPIC_NAME_ERROR_MESSAGE =
"%s topic %s, name is not correctly specified (It can contain only letters, numbers and hyphens,"
+ " underscores and dots and has to start with number or letter";

/**
* This method parses KCQL statements and fetches input and output topics checking against
* regex for invalid topic names in input and output.
* @param kcqlString string to parse
* @return map of input to output topic names
*/
public static Map<String, String> mapInputToOutputsFromConfig(String kcqlString) {
List<Kcql> kcqls = Kcql.parseMultiple(kcqlString);
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size());

for (Kcql kcql : kcqls) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();

if (!topicNameMatchesAgainstRegex(inputTopic)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic));
}
if (!topicNameMatchesAgainstRegex(outputTopic)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic));
}
if (inputToOutputTopics.containsKey(inputTopic)) {
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic));
}

inputToOutputTopics.put(inputTopic, outputTopic);
}

return inputToOutputTopics;
}

private static boolean topicNameMatchesAgainstRegex(String topicName) {
final Matcher matcher = TOPIC_NAME_PATTERN.matcher(topicName);
return matcher.matches();
}
}
Loading
Loading