Skip to content

Commit

Permalink
[improve] improve resource management logic (#3439)
Browse files Browse the repository at this point in the history
Co-authored-by: monrg <[email protected]>
  • Loading branch information
monrg and monrg authored Dec 28, 2023
1 parent 0533ec3 commit a1a564f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,18 @@ public static String stringifyException(@Nullable Throwable throwable) {
return e.getClass().getName() + " (error while printing stack trace)";
}
}

@FunctionalInterface
public interface WrapperRuntimeExceptionHandler<I, O> {
O handle(I input) throws Exception;
}

public static <I, O> O wrapRuntimeException(
I input, WrapperRuntimeExceptionHandler<I, O> handler) {
try {
return handler.handle(input);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ object Utils extends Logger {
new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile))).getManifest
}

def getJarManClass(jarFile: File): String = {
val manifest = getJarManifest(jarFile)
manifest.getMainAttributes.getValue("Main-Class") match {
case null => manifest.getMainAttributes.getValue("program-class")
case v => v
}
}

def copyProperties(original: Properties, target: Properties): Unit =
original.foreach(x => target.put(x._1, x._2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -498,8 +497,7 @@ public String getMain(Application appParam) {
project.getDistHome().getAbsolutePath().concat("/").concat(appParam.getModule());
jarFile = new File(modulePath, appParam.getJar());
}
Manifest manifest = Utils.getJarManifest(jarFile);
return manifest.getMainAttributes().getValue("Main-Class");
return Utils.getJarManClass(jarFile);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@
import org.apache.streampark.flink.packer.maven.MavenTool;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.Factory;
import org.apache.hadoop.shaded.org.apache.commons.codec.digest.DigestUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -68,7 +69,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
Expand All @@ -77,12 +77,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.ServiceLoader;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -95,7 +95,9 @@ public class ResourceServiceImpl extends ServiceImpl<ResourceMapper, Resource>
public static final String EXCEPTION = "exception";

@Autowired private ApplicationManageService applicationManageService;

@Autowired private CommonService commonService;

@Autowired private FlinkSqlService flinkSqlService;

@Override
Expand Down Expand Up @@ -138,26 +140,28 @@ public void addResource(Resource resource) throws Exception {
ApiAlertException.throwIfTrue(
jars.size() + poms.size() > 1, "Please do not add multi dependency at one time.");

if (resource.getResourceType() != ResourceTypeEnum.CONNECTOR) {
ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
} else {
if (resource.getResourceType() == ResourceTypeEnum.CONNECTOR) {
String connector = resource.getConnector();
ApiAlertException.throwIfTrue(connector == null, "the flink connector is null.");
FlinkConnector connectorResource = JacksonUtils.read(connector, FlinkConnector.class);
resource.setResourceName(connectorResource.getFactoryIdentifier());
if (connectorResource.getRequiredOptions() != null) {
resource.setConnectorRequiredOptions(
JacksonUtils.write(connectorResource.getRequiredOptions()));
}
if (connectorResource.getOptionalOptions() != null) {
resource.setConnectorOptionalOptions(
JacksonUtils.write(connectorResource.getOptionalOptions()));
}
Optional.ofNullable(connectorResource.getRequiredOptions())
.ifPresent(
v ->
resource.setConnectorRequiredOptions(
ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
Optional.ofNullable(connectorResource.getOptionalOptions())
.ifPresent(
v ->
resource.setConnectorOptionalOptions(
ExceptionUtils.wrapRuntimeException(v, JacksonUtils::write)));
} else {
ApiAlertException.throwIfNull(resource.getResourceName(), "The resourceName is required.");
}

ApiAlertException.throwIfTrue(
this.findByResourceName(resource.getTeamId(), resource.getResourceName()) != null,
String.format("Sorry, the resource %s already exists.", resource.getResourceName()));
String.format("the resource %s already exists, please check.", resource.getResourceName()));

if (!jars.isEmpty()) {
String resourcePath = jars.get(0);
Expand Down Expand Up @@ -251,15 +255,8 @@ public void changeOwnership(Long userId, Long targetUserId) {
@Override
public String upload(MultipartFile file) throws IOException {
File temp = WebUtils.getAppTempDir();

String name = file.getOriginalFilename();
String suffix = name.substring(name.lastIndexOf("."));

String sha256Hex = DigestUtils.sha256Hex(file.getInputStream());
String fileName = sha256Hex.concat(suffix);

String fileName = FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
File saveFile = new File(temp, fileName);

if (!saveFile.exists()) {
// save file to temp dir
try {
Expand All @@ -268,114 +265,93 @@ public String upload(MultipartFile file) throws IOException {
throw new ApiDetailException(e);
}
}

return saveFile.getAbsolutePath();
}

@Override
public RestResponse checkResource(Resource resourceParam) throws JsonProcessingException {
ResourceTypeEnum type = resourceParam.getResourceType();
Map<String, Serializable> resp = new HashMap<>(0);
resp.put(STATE, 0);
switch (type) {
case FLINK_APP:
// check main.
File jarFile;
try {
jarFile = getResourceJar(resourceParam);
} catch (Exception e) {
// get jarFile error
resp.put(STATE, 1);
resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
return RestResponse.success().data(resp);
}
Manifest manifest = Utils.getJarManifest(jarFile);
String mainClass = manifest.getMainAttributes().getValue("Main-Class");

if (mainClass == null) {
// main class is null
resp.put(STATE, 2);
return RestResponse.success().data(resp);
}
return RestResponse.success().data(resp);
return checkFlinkApp(resourceParam);
case CONNECTOR:
// 1) get connector id
FlinkConnector connectorResource;

ApiAlertException.throwIfFalse(
ResourceTypeEnum.CONNECTOR == resourceParam.getResourceType(),
"getConnectorId method error, resource not flink connector.");
return checkConnector(resourceParam);
}
return RestResponse.success().data(ImmutableMap.of(STATE, 0));
}

List<File> jars;
File connector = null;
List<String> factories;
private RestResponse checkConnector(Resource resourceParam) throws JsonProcessingException {
// 1) get connector jar
FlinkConnector connectorResource;
List<File> jars;
File connector;
List<String> factories;
try {
File file = getResourceJar(resourceParam);
connector = file;
jars = Collections.singletonList(file);
} catch (Exception e) {
// get jarFile error
return buildExceptResponse(e, 1);
}

Dependency dependency = Dependency.toDependency(resourceParam.getResource());
// 2) parse connector Factory
try {
factories = getConnectorFactory(connector);
} catch (Exception e) {
// flink connector invalid
return buildExceptResponse(e, 2);
}

// 1) get connector jar
if (!dependency.getPom().isEmpty()) {
Artifact artifact = dependency.toArtifact().get(0);
try {
jars = MavenTool.resolveArtifacts(artifact);
} catch (Exception e) {
// connector download is null
resp.put(STATE, 1);
resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Optional<File> file = jars.stream().filter(x -> x.getName().equals(fileName)).findFirst();
if (file.isPresent()) {
connector = file.get();
}
} else {
// 2) jar
String jar = dependency.getJar().get(0).split(":")[1];
File file = new File(jar);
connector = file;
jars = Collections.singletonList(file);
}
// 3) get connector resource
connectorResource = getConnectorResource(jars, factories);
if (connectorResource == null) {
// connector is null
return buildExceptResponse(new RuntimeException("connector is null"), 3);
}

// 2) parse connector Factory
try {
factories = getConnectorFactory(connector);
} catch (Exception e) {
// flink connector invalid
resp.put(STATE, 2);
resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
// 2) check connector exists
boolean exists =
existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
if (exists) {
return buildExceptResponse(new RuntimeException("connector is already exists"), 4);
}

// 3) get connector resource
connectorResource = getConnectorResource(jars, factories);
if (connectorResource == null) {
// connector is null
resp.put(STATE, 3);
return RestResponse.success().data(resp);
}
if (resourceParam.getId() != null
&& !(getById(resourceParam.getId())
.getResourceName()
.equals(connectorResource.getFactoryIdentifier()))) {
return buildExceptResponse(
new RuntimeException("resource name different with FactoryIdentifier"), 5);
}
return RestResponse.success()
.data(ImmutableMap.of(STATE, 0, "connector", JacksonUtils.write(connectorResource)));
}

// 2) check connector exists
boolean exists =
existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
if (exists) {
resp.put(STATE, 4);
resp.put("name", connectorResource.getFactoryIdentifier());
return RestResponse.success(resp);
}
private static RestResponse buildExceptResponse(Exception e, int code) {
return RestResponse.success()
.data(ImmutableMap.of(STATE, code, EXCEPTION, ExceptionUtils.stringifyException(e)));
}

if (resourceParam.getId() != null) {
Resource resource = getById(resourceParam.getId());
if (!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
resp.put(STATE, 5);
return RestResponse.success().data(resp);
}
}
resp.put(STATE, 0);
resp.put("connector", JacksonUtils.write(connectorResource));
return RestResponse.success().data(resp);
private RestResponse checkFlinkApp(Resource resourceParam) {
// check main.
File jarFile;
try {
jarFile = getResourceJar(resourceParam);
} catch (Exception e) {
// get jarFile error
return buildExceptResponse(e, 1);
}
ApiAlertException.throwIfTrue(jarFile == null, "flink app jar must exist.");
Map<String, Serializable> resp = new HashMap<>(0);
resp.put(STATE, 0);
if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
return RestResponse.success().data(resp);
}
String mainClass = Utils.getJarManClass(jarFile);
if (mainClass == null) {
// main class is null
return buildExceptResponse(new RuntimeException("main class is null"), 2);
}
return RestResponse.success().data(resp);
}
Expand All @@ -394,13 +370,7 @@ private FlinkConnector getConnectorResource(List<File> jars, List<String> factor
URL[] array =
jars.stream()
.map(
x -> {
try {
return x.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
})
file -> ExceptionUtils.wrapRuntimeException(file, handle -> handle.toURI().toURL()))
.toArray(URL[]::new);

try (URLClassLoader urlClassLoader = URLClassLoader.newInstance(array)) {
Expand Down Expand Up @@ -458,9 +428,9 @@ private File getResourceJar(Resource resource) throws Exception {
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Optional<File> jarFile =
files.stream().filter(x -> x.getName().equals(fileName)).findFirst();
if (jarFile.isPresent()) {
return jarFile.get();
}
jarFile.ifPresent(
file -> transferTeamResource(resource.getTeamId(), file.getAbsolutePath()));
return jarFile.orElse(null);
}
return null;
}
Expand Down

0 comments on commit a1a564f

Please sign in to comment.