Skip to content

Commit

Permalink
[Improve#3068] Improve streampark-console module base on [3.5 Concurr…
Browse files Browse the repository at this point in the history
…ent Processing] (#3435)

* [Improve] Unified management of thread pools

* [Improve] replace thread pool in ApplicationActionServiceImpl

* [Improve] replace thread pool in FlinkAppHttpWatcher

* [Improve] replace thread pool in FlinkClusterWatcher

* [Improve] replace thread pool in ProjectServiceImpl

* [Improve] replace thread pool in SavePointServiceImpl

* [Improve] replace thread pool in FlinkK8sChangeEventListener

* [Improve] replace thread pool in FlinkClusterServiceImpl AppBuildPipeServiceImpl
  • Loading branch information
zzzk1 authored Dec 30, 2023
1 parent a1a564f commit f71549b
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.streampark.console.base.config;

import org.apache.streampark.common.util.ThreadUtils;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class AsyncExecutorPoolConfig extends AsyncConfigurerSupport {
Expand All @@ -40,4 +45,104 @@ public Executor taskExecutor() {
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

@Bean("triggerSavepointExecutor")
public Executor savepointExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("trigger-savepoint-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}

@Bean("flinkRestAPIWatchingExecutor")
public Executor restAPIWatchingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("flink-restapi-watching-executor-");
return executor;
}

@Bean("flinkClusterWatchingExecutor")
public Executor clusterWatchingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("flink-cluster-watching-executor-");
return executor;
}

@Bean("streamparkBuildPipelineExecutor")
public ExecutorService pipelineExecutor() {
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
new ThreadPoolExecutor.AbortPolicy());
}

@Bean("streamparkClusterExecutor")
public ExecutorService clusterExecutor() {
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-cluster-executor"),
new ThreadPoolExecutor.AbortPolicy());
}

@Bean("streamparkNotifyExecutor")
public Executor notifyExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(20);
executor.setThreadNamePrefix("streampark-notify-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}

@Bean("streamparkDeployExecutor")
public Executor deployExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("streampark-deploy-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}

@Bean("streamparkBuildExecutor")
public Executor buildExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 5);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);
executor.setQueueCapacity(1024);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("streampark-build-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
Expand Down Expand Up @@ -101,6 +100,7 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -117,10 +117,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executor;

import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId;
import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp;
Expand All @@ -131,15 +128,9 @@
public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, Application>
implements ApplicationActionService {

private final ExecutorService executorService =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-deploy-executor"),
new ThreadPoolExecutor.AbortPolicy());
@Qualifier("streamparkDeployExecutor")
@Autowired
private Executor executorService;

@Autowired private ApplicationBackUpService backUpService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.util.JacksonUtils;
Expand Down Expand Up @@ -96,6 +95,7 @@
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -111,8 +111,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -149,15 +147,9 @@ public class AppBuildPipeServiceImpl

@Autowired private ResourceService resourceService;

private final ExecutorService executorService =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-build-pipeline-executor"),
new ThreadPoolExecutor.AbortPolicy());
@Qualifier("streamparkBuildPipelineExecutor")
@Autowired
private ExecutorService executorService;

private static final Cache<Long, DockerPullSnapshot> DOCKER_PULL_PG_SNAPSHOTS =
Caffeine.newBuilder().expireAfterWrite(30, TimeUnit.DAYS).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
Expand Down Expand Up @@ -50,6 +49,7 @@
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -63,8 +63,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand All @@ -78,15 +76,9 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli
private static final String ERROR_CLUSTER_QUEUE_HINT =
"Queue label '%s' isn't available in database, please add it first.";

private final ExecutorService executorService =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-cluster-executor"),
new ThreadPoolExecutor.AbortPolicy());
@Qualifier("streamparkClusterExecutor")
@Autowired
private ExecutorService executorService;

@Autowired private FlinkEnvService flinkEnvService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
Expand All @@ -50,6 +49,7 @@
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -68,9 +68,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -85,15 +83,9 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>

@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;

private final ExecutorService executorService =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-build-executor"),
new ThreadPoolExecutor.AbortPolicy());
@Qualifier("streamparkBuildExecutor")
@Autowired
private Executor executorService;

@Override
public RestResponse create(Project project) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.Constant;
import org.apache.streampark.console.base.domain.RestRequest;
Expand Down Expand Up @@ -61,6 +60,7 @@
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -74,9 +74,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -105,15 +103,9 @@ public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint

@Autowired private CommonServiceImpl commonService;

private final ExecutorService executorService =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("trigger-savepoint-executor"),
new ThreadPoolExecutor.AbortPolicy());
@Qualifier("triggerSavepointExecutor")
@Autowired
private Executor executorService;

@Override
public void expire(Long appId) {
Expand Down
Loading

0 comments on commit f71549b

Please sign in to comment.