diff --git a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java index 40deb57b59..22d93e52b3 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/util/ExceptionUtils.java @@ -48,4 +48,18 @@ public static String stringifyException(@Nullable Throwable throwable) { return e.getClass().getName() + " (error while printing stack trace)"; } } + + @FunctionalInterface + public interface WrapperRuntimeExceptionHandler { + O handle(I input) throws Exception; + } + + public static O wrapRuntimeException( + I input, WrapperRuntimeExceptionHandler handler) { + try { + return handler.handle(input); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index f8e3a21e64..fcba212d71 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -96,6 +96,14 @@ object Utils extends Logger { new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile))).getManifest } + def getJarManClass(jarFile: File): String = { + val manifest = getJarManifest(jarFile) + manifest.getMainAttributes.getValue("Main-Class") match { + case null => manifest.getMainAttributes.getValue("program-class") + case v => v + } + } + def copyProperties(original: Properties, target: Properties): Unit = original.foreach(x => target.put(x._1, x._2)) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java index 95883a07e9..36a1d05cc1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java @@ -78,7 +78,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.jar.Manifest; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -498,8 +497,7 @@ public String getMain(Application appParam) { project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule()); jarFile = new File(modulePath, appParam.getJar()); } - Manifest manifest = Utils.getJarManifest(jarFile); - return manifest.getMainAttributes().getValue("Main-Class"); + return Utils.getJarManClass(jarFile); } @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index b89e57a9ab..b8df85f2bf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -45,10 +45,10 @@ import org.apache.streampark.flink.packer.maven.MavenTool; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.factories.Factory; -import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; @@ -56,6 +56,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableMap; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -68,7 +69,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Serializable; -import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.time.Duration; @@ -77,12 +77,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Scanner; import java.util.ServiceLoader; import java.util.jar.JarEntry; import java.util.jar.JarFile; -import java.util.jar.Manifest; import java.util.stream.Collectors; @Slf4j @@ -95,7 +95,9 @@ public class ResourceServiceImpl extends ServiceImpl public static final String EXCEPTION = "exception"; @Autowired private ApplicationManageService applicationManageService; + @Autowired private CommonService commonService; + @Autowired private FlinkSqlService flinkSqlService; @Override @@ -138,26 +140,28 @@ public void addResource(Resource resource) throws Exception { ApiAlertException.throwIfTrue( jars.size() + poms.size() > 1, "Please do not add multi dependency at one time."); - if (resource.getResourceType() != ResourceTypeEnum.CONNECTOR) { - ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required."); - } else { + if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) { String connector = resource.getConnector(); ApiAlertException.throwIfTrue(connector == null, "the flink connector is null."); FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class); resource.setResourceName(connectorResource.getFactoryIdentifier()); - if (connectorResource.getRequiredOptions() != null) { - resource.setConnectorRequiredOptions( - JacksonUtils.write(connectorResource.getRequiredOptions())); - } - if (connectorResource.getOptionalOptions() != null) { - resource.setConnectorOptionalOptions( - JacksonUtils.write(connectorResource.getOptionalOptions())); - } + Optional.ofNullable(connectorResource.getRequiredOptions()) + .ifPresent( + v -> + resource.setConnectorRequiredOptions( + ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write))); + Optional.ofNullable(connectorResource.getOptionalOptions()) + .ifPresent( + v -> + resource.setConnectorOptionalOptions( + ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write))); + } else { + ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required."); } ApiAlertException.throwIfTrue( this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null, - String.format("Sorry, the resource %s already exists.", resource.getResourceName())); + String.format("the resource %s already exists, please check.", resource.getResourceName())); if (!jars.isEmpty()) { String resourcePath = jars.get(0); @@ -251,15 +255,8 @@ public void changeOwnership(Long userId, Long targetUserId) { @Override public String upload(MultipartFile file) throws IOException { File temp = WebUtils.getAppTempDir(); - - String name = file.getOriginalFilename(); - String suffix = name.substring(name.lastIndexOf(".")); - - String sha256Hex = DigestUtils.sha256Hex(file.getInputStream()); - String fileName = sha256Hex.concat(suffix); - + String fileName = FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename())); File saveFile = new File(temp, fileName); - if (!saveFile.exists()) { // save file to temp dir try { @@ -268,114 +265,93 @@ public String upload(MultipartFile file) throws IOException { throw new ApiDetailException(e); } } - return saveFile.getAbsolutePath(); } @Override public RestResponse checkResource(Resource resourceParam) throws JsonProcessingException { ResourceTypeEnum type = resourceParam.getResourceType(); - Map resp = new HashMap<>(0); - resp.put(STATE, 0); switch (type) { case FLINK_APP: - // check main. - File jarFile; - try { - jarFile = getResourceJar(resourceParam); - } catch (Exception e) { - // get jarFile error - resp.put(STATE, 1); - resp.put(EXCEPTION, ExceptionUtils.stringifyException(e)); - return RestResponse.success().data(resp); - } - if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) { - return RestResponse.success().data(resp); - } - Manifest manifest = Utils.getJarManifest(jarFile); - String mainClass = manifest.getMainAttributes().getValue("Main-Class"); - - if (mainClass == null) { - // main class is null - resp.put(STATE, 2); - return RestResponse.success().data(resp); - } - return RestResponse.success().data(resp); + return checkFlinkApp(resourceParam); case CONNECTOR: - // 1) get connector id - FlinkConnector connectorResource; - - ApiAlertException.throwIfFalse( - ResourceTypeEnum.CONNECTOR == resourceParam.getResourceType(), - "getConnectorId method error, resource not flink connector."); + return checkConnector(resourceParam); + } + return RestResponse.success().data(ImmutableMap.of(STATE, 0)); + } - List jars; - File connector = null; - List factories; + private RestResponse checkConnector(Resource resourceParam) throws JsonProcessingException { + // 1) get connector jar + FlinkConnector connectorResource; + List jars; + File connector; + List factories; + try { + File file = getResourceJar(resourceParam); + connector = file; + jars = Collections.singletonList(file); + } catch (Exception e) { + // get jarFile error + return buildExceptResponse(e, 1); + } - Dependency dependency = Dependency.toDependency(resourceParam.getResource()); + // 2) parse connector Factory + try { + factories = getConnectorFactory(connector); + } catch (Exception e) { + // flink connector invalid + return buildExceptResponse(e, 2); + } - // 1) get connector jar - if (!dependency.getPom().isEmpty()) { - Artifact artifact = dependency.toArtifact().get(0); - try { - jars = MavenTool.resolveArtifacts(artifact); - } catch (Exception e) { - // connector download is null - resp.put(STATE, 1); - resp.put(EXCEPTION, ExceptionUtils.stringifyException(e)); - return RestResponse.success().data(resp); - } - String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version()); - Optional file = jars.stream().filter(x -> x.getName().equals(fileName)).findFirst(); - if (file.isPresent()) { - connector = file.get(); - } - } else { - // 2) jar - String jar = dependency.getJar().get(0).split(":")[1]; - File file = new File(jar); - connector = file; - jars = Collections.singletonList(file); - } + // 3) get connector resource + connectorResource = getConnectorResource(jars, factories); + if (connectorResource == null) { + // connector is null + return buildExceptResponse(new RuntimeException("connector is null"), 3); + } - // 2) parse connector Factory - try { - factories = getConnectorFactory(connector); - } catch (Exception e) { - // flink connector invalid - resp.put(STATE, 2); - resp.put(EXCEPTION, ExceptionUtils.stringifyException(e)); - return RestResponse.success().data(resp); - } + // 2) check connector exists + boolean exists = + existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier()); + if (exists) { + return buildExceptResponse(new RuntimeException("connector is already exists"), 4); + } - // 3) get connector resource - connectorResource = getConnectorResource(jars, factories); - if (connectorResource == null) { - // connector is null - resp.put(STATE, 3); - return RestResponse.success().data(resp); - } + if (resourceParam.getId() != null + && !(getById(resourceParam.getId()) + .getResourceName() + .equals(connectorResource.getFactoryIdentifier()))) { + return buildExceptResponse( + new RuntimeException("resource name different with FactoryIdentifier"), 5); + } + return RestResponse.success() + .data(ImmutableMap.of(STATE, 0, "connector", JacksonUtils.write(connectorResource))); + } - // 2) check connector exists - boolean exists = - existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier()); - if (exists) { - resp.put(STATE, 4); - resp.put("name", connectorResource.getFactoryIdentifier()); - return RestResponse.success(resp); - } + private static RestResponse buildExceptResponse(Exception e, int code) { + return RestResponse.success() + .data(ImmutableMap.of(STATE, code, EXCEPTION, ExceptionUtils.stringifyException(e))); + } - if (resourceParam.getId() != null) { - Resource resource = getById(resourceParam.getId()); - if (!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) { - resp.put(STATE, 5); - return RestResponse.success().data(resp); - } - } - resp.put(STATE, 0); - resp.put("connector", JacksonUtils.write(connectorResource)); - return RestResponse.success().data(resp); + private RestResponse checkFlinkApp(Resource resourceParam) { + // check main. + File jarFile; + try { + jarFile = getResourceJar(resourceParam); + } catch (Exception e) { + // get jarFile error + return buildExceptResponse(e, 1); + } + ApiAlertException.throwIfTrue(jarFile == null, "flink app jar must exist."); + Map resp = new HashMap<>(0); + resp.put(STATE, 0); + if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) { + return RestResponse.success().data(resp); + } + String mainClass = Utils.getJarManClass(jarFile); + if (mainClass == null) { + // main class is null + return buildExceptResponse(new RuntimeException("main class is null"), 2); } return RestResponse.success().data(resp); } @@ -394,13 +370,7 @@ private FlinkConnector getConnectorResource(List jars, List factor URL[] array = jars.stream() .map( - x -> { - try { - return x.toURI().toURL(); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - }) + file -> ExceptionUtils.wrapRuntimeException(file, handle -> handle.toURI().toURL())) .toArray(URL[]::new); try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) { @@ -458,9 +428,9 @@ private File getResourceJar(Resource resource) throws Exception { String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version()); Optional jarFile = files.stream().filter(x -> x.getName().equals(fileName)).findFirst(); - if (jarFile.isPresent()) { - return jarFile.get(); - } + jarFile.ifPresent( + file -> transferTeamResource(resource.getTeamId(), file.getAbsolutePath())); + return jarFile.orElse(null); } return null; }