Skip to content

Commit

Permalink
PR comments pt.2 + builds
Browse files Browse the repository at this point in the history
  • Loading branch information
GoMati-MU committed May 24, 2024
1 parent 634953a commit 1c98d81
Show file tree
Hide file tree
Showing 24 changed files with 797 additions and 130 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/java-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ on:
java_modules:
description: "Stream reactor collection of java modules"
value: ${{ jobs.initiate-java-modules.outputs.java_matrix }}
java_test_modules:
description: "Stream reactor collection of java modules for test"
value: ${{ jobs.initiate-java-modules.outputs.java_test_matrix }}

jobs:

Expand All @@ -18,6 +21,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
java_matrix: ${{ steps.java-mods.outputs.java-matrix }}
java_test_matrix: ${{ steps.java-mods.outputs.java-matrix }}
steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
Expand All @@ -27,20 +31,22 @@ jobs:
distribution: 'temurin'
cache: 'gradle'
- name: Generate modules lists
run: cd 'java-connectors' && ./gradlew releaseModuleList
run: cd 'java-connectors' && ./gradlew prepareModuleList
env:
JVM_OPTS: -Xmx512m
- name: Read java modules lists
id: java-mods
run: |
echo "java-matrix=$(cat ./java-connectors/gradle-modules.txt)" >> $GITHUB_OUTPUT
echo "java-test-matrix=$(cat ./java-connectors/gradle-test-modules.txt)" >> $GITHUB_OUTPUT
test:
needs:
- initiate-java-modules
strategy:
matrix:
module: ${{fromJSON(needs.initiate-java-modules.outputs.java_matrix)}}
module: ${{fromJSON(needs.initiate-java-modules.outputs.java_test_matrix)}}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
3 changes: 2 additions & 1 deletion java-connectors/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.gradle
build/
target/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
Expand Down Expand Up @@ -45,4 +46,4 @@ bin/
### Lenses-specific ###
release/
gradle-modules.txt
**/src/main/gen/*
gradle-test-modules.txt
29 changes: 26 additions & 3 deletions java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ allprojects {
jUnitVersion = '5.9.1'
mockitoJupiterVersion = '5.10.0'
apacheToConfluentVersionAxis = ["2.8.1": "6.2.2", "3.3.0": "7.3.1"]
caffeineVersion = '3.1.8'

//Other Manifest Info
mainClassName = ''
Expand Down Expand Up @@ -62,7 +63,8 @@ allprojects {

//tests
testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoJupiterVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: mockitoJupiterVersion
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: mockitoJupiterVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: jUnitVersion
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.25.3'

}
Expand Down Expand Up @@ -92,9 +94,10 @@ allprojects {
}

shadowJar {
def artifactVersion = gitTag?.trim() ? gitTag : project.version

manifest {
attributes("StreamReactor-Version": project.version,
attributes("StreamReactor-Version": artifactVersion,
"Kafka-Version": kafkaVersion,
"Created-By": "Lenses",
"Created-At": new Date().format("YYYYMMDDHHmm"),
Expand All @@ -105,7 +108,7 @@ allprojects {
)
}
configurations = [project.configurations.compileClasspath]
//archiveBaseName = "${project.name}-${project.version}-${kafkaVersion}-all"
archiveFileName = "${project.name}-${artifactVersion}-all.jar"
zip64 true

mergeServiceFiles {
Expand Down Expand Up @@ -166,6 +169,21 @@ task prepareRelease(dependsOn: [collectFatJar]) {
dependsOn subprojects.collectFatJar
}

task testModuleList() {
def nonTestModules = ["java-reactor"]

def modulesFile = new File("gradle-test-modules.txt")
modulesFile.delete()
modulesFile.createNewFile()

def modulesBuilder = new StringBuilder("[")
allprojects.name.stream()
.filter {moduleName -> !nonTestModules.contains(moduleName)}
.forEach {moduleName -> modulesBuilder.append("\"" + moduleName + "\",") }
modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]")
modulesFile.append(modulesBuilder)
}

task releaseModuleList() {
def nonReleaseModules = ["java-reactor", "kafka-connect-cloud-common",
"kafka-connect-common", "kafka-connect-query-language"]
Expand All @@ -181,3 +199,8 @@ task releaseModuleList() {
modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]")
modulesFile.append(modulesBuilder)
}

task prepareModuleList() {
dependsOn testModuleList
dependsOn releaseModuleList
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AzureServiceBusConfigConstants {
public static final String KCQL_DOC =
"KCQL expression describing field selection and data routing to the target.";

public static final String TASK_RECORDS_QUEUE_SIZE = "task.records.queue.size";
public static final String TASK_RECORDS_QUEUE_SIZE = SOURCE_CONNECTOR_PREFIX + "task.records.queue.size";
public static final String TASK_RECORDS_QUEUE_SIZE_DOC = "Task's records queue size.";
public static final int TASK_RECORDS_QUEUE_SIZE_DEFAULT = 20;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,39 @@
*/
public class ServiceBusValueSchemaField {

private static final int NUMBER_OF_FIELDS = 17;
public static final Field DELIVERY_COUNT =
static final Field DELIVERY_COUNT =
new Field(SchemaFieldConstants.DELIVERY_COUNT, 0, Schema.INT64_SCHEMA);
public static final Field ENQUEUED_TIME_UTC =
static final Field ENQUEUED_TIME_UTC =
new Field(SchemaFieldConstants.ENQUEUED_TIME_UTC, 1, Schema.INT64_SCHEMA);
public static final Field CONTENT_TYPE =
static final Field CONTENT_TYPE =
new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.STRING_SCHEMA);
public static final Field LABEL =
static final Field LABEL =
new Field(SchemaFieldConstants.LABEL, 3, Schema.STRING_SCHEMA);
public static final Field CORRELATION_ID =
static final Field CORRELATION_ID =
new Field(SchemaFieldConstants.CORRELATION_ID, 4, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field MESSAGE_PROPERTIES =
static final Field MESSAGE_PROPERTIES =
new Field(SchemaFieldConstants.MESSAGE_PROPERTIES, 5, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field PARTITION_KEY =
static final Field PARTITION_KEY =
new Field(SchemaFieldConstants.PARTITION_KEY, 6, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field REPLY_TO =
static final Field REPLY_TO =
new Field(SchemaFieldConstants.REPLY_TO, 7, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field REPLY_TO_SESSION_ID =
static final Field REPLY_TO_SESSION_ID =
new Field(SchemaFieldConstants.REPLY_TO_SESSION_ID, 8, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field DEAD_LETTER_SOURCE =
static final Field DEAD_LETTER_SOURCE =
new Field(SchemaFieldConstants.DEAD_LETTER_SOURCE, 9, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field TIME_TO_LIVE =
static final Field TIME_TO_LIVE =
new Field(SchemaFieldConstants.TIME_TO_LIVE, 10, Schema.INT64_SCHEMA);
public static final Field LOCKED_UNTIL_UTC =
static final Field LOCKED_UNTIL_UTC =
new Field(SchemaFieldConstants.LOCKED_UNTIL_UTC, 11, Schema.OPTIONAL_INT64_SCHEMA);
public static final Field SEQUENCE_NUMBER =
static final Field SEQUENCE_NUMBER =
new Field(SchemaFieldConstants.SEQUENCE_NUMBER, 12, Schema.OPTIONAL_INT64_SCHEMA);
public static final Field SESSION_ID =
static final Field SESSION_ID =
new Field(SchemaFieldConstants.SESSION_ID, 13, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field LOCK_TOKEN =
static final Field LOCK_TOKEN =
new Field(SchemaFieldConstants.LOCK_TOKEN, 14, Schema.OPTIONAL_STRING_SCHEMA);
public static final Field MESSAGE_BODY =
static final Field MESSAGE_BODY =
new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.BYTES_SCHEMA);
public static final Field GET_TO =
static final Field GET_TO =
new Field(SchemaFieldConstants.GET_TO, 16, Schema.OPTIONAL_STRING_SCHEMA);

private static final List<Field> ALL_FIELDS =
Expand Down Expand Up @@ -87,7 +86,7 @@ public static List<Field> getAllFields() {
return ALL_FIELDS;
}

private class SchemaFieldConstants {
private static class SchemaFieldConstants {

private static final String DELIVERY_COUNT = "deliveryCount";
private static final String ENQUEUED_TIME_UTC = "enqueuedTimeUtc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusSourceConfig;
import io.lenses.streamreactor.connect.azure.servicebus.util.KcqlConfigBusMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
Expand Down Expand Up @@ -60,10 +60,10 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Setting task configurations for {} workers.", maxTasks);
List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);

IntStream.range(0, maxTasks).forEach(task -> taskConfigs.add(configProperties));
return taskConfigs;
return Stream.generate(() -> Map.copyOf(configProperties))
.limit(maxTasks)
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.lenses.streamreactor.connect.azure.servicebus.source;

import static java.util.Optional.ofNullable;

import io.lenses.kcql.Kcql;
import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants;
Expand All @@ -29,7 +27,6 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.OffsetStorageReader;

/**
* Implementation of {@link SourceTask} for Microsoft Azure EventHubs.
Expand All @@ -51,22 +48,18 @@ public String version() {

@Override
public void start(Map<String, String> props) {
int recordsQueueDefaultSize =
int recordsQueueSize =
new AzureServiceBusSourceConfig(props)
.getInt(AzureServiceBusConfigConstants.TASK_RECORDS_QUEUE_SIZE);
String connectionString = props.get(AzureServiceBusConfigConstants.CONNECTION_STRING);
List<Kcql> kcqls =
KcqlConfigBusMapper.mapKcqlsFromConfig(props.get(AzureServiceBusConfigConstants.KCQL_CONFIG));

OffsetStorageReader offsetStorageReader =
ofNullable(this.context).flatMap(
context -> ofNullable(context.offsetStorageReader())).orElseThrow();

ArrayBlockingQueue<ServiceBusMessageHolder> recordsQueue =
new ArrayBlockingQueue<>(recordsQueueDefaultSize);
new ArrayBlockingQueue<>(recordsQueueSize);

TaskToReceiverBridge serviceBusReceiverBridge =
new TaskToReceiverBridge(connectionString, kcqls, recordsQueue, offsetStorageReader);
new TaskToReceiverBridge(connectionString, kcqls, recordsQueue);

initialize(serviceBusReceiverBridge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ public AzureServiceBusPartitionKey(String topic, String partition) {
this.put(BUS_KEY, topic);
this.put(PARTITION_KEY, partition);
}

public String getTopic() {
return get(BUS_KEY);
}

public Integer getPartition() {
return Integer.valueOf(get(PARTITION_KEY));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private static Consumer<ServiceBusReceivedMessage> onSuccessfulMessage(
offer = recordsQueue.offer(serviceBusMessageHolder, FIVE_SECONDS_TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.info("{} has been interrupted on offering", receiverId);
log.warn("{} has been interrupted on offering", receiverId);
}
};
}
Expand Down
Loading

0 comments on commit 1c98d81

Please sign in to comment.