diff --git a/java-connectors/.gitignore b/java-connectors/.gitignore index 16453db46..2b169201d 100644 --- a/java-connectors/.gitignore +++ b/java-connectors/.gitignore @@ -1,5 +1,6 @@ .gradle build/ +target/ !gradle/wrapper/gradle-wrapper.jar !**/src/main/**/build/ !**/src/test/**/build/ @@ -45,4 +46,4 @@ bin/ ### Lenses-specific ### release/ gradle-modules.txt -**/src/main/gen/* +gradle-test-modules.txt diff --git a/java-connectors/build.gradle b/java-connectors/build.gradle index 7aa776915..c8270c840 100644 --- a/java-connectors/build.gradle +++ b/java-connectors/build.gradle @@ -29,6 +29,7 @@ allprojects { jUnitVersion = '5.9.1' mockitoJupiterVersion = '5.10.0' apacheToConfluentVersionAxis = ["2.8.1": "6.2.2", "3.3.0": "7.3.1"] + caffeineVersion = '3.1.8' //Other Manifest Info mainClassName = '' @@ -62,7 +63,8 @@ allprojects { //tests testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoJupiterVersion - testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: mockitoJupiterVersion + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: mockitoJupiterVersion + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: jUnitVersion testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.25.3' } @@ -92,9 +94,10 @@ allprojects { } shadowJar { + def artifactVersion = gitTag?.trim() ? gitTag : project.version manifest { - attributes("StreamReactor-Version": project.version, + attributes("StreamReactor-Version": artifactVersion, "Kafka-Version": kafkaVersion, "Created-By": "Lenses", "Created-At": new Date().format("YYYYMMDDHHmm"), @@ -105,7 +108,7 @@ allprojects { ) } configurations = [project.configurations.compileClasspath] - //archiveBaseName = "${project.name}-${project.version}-${kafkaVersion}-all" + archiveFileName = "${project.name}-${artifactVersion}-all.jar" zip64 true mergeServiceFiles { @@ -166,6 +169,21 @@ task prepareRelease(dependsOn: [collectFatJar]) { dependsOn subprojects.collectFatJar } +task testModuleList() { + def nonTestModules = ["java-reactor"] + + def modulesFile = new File("gradle-test-modules.txt") + modulesFile.delete() + modulesFile.createNewFile() + + def modulesBuilder = new StringBuilder("[") + allprojects.name.stream() + .filter {moduleName -> !nonTestModules.contains(moduleName)} + .forEach {moduleName -> modulesBuilder.append("\"" + moduleName + "\",") } + modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]") + modulesFile.append(modulesBuilder) +} + task releaseModuleList() { def nonReleaseModules = ["java-reactor", "kafka-connect-cloud-common", "kafka-connect-common", "kafka-connect-query-language"] @@ -181,3 +199,8 @@ task releaseModuleList() { modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]") modulesFile.append(modulesBuilder) } + +task prepareModuleList() { + dependsOn testModuleList + dependsOn releaseModuleList +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/config/AzureServiceBusConfigConstants.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/config/AzureServiceBusConfigConstants.java index de56feb5c..a758f628b 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/config/AzureServiceBusConfigConstants.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/config/AzureServiceBusConfigConstants.java @@ -35,7 +35,7 @@ public class AzureServiceBusConfigConstants { public static final String KCQL_DOC = "KCQL expression describing field selection and data routing to the target."; - public static final String TASK_RECORDS_QUEUE_SIZE = "task.records.queue.size"; + public static final String TASK_RECORDS_QUEUE_SIZE = SOURCE_CONNECTOR_PREFIX + "task.records.queue.size"; public static final String TASK_RECORDS_QUEUE_SIZE_DOC = "Task's records queue size."; public static final int TASK_RECORDS_QUEUE_SIZE_DEFAULT = 20; diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java index b1fc20736..8bb0f7309 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/mapping/ServiceBusValueSchemaField.java @@ -24,40 +24,39 @@ */ public class ServiceBusValueSchemaField { - private static final int NUMBER_OF_FIELDS = 17; - public static final Field DELIVERY_COUNT = + static final Field DELIVERY_COUNT = new Field(SchemaFieldConstants.DELIVERY_COUNT, 0, Schema.INT64_SCHEMA); - public static final Field ENQUEUED_TIME_UTC = + static final Field ENQUEUED_TIME_UTC = new Field(SchemaFieldConstants.ENQUEUED_TIME_UTC, 1, Schema.INT64_SCHEMA); - public static final Field CONTENT_TYPE = + static final Field CONTENT_TYPE = new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.STRING_SCHEMA); - public static final Field LABEL = + static final Field LABEL = new Field(SchemaFieldConstants.LABEL, 3, Schema.STRING_SCHEMA); - public static final Field CORRELATION_ID = + static final Field CORRELATION_ID = new Field(SchemaFieldConstants.CORRELATION_ID, 4, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field MESSAGE_PROPERTIES = + static final Field MESSAGE_PROPERTIES = new Field(SchemaFieldConstants.MESSAGE_PROPERTIES, 5, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field PARTITION_KEY = + static final Field PARTITION_KEY = new Field(SchemaFieldConstants.PARTITION_KEY, 6, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field REPLY_TO = + static final Field REPLY_TO = new Field(SchemaFieldConstants.REPLY_TO, 7, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field REPLY_TO_SESSION_ID = + static final Field REPLY_TO_SESSION_ID = new Field(SchemaFieldConstants.REPLY_TO_SESSION_ID, 8, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field DEAD_LETTER_SOURCE = + static final Field DEAD_LETTER_SOURCE = new Field(SchemaFieldConstants.DEAD_LETTER_SOURCE, 9, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field TIME_TO_LIVE = + static final Field TIME_TO_LIVE = new Field(SchemaFieldConstants.TIME_TO_LIVE, 10, Schema.INT64_SCHEMA); - public static final Field LOCKED_UNTIL_UTC = + static final Field LOCKED_UNTIL_UTC = new Field(SchemaFieldConstants.LOCKED_UNTIL_UTC, 11, Schema.OPTIONAL_INT64_SCHEMA); - public static final Field SEQUENCE_NUMBER = + static final Field SEQUENCE_NUMBER = new Field(SchemaFieldConstants.SEQUENCE_NUMBER, 12, Schema.OPTIONAL_INT64_SCHEMA); - public static final Field SESSION_ID = + static final Field SESSION_ID = new Field(SchemaFieldConstants.SESSION_ID, 13, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field LOCK_TOKEN = + static final Field LOCK_TOKEN = new Field(SchemaFieldConstants.LOCK_TOKEN, 14, Schema.OPTIONAL_STRING_SCHEMA); - public static final Field MESSAGE_BODY = + static final Field MESSAGE_BODY = new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.BYTES_SCHEMA); - public static final Field GET_TO = + static final Field GET_TO = new Field(SchemaFieldConstants.GET_TO, 16, Schema.OPTIONAL_STRING_SCHEMA); private static final List ALL_FIELDS = @@ -87,7 +86,7 @@ public static List getAllFields() { return ALL_FIELDS; } - private class SchemaFieldConstants { + private static class SchemaFieldConstants { private static final String DELIVERY_COUNT = "deliveryCount"; private static final String ENQUEUED_TIME_UTC = "enqueuedTimeUtc"; diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java index 138d7fb30..ef37d0edd 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceConnector.java @@ -21,10 +21,10 @@ import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants; import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusSourceConfig; import io.lenses.streamreactor.connect.azure.servicebus.util.KcqlConfigBusMapper; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.IntStream; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; @@ -60,10 +60,10 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { log.info("Setting task configurations for {} workers.", maxTasks); - List> taskConfigs = new ArrayList<>(maxTasks); - IntStream.range(0, maxTasks).forEach(task -> taskConfigs.add(configProperties)); - return taskConfigs; + return Stream.generate(() -> Map.copyOf(configProperties)) + .limit(maxTasks) + .collect(Collectors.toList()); } @Override diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceTask.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceTask.java index 1ae1c38c3..25e1e56be 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceTask.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/AzureServiceBusSourceTask.java @@ -15,8 +15,6 @@ */ package io.lenses.streamreactor.connect.azure.servicebus.source; -import static java.util.Optional.ofNullable; - import io.lenses.kcql.Kcql; import io.lenses.streamreactor.common.util.JarManifest; import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants; @@ -29,7 +27,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; -import org.apache.kafka.connect.storage.OffsetStorageReader; /** * Implementation of {@link SourceTask} for Microsoft Azure EventHubs. @@ -51,22 +48,18 @@ public String version() { @Override public void start(Map props) { - int recordsQueueDefaultSize = + int recordsQueueSize = new AzureServiceBusSourceConfig(props) .getInt(AzureServiceBusConfigConstants.TASK_RECORDS_QUEUE_SIZE); String connectionString = props.get(AzureServiceBusConfigConstants.CONNECTION_STRING); List kcqls = KcqlConfigBusMapper.mapKcqlsFromConfig(props.get(AzureServiceBusConfigConstants.KCQL_CONFIG)); - OffsetStorageReader offsetStorageReader = - ofNullable(this.context).flatMap( - context -> ofNullable(context.offsetStorageReader())).orElseThrow(); - ArrayBlockingQueue recordsQueue = - new ArrayBlockingQueue<>(recordsQueueDefaultSize); + new ArrayBlockingQueue<>(recordsQueueSize); TaskToReceiverBridge serviceBusReceiverBridge = - new TaskToReceiverBridge(connectionString, kcqls, recordsQueue, offsetStorageReader); + new TaskToReceiverBridge(connectionString, kcqls, recordsQueue); initialize(serviceBusReceiverBridge); } diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusPartitionOffsetProvider.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusPartitionOffsetProvider.java index f561d985e..240fe3555 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusPartitionOffsetProvider.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusPartitionOffsetProvider.java @@ -65,14 +65,6 @@ public AzureServiceBusPartitionKey(String topic, String partition) { this.put(BUS_KEY, topic); this.put(PARTITION_KEY, partition); } - - public String getTopic() { - return get(BUS_KEY); - } - - public Integer getPartition() { - return Integer.valueOf(get(PARTITION_KEY)); - } } /** diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusReceiverFacade.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusReceiverFacade.java index 3621a9eba..8b420b7a6 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusReceiverFacade.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/ServiceBusReceiverFacade.java @@ -149,7 +149,7 @@ private static Consumer onSuccessfulMessage( offer = recordsQueue.offer(serviceBusMessageHolder, FIVE_SECONDS_TIMEOUT, TimeUnit.SECONDS); } } catch (InterruptedException e) { - log.info("{} has been interrupted on offering", receiverId); + log.warn("{} has been interrupted on offering", receiverId); } }; } diff --git a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java index b6662328e..ec47ed124 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/main/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridge.java @@ -16,8 +16,6 @@ package io.lenses.streamreactor.connect.azure.servicebus.source; import io.lenses.kcql.Kcql; -import io.lenses.streamreactor.connect.azure.servicebus.config.AzureServiceBusConfigConstants; -import io.lenses.streamreactor.connect.azure.servicebus.util.KcqlConfigBusMapper; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -30,7 +28,6 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.OffsetStorageReader; /** * Bridge between Receivers and Connector's Task for Azure Service Bus. @@ -39,44 +36,22 @@ public class TaskToReceiverBridge { private static final int INITIAL_RECORDS_TO_COMMIT_SIZE = 500; + private static final String FACADE_CLASS_SIMPLE_NAME = ServiceBusReceiverFacade.class.getSimpleName(); private final BlockingQueue recordsQueue; - private final ServiceBusPartitionOffsetProvider offsetProvider; private final ExecutorService sentMessagesExecutors; private Map receivers; private final Map recordsToCommitMap; - /** - * Creates Bridge between Receivers and Connector's Task for Azure Service Bus. - * - * @param properties all properties for {@link org.apache.kafka.connect.source.SourceTask}. - * @param recordsQueue records queue used to store received messages. - * @param offsetStorageReader offset storage reader from Task. - */ - public TaskToReceiverBridge(Map properties, - BlockingQueue recordsQueue, OffsetStorageReader offsetStorageReader) { - this.recordsQueue = recordsQueue; - this.offsetProvider = new ServiceBusPartitionOffsetProvider(offsetStorageReader); - String connectionString = properties.get(AzureServiceBusConfigConstants.CONNECTION_STRING); - List kcqls = - KcqlConfigBusMapper.mapKcqlsFromConfig(properties.get(AzureServiceBusConfigConstants.KCQL_CONFIG)); - sentMessagesExecutors = Executors.newFixedThreadPool(kcqls.size() * 2); - recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE); - - initiateReceivers(recordsQueue, kcqls, connectionString); - } - /** * Creates Bridge between Receivers and Connector's Task for Azure Service Bus. * - * @param connectionString Service Bus connection string - * @param kcqls list of KCQLs to initiate Receivers for. - * @param recordsQueue records queue used to store received messages. - * @param offsetStorageReader offset storage reader from Task. + * @param connectionString Service Bus connection string + * @param kcqls list of KCQLs to initiate Receivers for. + * @param recordsQueue records queue used to store received messages. */ public TaskToReceiverBridge(String connectionString, List kcqls, - BlockingQueue recordsQueue, OffsetStorageReader offsetStorageReader) { + BlockingQueue recordsQueue) { this.recordsQueue = recordsQueue; - this.offsetProvider = new ServiceBusPartitionOffsetProvider(offsetStorageReader); sentMessagesExecutors = Executors.newFixedThreadPool(kcqls.size() * 2); recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE); @@ -87,15 +62,13 @@ public TaskToReceiverBridge(String connectionString, List kcqls, * Creates Bridge between Receivers and Connector's Task for Azure Service Bus. * * @param recordsQueue records queue used to store received messages. - * @param offsetStorageReader offset storage reader from Task. * @param receivers map of {@link ServiceBusReceiverFacade} receivers. * @param sentMessagesExecutors {@link ExecutorService} that handles sent messages. */ TaskToReceiverBridge(BlockingQueue recordsQueue, - OffsetStorageReader offsetStorageReader, Map receivers, + Map receivers, ExecutorService sentMessagesExecutors) { this.recordsQueue = recordsQueue; - this.offsetProvider = new ServiceBusPartitionOffsetProvider(offsetStorageReader); this.sentMessagesExecutors = sentMessagesExecutors; this.receivers = receivers; recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE); @@ -105,8 +78,8 @@ private void initiateReceivers(BlockingQueue recordsQue List kcqls, String connectionString) { receivers = new ConcurrentHashMap<>(kcqls.size()); - kcqls.stream().forEach(kcql -> { - String receiverId = ServiceBusReceiverFacade.class.getSimpleName() + UUID.randomUUID(); + kcqls.forEach(kcql -> { + String receiverId = FACADE_CLASS_SIMPLE_NAME + UUID.randomUUID(); receivers.put(receiverId, new ServiceBusReceiverFacade(kcql, recordsQueue, connectionString, receiverId)); }); } @@ -120,7 +93,7 @@ public void closeReceivers() { /** * Polls for Consumer Records from the queue. - * + * * @return List of {@link SourceRecord} or empty list if no new messages received. */ public List poll() { @@ -128,13 +101,11 @@ public List poll() { recordsQueue.drainTo(recordsFromQueue); - List recordsToSend = - recordsFromQueue.stream() - .map(messageHolder -> { - recordsToCommitMap.put(messageHolder.getOriginalRecord().getMessageId(), messageHolder); - return messageHolder.getTranslatedRecord(); - }).collect(Collectors.toList()); - return recordsToSend; + return recordsFromQueue.stream() + .map(messageHolder -> { + recordsToCommitMap.put(messageHolder.getOriginalRecord().getMessageId(), messageHolder); + return messageHolder.getTranslatedRecord(); + }).collect(Collectors.toList()); } void commitRecordInServiceBus(SourceRecord sourceRecord) { diff --git a/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java b/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java index 4a6c11860..1f508d31e 100644 --- a/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java +++ b/java-connectors/kafka-connect-azure-servicebus/src/test/java/io/lenses/streamreactor/connect/azure/servicebus/source/TaskToReceiverBridgeTest.java @@ -30,32 +30,27 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.OffsetStorageReader; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +@ExtendWith(MockitoExtension.class) class TaskToReceiverBridgeTest { private static final String RECEIVER_ID_1 = "RECEIVER1"; private static final String RECEIVER_ID_2 = "RECEIVER2"; - ArrayBlockingQueue blockingQueue; - OffsetStorageReader offsetReader; - - TaskToReceiverBridge testObj; - - @BeforeEach - void setUp() { - blockingQueue = mock(ArrayBlockingQueue.class); - offsetReader = mock(OffsetStorageReader.class); - } + @Mock + private ArrayBlockingQueue blockingQueue; + private TaskToReceiverBridge testObj; @Test void closeReceiversShouldCloseAllReceivers() { //given ExecutorService executorService = mock(ExecutorService.class); - ServiceBusReceiverFacade receiver1 = mockReceiver(RECEIVER_ID_1); - ServiceBusReceiverFacade receiver2 = mockReceiver(RECEIVER_ID_2); + ServiceBusReceiverFacade receiver1 = mockReceiver(); + ServiceBusReceiverFacade receiver2 = mockReceiver(); Map receivers = Map.of( RECEIVER_ID_1, receiver1, @@ -63,7 +58,7 @@ void closeReceiversShouldCloseAllReceivers() { ); //when - testObj = new TaskToReceiverBridge(blockingQueue, offsetReader, receivers, executorService); + testObj = new TaskToReceiverBridge(blockingQueue, receivers, executorService); testObj.closeReceivers(); //then @@ -76,31 +71,21 @@ void pollShouldDrainAllMessagesFromQueue() throws InterruptedException { //given int arrayBlockingQueueCapacity = 10; String messageIdTemplate = "MSGID%s"; - ExecutorService executorService = mock(ExecutorService.class); - ServiceBusReceiverFacade receiver1 = mockReceiver(RECEIVER_ID_1); BlockingQueue sourceRecordBlockingQueue = new ArrayBlockingQueue<>(arrayBlockingQueueCapacity); + + ExecutorService executorService = mock(ExecutorService.class); + ServiceBusReceiverFacade receiver1 = mockReceiver(); + Map receivers = Map.of(RECEIVER_ID_1, receiver1); List allSourceRecords = IntStream.range(0, arrayBlockingQueueCapacity).mapToObj(i -> { - SourceRecord sourceRecord = mock(SourceRecord.class); - - ServiceBusReceivedMessage busReceivedMessage = mock(ServiceBusReceivedMessage.class); - when(busReceivedMessage.getMessageId()).thenReturn(String.format(messageIdTemplate, i)); - - ServiceBusMessageHolder mockedRecord = mock(ServiceBusMessageHolder.class); - when(mockedRecord.getTranslatedRecord()).thenReturn(sourceRecord); - when(mockedRecord.getOriginalRecord()).thenReturn(busReceivedMessage); - try { - sourceRecordBlockingQueue.put(mockedRecord); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return sourceRecord; + String formattedMessageId = String.format(messageIdTemplate, i); + return createMockedSourceRecord(formattedMessageId, sourceRecordBlockingQueue); }).collect(Collectors.toList()); //when - testObj = new TaskToReceiverBridge(sourceRecordBlockingQueue, offsetReader, receivers, executorService); + testObj = new TaskToReceiverBridge(sourceRecordBlockingQueue, receivers, executorService); List polled = testObj.poll(); //then @@ -113,7 +98,7 @@ void commitMessageInServiceBusShouldCallExecutorService() { int arrayBlockingQueueCapacity = 10; String messageKey = "MESSAGE_KEY"; ExecutorService executorService = mock(ExecutorService.class); - ServiceBusReceiverFacade receiver1 = mockReceiver(RECEIVER_ID_1); + ServiceBusReceiverFacade receiver1 = mockReceiver(); BlockingQueue sourceRecordBlockingQueue = new ArrayBlockingQueue<>(arrayBlockingQueueCapacity); Map receivers = Map.of(RECEIVER_ID_1, receiver1); @@ -121,17 +106,35 @@ void commitMessageInServiceBusShouldCallExecutorService() { when(sourceRecord.key()).thenReturn(messageKey); //when - testObj = new TaskToReceiverBridge(sourceRecordBlockingQueue, offsetReader, receivers, executorService); + testObj = new TaskToReceiverBridge(sourceRecordBlockingQueue, receivers, executorService); testObj.commitRecordInServiceBus(sourceRecord); //then verify(executorService).submit(any(Runnable.class)); } - private ServiceBusReceiverFacade mockReceiver(String receiverId) { + private ServiceBusReceiverFacade mockReceiver() { ServiceBusReceiverFacade receiverFacade = mock(ServiceBusReceiverFacade.class); - when(receiverFacade.getReceiverId()).thenReturn(receiverId); return receiverFacade; } + + private static SourceRecord createMockedSourceRecord(String format, + BlockingQueue sourceRecordBlockingQueue) { + ServiceBusReceivedMessage busReceivedMessage = mock(ServiceBusReceivedMessage.class); + when(busReceivedMessage.getMessageId()).thenReturn(format); + + SourceRecord sourceRecord = mock(SourceRecord.class); + + ServiceBusMessageHolder mockedRecord = mock(ServiceBusMessageHolder.class); + when(mockedRecord.getOriginalRecord()).thenReturn(busReceivedMessage); + when(mockedRecord.getTranslatedRecord()).thenReturn(sourceRecord); + + try { + sourceRecordBlockingQueue.put(mockedRecord); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return sourceRecord; + } } diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java new file mode 100644 index 000000000..55db05a61 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/KcqlSettings.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.config.base; + +import java.util.List; + +import lombok.Getter; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import io.lenses.kcql.Kcql; +import io.lenses.streamreactor.common.config.base.model.ConnectorPrefix; +import io.lenses.streamreactor.common.config.source.ConfigSource; +import lombok.val; + +@Getter +public class KcqlSettings implements ConfigSettings> { + + private static final String KCQL_DOC = + "Contains the Kafka Connect Query Language describing data mappings from the source to the target system."; + + private final String kcqlSettingsKey; + + public KcqlSettings( + ConnectorPrefix connectorPrefix) { + kcqlSettingsKey = connectorPrefix.prefixKey("kcql"); + } + + @Override + public ConfigDef withSettings(ConfigDef configDef) { + return configDef.define( + kcqlSettingsKey, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + KCQL_DOC + ); + } + + @Override + public List parseFromConfig(ConfigSource configSource) { + return Kcql.parseMultiple(getKCQLString(configSource)); + } + + private String getKCQLString(ConfigSource configSource) { + val raw = configSource.getString(kcqlSettingsKey); + return raw.orElseThrow(() -> new ConfigException(String.format("Missing [%s]", kcqlSettingsKey))); + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/Converter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/Converter.java new file mode 100644 index 000000000..bc6dd1c24 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/Converter.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.config.base.intf; + +import org.apache.kafka.common.config.ConfigException; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Picks out the settings required from KCQL + */ +public abstract class Converter { + + public List convertAll(List source) throws ConfigException { + return source.stream().map(this::convert).collect(Collectors.toList()); + } + + protected abstract T convert(S source) throws ConfigException; +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/KcqlConverter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/KcqlConverter.java new file mode 100644 index 000000000..7c9519d24 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/KcqlConverter.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.config.base.intf; + +import io.lenses.kcql.Kcql; +import org.apache.kafka.common.config.ConfigException; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Picks out the settings required from KCQL + */ +public abstract class KcqlConverter extends Converter { + +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/source/ConfigWrapperSource.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/source/ConfigWrapperSource.java index 2dccc6177..d0792ddcf 100644 --- a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/source/ConfigWrapperSource.java +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/source/ConfigWrapperSource.java @@ -15,9 +15,11 @@ */ package io.lenses.streamreactor.common.config.source; +import java.util.Map; import java.util.Optional; import lombok.AllArgsConstructor; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.types.Password; /** @@ -27,6 +29,10 @@ @AllArgsConstructor public class ConfigWrapperSource implements ConfigSource { + public static ConfigWrapperSource fromConfigDef(ConfigDef configDef, Map props) { + return new ConfigWrapperSource(new AbstractConfig(configDef, props)); + } + private final AbstractConfig abstractConfig; @Override diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java new file mode 100644 index 000000000..d367f64de --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/ListSplitter.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * Utility class for List splitting. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ListSplitter { + + /** + * Splits the given list into {@code maxN} sublists of roughly equal size. + * If the list cannot be divided evenly, the remaining elements are distributed + * among the sublists so that the size difference between any two sublists is at most 1. + * + * @param list the list to be split + * @param maxN the number of sublists to create + * @param the type of elements in the list + * @return a list of sublists, where each sublist contains a portion of the original list + * @throws IllegalArgumentException if {@code maxN} is less than or equal to 0 + */ + public static List> splitList(List list, int maxN) { + if (maxN <= 0) { + throw new IllegalArgumentException("Number of parts must be greater than zero."); + } + + int totalSize = list.size(); + int partSize = totalSize / maxN; + int remainder = totalSize % maxN; + + return IntStream.range(0, maxN) + .mapToObj(i -> { + int start = i * partSize + Math.min(i, remainder); + int end = start + partSize + (i < remainder ? 1 : 0); + return list.subList(start, Math.min(end, totalSize)); + }) + .filter(sublist -> !sublist.isEmpty()) + .collect(Collectors.toList()); + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java new file mode 100644 index 000000000..0c5dd47f6 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/MapUtils.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * Utility class for map operations. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class MapUtils { + + /** + * Casts a map to a specified key and value type. + * + * @param map the map to cast + * @param targetKeyType the class of the key type + * @param targetValueType the class of the value type + * @param the target key type + * @param the target value type + * @return the casted map + * @throws IllegalArgumentException if the map contains keys or values of incorrect types + */ + @SuppressWarnings("unchecked") + public static Map castMap(Map map, Class targetKeyType, Class targetValueType) { + map.forEach((key, value) -> { + if (!isAssignable(key, targetKeyType) || !isAssignable(value, targetValueType)) { + throw new IllegalArgumentException("Map contains invalid key or value type"); + } + }); + return (Map) map; + } + + /** + * Checks if an object is assignable to a specified type, allowing for null values. + * + * @param obj the object to check + * @param type the target type + * @param the target type + * @return true if the object is null or assignable to the type, false otherwise + */ + private static boolean isAssignable(Object obj, Class type) { + return obj == null || type.isAssignableFrom(obj.getClass()); + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java new file mode 100644 index 000000000..3571df3e9 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/TasksSplitter.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static io.lenses.kcql.Kcql.KCQL_MULTI_STATEMENT_SEPARATOR; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.lenses.streamreactor.common.config.base.KcqlSettings; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.val; + +/** + * Utility class for splitting tasks based on KCQL statements. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class TasksSplitter { + + /** + * Splits tasks based on the KCQL statements provided in the properties map. + * Each resulting map will contain the original properties and a subset of the KCQL statements. + * + * @param maxTasks the maximum number of tasks to split into + * @param props the original properties map containing KCQL settings + * @param kcqlSettings the KCQL settings object that provides the key for KCQL settings in the properties map + * @return a list of maps, each containing the original properties and a subset of the KCQL statements + */ + public static List> splitByKcqlStatements(int maxTasks, Map props, + KcqlSettings kcqlSettings) { + val kcqlSettingsKey = kcqlSettings.getKcqlSettingsKey(); + val kcqls = + Arrays + .stream(props.get(kcqlSettingsKey).split(KCQL_MULTI_STATEMENT_SEPARATOR)) + .collect(Collectors.toList()); + + return ListSplitter + .splitList(kcqls, maxTasks) + .stream() + .map(kcqlsForTask -> Stream.concat( + props.entrySet().stream(), + Stream.of(Map.entry(kcqlSettingsKey, String.join(";", kcqlsForTask))) + ).collect(Collectors.toUnmodifiableMap( + Map.Entry::getKey, + Map.Entry::getValue, + (existing, replacement) -> replacement + ))) + .collect(Collectors.toUnmodifiableList()); + } + +} diff --git a/java-connectors/kafka-connect-common/src/main/resources/logback.xml b/java-connectors/kafka-connect-common/src/main/resources/logback.xml index 920828a04..a7902682c 100644 --- a/java-connectors/kafka-connect-common/src/main/resources/logback.xml +++ b/java-connectors/kafka-connect-common/src/main/resources/logback.xml @@ -1,7 +1,7 @@ - + diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/ListSplitterTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/ListSplitterTest.java new file mode 100644 index 000000000..4a9954909 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/ListSplitterTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +class ListSplitterTest { + + private final List list = IntStream.range(1, 11).boxed().collect(Collectors.toList()); + + @Test + void testSplitListIntoEqualParts() { + List> result = ListSplitter.splitList(list, 5); + assertEquals(5, result.size()); + for (List sublist : result) { + assertEquals(2, sublist.size()); + } + } + + @Test + void testSplitListWithRemainder() { + List> result = ListSplitter.splitList(list, 3); + assertEquals(3, result.size()); + assertEquals(4, result.get(0).size()); + assertEquals(3, result.get(1).size()); + assertEquals(3, result.get(2).size()); + } + + @Test + void testSplitListSinglePart() { + List> result = ListSplitter.splitList(list, 1); + assertEquals(1, result.size()); + assertEquals(10, result.get(0).size()); + } + + @Test + void testSplitListMorePartsThanElements() { + List> result = ListSplitter.splitList(list, 12); + assertEquals(12, result.size()); + int nonEmptyLists = (int) result.stream().filter(sublist -> !sublist.isEmpty()).count(); + assertEquals(10, nonEmptyLists); + for (List sublist : result) { + assertTrue(sublist.size() <= 1); + } + } + + @Test + void testSplitEmptyList() { + List emptyList = Collections.emptyList(); + List> result = ListSplitter.splitList(emptyList, 3); + assertEquals(3, result.size()); + for (List sublist : result) { + assertTrue(sublist.isEmpty()); + } + } + + @Test + void testSplitListInvalidParts() { + Executable executable = () -> ListSplitter.splitList(list, 0); + assertThrows(IllegalArgumentException.class, executable); + } + + @Test + void testListSmallerThanMaxNShouldProvideMaxNResults() { + List> result = ListSplitter.splitList(Collections.singletonList(1), 100); + assertEquals(1, result.size()); + for (List sublist : result) { + assertEquals(1, sublist.size()); + } + } +} diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java new file mode 100644 index 000000000..afd53e296 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/MapUtilsTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +class MapUtilsTest { + + @Test + void testCastMap_ValidStringMap_ReturnsTypedMap() { + Map rawMap = + Map.of( + "key1", "value1", + "key2", "value2" + ); + + Map typedMap = MapUtils.castMap(rawMap, String.class, String.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_NonStringKeyOrValue_ThrowsException() { + Map rawMap = + Map.of( + "key1", "value1", + 123, "value2" // Non-String key + ); + + assertThrows(IllegalArgumentException.class, () -> MapUtils.castMap(rawMap, String.class, String.class)); + } + + @Test + void testCastMap_ValidIntegerMap_ReturnsTypedMap() { + Map rawMap = + Map.of( + 10, 99, + 20, 145 + ); + + Map typedMap = MapUtils.castMap(rawMap, Integer.class, Integer.class); + + assertEquals(99, typedMap.get(10)); + assertEquals(145, typedMap.get(20)); + } + + @Test + void testCastMap_NonIntegerKeyOrValue_ThrowsException() { + Map rawMap = + Map.of( + 10, 99, + 20, "pigeons" // Non-Integer value + ); + + assertThrows(IllegalArgumentException.class, () -> MapUtils.castMap(rawMap, Integer.class, Integer.class)); + } + + @Test + void testCastMap_WithNullValues_ShouldHandleGracefully() { + Map rawMap = new HashMap<>(); + rawMap.put("key1", "value1"); + rawMap.put("key2", null); // Null value + + Map typedMap = MapUtils.castMap(rawMap, String.class, String.class); + + assertEquals("value1", typedMap.get("key1")); + assertNull(typedMap.get("key2")); + } + + @Test + void testCastMap_SuperclassTypeCompatibility_ShouldPass() { + Map rawMap = + Map.of( + "key1", "value1", + "key2", "value2" + ); + + // Cast to Map since String implements CharSequence + Map typedMap = MapUtils.castMap(rawMap, CharSequence.class, CharSequence.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_InterfaceTypeCompatibility_ShouldPass() { + Map rawMap = + Map.of( + "key1", "value1", + "key2", "value2" + ); + + // Cast to Map since String is an Object + Map typedMap = MapUtils.castMap(rawMap, Object.class, Object.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_StringToObject_ShouldPass() { + Map rawMap = + Map.of( + "key1", "value1", + "key2", "value2" + ); + + // Cast to Map since String is an Object + Map typedMap = MapUtils.castMap(rawMap, Object.class, Object.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + + @Test + void testCastMap_StringToCharSequence_ShouldPass() { + Map rawMap = + Map.of( + "key1", "value1", + "key2", "value2" + ); + + // Cast to Map since String implements CharSequence + Map typedMap = MapUtils.castMap(rawMap, CharSequence.class, CharSequence.class); + + assertEquals("value1", typedMap.get("key1")); + assertEquals("value2", typedMap.get("key2")); + } + +} diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/StringUtilsTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/StringUtilsTest.java new file mode 100644 index 000000000..38c38df7c --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/StringUtilsTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class StringUtilsTest { + + @Test + void testIsBlankWithNull() { + assertTrue(StringUtils.isBlank(null), "null should be considered blank"); + } + + @Test + void testIsBlankWithEmptyString() { + assertTrue(StringUtils.isBlank(""), "Empty string should be considered blank"); + } + + @Test + void testIsBlankWithWhitespace() { + assertTrue(StringUtils.isBlank(" "), "String with only whitespace should be considered blank"); + } + + @Test + void testIsBlankWithNonBlankString() { + assertFalse(StringUtils.isBlank("abc"), "Non-blank string should not be considered blank"); + } + + @Test + void testIsBlankWithStringContainingWhitespace() { + assertFalse(StringUtils.isBlank(" abc "), "String with non-whitespace characters should not be considered blank"); + } +} diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/TasksSplitterTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/TasksSplitterTest.java new file mode 100644 index 000000000..ed9cfc156 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/TasksSplitterTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import io.lenses.streamreactor.common.config.base.KcqlSettings; +import lombok.val; + +class TasksSplitterTest { + + private static final String KCQL_SETTINGS_KEY = "connect.some.prefix.kcql"; + + private static final String OTHER_KEY = "key1"; + + private static final String OTHER_VALUE = "value1"; + + @ParameterizedTest + @MethodSource("testCases") + void testSplitTasksByKcqlStatements(String joinedKcqlStatements, int maxTasks, List expectedKcqls) { + Map props = + Map.of( + OTHER_KEY, OTHER_VALUE, + KCQL_SETTINGS_KEY, joinedKcqlStatements + ); + + val kcqlSettings = mock(KcqlSettings.class); + when(kcqlSettings.getKcqlSettingsKey()).thenReturn(KCQL_SETTINGS_KEY); + + val result = TasksSplitter.splitByKcqlStatements(maxTasks, props, kcqlSettings); + + assertEquals(expectedKcqls.size(), result.size()); + for (int i = 0; i < expectedKcqls.size(); i++) { + val taskProps = result.get(i); + assertEquals(OTHER_VALUE, taskProps.get(OTHER_KEY)); + assertEquals(expectedKcqls.get(i), taskProps.get(KCQL_SETTINGS_KEY)); + } + } + + private static Stream testCases() { + return Stream.of( + Arguments.of("INSERT INTO * SELECT * FROM topicA", 1, Collections.singletonList( + "INSERT INTO * SELECT * FROM topicA")), + Arguments.of("INSERT INTO * SELECT * FROM topicA;INSERT INTO * SELECT * FROM topicB", 1, Collections + .singletonList( + "INSERT INTO * SELECT * FROM topicA;INSERT INTO * SELECT * FROM topicB")), + Arguments.of( + "INSERT INTO * SELECT * FROM topicA;INSERT INTO * SELECT * FROM topicB;INSERT INTO * SELECT * FROM topicC", + 2, Arrays.asList( + "INSERT INTO * SELECT * FROM topicA;INSERT INTO * SELECT * FROM topicB", + "INSERT INTO * SELECT * FROM topicC")), + Arguments.of("", 1, Collections.singletonList("")), + Arguments.of("INSERT INTO * SELECT * FROM topicA;INSERT INTO * SELECT * FROM topicB", 3, Arrays.asList( + "INSERT INTO * SELECT * FROM topicA", + "INSERT INTO * SELECT * FROM topicB")) + ); + } +}