Skip to content

Commit

Permalink
PR comments pt.3
Browse files Browse the repository at this point in the history
  • Loading branch information
GoMati-MU committed May 29, 2024
1 parent 2c54bf2 commit d373f76
Show file tree
Hide file tree
Showing 20 changed files with 183 additions and 139 deletions.
20 changes: 0 additions & 20 deletions java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,6 @@ task prepareRelease(dependsOn: [collectFatJar]) {
dependsOn subprojects.collectFatJar
}

task testModuleList() {
def nonTestModules = ["java-reactor"]

def modulesFile = new File("gradle-test-modules.txt")
modulesFile.delete()
modulesFile.createNewFile()

def modulesBuilder = new StringBuilder("[")
allprojects.name.stream()
.filter {moduleName -> !nonTestModules.contains(moduleName)}
.forEach {moduleName -> modulesBuilder.append("\"" + moduleName + "\",") }
modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]")
modulesFile.append(modulesBuilder)
}

task releaseModuleList() {
def nonReleaseModules = ["java-reactor", "kafka-connect-cloud-common",
"kafka-connect-common", "kafka-connect-query-language"]
Expand All @@ -199,8 +184,3 @@ task releaseModuleList() {
modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]")
modulesFile.append(modulesBuilder)
}

task prepareModuleList() {
dependsOn testModuleList
dependsOn releaseModuleList
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class AzureEventHubsSourceConnector extends SourceConnector {

private final JarManifest jarManifest =
new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation());
JarManifest.produceFromClass(getClass());
private Map<String, String> configProperties;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class AzureEventHubsSourceTask extends SourceTask {
private BlockingQueueProducerProvider blockingQueueProducerProvider;

public AzureEventHubsSourceTask() {
jarManifest = new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation());
jarManifest = JarManifest.produceFromClass(getClass());
}

public AzureEventHubsSourceTask(JarManifest jarManifest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
Expand All @@ -37,7 +37,7 @@
public class AzureServiceBusSourceConnector extends SourceConnector {

private final JarManifest jarManifest =
new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation());
JarManifest.produceFromClass(getClass());
private Map<String, String> configProperties;

@Override
Expand All @@ -60,9 +60,10 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Setting task configurations for {} workers.", maxTasks);
Map<String, String> immutableProps = Map.copyOf(configProperties);

return Stream.generate(() -> Map.copyOf(configProperties))
.limit(maxTasks)
return IntStream.range(0, maxTasks)
.mapToObj(i -> immutableProps)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -38,7 +39,7 @@ public class AzureServiceBusSourceTask extends SourceTask {
private TaskToReceiverBridge taskToReceiverBridge;

public AzureServiceBusSourceTask() {
this.jarManifest = new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation());
this.jarManifest = JarManifest.produceFromClass(getClass());
}

@Override
Expand All @@ -55,17 +56,20 @@ public void start(Map<String, String> props) {
List<Kcql> kcqls =
KcqlConfigBusMapper.mapKcqlsFromConfig(props.get(AzureServiceBusConfigConstants.KCQL_CONFIG));

ArrayBlockingQueue<ServiceBusMessageHolder> recordsQueue =
final ArrayBlockingQueue<ServiceBusMessageHolder> recordsQueue =
new ArrayBlockingQueue<>(recordsQueueSize);

final Map<String, ServiceBusReceiverFacade> receiversMap =
ServiceBusReceiverFacadeInitializer.initializeReceiverFacades(recordsQueue, kcqls, connectionString);

TaskToReceiverBridge serviceBusReceiverBridge =
new TaskToReceiverBridge(connectionString, kcqls, recordsQueue);
new TaskToReceiverBridge(recordsQueue, receiversMap, Executors.newFixedThreadPool(kcqls.size() * 2));

initialize(serviceBusReceiverBridge);
}

void initialize(TaskToReceiverBridge serviceBusReceiverFacade) {
taskToReceiverBridge = serviceBusReceiverFacade;
void initialize(TaskToReceiverBridge taskToReceiverBridge) {
this.taskToReceiverBridge = taskToReceiverBridge;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.azure.servicebus.source;

import io.lenses.kcql.Kcql;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Class that initializes {@link ServiceBusReceiverFacade}s.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ServiceBusReceiverFacadeInitializer {

private static final String FACADE_CLASS_SIMPLE_NAME = ServiceBusReceiverFacade.class.getSimpleName();

/**
* Initializes map of {@link ServiceBusReceiverFacade}s from given input.
*
* @param recordsQueue Queue that receiver can write records to.
* @param kcqls {@link Kcql}s with mappings.
* @param connectionString ServiceBus connection string.
* @return map of receiverIDs to {@link ServiceBusReceiverFacade} object.
*/
static Map<String, ServiceBusReceiverFacade> initializeReceiverFacades(
BlockingQueue<ServiceBusMessageHolder> recordsQueue,
List<Kcql> kcqls,
String connectionString
) {
return kcqls.stream()
.map(kcql -> new ServiceBusReceiverFacade(kcql, recordsQueue, connectionString,
FACADE_CLASS_SIMPLE_NAME + UUID.randomUUID()))
.collect(Collectors.toMap(ServiceBusReceiverFacade::getReceiverId, e -> e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package io.lenses.streamreactor.connect.azure.servicebus.source;

import io.lenses.kcql.Kcql;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,28 +33,11 @@
public class TaskToReceiverBridge {

private static final int INITIAL_RECORDS_TO_COMMIT_SIZE = 500;
private static final String FACADE_CLASS_SIMPLE_NAME = ServiceBusReceiverFacade.class.getSimpleName();
private final BlockingQueue<ServiceBusMessageHolder> recordsQueue;
private final ExecutorService sentMessagesExecutors;
private Map<String, ServiceBusReceiverFacade> receivers;
private final Map<String, ServiceBusReceiverFacade> receivers;
private final Map<String, ServiceBusMessageHolder> recordsToCommitMap;

/**
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
* @param connectionString Service Bus connection string
* @param kcqls list of KCQLs to initiate Receivers for.
* @param recordsQueue records queue used to store received messages.
*/
public TaskToReceiverBridge(String connectionString, List<Kcql> kcqls,
BlockingQueue<ServiceBusMessageHolder> recordsQueue) {
this.recordsQueue = recordsQueue;
sentMessagesExecutors = Executors.newFixedThreadPool(kcqls.size() * 2);
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);

initiateReceivers(recordsQueue, kcqls, connectionString);
}

/**
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
Expand All @@ -74,16 +54,6 @@ public TaskToReceiverBridge(String connectionString, List<Kcql> kcqls,
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);
}

private void initiateReceivers(BlockingQueue<ServiceBusMessageHolder> recordsQueue,
List<Kcql> kcqls, String connectionString) {
receivers = new ConcurrentHashMap<>(kcqls.size());

kcqls.forEach(kcql -> {
String receiverId = FACADE_CLASS_SIMPLE_NAME + UUID.randomUUID();
receivers.put(receiverId, new ServiceBusReceiverFacade(kcql, recordsQueue, connectionString, receiverId));
});
}

/**
* Unsubscribes from all subscriptions and closes the Receivers.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,38 +62,52 @@ public static List<Kcql> mapKcqlsFromConfig(String kcqlString) {
Map<String, String> inputToOutputTopics = new HashMap<>(kcqls.size());

for (Kcql kcql : kcqls) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();
validateTopicNames(kcql);
validateTopicMappings(inputToOutputTopics, kcql);
validateKcqlProperties(kcql);

if (!azureNameMatchesAgainstRegex(inputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic));
}
if (!azureNameMatchesAgainstRegex(outputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic));
}
if (inputToOutputTopics.containsKey(inputTopic)) {
throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic));
}
if (inputToOutputTopics.containsValue(outputTopic)) {
throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic));
}
inputToOutputTopics.put(kcql.getSource(), kcql.getTarget());
}

List<ServiceBusKcqlProperties> notSatisfiedProperties = checkForNecessaryKcqlProperties(kcql);
if (!notSatisfiedProperties.isEmpty()) {
String missingPropertiesError =
notSatisfiedProperties.stream()
.map(ServiceBusKcqlProperties::getPropertyName)
.collect(Collectors.joining(","));
throw new ConfigException(
String.format("Following non-optional properties missing in KCQL: %s", missingPropertiesError));
}
return kcqls;
}

private static void validateTopicNames(Kcql kcql) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();

checkForValidPropertyValues(kcql.getProperties());
if (!azureNameMatchesAgainstRegex(inputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic));
}
if (!azureNameMatchesAgainstRegex(outputTopic, MAX_BUS_NAME_LENGTH)) {
throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic));
}
}

inputToOutputTopics.put(inputTopic, outputTopic);
public static void validateTopicMappings(Map<String, String> inputToOutputTopics, Kcql kcql) {
String inputTopic = kcql.getSource();
String outputTopic = kcql.getTarget();

if (inputToOutputTopics.containsKey(inputTopic)) {
throw new ConfigException(String.format("Input '%s' cannot be mapped twice.", inputTopic));
}
if (inputToOutputTopics.containsValue(outputTopic)) {
throw new ConfigException(String.format("Output '%s' cannot be mapped twice.", outputTopic));
}
}

return kcqls;
public static void validateKcqlProperties(Kcql kcql) {
List<ServiceBusKcqlProperties> notSatisfiedProperties = checkForNecessaryKcqlProperties(kcql);
if (!notSatisfiedProperties.isEmpty()) {
String missingPropertiesError =
notSatisfiedProperties.stream()
.map(ServiceBusKcqlProperties::getPropertyName)
.collect(Collectors.joining(","));
throw new ConfigException(String.format("Following non-optional properties are missing in KCQL: %s",
missingPropertiesError));
}

checkForValidPropertyValues(kcql.getProperties());
}

private static void checkForValidPropertyValues(Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ void getOffsetShouldReturnEmptyOptionalIfOffsetNotPresent() {

//then
verify(offsetStorageReader).offset(any(AzureServiceBusPartitionKey.class));
assertThat(offset).isNotNull();
assertThat(offset.isEmpty()).isTrue();
assertThat(offset).isEmpty();
}

@Test
Expand All @@ -76,8 +75,6 @@ void getOffsetShouldReturnEmptyOptionalOffsetIfPresent() {

//then
verify(offsetStorageReader).offset(any(AzureServiceBusPartitionKey.class));
assertThat(offset).isNotNull();
assertThat(offset.isEmpty()).isFalse();
assertThat(offset.get().getOffsetValue()).isEqualTo(exampleOffset);
assertThat(offset).contains(new AzureServiceBusOffsetMarker(exampleOffset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package io.lenses.streamreactor.connect.azure.servicebus.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusSourceConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -69,7 +69,7 @@ void taskClassShouldReturnTaskClass() {
Class<? extends Task> taskClass = testObj.taskClass();

//then
AzureServiceBusSourceTask.class.equals(taskClass);
assertEquals(AzureServiceBusSourceTask.class, taskClass);
}

@Test
Expand All @@ -94,7 +94,7 @@ void configShouldReturnConfigClass() {
ConfigDef configDef = testObj.config();

//then
AzureServiceBusSourceConfig.getConfigDefinition().equals(configDef);
assertEquals(AzureServiceBusSourceConfig.getConfigDefinition(), configDef);
}

@Test
Expand All @@ -110,12 +110,10 @@ void versionShouldReturnJarManifestsVersion() {
}

private Map<String, String> buildValidProperties() {
HashMap<String, String> propertyMap = new HashMap<>();

propertyMap.put(AzureServiceBusConfigConstants.CONNECTOR_NAME, CONFIG_CONNECTOR_NAME);
propertyMap.put(AzureServiceBusConfigConstants.CONNECTION_STRING, EMPTY_STRING);
propertyMap.put(AzureServiceBusConfigConstants.KCQL_CONFIG, VALID_KCQL);

return propertyMap;
return Map.of(
AzureServiceBusConfigConstants.CONNECTOR_NAME, CONFIG_CONNECTOR_NAME,
AzureServiceBusConfigConstants.CONNECTION_STRING, EMPTY_STRING,
AzureServiceBusConfigConstants.KCQL_CONFIG, VALID_KCQL
);
}
}
Loading

0 comments on commit d373f76

Please sign in to comment.