Skip to content

Commit

Permalink
PR comments pt.2
Browse files Browse the repository at this point in the history
  • Loading branch information
GoMati-MU committed May 28, 2024
1 parent 634953a commit a745853
Show file tree
Hide file tree
Showing 22 changed files with 801 additions and 128 deletions.
3 changes: 2 additions & 1 deletion java-connectors/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.gradle
build/
target/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
Expand Down Expand Up @@ -45,4 +46,4 @@ bin/
### Lenses-specific ###
release/
gradle-modules.txt
**/src/main/gen/*
gradle-test-modules.txt
29 changes: 26 additions & 3 deletions java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ allprojects {
jUnitVersion = '5.9.1'
mockitoJupiterVersion = '5.10.0'
apacheToConfluentVersionAxis = ["2.8.1": "6.2.2", "3.3.0": "7.3.1"]
caffeineVersion = '3.1.8'

//Other Manifest Info
mainClassName = ''
Expand Down Expand Up @@ -62,7 +63,8 @@ allprojects {

//tests
testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoJupiterVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: mockitoJupiterVersion
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: mockitoJupiterVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: jUnitVersion
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.25.3'

}
Expand Down Expand Up @@ -92,9 +94,10 @@ allprojects {
}

shadowJar {
def artifactVersion = gitTag?.trim() ? gitTag : project.version

manifest {
attributes("StreamReactor-Version": project.version,
attributes("StreamReactor-Version": artifactVersion,
"Kafka-Version": kafkaVersion,
"Created-By": "Lenses",
"Created-At": new Date().format("YYYYMMDDHHmm"),
Expand All @@ -105,7 +108,7 @@ allprojects {
)
}
configurations = [project.configurations.compileClasspath]
//archiveBaseName = "${project.name}-${project.version}-${kafkaVersion}-all"
archiveFileName = "${project.name}-${artifactVersion}-all.jar"
zip64 true

mergeServiceFiles {
Expand Down Expand Up @@ -166,6 +169,21 @@ task prepareRelease(dependsOn: [collectFatJar]) {
dependsOn subprojects.collectFatJar
}

task testModuleList() {
def nonTestModules = ["java-reactor"]

def modulesFile = new File("gradle-test-modules.txt")
modulesFile.delete()
modulesFile.createNewFile()

def modulesBuilder = new StringBuilder("[")
allprojects.name.stream()
.filter {moduleName -> !nonTestModules.contains(moduleName)}
.forEach {moduleName -> modulesBuilder.append("\"" + moduleName + "\",") }
modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]")
modulesFile.append(modulesBuilder)
}

task releaseModuleList() {
def nonReleaseModules = ["java-reactor", "kafka-connect-cloud-common",
"kafka-connect-common", "kafka-connect-query-language"]
Expand All @@ -181,3 +199,8 @@ task releaseModuleList() {
modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]")
modulesFile.append(modulesBuilder)
}

task prepareModuleList() {
dependsOn testModuleList
dependsOn releaseModuleList
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AzureServiceBusConfigConstants {
public static final String KCQL_DOC =
"KCQL expression describing field selection and data routing to the target.";

public static final String TASK_RECORDS_QUEUE_SIZE = "task.records.queue.size";
public static final String TASK_RECORDS_QUEUE_SIZE = SOURCE_CONNECTOR_PREFIX + "task.records.queue.size";
public static final String TASK_RECORDS_QUEUE_SIZE_DOC = "Task's records queue size.";
public static final int TASK_RECORDS_QUEUE_SIZE_DEFAULT = 20;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,39 @@
*/
public class ServiceBusValueSchemaField {

private static final int NUMBER_OF_FIELDS = 17;
public static final Field DELIVERY_COUNT =
static final Field DELIVERY_COUNT =
new Field(SchemaFieldConstants.DELIVERY_COUNT, 0, Schema.INT64_SCHEMA);
public static final Field ENQUEUED_TIME_UTC =
static final Field ENQUEUED_TIME_UTC =
new Field(SchemaFieldConstants.ENQUEUED_TIME_UTC, 1, Schema.INT64_SCHEMA);
public static final Field CONTENT_TYPE =
static final Field CONTENT_TYPE =
new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.STRING_SCHEMA);
public static final Field LABEL =
static final Field LABEL =
new Field(SchemaFieldConstants.LABEL, 3, Schema.STRING_SCHEMA);
public static final Field CORRELATION_ID =
static final Field CORRELATION_ID =
new Field(SchemaFieldConstants.CORRELATION_ID, 4, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field MESSAGE_PROPERTIES =
static final Field MESSAGE_PROPERTIES =
new Field(SchemaFieldConstants.MESSAGE_PROPERTIES, 5, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field PARTITION_KEY =
static final Field PARTITION_KEY =
new Field(SchemaFieldConstants.PARTITION_KEY, 6, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field REPLY_TO =
static final Field REPLY_TO =
new Field(SchemaFieldConstants.REPLY_TO, 7, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field REPLY_TO_SESSION_ID =
static final Field REPLY_TO_SESSION_ID =
new Field(SchemaFieldConstants.REPLY_TO_SESSION_ID, 8, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field DEAD_LETTER_SOURCE =
static final Field DEAD_LETTER_SOURCE =
new Field(SchemaFieldConstants.DEAD_LETTER_SOURCE, 9, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field TIME_TO_LIVE =
static final Field TIME_TO_LIVE =
new Field(SchemaFieldConstants.TIME_TO_LIVE, 10, Schema.INT64_SCHEMA);
public static final Field LOCKED_UNTIL_UTC =
static final Field LOCKED_UNTIL_UTC =
new Field(SchemaFieldConstants.LOCKED_UNTIL_UTC, 11, Schema.OPTIONAL_INT64_SCHEMA);
public static final Field SEQUENCE_NUMBER =
static final Field SEQUENCE_NUMBER =
new Field(SchemaFieldConstants.SEQUENCE_NUMBER, 12, Schema.OPTIONAL_INT64_SCHEMA);
public static final Field SESSION_ID =
static final Field SESSION_ID =
new Field(SchemaFieldConstants.SESSION_ID, 13, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field LOCK_TOKEN =
static final Field LOCK_TOKEN =
new Field(SchemaFieldConstants.LOCK_TOKEN, 14, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field MESSAGE_BODY =
static final Field MESSAGE_BODY =
new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.BYTES_SCHEMA);
public static final Field GET_TO =
static final Field GET_TO =
new Field(SchemaFieldConstants.GET_TO, 16, Schema.OPTIONAL_STRING_SCHEMA);

private static final List<Field> ALL_FIELDS =
Expand Down Expand Up @@ -87,7 +86,7 @@ public static List<Field> getAllFields() {
return ALL_FIELDS;
}

private class SchemaFieldConstants {
private static class SchemaFieldConstants {

private static final String DELIVERY_COUNT = "deliveryCount";
private static final String ENQUEUED_TIME_UTC = "enqueuedTimeUtc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusSourceConfig;
import io.lenses.streamreactor.connect.azure.servicebus.util.KcqlConfigBusMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
Expand Down Expand Up @@ -60,10 +60,10 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Setting task configurations for {} workers.", maxTasks);
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);

IntStream.range(0, maxTasks).forEach(task -> taskConfigs.add(configProperties));
return taskConfigs;
return Stream.generate(() -> Map.copyOf(configProperties))
.limit(maxTasks)
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.lenses.streamreactor.connect.azure.servicebus.source;

import static java.util.Optional.ofNullable;

import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants;
Expand All @@ -29,7 +27,6 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.OffsetStorageReader;

/**
* Implementation of {@link SourceTask} for Microsoft Azure EventHubs.
Expand All @@ -51,22 +48,18 @@ public String version() {

@Override
public void start(Map<String, String> props) {
int recordsQueueDefaultSize =
int recordsQueueSize =
new AzureServiceBusSourceConfig(props)
.getInt(AzureServiceBusConfigConstants.TASK_RECORDS_QUEUE_SIZE);
String connectionString = props.get(AzureServiceBusConfigConstants.CONNECTION_STRING);
List<Kcql> kcqls =
KcqlConfigBusMapper.mapKcqlsFromConfig(props.get(AzureServiceBusConfigConstants.KCQL_CONFIG));

OffsetStorageReader offsetStorageReader =
ofNullable(this.context).flatMap(
context -> ofNullable(context.offsetStorageReader())).orElseThrow();

ArrayBlockingQueue<ServiceBusMessageHolder> recordsQueue =
new ArrayBlockingQueue<>(recordsQueueDefaultSize);
new ArrayBlockingQueue<>(recordsQueueSize);

TaskToReceiverBridge serviceBusReceiverBridge =
new TaskToReceiverBridge(connectionString, kcqls, recordsQueue, offsetStorageReader);
new TaskToReceiverBridge(connectionString, kcqls, recordsQueue);

initialize(serviceBusReceiverBridge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ public AzureServiceBusPartitionKey(String topic, String partition) {
this.put(BUS_KEY, topic);
this.put(PARTITION_KEY, partition);
}

public String getTopic() {
return get(BUS_KEY);
}

public Integer getPartition() {
return Integer.valueOf(get(PARTITION_KEY));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private static Consumer<ServiceBusReceivedMessage> onSuccessfulMessage(
offer = recordsQueue.offer(serviceBusMessageHolder, FIVE_SECONDS_TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.info("{} has been interrupted on offering", receiverId);
log.warn("{} has been interrupted on offering", receiverId);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.lenses.streamreactor.connect.azure.servicebus.source;

import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants;
import io.lenses.streamreactor.connect.azure.servicebus.util.KcqlConfigBusMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -30,7 +28,6 @@
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.OffsetStorageReader;

/**
* Bridge between Receivers and Connector's Task for Azure Service Bus.
Expand All @@ -39,44 +36,22 @@
public class TaskToReceiverBridge {

private static final int INITIAL_RECORDS_TO_COMMIT_SIZE = 500;
private static final String FACADE_CLASS_SIMPLE_NAME = ServiceBusReceiverFacade.class.getSimpleName();
private final BlockingQueue<ServiceBusMessageHolder> recordsQueue;
private final ServiceBusPartitionOffsetProvider offsetProvider;
private final ExecutorService sentMessagesExecutors;
private Map<String, ServiceBusReceiverFacade> receivers;
private final Map<String, ServiceBusMessageHolder> recordsToCommitMap;

/**
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
* @param properties all properties for {@link org.apache.kafka.connect.source.SourceTask}.
* @param recordsQueue records queue used to store received messages.
* @param offsetStorageReader offset storage reader from Task.
*/
public TaskToReceiverBridge(Map<String, String> properties,
BlockingQueue<ServiceBusMessageHolder> recordsQueue, OffsetStorageReader offsetStorageReader) {
this.recordsQueue = recordsQueue;
this.offsetProvider = new ServiceBusPartitionOffsetProvider(offsetStorageReader);
String connectionString = properties.get(AzureServiceBusConfigConstants.CONNECTION_STRING);
List<Kcql> kcqls =
KcqlConfigBusMapper.mapKcqlsFromConfig(properties.get(AzureServiceBusConfigConstants.KCQL_CONFIG));
sentMessagesExecutors = Executors.newFixedThreadPool(kcqls.size() * 2);
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);

initiateReceivers(recordsQueue, kcqls, connectionString);
}

/**
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
* @param connectionString Service Bus connection string
* @param kcqls list of KCQLs to initiate Receivers for.
* @param recordsQueue records queue used to store received messages.
* @param offsetStorageReader offset storage reader from Task.
* @param connectionString Service Bus connection string
* @param kcqls list of KCQLs to initiate Receivers for.
* @param recordsQueue records queue used to store received messages.
*/
public TaskToReceiverBridge(String connectionString, List<Kcql> kcqls,
BlockingQueue<ServiceBusMessageHolder> recordsQueue, OffsetStorageReader offsetStorageReader) {
BlockingQueue<ServiceBusMessageHolder> recordsQueue) {
this.recordsQueue = recordsQueue;
this.offsetProvider = new ServiceBusPartitionOffsetProvider(offsetStorageReader);
sentMessagesExecutors = Executors.newFixedThreadPool(kcqls.size() * 2);
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);

Expand All @@ -87,15 +62,13 @@ public TaskToReceiverBridge(String connectionString, List<Kcql> kcqls,
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
* @param recordsQueue records queue used to store received messages.
* @param offsetStorageReader offset storage reader from Task.
* @param receivers map of {@link ServiceBusReceiverFacade} receivers.
* @param sentMessagesExecutors {@link ExecutorService} that handles sent messages.
*/
TaskToReceiverBridge(BlockingQueue<ServiceBusMessageHolder> recordsQueue,
OffsetStorageReader offsetStorageReader, Map<String, ServiceBusReceiverFacade> receivers,
Map<String, ServiceBusReceiverFacade> receivers,
ExecutorService sentMessagesExecutors) {
this.recordsQueue = recordsQueue;
this.offsetProvider = new ServiceBusPartitionOffsetProvider(offsetStorageReader);
this.sentMessagesExecutors = sentMessagesExecutors;
this.receivers = receivers;
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);
Expand All @@ -105,8 +78,8 @@ private void initiateReceivers(BlockingQueue<ServiceBusMessageHolder> recordsQue
List<Kcql> kcqls, String connectionString) {
receivers = new ConcurrentHashMap<>(kcqls.size());

kcqls.stream().forEach(kcql -> {
String receiverId = ServiceBusReceiverFacade.class.getSimpleName() + UUID.randomUUID();
kcqls.forEach(kcql -> {
String receiverId = FACADE_CLASS_SIMPLE_NAME + UUID.randomUUID();
receivers.put(receiverId, new ServiceBusReceiverFacade(kcql, recordsQueue, connectionString, receiverId));
});
}
Expand All @@ -120,21 +93,19 @@ public void closeReceivers() {

/**
* Polls for Consumer Records from the queue.
*
*
* @return List of {@link SourceRecord} or empty list if no new messages received.
*/
public List<SourceRecord> poll() {
List<ServiceBusMessageHolder> recordsFromQueue = new ArrayList<>(recordsQueue.size());

recordsQueue.drainTo(recordsFromQueue);

List<SourceRecord> recordsToSend =
recordsFromQueue.stream()
.map(messageHolder -> {
recordsToCommitMap.put(messageHolder.getOriginalRecord().getMessageId(), messageHolder);
return messageHolder.getTranslatedRecord();
}).collect(Collectors.toList());
return recordsToSend;
return recordsFromQueue.stream()
.map(messageHolder -> {
recordsToCommitMap.put(messageHolder.getOriginalRecord().getMessageId(), messageHolder);
return messageHolder.getTranslatedRecord();
}).collect(Collectors.toList());
}

void commitRecordInServiceBus(SourceRecord sourceRecord) {
Expand Down
Loading

0 comments on commit a745853

Please sign in to comment.