Skip to content

Commit

Permalink
Suggestion for receiver mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed May 28, 2024
1 parent 5295a92 commit a002c39
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -58,8 +60,11 @@ public void start(Map<String, String> props) {
ArrayBlockingQueue<ServiceBusMessageHolder> recordsQueue =
new ArrayBlockingQueue<>(recordsQueueSize);

Map<String, ServiceBusReceiverFacade> receiversMap =
ServiceBusReceiverMapper.mapReceivers(recordsQueue, kcqls, connectionString);

TaskToReceiverBridge serviceBusReceiverBridge =
new TaskToReceiverBridge(connectionString, kcqls, recordsQueue);
new TaskToReceiverBridge(recordsQueue, receiversMap, Executors.newFixedThreadPool(kcqls.size() * 2));

initialize(serviceBusReceiverBridge);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.azure.servicebus.source;

import io.lenses.kcql.Kcql;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

public class ServiceBusReceiverMapper {

private ServiceBusReceiverMapper() {
}

private static final String FACADE_CLASS_SIMPLE_NAME = ServiceBusReceiverFacade.class.getSimpleName();

static Map<String, ServiceBusReceiverFacade> mapReceivers(
BlockingQueue<ServiceBusMessageHolder> recordsQueue,
List<Kcql> kcqls,
String connectionString
) {
return kcqls.stream().map(kcql -> new ServiceBusReceiverFacade(kcql, recordsQueue, connectionString,
FACADE_CLASS_SIMPLE_NAME + UUID.randomUUID()))
.collect(Collectors.toMap(
ServiceBusReceiverFacade::getReceiverId, e -> e
));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package io.lenses.streamreactor.connect.azure.servicebus.source;

import io.lenses.kcql.Kcql;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,54 +33,29 @@
public class TaskToReceiverBridge {

private static final int INITIAL_RECORDS_TO_COMMIT_SIZE = 500;
private static final String FACADE_CLASS_SIMPLE_NAME = ServiceBusReceiverFacade.class.getSimpleName();
private final BlockingQueue<ServiceBusMessageHolder> recordsQueue;
private final ExecutorService sentMessagesExecutors;
private Map<String, ServiceBusReceiverFacade> receivers;
private final Map<String, ServiceBusReceiverFacade> receivers;
private final Map<String, ServiceBusMessageHolder> recordsToCommitMap;

/**
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
* @param connectionString Service Bus connection string
* @param kcqls list of KCQLs to initiate Receivers for.
* @param recordsQueue records queue used to store received messages.
*/
public TaskToReceiverBridge(String connectionString, List<Kcql> kcqls,
BlockingQueue<ServiceBusMessageHolder> recordsQueue) {
this.recordsQueue = recordsQueue;
sentMessagesExecutors = Executors.newFixedThreadPool(kcqls.size() * 2);
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);

initiateReceivers(recordsQueue, kcqls, connectionString);
}

/**
* Creates Bridge between Receivers and Connector's Task for Azure Service Bus.
*
* @param recordsQueue records queue used to store received messages.
* @param receivers map of {@link ServiceBusReceiverFacade} receivers.
* @param sentMessagesExecutors {@link ExecutorService} that handles sent messages.
*/
TaskToReceiverBridge(BlockingQueue<ServiceBusMessageHolder> recordsQueue,
public TaskToReceiverBridge(
BlockingQueue<ServiceBusMessageHolder> recordsQueue,
Map<String, ServiceBusReceiverFacade> receivers,
ExecutorService sentMessagesExecutors) {
ExecutorService sentMessagesExecutors
) {
this.recordsQueue = recordsQueue;
this.sentMessagesExecutors = sentMessagesExecutors;
this.receivers = receivers;
recordsToCommitMap = new ConcurrentHashMap<>(INITIAL_RECORDS_TO_COMMIT_SIZE);
}

private void initiateReceivers(BlockingQueue<ServiceBusMessageHolder> recordsQueue,
List<Kcql> kcqls, String connectionString) {
receivers = new ConcurrentHashMap<>(kcqls.size());

kcqls.forEach(kcql -> {
String receiverId = FACADE_CLASS_SIMPLE_NAME + UUID.randomUUID();
receivers.put(receiverId, new ServiceBusReceiverFacade(kcql, recordsQueue, connectionString, receiverId));
});
}

/**
* Unsubscribes from all subscriptions and closes the Receivers.
*/
Expand Down

0 comments on commit a002c39

Please sign in to comment.