From 5743979d1c1489e94cc2439f56374bb0568f42be Mon Sep 17 00:00:00 2001 From: zaki Date: Mon, 28 Oct 2024 17:25:18 +0800 Subject: [PATCH] feat: Define and standardize common parameters for Source. --- .../common/config/connector/Constants.java | 17 +++++++ .../common/config/connector/PollConfig.java | 43 ++++++++++++++++ .../common/config/connector/SourceConfig.java | 3 ++ .../connector/http/SourceConnectorConfig.java | 8 +-- .../mq/kafka/SourceConnectorConfig.java | 1 - .../connector/CanalSourceCheckConnector.java | 8 ++- .../connector/CanalSourceFullConnector.java | 7 ++- .../connector/ChatGPTSourceConnector.java | 22 ++++++--- .../common/SynchronizedCircularFifoQueue.java | 1 - .../http/source/HttpSourceConnector.java | 49 ++++++++++++------- .../http/source/protocol/Protocol.java | 5 +- .../protocol/impl/CloudEventProtocol.java | 5 +- .../source/protocol/impl/CommonProtocol.java | 4 +- .../source/protocol/impl/GitHubProtocol.java | 4 +- .../src/main/resources/source-config.yml | 2 - .../src/test/resources/source-config.yml | 2 - .../jdbc/source/JdbcSourceConnector.java | 9 ++-- .../jdbc/source/TaskManagerCoordinator.java | 29 ++++++----- .../connector/KafkaSourceConnector.java | 6 +-- .../connector/MongodbSourceConnector.java | 24 ++++++--- .../OpenFunctionSourceConnector.java | 30 +++++++++--- .../connector/PravegaSourceConnector.java | 24 ++++++--- .../connector/RabbitMQSourceConnector.java | 24 ++++++--- .../connector/RedisSourceConnector.java | 24 ++++++--- .../connector/SpringSourceConnector.java | 28 ++++++++--- 25 files changed, 267 insertions(+), 112 deletions(-) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java index 74576e843a..817efb6d3a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/Constants.java @@ -30,4 +30,21 @@ public class Constants { public static final int DEFAULT_ATTEMPT = 3; public static final int DEFAULT_PORT = 8080; + + // ======================== Source Constants ======================== + /** + * Default capacity + */ + public static final int DEFAULT_CAPACITY = 1024; + + /** + * Default poll batch size + */ + public static final int DEFAULT_POLL_BATCH_SIZE = 10; + + /** + * Default poll timeout (unit: ms) + */ + public static final long DEFAULT_POLL_TIMEOUT = 5000L; + } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java new file mode 100644 index 0000000000..cf3f06be91 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/PollConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config.connector; + +import lombok.Data; + +/** + * Source Poll Config + */ +@Data +public class PollConfig { + + /** + * Capacity of the poll queue + */ + private int capacity = Constants.DEFAULT_CAPACITY; + + /** + * Max batch size of the poll + */ + private int maxBatchSize = Constants.DEFAULT_POLL_BATCH_SIZE; + + /** + * Max wait time of the poll + */ + private long maxWaitTime = Constants.DEFAULT_POLL_TIMEOUT; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java index 7630631258..f7bc42970c 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/SourceConfig.java @@ -30,4 +30,7 @@ public abstract class SourceConfig extends Config { private OffsetStorageConfig offsetStorageConfig; + // Polling configuration, e.g. capacity, batch size, wait time, etc. + private PollConfig pollConfig = new PollConfig(); + } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java index 58d910bf2d..282f883332 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java @@ -44,13 +44,7 @@ public class SourceConnectorConfig { */ private int maxFormAttributeSize = 1024 * 1024; - // max size of the queue, default 1000 - private int maxStorageSize = 1000; - - // batch size, default 10 - private int batchSize = 10; - - // protocol, default CloudEvent + // protocol, default Common private String protocol = "Common"; // extra config, e.g. GitHub secret diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java index 21fb18eb23..eb7406f664 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/mq/kafka/SourceConnectorConfig.java @@ -32,5 +32,4 @@ public class SourceConnectorConfig { private String enableAutoCommit = "false"; private String sessionTimeoutMS = "10000"; private String maxPollRecords = "1000"; - private int pollTimeOut = 100; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java index 841c9a4814..bd85f03240 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java @@ -50,12 +50,14 @@ @Slf4j public class CanalSourceCheckConnector extends AbstractComponent implements Source, ConnectorCreateService { + private CanalSourceFullConfig config; private CanalFullPositionMgr positionMgr; private RdbTableMgr tableMgr; private ThreadPoolExecutor executor; - private final BlockingQueue> queue = new LinkedBlockingQueue<>(); + private BlockingQueue> queue; private final AtomicBoolean flag = new AtomicBoolean(true); + private long maxPollWaitTime; @Override protected void run() throws Exception { @@ -140,6 +142,8 @@ private void init() { DatabaseConnection.initSourceConnection(); this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); this.positionMgr = new CanalFullPositionMgr(config, tableMgr); + this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime(); + this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity()); } @Override @@ -168,7 +172,7 @@ public void onException(ConnectRecord record) { public List poll() { while (flag.get()) { try { - List records = queue.poll(5, TimeUnit.SECONDS); + List records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS); if (records == null || records.isEmpty()) { continue; } diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java index c2632ee472..09e2e0dcf7 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java @@ -56,8 +56,9 @@ public class CanalSourceFullConnector extends AbstractComponent implements Sourc private CanalFullPositionMgr positionMgr; private RdbTableMgr tableMgr; private ThreadPoolExecutor executor; - private final BlockingQueue> queue = new LinkedBlockingQueue<>(); + private BlockingQueue> queue; private final AtomicBoolean flag = new AtomicBoolean(true); + private long maxPollWaitTime; @Override protected void run() throws Exception { @@ -137,6 +138,8 @@ private void init() { DatabaseConnection.initSourceConnection(); this.tableMgr = new RdbTableMgr(config.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); this.positionMgr = new CanalFullPositionMgr(config, tableMgr); + this.maxPollWaitTime = config.getPollConfig().getMaxWaitTime(); + this.queue = new LinkedBlockingQueue<>(config.getPollConfig().getCapacity()); } @Override @@ -166,7 +169,7 @@ public void onException(ConnectRecord record) { public List poll() { while (flag.get()) { try { - List records = queue.poll(5, TimeUnit.SECONDS); + List records = queue.poll(maxPollWaitTime, TimeUnit.MILLISECONDS); if (records == null || records.isEmpty()) { continue; } diff --git a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java index 6b122087e5..1b6955feb2 100644 --- a/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java @@ -61,8 +61,6 @@ @Slf4j public class ChatGPTSourceConnector implements Source { - private static final int DEFAULT_BATCH_SIZE = 10; - private ChatGPTSourceConfig sourceConfig; private BlockingQueue queue; private HttpServer server; @@ -79,6 +77,9 @@ public class ChatGPTSourceConnector implements Source { private static final String APPLICATION_JSON = "application/json"; private static final String TEXT_PLAIN = "text/plain"; + private int maxBatchSize; + private long maxPollWaitTime; + @Override public Class configClass() { @@ -129,7 +130,9 @@ private void doInit() { if (StringUtils.isNotEmpty(parsePromptTemplateStr)) { this.parseHandler = new ParseHandler(openaiManager, parsePromptTemplateStr); } - this.queue = new LinkedBlockingQueue<>(1024); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); final Vertx vertx = Vertx.vertx(); final Router router = Router.router(vertx); router.route().path(this.sourceConfig.connectorConfig.getPath()).method(HttpMethod.POST).handler(BodyHandler.create()).handler(ctx -> { @@ -239,14 +242,21 @@ public void stop() { @Override public List poll() { - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); - for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; + + List connectRecords = new ArrayList<>(maxBatchSize); + for (int i = 0; i < maxBatchSize; i++) { try { - CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (event == null) { break; } connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { break; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java index 0564e58734..9989552d1e 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java @@ -142,7 +142,6 @@ public synchronized List fetchRange(int start, int end, boolean removed) { count++; } return items; - } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java index 2b2a01a9dd..6c78badaf4 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java @@ -20,7 +20,6 @@ import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; import org.apache.eventmesh.connector.http.source.protocol.Protocol; import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory; import org.apache.eventmesh.openconnect.api.ConnectorCreateService; @@ -30,8 +29,9 @@ import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import io.netty.handler.codec.http.HttpResponseStatus; @@ -50,9 +50,11 @@ public class HttpSourceConnector implements Source, ConnectorCreateService queue; + private BlockingQueue queue; - private int batchSize; + private int maxBatchSize; + + private long maxPollWaitTime; private Route route; @@ -92,11 +94,11 @@ public void init(ConnectorContext connectorContext) { private void doInit() { // init queue - int maxQueueSize = this.sourceConfig.getConnectorConfig().getMaxStorageSize(); - this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize); + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); - // init batch size - this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize(); + // init poll batch size and timeout + this.maxBatchSize = this.sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = this.sourceConfig.getPollConfig().getMaxWaitTime(); // init protocol String protocolName = this.sourceConfig.getConnectorConfig().getProtocol(); @@ -183,20 +185,29 @@ public void stop() { @Override public List poll() { - // if queue is empty, return empty list - if (queue.isEmpty()) { - return Collections.emptyList(); - } + // record current time + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; + // poll from queue - List connectRecords = new ArrayList<>(batchSize); - for (int i = 0; i < batchSize; i++) { - Object obj = queue.poll(); - if (obj == null) { + List connectRecords = new ArrayList<>(maxBatchSize); + for (int i = 0; i < maxBatchSize; i++) { + try { + Object obj = queue.poll(remainingTime, TimeUnit.MILLISECONDS); + if (obj == null) { + break; + } + // convert to ConnectRecord + ConnectRecord connectRecord = protocol.convertToConnectRecord(obj); + connectRecords.add(connectRecord); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; + } catch (Exception e) { + log.error("Failed to poll from queue.", e); break; } - // convert to ConnectRecord - ConnectRecord connectRecord = protocol.convertToConnectRecord(obj); - connectRecords.add(connectRecord); } return connectRecords; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java index b671383e54..c5a22139e0 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java @@ -18,9 +18,10 @@ package org.apache.eventmesh.connector.http.source.protocol; import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig; -import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; +import java.util.concurrent.BlockingQueue; + import io.vertx.ext.web.Route; @@ -45,7 +46,7 @@ public interface Protocol { * @param route route * @param queue queue info */ - void setHandler(Route route, SynchronizedCircularFifoQueue queue); + void setHandler(Route route, BlockingQueue queue); /** diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java index 4906e920f2..a44ed0e90c 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java @@ -18,12 +18,13 @@ package org.apache.eventmesh.connector.http.source.protocol.impl; import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig; -import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; import org.apache.eventmesh.connector.http.source.data.CommonResponse; import org.apache.eventmesh.connector.http.source.protocol.Protocol; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.util.CloudEventUtil; +import java.util.concurrent.BlockingQueue; + import io.cloudevents.CloudEvent; import io.cloudevents.http.vertx.VertxMessageFactory; import io.netty.handler.codec.http.HttpResponseStatus; @@ -60,7 +61,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) { * @param queue queue info */ @Override - public void setHandler(Route route, SynchronizedCircularFifoQueue queue) { + public void setHandler(Route route, BlockingQueue queue) { route.method(HttpMethod.POST) .handler(ctx -> VertxMessageFactory.createReader(ctx.request()) .map(reader -> { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java index 0761170ac0..e831dc9723 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java @@ -20,7 +20,6 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig; import org.apache.eventmesh.common.utils.JsonUtils; -import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; import org.apache.eventmesh.connector.http.source.data.CommonResponse; import org.apache.eventmesh.connector.http.source.data.WebhookRequest; import org.apache.eventmesh.connector.http.source.protocol.Protocol; @@ -28,6 +27,7 @@ import java.util.Base64; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; import io.netty.handler.codec.http.HttpResponseStatus; @@ -66,7 +66,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) { * @param queue queue info */ @Override - public void setHandler(Route route, SynchronizedCircularFifoQueue queue) { + public void setHandler(Route route, BlockingQueue queue) { route.method(HttpMethod.POST) .handler(BodyHandler.create()) .handler(ctx -> { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java index fac8c0d801..e1edbd0faf 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java @@ -20,7 +20,6 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig; import org.apache.eventmesh.common.exception.EventMeshException; -import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; import org.apache.eventmesh.connector.http.source.data.CommonResponse; import org.apache.eventmesh.connector.http.source.data.WebhookRequest; import org.apache.eventmesh.connector.http.source.protocol.Protocol; @@ -31,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; import javax.crypto.Mac; @@ -90,7 +90,7 @@ public void initialize(SourceConnectorConfig sourceConnectorConfig) { * @param queue queue info */ @Override - public void setHandler(Route route, SynchronizedCircularFifoQueue queue) { + public void setHandler(Route route, BlockingQueue queue) { route.method(HttpMethod.POST) .handler(BodyHandler.create()) .handler(ctx -> { diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml index b1edc084ff..0a73e627b0 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml @@ -30,8 +30,6 @@ connectorConfig: port: 3755 idleTimeout: 5000 # timeunit: ms maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). This applies only when handling form data submissions. - maxStorageSize: 1000 # max storage size, default: 1000 - batchSize: 10 # batch size, default: 10 protocol: CloudEvent # Case insensitive, default: CloudEvent, options: CloudEvent, GitHub, Common extraConfig: # extra config for different protocol, e.g. GitHub secret secret: xxxxxxx # GitHub secret diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml index 735d3b01d7..336bb2cb5e 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml @@ -30,8 +30,6 @@ connectorConfig: port: 3755 idleTimeout: 5000 # timeunit: ms maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). This applies only when handling form data submissions. - maxStorageSize: 1000 # max storage size, default: 1000 - batchSize: 10 # batch size, default: 10 protocol: CloudEvent # Case insensitive, default: CloudEvent, options: CloudEvent, GitHub, Common extraConfig: # extra config for different protocol, e.g. GitHub secret secret: xxxxxxx # GitHub secret diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java index 810a59e723..ecc5a44154 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java @@ -142,7 +142,9 @@ private void doInit() { this.dispatcher = new EventDispatcher(this.sourceJdbcTaskManager); - this.taskManagerCoordinator = new TaskManagerCoordinator(); + this.taskManagerCoordinator = new TaskManagerCoordinator(sourceConfig.getPollConfig().getCapacity(), + sourceConfig.getPollConfig().getMaxBatchSize(), + sourceConfig.getPollConfig().getMaxWaitTime()); this.taskManagerCoordinator.registerTaskManager(SourceJdbcTaskManager.class.getName(), sourceJdbcTaskManager); this.taskManagerCoordinator.init(); } @@ -209,9 +211,6 @@ public void stop() throws Exception { @Override public List poll() { - - List connectRecords = this.taskManagerCoordinator.poll(); - - return connectRecords; + return this.taskManagerCoordinator.poll(); } } diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java index c299fbc531..8efb8cbc71 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/TaskManagerCoordinator.java @@ -40,16 +40,16 @@ @Slf4j public class TaskManagerCoordinator { - private static final int BATCH_MAX = 10; - private static final int DEFAULT_QUEUE_SIZE = 1 << 13; + private final BlockingQueue recordBlockingQueue; + private final Map taskManagerCache = new HashMap<>(8); + private final int maxBatchSize; + private final long maxPollTimeout; - private BlockingQueue recordBlockingQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE); - private Map taskManagerCache = new HashMap<>(8); - /** - * Constructs a new TaskManagerCoordinator. - */ - public TaskManagerCoordinator() { + public TaskManagerCoordinator(int capacity, int maxBatchSize, long maxPollTimeout) { + this.recordBlockingQueue = new LinkedBlockingQueue<>(capacity); + this.maxBatchSize = maxBatchSize; + this.maxPollTimeout = maxPollTimeout; } /** @@ -96,10 +96,13 @@ public void start() { * @return A list of ConnectRecords, up to the maximum batch size defined by BATCH_MAX. */ public List poll() { - List records = new ArrayList<>(BATCH_MAX); - for (int index = 0; index < BATCH_MAX; ++index) { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollTimeout; + + List records = new ArrayList<>(maxBatchSize); + for (int index = 0; index < maxBatchSize; ++index) { try { - ConnectRecord record = recordBlockingQueue.poll(3, TimeUnit.SECONDS); + ConnectRecord record = recordBlockingQueue.poll(remainingTime, TimeUnit.MILLISECONDS); if (Objects.isNull(record)) { break; } @@ -107,6 +110,10 @@ public List poll() { log.debug("record:{}", JsonUtils.toJSONString(record)); } records.add(record); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollTimeout > elapsedTime ? maxPollTimeout - elapsedTime : 0; } catch (InterruptedException e) { break; } diff --git a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java index d573126934..f771e907cb 100644 --- a/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java @@ -45,7 +45,7 @@ public class KafkaSourceConnector implements Source { private KafkaConsumer kafkaConsumer; - private int pollTimeOut = 100; + private long maxPollWaitTime; @Override public Class configClass() { @@ -75,7 +75,7 @@ private void doInit() { props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, sourceConfig.getConnectorConfig().getMaxPollRecords()); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, sourceConfig.getConnectorConfig().getAutoCommitIntervalMS()); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sourceConfig.getConnectorConfig().getSessionTimeoutMS()); - this.pollTimeOut = sourceConfig.getConnectorConfig().getPollTimeOut(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); this.kafkaConsumer = new KafkaConsumer<>(props); } @@ -106,7 +106,7 @@ public void stop() { @Override public List poll() { - ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(pollTimeOut)); + ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(maxPollWaitTime)); List connectRecords = new ArrayList<>(records.count()); for (ConsumerRecord record : records) { Long timestamp = System.currentTimeMillis(); diff --git a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java index df3f66d6a6..1d1dcc1843 100644 --- a/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-mongodb/src/main/java/org/apache/eventmesh/connector/mongodb/source/connector/MongodbSourceConnector.java @@ -42,10 +42,12 @@ public class MongodbSourceConnector implements Source { private MongodbSourceConfig sourceConfig; - private static final int DEFAULT_BATCH_SIZE = 10; - private BlockingQueue queue; + private int maxBatchSize; + + private long maxPollWaitTime; + private MongodbSourceClient client; @Override @@ -67,7 +69,9 @@ public void init(ConnectorContext connectorContext) throws Exception { } private void doInit() { - this.queue = new LinkedBlockingQueue<>(1000); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); String connectorType = sourceConfig.getConnectorConfig().getConnectorType(); if (connectorType.equals(ClusterType.STANDALONE.name())) { this.client = new MongodbStandaloneSourceClient(sourceConfig.getConnectorConfig(), queue); @@ -105,15 +109,21 @@ public void stop() throws Exception { @Override public List poll() { - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); - for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; + + List connectRecords = new ArrayList<>(maxBatchSize); + for (int count = 0; count < maxBatchSize; ++count) { try { - CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (event == null) { break; } - connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { break; } diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java index 534ecfb79d..e40c451ff8 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java @@ -35,12 +35,14 @@ @Slf4j public class OpenFunctionSourceConnector implements Source { - private static final int DEFAULT_BATCH_SIZE = 10; - private OpenFunctionSourceConfig sourceConfig; private BlockingQueue queue; + private int maxBatchSize; + + private long maxPollWaitTime; + @Override public Class configClass() { return OpenFunctionSourceConfig.class; @@ -50,7 +52,7 @@ public Class configClass() { public void init(Config config) throws Exception { // init config for openfunction source connector this.sourceConfig = (OpenFunctionSourceConfig) config; - this.queue = new LinkedBlockingQueue<>(1000); + doInit(); } @Override @@ -58,7 +60,14 @@ public void init(ConnectorContext connectorContext) throws Exception { SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; // init config for openfunction source connector this.sourceConfig = (OpenFunctionSourceConfig) sourceConnectorContext.getSourceConfig(); - this.queue = new LinkedBlockingQueue<>(1000); + doInit(); + } + + private void doInit() { + // init config for openfunction source connector + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); } @Override @@ -92,16 +101,21 @@ public BlockingQueue queue() { @Override public List poll() { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); - - for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + List connectRecords = new ArrayList<>(maxBatchSize); + for (int count = 0; count < maxBatchSize; ++count) { try { - ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS); + ConnectRecord connectRecord = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (connectRecord == null) { break; } connectRecords.add(connectRecord); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { Thread currentThread = Thread.currentThread(); log.warn("[OpenFunctionSourceConnector] Interrupting thread {} due to exception {}", diff --git a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java index 836779dbcf..4b5e4751b3 100644 --- a/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-pravega/src/main/java/org/apache/eventmesh/connector/pravega/source/connector/PravegaSourceConnector.java @@ -57,8 +57,6 @@ public class PravegaSourceConnector implements Source { private static final AtomicBoolean started = new AtomicBoolean(false); - private static final int DEFAULT_BATCH_SIZE = 10; - private PravegaSourceConfig sourceConfig; private StreamManager streamManager; @@ -71,6 +69,10 @@ public class PravegaSourceConnector implements Source { private BlockingQueue queue; + private int maxBatchSize; + + private long maxPollWaitTime; + private final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, @@ -89,7 +91,9 @@ public void init(Config config) throws Exception { public void init(ConnectorContext connectorContext) throws Exception { SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; this.sourceConfig = (PravegaSourceConfig) sourceConnectorContext.getSourceConfig(); - this.queue = new LinkedBlockingQueue<>(1000); + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); streamManager = StreamManager.create(sourceConfig.getConnectorConfig().getControllerURI()); ClientConfig.ClientConfigBuilder clientConfigBuilder = @@ -168,15 +172,21 @@ public void stop() { @Override public List poll() { - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); - for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; + + List connectRecords = new ArrayList<>(maxBatchSize); + for (int count = 0; count < maxBatchSize; ++count) { try { - CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (event == null) { break; } - connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { break; } diff --git a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java index 0b7e726bda..a19b159c1c 100644 --- a/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rabbitmq/src/main/java/org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.java @@ -54,10 +54,12 @@ public class RabbitMQSourceConnector implements Source { private volatile boolean started = false; - private static final int DEFAULT_BATCH_SIZE = 10; - private BlockingQueue queue; + private int maxBatchSize; + + private long maxPollWaitTime; + private final RabbitmqConnectionFactory rabbitmqConnectionFactory = new RabbitmqConnectionFactory(); private RabbitMQSourceHandler rabbitMQSourceHandler; @@ -84,7 +86,9 @@ public void init(Config config) throws Exception { @Override public void init(ConnectorContext connectorContext) throws Exception { - this.queue = new LinkedBlockingQueue<>(1000); + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); this.sourceConfig = (RabbitMQSourceConfig) ((SourceConnectorContext) connectorContext).getSourceConfig(); this.rabbitmqClient = new RabbitmqClient(rabbitmqConnectionFactory); this.connection = rabbitmqClient.getConnection(sourceConfig.getConnectorConfig().getHost(), @@ -139,15 +143,21 @@ public void stop() { @Override public List poll() { - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); - for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; + + List connectRecords = new ArrayList<>(maxBatchSize); + for (int count = 0; count < maxBatchSize; ++count) { try { - CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (event == null) { break; } - connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { break; } diff --git a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java index 868639c205..5b858afa30 100644 --- a/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java @@ -40,8 +40,6 @@ public class RedisSourceConnector implements Source { - private static final int DEFAULT_BATCH_SIZE = 10; - private RTopic topic; private RedisSourceConfig sourceConfig; @@ -50,6 +48,10 @@ public class RedisSourceConnector implements Source { private BlockingQueue queue; + private int maxBatchSize; + + private long maxPollWaitTime; + @Override public Class configClass() { return RedisSourceConfig.class; @@ -73,7 +75,9 @@ private void doInit() { redisConfig.useSingleServer().setAddress(sourceConfig.connectorConfig.getServer()); redisConfig.setCodec(CloudEventCodec.getInstance()); this.redissonClient = Redisson.create(redisConfig); - this.queue = new LinkedBlockingQueue<>(1000); + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); } @Override @@ -107,15 +111,21 @@ public void stop() throws Exception { @Override public List poll() { - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); - for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; + + List connectRecords = new ArrayList<>(maxBatchSize); + for (int count = 0; count < maxBatchSize; ++count) { try { - CloudEvent event = queue.poll(3, TimeUnit.SECONDS); + CloudEvent event = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (event == null) { break; } - connectRecords.add(CloudEventUtil.convertEventToRecord(event)); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { break; } diff --git a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java index db286eb609..6efed2db3c 100644 --- a/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-spring/src/main/java/org/apache/eventmesh/connector/spring/source/connector/SpringSourceConnector.java @@ -52,14 +52,16 @@ public class SpringSourceConnector implements Source, MessageSendingOperations, private static final String CONNECTOR_PROPERTY_PREFIX = "eventmesh.connector."; - private static final int DEFAULT_BATCH_SIZE = 10; - private ApplicationContext applicationContext; private SpringSourceConfig sourceConfig; private BlockingQueue queue; + private int maxBatchSize; + + private long maxPollWaitTime; + @Override public Class configClass() { return SpringSourceConfig.class; @@ -69,7 +71,7 @@ public Class configClass() { public void init(Config config) throws Exception { // init config for spring source connector this.sourceConfig = (SpringSourceConfig) config; - this.queue = new LinkedBlockingQueue<>(1000); + doInit(); } @Override @@ -77,7 +79,13 @@ public void init(ConnectorContext connectorContext) throws Exception { SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext; // init config for spring source connector this.sourceConfig = (SpringSourceConfig) sourceConnectorContext.getSourceConfig(); - this.queue = new LinkedBlockingQueue<>(1000); + doInit(); + } + + private void doInit() { + this.queue = new LinkedBlockingQueue<>(sourceConfig.getPollConfig().getCapacity()); + this.maxBatchSize = sourceConfig.getPollConfig().getMaxBatchSize(); + this.maxPollWaitTime = sourceConfig.getPollConfig().getMaxWaitTime(); } @Override @@ -107,15 +115,21 @@ public void stop() throws Exception { @Override public List poll() { - List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); + long startTime = System.currentTimeMillis(); + long remainingTime = maxPollWaitTime; - for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + List connectRecords = new ArrayList<>(maxBatchSize); + for (int count = 0; count < maxBatchSize; ++count) { try { - ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS); + ConnectRecord connectRecord = queue.poll(remainingTime, TimeUnit.MILLISECONDS); if (connectRecord == null) { break; } connectRecords.add(connectRecord); + + // calculate elapsed time and update remaining time for next poll + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = maxPollWaitTime > elapsedTime ? maxPollWaitTime - elapsedTime : 0; } catch (InterruptedException e) { Thread currentThread = Thread.currentThread(); log.warn("[SpringSourceConnector] Interrupting thread {} due to exception {}",