From 392613280ed996aa1783f8033f720e388d7e3ad3 Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Sun, 22 Oct 2023 22:46:12 +0800 Subject: [PATCH] [Hotfix] Add interface document and polish code (#3268) --- .../console/core/aspect/ConsoleAspect.java | 7 +- .../service/ApplicationBackUpService.java | 42 +++++++++ .../service/ApplicationConfigService.java | 86 +++++++++++++++++-- .../console/core/service/FlinkEnvService.java | 57 +++++++----- .../core/service/SavePointService.java | 52 ++++++++++- .../console/core/service/SettingService.java | 32 +++++++ .../console/core/service/VariableService.java | 64 +++++++++++--- .../service/impl/AppBuildPipeServiceImpl.java | 3 +- .../impl/ApplicationConfigServiceImpl.java | 54 ++++++------ .../service/impl/SavePointServiceImpl.java | 12 +-- .../core/watcher/FlinkAppHttpWatcher.java | 2 +- .../core/watcher/FlinkAppLostWatcher.java | 7 +- .../core/websocket/WebSocketEndpoint.java | 23 ++--- .../system/authentication/JWTUtil.java | 2 +- .../doris/internal/DorisSinkFunction.java | 3 +- 15 files changed, 347 insertions(+), 99 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java index 303730bb94..c500c61d0d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/aspect/ConsoleAspect.java @@ -30,7 +30,6 @@ import org.apache.streampark.console.core.service.application.ApplicationManageService; import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher; import org.apache.streampark.console.system.entity.AccessToken; -import org.apache.streampark.console.system.entity.Member; import org.apache.streampark.console.system.entity.User; import org.apache.streampark.console.system.service.MemberService; @@ -126,17 +125,15 @@ public RestResponse permissionAction(ProceedingJoinPoint joinPoint) throws Throw "Permission denied, only user himself can access this permission"); break; case TEAM: - Member member = memberService.findByUserName(paramId, currentUser.getUsername()); ApiAlertException.throwIfTrue( - member == null, + memberService.findByUserName(paramId, currentUser.getUsername()) == null, "Permission denied, only user belongs to this team can access this permission"); break; case APP: Application app = applicationManageService.getById(paramId); ApiAlertException.throwIfTrue(app == null, "Invalid operation, application is null"); - member = memberService.findByUserName(app.getTeamId(), currentUser.getUsername()); ApiAlertException.throwIfTrue( - member == null, + memberService.findByUserName(app.getTeamId(), currentUser.getUsername()) == null, "Permission denied, only user belongs to this team can access this permission"); break; default: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java index 6f441e2f1f..e4eed3c1fc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationBackUpService.java @@ -26,19 +26,61 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; +/** Interface representing a service for application backup operations. */ public interface ApplicationBackUpService extends IService { + /** + * Deletes an object specified by the given ID. + * + * @param id The ID of the object to delete. + * @return true if the object was successfully deleted, false otherwise. + * @throws InternalException if an internal error occurs during the deletion process. + */ Boolean delete(Long id) throws InternalException; + /** + * Performs a backup for the given application and Flink SQL parameters. + * + * @param appParam The application to back up. + * @param flinkSqlParam The Flink SQL to back up. + */ void backup(Application appParam, FlinkSql flinkSqlParam); + /** + * Retrieves a page of {@link ApplicationBackUp} objects based on the provided parameters. + * + * @param bakParam The {@link ApplicationBackUp} object containing the search criteria. + * @param request The {@link RestRequest} object used for pagination and sorting. + * @return An {@link IPage} containing the retrieved {@link ApplicationBackUp} objects. + */ IPage page(ApplicationBackUp bakParam, RestRequest request); + /** + * Rolls back the changes made by the specified application backup. + * + * @param bakParam The ApplicationBackUp object representing the backup to roll back. + */ void rollback(ApplicationBackUp bakParam); + /** + * Revoke the given application. + * + * @param appParam The application to be revoked. + */ void revoke(Application appParam); + /** + * Removes the specified application. + * + * @param appParam the application to be removed + */ void removeApp(Application appParam); + /** + * Rolls back a Flink SQL application to its previous state. + * + * @param appParam The application to rollback. + * @param flinkSqlParam The Flink SQL instance associated with the application. + */ void rollbackFlinkSql(Application appParam, FlinkSql flinkSqlParam); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java index 1fa81cd57e..d3531b67fc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationConfigService.java @@ -26,27 +26,101 @@ import java.util.List; +/** This interface defines the methods to manage the application configuration. */ public interface ApplicationConfigService extends IService { - void create(Application application, Boolean latest); - - void update(Application application, Boolean latest); - + /** + * Creates a new instance of an Application. + * + * @param appParam The Application object to create. + * @param latest If set to true, sets the created Application as the latest version. + */ + void create(Application appParam, Boolean latest); + + /** + * Updates the given application. + * + * @param appParam the application to be updated + * @param latest a boolean indicating whether to update to the latest version + */ + void update(Application appParam, Boolean latest); + + /** + * Sets the latest or effective flag for a given configuration and application. The latest flag + * determines whether the configuration is the latest version available. The effective flag + * determines whether the configuration is effective for the application. + * + * @param latest a boolean value indicating whether the configuration is the latest version (true) + * or not (false) + * @param configId the ID of the configuration + * @param appId the ID of the application + */ void setLatestOrEffective(Boolean latest, Long configId, Long appId); + /** + * Sets the configuration to effective for the given application and configuration ID. + * + * @param appId The ID of the application + * @param configId The ID of the configuration + */ void toEffective(Long appId, Long configId); + /** + * Returns the latest version of the application configuration for the given application ID. + * + * @param appId The ID of the application + * @return The latest version of the application configuration + */ ApplicationConfig getLatest(Long appId); + /** + * Retrieves the effective ApplicationConfig for the given appId. + * + * @param appId The identifier of the application. + * @return The effective ApplicationConfig. + */ ApplicationConfig getEffective(Long appId); + /** + * Retrieves the ApplicationConfig for the specified ID. + * + * @param id the ID of the ApplicationConfig to retrieve + * @return the ApplicationConfig object corresponding to the specified ID, or null if no + * ApplicationConfig is found + */ ApplicationConfig get(Long id); + /** + * Retrieves a page of ApplicationConfig objects based on the specified ApplicationConfig and + * RestRequest. + * + * @param config the ApplicationConfig object to use as a filter for retrieving the page + * @param request the RestRequest object containing additional parameters and settings for + * retrieving the page + * @return an IPage containing the ApplicationConfig objects that match the filter criteria + * specified in the config object, limited by the settings in the request object + */ IPage page(ApplicationConfig config, RestRequest request); - List history(Application application); - + /** + * Retrieves the history of application configurations for a given application. + * + * @param appParam The application for which to retrieve the history. + * @return The list of application configurations representing the history. + */ + List history(Application appParam); + + /** + * Reads a template from a file or a database. + * + * @return the content of the template as a String + */ String readTemplate(); + /** + * Removes the app with the specified appId. + * + * @param appId The id of the app to be removed. + */ void removeApp(Long appId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java index c00f934b98..f47e53c953 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java @@ -26,71 +26,84 @@ public interface FlinkEnvService extends IService { /** - * check exists + * Checks if a specific version of Flink exists. * - * @param version - * @return + * @param version The version of Flink to check. + * @return Returns an Integer value indicating the existence of the specified version: - 0 if the + * version exists - 1 if the version does not exist - null if the version is invalid or an + * error occurred during the check */ Integer check(FlinkEnv version); /** - * create new + * Create a new instance. * - * @param version - * @throws IOException + * @param version The version of FlinkEnv to use. + * @throws Exception if an error occurs during the creation process. + * @return true if the instance is successfully created, false otherwise. */ boolean create(FlinkEnv version) throws Exception; /** - * delete flink env + * Deletes a Flink environment with the provided ID. * - * @param id + * @param id the ID of the Flink environment to delete */ void delete(Long id); /** - * update + * Updates the specified version of Flink environment. * - * @param version - * @throws IOException + * @param version the version of Flink environment to update + * @throws IOException if an I/O error occurs during the update process */ void update(FlinkEnv version) throws IOException; /** - * get flink version by appid + * Get flink version by application id. * - * @param appId - * @return + * @param appId the ID of the application + * @return the FlinkEnv object representing the version of Flink associated with the given app ID */ FlinkEnv getByAppId(Long appId); /** - * set a flink version as the default + * Sets the specified Flink version as the default. * - * @param id + * @param id The ID of the Flink version to set as the default. */ void setDefault(Long id); /** - * get default version + * Retrieves the default version of FlinkEnv. * - * @return + * @return the default version of FlinkEnv */ FlinkEnv getDefault(); /** - * get flink version, if null, get default version + * Retrieves a Flink environment by ID, if available. If the ID is null or not found, the method + * returns the default Flink environment. * - * @return + * @param id The ID of the Flink environment to retrieve. If null, the default environment will be + * retrieved. + * @return The Flink environment with the specified ID, or the default environment if the ID is + * null or not found. */ FlinkEnv getByIdOrDefault(Long id); /** - * sycn conf file + * Synchronizes the configuration file for the given id. * - * @param id + * @param id The id of the configuration file to be synchronized. + * @throws IOException If an I/O error occurs while synchronizing the configuration file. */ void syncConf(Long id) throws IOException; + /** + * Checks the validity of the given ID. + * + * @param id The ID to check for validity. + */ void validity(Long id); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java index 684709dc7d..20e772934a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavePointService.java @@ -29,17 +29,63 @@ public interface SavePointService extends IService { + /** + * Expires all savepoints for the specified application. + * + * @param appId the ID of the application to expire + */ void expire(Long appId); + /** + * Retrieves the latest savepoint based on the given id. + * + * @param id the unique identifier of the SavePoint + * @return the latest SavePoint object, or null if not found + */ SavePoint getLatest(Long id); + /** + * Triggers a savepoint for the specified application. + * + * @param appId the ID of the application to trigger the savepoint for + * @param savepointPath the path where the savepoint will be stored, or null if the default path + * should be used + * @param nativeFormat true to store the savepoint in native format, false otherwise + */ void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean nativeFormat); - Boolean delete(Long id, Application application) throws InternalException; + /** + * Deletes an application with the specified ID. + * + * @param id the ID of the application to be deleted + * @param appParam the application object representing the application to be deleted + * @return true if the application is successfully deleted, false otherwise + * @throws InternalException if there is an internal error during the deletion process + */ + Boolean delete(Long id, Application appParam) throws InternalException; + /** + * Retrieves a page of savepoint objects based on the specified parameters. + * + * @param savePoint The SavePoint object to be used for filtering the page results. + * @param request The RestRequest object containing additional request parameters. + * @return An instance of IPage representing the page of SavePoint objects. + */ IPage page(SavePoint savePoint, RestRequest request); - void removeApp(Application application); + /** + * Removes all savepoints for the specified application. + * + * @param appParam the application to be removed + */ + void removeApp(Application appParam); - String getSavePointPath(Application app) throws Exception; + /** + * Returns the savepoint path for the given application. + * + * @param appParam the application for which to get the save point path + * @return the save point path for the given application + * @throws Exception if an error occurs while getting the save point path + */ + String getSavePointPath(Application appParam) throws Exception; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java index cb8cfb0070..8dafd32a6b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SettingService.java @@ -48,15 +48,47 @@ public interface SettingService extends IService { String KEY_INGRESS_MODE_DEFAULT = "ingress.mode.default"; + /** + * Retrieves the value of the setting associated with the specified key. + * + * @param key the key of the setting to retrieve + * @return the value of the setting if found, null otherwise + */ Setting get(String key); + /** + * Updates the specified Setting. + * + * @param setting the Setting object to update + * @return true if the update is successful, false otherwise + */ boolean update(Setting setting); + /** + * Retrieves the Maven configuration settings. + * + * @return The MavenConfig object containing the Maven configuration settings. + */ MavenConfig getMavenConfig(); + /** + * Retrieves the Docker configuration settings. + * + * @return The DockerConfig object representing the configuration for Docker. + */ DockerConfig getDockerConfig(); + /** + * Retrieves the StreamPark address. + * + * @return a String representing the StreamPark address. + */ String getStreamParkAddress(); + /** + * Retrieves the default ingress mode. + * + * @return The default ingress mode. + */ String getIngressModeDefault(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java index c255b8254f..82a3291516 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/VariableService.java @@ -29,47 +29,85 @@ public interface VariableService extends IService { /** - * find variable + * Find variables based on the given variable and query request. * - * @param variable variable - * @param restRequest queryRequest - * @return IPage + * @param variable The variable to search for. + * @param restRequest The query request containing search filters and pagination options. + * @return An IPage object containing the found Variable objects matching the search criteria. */ IPage page(Variable variable, RestRequest restRequest); /** - * get variables through team + * Retrieves a list of variables based on the team ID. * - * @param teamId - * @return + * @param teamId The ID of the team to filter the variables by. + * @return A list of variables that belong to the specified team. */ List findByTeamId(Long teamId); /** - * Get variables through team and search keywords. + * Retrieve a list of variables based on the team ID and search keywords. * - * @param teamId - * @param keyword Fuzzy search keywords through variable code or description, Nullable. - * @return + * @param teamId The ID of the team for which to retrieve the variables. + * @param keyword The fuzzy search keywords used to filter the variables. This parameter is + * nullable. + * @return A List of Variable objects that match the specified team ID and search keywords. */ List findByTeamId(Long teamId, String keyword); + /** + * Check if a team exists by teamId. + * + * @param teamId the id of the team to check. + * @return true if a team exists with the given teamId, false otherwise. + */ boolean existsByTeamId(Long teamId); /** - * create variable + * Create a variable. * - * @param variable variable + * @param variable The variable to be created. */ void createVariable(Variable variable); + /** + * Deletes a Variable. + * + * @param variable the Variable object to be deleted + */ void deleteVariable(Variable variable); + /** + * Find a Variable by its code and team ID. + * + * @param teamId The ID of the team to search within. + * @param variableCode The code of the variable to find. + * @return The Variable found, or null if no match is found. + */ Variable findByVariableCode(Long teamId, String variableCode); + /** + * Replaces a specified variable in the given string with the corresponding variable value. + * + * @param teamId The identifier of the team. + * @param mixed The string that may contain variables to be replaced. + * @return The modified string after replacing the variables. + */ String replaceVariable(Long teamId, String mixed); + /** + * Retrieves a page of dependent applications based on the given variable and request. + * + * @param variable The variable to use for retrieving dependent applications. + * @param request The REST request containing additional parameters for retrieving the page. + * @return An instance of IPage containing the dependent applications. + */ IPage dependAppsPage(Variable variable, RestRequest request); + /** + * Updates the given variable. + * + * @param variable the variable to be updated + */ void updateVariable(Variable variable); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 539b196d38..1c99847522 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -94,6 +94,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; @@ -175,7 +176,7 @@ public class AppBuildPipeServiceImpl * @return Whether the pipeline was successfully started */ @Override - public boolean buildApplication(Long appId, boolean forceBuild) { + public boolean buildApplication(@NotNull Long appId, boolean forceBuild) { // check the build environment checkBuildEnv(appId, forceBuild); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java index 2428b409b8..4c471c7b1e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationConfigServiceImpl.java @@ -64,15 +64,15 @@ public class ApplicationConfigServiceImpl @Autowired private EffectiveService effectiveService; @Override - public synchronized void create(Application application, Boolean latest) { - String decode = new String(Base64.getDecoder().decode(application.getConfig())); + public synchronized void create(Application appParam, Boolean latest) { + String decode = new String(Base64.getDecoder().decode(appParam.getConfig())); String config = DeflaterUtils.zipString(decode.trim()); ApplicationConfig applicationConfig = new ApplicationConfig(); - applicationConfig.setAppId(application.getId()); + applicationConfig.setAppId(appParam.getId()); - if (application.getFormat() != null) { - ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(application.getFormat()); + if (appParam.getFormat() != null) { + ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(appParam.getFormat()); ApiAlertException.throwIfTrue( fileType == null || ConfigFileTypeEnum.UNKNOWN == fileType, "application' config error. must be (.properties|.yaml|.yml |.conf)"); @@ -82,10 +82,10 @@ public synchronized void create(Application application, Boolean latest) { applicationConfig.setContent(config); applicationConfig.setCreateTime(new Date()); - Integer version = this.baseMapper.getLastVersion(application.getId()); + Integer version = this.baseMapper.getLastVersion(appParam.getId()); applicationConfig.setVersion(version == null ? 1 : version + 1); save(applicationConfig); - this.setLatestOrEffective(latest, applicationConfig.getId(), application.getId()); + this.setLatestOrEffective(latest, applicationConfig.getId(), appParam.getId()); } public void setLatest(Long appId, Long configId) { @@ -99,15 +99,15 @@ public void setLatest(Long appId, Long configId) { } @Override - public synchronized void update(Application application, Boolean latest) { + public synchronized void update(Application appParam, Boolean latest) { // flink sql job - ApplicationConfig latestConfig = getLatest(application.getId()); - if (application.isFlinkSqlJob()) { + ApplicationConfig latestConfig = getLatest(appParam.getId()); + if (appParam.isFlinkSqlJob()) { // get effect config - ApplicationConfig effectiveConfig = getEffective(application.getId()); - if (Utils.isEmpty(application.getConfig())) { + ApplicationConfig effectiveConfig = getEffective(appParam.getId()); + if (Utils.isEmpty(appParam.getConfig())) { if (effectiveConfig != null) { - effectiveService.delete(application.getId(), EffectiveTypeEnum.CONFIG); + effectiveService.delete(appParam.getId(), EffectiveTypeEnum.CONFIG); } } else { // there was no configuration before, is a new configuration @@ -115,48 +115,48 @@ public synchronized void update(Application application, Boolean latest) { if (latestConfig != null) { removeById(latestConfig.getId()); } - this.create(application, latest); + this.create(appParam, latest); } else { - String decode = new String(Base64.getDecoder().decode(application.getConfig())); + String decode = new String(Base64.getDecoder().decode(appParam.getConfig())); String encode = DeflaterUtils.zipString(decode.trim()); // need to diff the two configs are consistent if (!effectiveConfig.getContent().equals(encode)) { if (latestConfig != null) { removeById(latestConfig.getId()); } - this.create(application, latest); + this.create(appParam, latest); } } } } else { // may be re-selected a config file (without config id), or may be based on an original edit // (with config Id). - Long configId = application.getConfigId(); + Long configId = appParam.getConfigId(); // an original edit if (configId != null) { ApplicationConfig config = this.getById(configId); - String decode = new String(Base64.getDecoder().decode(application.getConfig())); + String decode = new String(Base64.getDecoder().decode(appParam.getConfig())); String encode = DeflaterUtils.zipString(decode.trim()); // create... if (!config.getContent().equals(encode)) { if (latestConfig != null) { removeById(latestConfig.getId()); } - this.create(application, latest); + this.create(appParam, latest); } else { - this.setLatestOrEffective(latest, configId, application.getId()); + this.setLatestOrEffective(latest, configId, appParam.getId()); } } else { - ApplicationConfig config = getEffective(application.getId()); + ApplicationConfig config = getEffective(appParam.getId()); if (config != null) { - String decode = new String(Base64.getDecoder().decode(application.getConfig())); + String decode = new String(Base64.getDecoder().decode(appParam.getConfig())); String encode = DeflaterUtils.zipString(decode.trim()); // create... if (!config.getContent().equals(encode)) { - this.create(application, latest); + this.create(appParam, latest); } } else { - this.create(application, latest); + this.create(appParam, latest); } } } @@ -211,14 +211,14 @@ public IPage page(ApplicationConfig config, RestRequest reque } @Override - public List history(Application application) { + public List history(Application appParam) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .eq(ApplicationConfig::getAppId, application.getId()) + .eq(ApplicationConfig::getAppId, appParam.getId()) .orderByDesc(ApplicationConfig::getVersion); List configList = this.baseMapper.selectList(queryWrapper); - fillEffectiveField(application.getId(), configList); + fillEffectiveField(appParam.getId(), configList); return configList; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index 9092e7bae5..60babca273 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -199,11 +199,11 @@ public void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolea } @Override - public Boolean delete(Long id, Application application) throws InternalException { + public Boolean delete(Long id, Application appParam) throws InternalException { SavePoint savePoint = getById(id); try { if (StringUtils.isNotEmpty(savePoint.getPath())) { - application.getFsOperator().delete(savePoint.getPath()); + appParam.getFsOperator().delete(savePoint.getPath()); } return removeById(id); } catch (Exception e) { @@ -221,17 +221,17 @@ public IPage page(SavePoint savePoint, RestRequest request) { } @Override - public void removeApp(Application application) { - Long appId = application.getId(); + public void removeApp(Application appParam) { + Long appId = appParam.getId(); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper().eq(SavePoint::getAppId, appId); this.remove(queryWrapper); try { - application + appParam .getFsOperator() - .delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString())); + .delete(appParam.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString())); } catch (Exception e) { log.error(e.getMessage(), e); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 8726415dbe..52e9de29ab 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -253,7 +253,7 @@ private void getStateFromFlink(Application application) throws Exception { ? jobsOverview.getJobs().stream() .filter(a -> StringUtils.equals(application.getJobId(), a.getId())) .findFirst() - : jobsOverview.getJobs().stream().findFirst(); + : Optional.empty(); } else { optional = jobsOverview.getJobs().stream() diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java index 30e196a3b5..1ce68b735c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppLostWatcher.java @@ -98,7 +98,7 @@ public void watch(List applications) { .filter(application -> FlinkAppStateEnum.isLost(application.getState())) .collect(Collectors.toList()); updateState(probeApplication); - probeApplication.stream().forEach(this::monitorApplication); + probeApplication.forEach(this::monitorApplication); } private void updateState(List applications) { @@ -118,7 +118,7 @@ private void handleProbeResults() { watch(probeApps); } else { List alertProbeMsgs = generateProbeResults(probeApps); - alertProbeMsgs.stream().forEach(this::alert); + alertProbeMsgs.forEach(this::alert); reset(probeApps); } } @@ -135,7 +135,8 @@ private void reset(List applications) { } private void alert(AlertProbeMsg alertProbeMsg) { - alertProbeMsg.getAlertId().stream() + alertProbeMsg + .getAlertId() .forEach((alterId) -> alertService.alert(alterId, AlertTemplate.of(alertProbeMsg))); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java index 3872b90519..49f58dba75 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Map; +@Getter @Slf4j @Component @ServerEndpoint(value = "/websocket/{id}") @@ -41,27 +42,29 @@ public class WebSocketEndpoint { private static final Map SOCKET_SESSIONS = new CopyOnWriteMap<>(); - @Getter private String id; + private String id; - @Getter private Session session; + private Session session; @OnOpen public void onOpen(Session session, @PathParam("id") String id) { - if (log.isDebugEnabled()) { - log.debug("websocket onOpen...."); - } + log.debug("Websocket onOpen...."); this.id = id; this.session = session; SOCKET_SESSIONS.put(id, session); } @OnClose - public void onClose() throws IOException { - if (log.isDebugEnabled()) { - log.debug("websocket onClose...."); + public void onClose() { + if (SOCKET_SESSIONS.containsKey(this.id)) { + try (Session remove = SOCKET_SESSIONS.remove(this.id)) { + if (remove != null) { + log.debug("Websocket onClose id: {}", this.id); + } + } catch (IOException e) { + log.error("WebSocket onClose error: {}", e.getMessage(), e); + } } - this.session.close(); - SOCKET_SESSIONS.remove(this.id); } @OnError diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java index b3c25aa0d8..9b44eeed83 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java @@ -114,7 +114,7 @@ public static String sign(Long userId, String userName, Long expireTime) { .withExpiresAt(date) .sign(algorithm); } catch (Exception e) { - log.error("error:{}", e); + log.error("error:{}", e.getMessage()); return null; } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java index 210e92e384..c0ee6ee673 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Map; import java.util.Properties; @@ -84,7 +85,7 @@ public void invoke(T value, SinkFunction.Context context) throws Exception { LOGGER.warn( String.format( " row data not fulfilled. {database: %s, table: %s, dataRows: %s}", - data.getDatabase(), data.getTable(), data.getDataRows())); + data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows()))); return; } dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(), data.getDataRows());