diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java index b50ffe2893..5222c0cfb8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/AsyncExecutorPoolConfig.java @@ -17,13 +17,18 @@ package org.apache.streampark.console.base.config; +import org.apache.streampark.common.util.ThreadUtils; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurerSupport; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; @Configuration public class AsyncExecutorPoolConfig extends AsyncConfigurerSupport { @@ -40,4 +45,104 @@ public Executor taskExecutor() { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } + + @Bean("triggerSavepointExecutor") + public Executor savepointExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("trigger-savepoint-executor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + return executor; + } + + @Bean("flinkRestAPIWatchingExecutor") + public Executor restAPIWatchingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("flink-restapi-watching-executor-"); + return executor; + } + + @Bean("flinkClusterWatchingExecutor") + public Executor clusterWatchingExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("flink-cluster-watching-executor-"); + return executor; + } + + @Bean("streamparkBuildPipelineExecutor") + public ExecutorService pipelineExecutor() { + return new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors() * 5, + Runtime.getRuntime().availableProcessors() * 10, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), + ThreadUtils.threadFactory("streampark-build-pipeline-executor"), + new ThreadPoolExecutor.AbortPolicy()); + } + + @Bean("streamparkClusterExecutor") + public ExecutorService clusterExecutor() { + return new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors() * 5, + Runtime.getRuntime().availableProcessors() * 10, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), + ThreadUtils.threadFactory("streampark-cluster-executor"), + new ThreadPoolExecutor.AbortPolicy()); + } + + @Bean("streamparkNotifyExecutor") + public Executor notifyExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(20); + executor.setThreadNamePrefix("streampark-notify-executor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + return executor; + } + + @Bean("streamparkDeployExecutor") + public Executor deployExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("streampark-deploy-executor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + return executor; + } + + @Bean("streamparkBuildExecutor") + public Executor buildExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); + executor.setQueueCapacity(1024); + executor.setKeepAliveSeconds(60); + executor.setThreadNamePrefix("streampark-build-executor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + return executor; + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index 963a92d451..86384bab25 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -31,7 +31,6 @@ import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.PropertiesUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApplicationException; @@ -101,6 +100,7 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -117,10 +117,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId; import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp; @@ -131,15 +128,9 @@ public class ApplicationActionServiceImpl extends ServiceImpl implements ApplicationActionService { - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-deploy-executor"), - new ThreadPoolExecutor.AbortPolicy()); + @Qualifier("streamparkDeployExecutor") + @Autowired + private Executor executorService; @Autowired private ApplicationBackUpService backUpService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 36111230ec..6d11c21eaa 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -26,7 +26,6 @@ import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.ExceptionUtils; import org.apache.streampark.common.util.FileUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.util.JacksonUtils; @@ -96,6 +95,7 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -111,8 +111,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -149,15 +147,9 @@ public class AppBuildPipeServiceImpl @Autowired private ResourceService resourceService; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-build-pipeline-executor"), - new ThreadPoolExecutor.AbortPolicy()); + @Qualifier("streamparkBuildPipelineExecutor") + @Autowired + private ExecutorService executorService; private static final Cache DOCKER_PULL_PG_SNAPSHOTS = Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index b85313126f..9400da6d9b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -20,7 +20,6 @@ import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkExecutionMode; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; @@ -50,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -63,8 +63,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -78,15 +76,9 @@ public class FlinkClusterServiceImpl extends ServiceImpl(1024), - ThreadUtils.threadFactory("streampark-cluster-executor"), - new ThreadPoolExecutor.AbortPolicy()); + @Qualifier("streamparkClusterExecutor") + @Autowired + private ExecutorService executorService; @Autowired private FlinkEnvService flinkEnvService; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java index 55ca1dc53e..b8abe3fc76 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java @@ -23,7 +23,6 @@ import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.FileUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestRequest; @@ -50,6 +49,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -68,9 +68,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -85,15 +83,9 @@ public class ProjectServiceImpl extends ServiceImpl @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher; - private final ExecutorService executorService = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-build-executor"), - new ThreadPoolExecutor.AbortPolicy()); + @Qualifier("streamparkBuildExecutor") + @Autowired + private Executor executorService; @Override public RestResponse create(Project project) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 6d71af900e..87052ad8fc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -20,7 +20,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.util.CompletableFutureUtils; import org.apache.streampark.common.util.ExceptionUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.domain.Constant; import org.apache.streampark.console.base.domain.RestRequest; @@ -61,6 +60,7 @@ import com.google.common.annotations.VisibleForTesting; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -74,9 +74,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -105,15 +103,9 @@ public class SavePointServiceImpl extends ServiceImpl(1024), - ThreadUtils.threadFactory("trigger-savepoint-executor"), - new ThreadPoolExecutor.AbortPolicy()); + @Qualifier("triggerSavepointExecutor") + @Autowired + private Executor executorService; @Override public void expire(Long appId) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 8d200d530c..b4bf9c8b53 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -19,7 +19,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.util.HttpClientUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.util.JacksonUtils; @@ -52,6 +51,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -68,9 +68,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -95,6 +93,10 @@ public class FlinkAppHttpWatcher { @Autowired private FlinkClusterWatcher flinkClusterWatcher; + @Qualifier("flinkRestAPIWatchingExecutor") + @Autowired + private Executor executorService; + // track interval every 5 seconds public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5); @@ -158,15 +160,6 @@ public class FlinkAppHttpWatcher { private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0"); - private static final ExecutorService EXECUTOR = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("flink-restapi-watching-executor")); - @PostConstruct public void init() { WATCHING_APPS.clear(); @@ -217,7 +210,7 @@ public void start() { } private void watch(Long id, Application application) { - EXECUTOR.execute( + executorService.execute( () -> { try { // query status from flink rest api diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java index 4216683030..dada561c07 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java @@ -23,7 +23,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.util.HadoopUtils; import org.apache.streampark.common.util.HttpClientUtils; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.core.bean.AlertTemplate; @@ -43,6 +42,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -53,9 +53,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */ @@ -69,6 +67,10 @@ public class FlinkClusterWatcher { @Autowired private ApplicationInfoService applicationInfoService; + @Qualifier("flinkClusterWatchingExecutor") + @Autowired + private Executor executorService; + private Long lastWatchTime = 0L; // Track interval every 30 seconds @@ -82,16 +84,6 @@ public class FlinkClusterWatcher { private boolean immediateWatch = false; - /** Thread pool for processing status monitoring for each cluster */ - private static final ExecutorService EXECUTOR = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("flink-cluster-watching-executor")); - /** Initialize cluster cache */ @PostConstruct private void init() { @@ -113,7 +105,7 @@ private void start() { immediateWatch = false; WATCHER_CLUSTERS.forEach( (aLong, flinkCluster) -> - EXECUTOR.execute( + executorService.execute( () -> { ClusterState state = getClusterState(flinkCluster); switch (state) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java index 358f356fcf..0284b4346d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkK8sChangeEventListener.java @@ -18,7 +18,6 @@ package org.apache.streampark.console.core.watcher; import org.apache.streampark.common.enums.FlinkExecutionMode; -import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.console.core.bean.AlertTemplate; import org.apache.streampark.console.core.component.FlinkCheckpointProcessor; import org.apache.streampark.console.core.entity.Application; @@ -42,14 +41,12 @@ import com.google.common.eventbus.Subscribe; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import java.util.Date; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; import scala.Enumeration; @@ -69,21 +66,16 @@ public class FlinkK8sChangeEventListener { @Lazy @Autowired private ApplicationManageService applicationManageService; + @Autowired private ApplicationInfoService applicationInfoService; @Lazy @Autowired private AlertService alertService; @Lazy @Autowired private FlinkCheckpointProcessor checkpointProcessor; - private final ExecutorService executor = - new ThreadPoolExecutor( - Runtime.getRuntime().availableProcessors() * 5, - Runtime.getRuntime().availableProcessors() * 10, - 20L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024), - ThreadUtils.threadFactory("streampark-notify-executor"), - new ThreadPoolExecutor.AbortPolicy()); + @Qualifier("streamparkNotifyExecutor") + @Autowired + private Executor executorService; /** * Catch FlinkJobStatusChangeEvent then storage it persistently to db. Actually update @@ -110,7 +102,7 @@ public void subscribeJobStatusChange(FlinkJobStatusChangeEvent event) { || FlinkAppStateEnum.LOST == state || FlinkAppStateEnum.RESTARTING == state || FlinkAppStateEnum.FINISHED == state) { - executor.execute( + executorService.execute( () -> { if (app.getProbing()) { log.info("application with id {} is probing, don't send alert", app.getId());